rpc加入压缩功能

This commit is contained in:
duanhf2012
2023-07-10 14:22:23 +08:00
parent 962016d476
commit 85a8ec58e5
4 changed files with 154 additions and 12 deletions

View File

@@ -26,6 +26,7 @@ type NodeInfo struct {
Private bool Private bool
ListenAddr string ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度 MaxRpcParamLen uint32 //最大Rpc参数长度
CompressBytesLen int //超过字节进行压缩的长度
ServiceList []string //所有的有序服务列表 ServiceList []string //所有的有序服务列表
PublicServiceList []string //对外公开的服务列表 PublicServiceList []string //对外公开的服务列表
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
@@ -73,7 +74,7 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
} }
func (cls *Cluster) Start() { func (cls *Cluster) Start() {
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen) cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen)
} }
func (cls *Cluster) Stop() { func (cls *Cluster) Stop() {
@@ -195,7 +196,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.triggerRpcEvent) rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,nodeInfo.CompressBytesLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo cls.mapRpc[nodeInfo.NodeId] = rpcInfo
} }

69
rpc/compressor.go Normal file
View File

@@ -0,0 +1,69 @@
package rpc
import (
"runtime"
"errors"
"github.com/pierrec/lz4/v4"
"fmt"
)
type ICompressor interface {
CompressBlock(src, dst []byte) (int, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte, dst []byte) (int, error)//dst如果有预申请使用dst内存传入nil时内部申请
CompressBlockBound(n int) int
UnCompressBlockBound(n int) int
}
var compressor ICompressor
func init(){
SetCompressor(&Lz4Compressor{})
}
func SetCompressor(cp ICompressor){
compressor = cp
}
type Lz4Compressor struct {
}
func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (cnt int, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l]))
}
}()
var c lz4.Compressor
cnt, err = c.CompressBlock(src, dst)
return
}
func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (cnt int, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l]))
}
}()
cnt, err = lz4.UncompressBlock(src, dst)
return
}
func (lc *Lz4Compressor) CompressBlockBound(n int) int{
return lz4.CompressBlockBound(n)
}
func (lc *Lz4Compressor) UnCompressBlockBound(n int) int{
return n*10
}

View File

@@ -13,6 +13,9 @@ import (
//跨结点连接的Client //跨结点连接的Client
type RClient struct { type RClient struct {
compressBuff []byte
compressBytesLen int
selfClient *Client selfClient *Client
network.TCPClient network.TCPClient
conn *network.TCPConn conn *network.TCPConn
@@ -80,11 +83,24 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
return call return call
} }
bCompress := uint8(0x7f)
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:])
if cErr != nil {
call.Seq = 0
log.SError(err.Error())
call.DoError(err)
return call
}
bytes = rc.compressBuff[:cnt]
bCompress = 0xff
}
if noReply == false { if noReply == false {
rc.selfClient.AddPending(call) rc.selfClient.AddPending(call)
} }
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes)
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
@@ -127,6 +143,16 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
return errors.New("Rpc server is disconnect,call " + serviceMethod) return errors.New("Rpc server is disconnect,call " + serviceMethod)
} }
bCompress := uint8(0x7f)
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:])
if cErr != nil {
return cErr
}
bytes = rc.compressBuff[:cnt]
bCompress = 0xff
}
call := MakeCall() call := MakeCall()
call.Reply = replyParam call.Reply = replyParam
call.callback = &callback call.callback = &callback
@@ -135,7 +161,7 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
call.Seq = seq call.Seq = seq
rc.selfClient.AddPending(call) rc.selfClient.AddPending(call)
err = conn.WriteMsg([]byte{uint8(processorType)}, bytes) err = conn.WriteMsg([]byte{uint8(processorType)&bCompress}, bytes)
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
ReleaseCall(call) ReleaseCall(call)
@@ -145,6 +171,8 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
return nil return nil
} }
func (rc *RClient) Run() { func (rc *RClient) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@@ -163,7 +191,8 @@ func (rc *RClient) Run() {
return return
} }
processor := GetProcessor(bytes[0]) bCompress := (bytes[0]>>7) > 0
processor := GetProcessor(bytes[0]&0x7f)
if processor == nil { if processor == nil {
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
@@ -174,7 +203,19 @@ func (rc *RClient) Run() {
response := RpcResponse{} response := RpcResponse{}
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
err = processor.Unmarshal(bytes[1:], response.RpcResponseData) //解压缩
byteData := bytes[1:]
if bCompress == true {
cnt,unCompressErr := compressor.UncompressBlock(byteData,rc.compressBuff)
if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
return
}
byteData = rc.compressBuff[:cnt]
}
err = processor.Unmarshal(byteData, response.RpcResponseData)
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
if err != nil { if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData) processor.ReleaseRpcResponse(response.RpcResponseData)
@@ -214,14 +255,14 @@ func (rc *RClient) OnClose() {
rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId())
} }
func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId client.nodeId = nodeId
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout client.callRpcTimeout = DefaultRpcTimeout
c:= &RClient{} c:= &RClient{}
c.compressBytesLen = compressBytesLen
c.selfClient = client c.selfClient = client
c.Addr = addr c.Addr = addr
c.ConnectInterval = DefaultConnectInterval c.ConnectInterval = DefaultConnectInterval
@@ -236,6 +277,8 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve
c.LittleEndian = LittleEndian c.LittleEndian = LittleEndian
c.NewAgent = client.NewClientAgent c.NewAgent = client.NewClientAgent
c.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(maxRpcParamLen)))
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
c.MaxMsgLen = maxRpcParamLen c.MaxMsgLen = maxRpcParamLen
} else { } else {

View File

@@ -27,12 +27,16 @@ type Server struct {
functions map[interface{}]interface{} functions map[interface{}]interface{}
rpcHandleFinder RpcHandleFinder rpcHandleFinder RpcHandleFinder
rpcServer *network.TCPServer rpcServer *network.TCPServer
compressBytesLen int
} }
type RpcAgent struct { type RpcAgent struct {
conn network.Conn conn network.Conn
rpcServer *Server rpcServer *Server
userData interface{} userData interface{}
compressBuff []byte
} }
func AppendProcessor(rpcProcessor IRpcProcessor) { func AppendProcessor(rpcProcessor IRpcProcessor) {
@@ -64,7 +68,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) {
const Default_ReadWriteDeadline = 15*time.Second const Default_ReadWriteDeadline = 15*time.Second
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
splitAddr := strings.Split(listenAddr, ":") splitAddr := strings.Split(listenAddr, ":")
if len(splitAddr) != 2 { if len(splitAddr) != 2 {
log.SFatal("listen addr is error :", listenAddr) log.SFatal("listen addr is error :", listenAddr)
@@ -72,6 +76,7 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.MinMsgLen = 2 server.rpcServer.MinMsgLen = 2
server.compressBytesLen = compressBytesLen
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = maxRpcParamLen server.rpcServer.MaxMsgLen = maxRpcParamLen
} else { } else {
@@ -112,7 +117,18 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
return return
} }
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) bCompress := uint8(0x7f)
if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen {
cnt,cErr := compressor.CompressBlock(bytes,agent.compressBuff[:])
if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error())
return
}
bytes = agent.compressBuff[:cnt]
bCompress = 0xff
}
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes)
if errM != nil { if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
} }
@@ -127,7 +143,8 @@ func (agent *RpcAgent) Run() {
break break
} }
processor := GetProcessor(data[0]) bCompress := (data[0]>>7) > 0
processor := GetProcessor(data[0]&0x7f)
if processor == nil { if processor == nil {
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0]) log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0])
@@ -135,8 +152,19 @@ func (agent *RpcAgent) Run() {
} }
//解析head //解析head
byteData := data[1:]
if bCompress == true {
cnt,unCompressErr := compressor.UncompressBlock(byteData,agent.compressBuff)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error())
return
}
byteData = agent.compressBuff[:cnt]
}
req := MakeRpcRequest(processor, 0, 0, "", false, nil) req := MakeRpcRequest(processor, 0, 0, "", false, nil)
err = processor.Unmarshal(data[1:], req.RpcRequestData) err = processor.Unmarshal(byteData, req.RpcRequestData)
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
if err != nil { if err != nil {
log.SError("rpc Unmarshal request is error:", err.Error()) log.SError("rpc Unmarshal request is error:", err.Error())
@@ -233,6 +261,7 @@ func (agent *RpcAgent) Destroy() {
func (server *Server) NewAgent(c *network.TCPConn) network.Agent { func (server *Server) NewAgent(c *network.TCPConn) network.Agent {
agent := &RpcAgent{conn: c, rpcServer: server} agent := &RpcAgent{conn: c, rpcServer: server}
agent.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(server.rpcServer.MaxMsgLen)))
return agent return agent
} }