From ec1c2b451757df46eb394af52d33acafc163d01d Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Fri, 21 Jul 2023 15:28:52 +0800 Subject: [PATCH] =?UTF-8?q?node=E6=94=AF=E6=8C=81rpc=E5=8E=8B=E7=BC=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 2 +- cluster/dynamicdiscovery.go | 10 ++++++++++ rpc/compressor.go | 24 ++++++++++++++++++------ rpc/rclient.go | 37 ++++++++++++++++++++++++------------- rpc/server.go | 21 ++++++++++++++------- 5 files changed, 67 insertions(+), 27 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 602a779..763e564 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -196,7 +196,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo - rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,nodeInfo.CompressBytesLen,cls.triggerRpcEvent) + rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent) cls.mapRpc[nodeInfo.NodeId] = rpcInfo } diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index 7db631a..4ecdcf9 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -5,6 +5,8 @@ import ( "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" + "time" + "github.com/duanhf2012/origin/util/timer" ) const DynamicDiscoveryMasterName = "DiscoveryMaster" @@ -341,6 +343,10 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool { } func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { + dc.regServiceDiscover(nodeId) +} + +func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){ nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId) if nodeInfo == nil { return @@ -364,6 +370,10 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { if err != nil { log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) + dc.AfterFunc(time.Second*3, func(timer *timer.Timer) { + dc.regServiceDiscover(nodeId) + }) + return } }) diff --git a/rpc/compressor.go b/rpc/compressor.go index d1fba28..bf834d8 100644 --- a/rpc/compressor.go +++ b/rpc/compressor.go @@ -8,8 +8,8 @@ import ( ) type ICompressor interface { - CompressBlock(src, dst []byte) (int, error) //dst如果有预申请使用dst内存,传入nil时内部申请 - UncompressBlock(src []byte, dst []byte) (int, error)//dst如果有预申请使用dst内存,传入nil时内部申请 + CompressBlock(src, dst []byte) ([]byte,int, error) //dst如果有预申请使用dst内存,传入nil时内部申请 + UncompressBlock(src []byte, dst []byte) ([]byte,int, error)//dst如果有预申请使用dst内存,传入nil时内部申请 CompressBlockBound(n int) int UnCompressBlockBound(n int) int @@ -28,7 +28,7 @@ func SetCompressor(cp ICompressor){ type Lz4Compressor struct { } -func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (cnt int, err error) { +func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (dest []byte,cnt int, err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -38,13 +38,19 @@ func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (cnt int, err error) { } }() + dest = dst var c lz4.Compressor - cnt, err = c.CompressBlock(src, dst) + maxCompressSize := lc.CompressBlockBound(len(src)) + if len(dest) < maxCompressSize { + dest = make([]byte,maxCompressSize) + } + + cnt, err = c.CompressBlock(src, dest) return } -func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (cnt int, err error) { +func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (dest []byte,cnt int, err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -54,7 +60,13 @@ func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (cnt int, err error) { } }() - cnt, err = lz4.UncompressBlock(src, dst) + dest = dst + maxUncompressSize := lc.UnCompressBlockBound(len(src)) + if len(dest) < maxUncompressSize { + dest = make([]byte,maxUncompressSize) + } + + cnt, err = lz4.UncompressBlock(src, dest) return } diff --git a/rpc/rclient.go b/rpc/rclient.go index 84f5748..abc6112 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -84,24 +84,28 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply return call } - bCompress := uint8(0x7f) + bCompress := uint8(0) if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen { - cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:]) + var cnt int + var cErr error + rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:]) if cErr != nil { call.Seq = 0 log.SError(err.Error()) call.DoError(err) return call } - bytes = rc.compressBuff[:cnt] - bCompress = 0xff + if cnt < len(bytes) { + bytes = rc.compressBuff[:cnt] + bCompress = 1<<7 + } } if noReply == false { rc.selfClient.AddPending(call) } - err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes) + err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes) if err != nil { rc.selfClient.RemovePending(call.Seq) @@ -144,14 +148,20 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod) } - bCompress := uint8(0x7f) + bCompress := uint8(0) if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen { - cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:]) + var cnt int + var cErr error + + rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:]) if cErr != nil { return emptyCancelRpc,cErr } - bytes = rc.compressBuff[:cnt] - bCompress = 0xff + + if cnt < len(bytes) { + bytes = rc.compressBuff[:cnt] + bCompress = 1<<7 + } } call := MakeCall() @@ -163,7 +173,7 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi call.TimeOut = timeout rc.selfClient.AddPending(call) - err = conn.WriteMsg([]byte{uint8(processorType)&bCompress}, bytes) + err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes) if err != nil { rc.selfClient.RemovePending(call.Seq) ReleaseCall(call) @@ -212,7 +222,10 @@ func (rc *RClient) Run() { //解压缩 byteData := bytes[1:] if bCompress == true { - cnt,unCompressErr := compressor.UncompressBlock(byteData,rc.compressBuff) + var cnt int + var unCompressErr error + + rc.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,rc.compressBuff[:]) if unCompressErr!= nil { rc.conn.ReleaseReadMsg(bytes) log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) @@ -283,8 +296,6 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen c.LittleEndian = LittleEndian c.NewAgent = client.NewClientAgent - c.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(maxRpcParamLen))) - if maxRpcParamLen > 0 { c.MaxMsgLen = maxRpcParamLen } else { diff --git a/rpc/server.go b/rpc/server.go index 47be5be..ac30760 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -117,18 +117,23 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri return } - bCompress := uint8(0x7f) + bCompress := uint8(0) if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen { - cnt,cErr := compressor.CompressBlock(bytes,agent.compressBuff[:]) + var cnt int + var cErr error + + agent.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,agent.compressBuff[:]) if cErr != nil { log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error()) return } - bytes = agent.compressBuff[:cnt] - bCompress = 0xff + if cnt < len(bytes) { + bytes = agent.compressBuff[:cnt] + bCompress = 1<<7 + } } - errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes) + errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes) if errM != nil { log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) } @@ -154,7 +159,10 @@ func (agent *RpcAgent) Run() { //解析head byteData := data[1:] if bCompress == true { - cnt,unCompressErr := compressor.UncompressBlock(byteData,agent.compressBuff) + var cnt int + var unCompressErr error + + agent.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,agent.compressBuff[:]) if unCompressErr!= nil { agent.conn.ReleaseReadMsg(data) log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error()) @@ -261,7 +269,6 @@ func (agent *RpcAgent) Destroy() { func (server *Server) NewAgent(c *network.TCPConn) network.Agent { agent := &RpcAgent{conn: c, rpcServer: server} - agent.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(server.rpcServer.MaxMsgLen))) return agent }