diff --git a/cluster/cluster.go b/cluster/cluster.go index fcda60f..602a779 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -26,6 +26,7 @@ type NodeInfo struct { Private bool ListenAddr string MaxRpcParamLen uint32 //最大Rpc参数长度 + CompressBytesLen int //超过字节进行压缩的长度 ServiceList []string //所有的有序服务列表 PublicServiceList []string //对外公开的服务列表 DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 @@ -73,7 +74,7 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) { } func (cls *Cluster) Start() { - cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen) + cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen) } func (cls *Cluster) Stop() { @@ -195,7 +196,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo - rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.triggerRpcEvent) + rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,nodeInfo.CompressBytesLen,cls.triggerRpcEvent) cls.mapRpc[nodeInfo.NodeId] = rpcInfo } diff --git a/rpc/compressor.go b/rpc/compressor.go new file mode 100644 index 0000000..d1fba28 --- /dev/null +++ b/rpc/compressor.go @@ -0,0 +1,69 @@ +package rpc + +import ( + "runtime" + "errors" + "github.com/pierrec/lz4/v4" + "fmt" +) + +type ICompressor interface { + CompressBlock(src, dst []byte) (int, error) //dst如果有预申请使用dst内存,传入nil时内部申请 + UncompressBlock(src []byte, dst []byte) (int, error)//dst如果有预申请使用dst内存,传入nil时内部申请 + + CompressBlockBound(n int) int + UnCompressBlockBound(n int) int +} + +var compressor ICompressor + +func init(){ + SetCompressor(&Lz4Compressor{}) +} + +func SetCompressor(cp ICompressor){ + compressor = cp +} + +type Lz4Compressor struct { +} + +func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (cnt int, err error) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l])) + } + }() + + var c lz4.Compressor + cnt, err = c.CompressBlock(src, dst) + + return +} + +func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (cnt int, err error) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l])) + } + }() + + cnt, err = lz4.UncompressBlock(src, dst) + return +} + +func (lc *Lz4Compressor) CompressBlockBound(n int) int{ + return lz4.CompressBlockBound(n) +} + +func (lc *Lz4Compressor) UnCompressBlockBound(n int) int{ + return n*10 +} + + diff --git a/rpc/rclient.go b/rpc/rclient.go index 6296724..b0eaa45 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -13,6 +13,9 @@ import ( //跨结点连接的Client type RClient struct { + compressBuff []byte + + compressBytesLen int selfClient *Client network.TCPClient conn *network.TCPConn @@ -80,11 +83,24 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply return call } + bCompress := uint8(0x7f) + if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen { + cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:]) + if cErr != nil { + call.Seq = 0 + log.SError(err.Error()) + call.DoError(err) + return call + } + bytes = rc.compressBuff[:cnt] + bCompress = 0xff + } + if noReply == false { rc.selfClient.AddPending(call) } - err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) + err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes) if err != nil { rc.selfClient.RemovePending(call.Seq) @@ -127,6 +143,16 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb return errors.New("Rpc server is disconnect,call " + serviceMethod) } + bCompress := uint8(0x7f) + if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen { + cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:]) + if cErr != nil { + return cErr + } + bytes = rc.compressBuff[:cnt] + bCompress = 0xff + } + call := MakeCall() call.Reply = replyParam call.callback = &callback @@ -135,7 +161,7 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb call.Seq = seq rc.selfClient.AddPending(call) - err = conn.WriteMsg([]byte{uint8(processorType)}, bytes) + err = conn.WriteMsg([]byte{uint8(processorType)&bCompress}, bytes) if err != nil { rc.selfClient.RemovePending(call.Seq) ReleaseCall(call) @@ -145,6 +171,8 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb return nil } + + func (rc *RClient) Run() { defer func() { if r := recover(); r != nil { @@ -163,7 +191,8 @@ func (rc *RClient) Run() { return } - processor := GetProcessor(bytes[0]) + bCompress := (bytes[0]>>7) > 0 + processor := GetProcessor(bytes[0]&0x7f) if processor == nil { rc.conn.ReleaseReadMsg(bytes) log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) @@ -174,7 +203,19 @@ func (rc *RClient) Run() { response := RpcResponse{} response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) - err = processor.Unmarshal(bytes[1:], response.RpcResponseData) + //解压缩 + byteData := bytes[1:] + if bCompress == true { + cnt,unCompressErr := compressor.UncompressBlock(byteData,rc.compressBuff) + if unCompressErr!= nil { + rc.conn.ReleaseReadMsg(bytes) + log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) + return + } + byteData = rc.compressBuff[:cnt] + } + + err = processor.Unmarshal(byteData, response.RpcResponseData) rc.conn.ReleaseReadMsg(bytes) if err != nil { processor.ReleaseRpcResponse(response.RpcResponseData) @@ -214,14 +255,14 @@ func (rc *RClient) OnClose() { rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) } -func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ +func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) client.nodeId = nodeId client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount client.callRpcTimeout = DefaultRpcTimeout - c:= &RClient{} + c.compressBytesLen = compressBytesLen c.selfClient = client c.Addr = addr c.ConnectInterval = DefaultConnectInterval @@ -236,6 +277,8 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve c.LittleEndian = LittleEndian c.NewAgent = client.NewClientAgent + c.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(maxRpcParamLen))) + if maxRpcParamLen > 0 { c.MaxMsgLen = maxRpcParamLen } else { diff --git a/rpc/server.go b/rpc/server.go index 7da1844..6487e98 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -27,12 +27,16 @@ type Server struct { functions map[interface{}]interface{} rpcHandleFinder RpcHandleFinder rpcServer *network.TCPServer + + compressBytesLen int } type RpcAgent struct { conn network.Conn rpcServer *Server userData interface{} + + compressBuff []byte } func AppendProcessor(rpcProcessor IRpcProcessor) { @@ -64,7 +68,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { const Default_ReadWriteDeadline = 15*time.Second -func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { +func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) { splitAddr := strings.Split(listenAddr, ":") if len(splitAddr) != 2 { log.SFatal("listen addr is error :", listenAddr) @@ -72,6 +76,7 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.MinMsgLen = 2 + server.compressBytesLen = compressBytesLen if maxRpcParamLen > 0 { server.rpcServer.MaxMsgLen = maxRpcParamLen } else { @@ -112,7 +117,18 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri return } - errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) + bCompress := uint8(0x7f) + if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen { + cnt,cErr := compressor.CompressBlock(bytes,agent.compressBuff[:]) + if cErr != nil { + log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error()) + return + } + bytes = agent.compressBuff[:cnt] + bCompress = 0xff + } + + errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes) if errM != nil { log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) } @@ -127,7 +143,8 @@ func (agent *RpcAgent) Run() { break } - processor := GetProcessor(data[0]) + bCompress := (data[0]>>7) > 0 + processor := GetProcessor(data[0]&0x7f) if processor == nil { agent.conn.ReleaseReadMsg(data) log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0]) @@ -135,8 +152,19 @@ func (agent *RpcAgent) Run() { } //解析head + byteData := data[1:] + if bCompress == true { + cnt,unCompressErr := compressor.UncompressBlock(byteData,agent.compressBuff) + if unCompressErr!= nil { + agent.conn.ReleaseReadMsg(data) + log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error()) + return + } + byteData = agent.compressBuff[:cnt] + } + req := MakeRpcRequest(processor, 0, 0, "", false, nil) - err = processor.Unmarshal(data[1:], req.RpcRequestData) + err = processor.Unmarshal(byteData, req.RpcRequestData) agent.conn.ReleaseReadMsg(data) if err != nil { log.SError("rpc Unmarshal request is error:", err.Error()) @@ -233,6 +261,7 @@ func (agent *RpcAgent) Destroy() { func (server *Server) NewAgent(c *network.TCPConn) network.Agent { agent := &RpcAgent{conn: c, rpcServer: server} + agent.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(server.rpcServer.MaxMsgLen))) return agent }