diff --git a/rpc/compressor.go b/rpc/compressor.go index 32eda99..3f28c6e 100644 --- a/rpc/compressor.go +++ b/rpc/compressor.go @@ -11,11 +11,11 @@ import ( var memPool network.INetMempool = network.NewMemAreaPool() type ICompressor interface { - CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请 - UncompressBlock(src []byte) ([]byte, error)//dst如果有预申请使用dst内存,传入nil时内部申请 + CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请 + UncompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请 - CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收 - UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收 + CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收 + UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收 } var compressor ICompressor @@ -99,4 +99,4 @@ func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){ func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) { memPool.ReleaseByteSlice(buffer) -} \ No newline at end of file +} diff --git a/rpc/rclient.go b/rpc/rclient.go index dafbccd..774a9f9 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -55,7 +55,6 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) } - func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { call := MakeCall() call.ServiceMethod = serviceMethod @@ -107,9 +106,6 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply if cap(compressBuff) >0 { compressor.CompressBufferCollection(compressBuff) } - if cap(compressBuff) >0 { - compressor.CompressBufferCollection(compressBuff) - } if err != nil { rc.selfClient.RemovePending(call.Seq) @@ -228,9 +224,9 @@ func (rc *RClient) Run() { //解压缩 byteData := bytes[1:] var compressBuff []byte + if bCompress == true { var unCompressErr error - compressBuff,unCompressErr = compressor.UncompressBlock(byteData) if unCompressErr!= nil { rc.conn.ReleaseReadMsg(bytes) @@ -244,13 +240,14 @@ func (rc *RClient) Run() { if cap(compressBuff) > 0 { compressor.UnCompressBufferCollection(compressBuff) } + rc.conn.ReleaseReadMsg(bytes) if err != nil { processor.ReleaseRpcResponse(response.RpcResponseData) log.SError("rpcClient Unmarshal head error:", err.Error()) continue } - + v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq()) if v == nil { log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending") diff --git a/rpc/server.go b/rpc/server.go index 6e68d1f..a4d6202 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -153,7 +153,7 @@ func (agent *RpcAgent) Run() { processor := GetProcessor(data[0]&0x7f) if processor == nil { agent.conn.ReleaseReadMsg(data) - log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0]) + log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0]) return } @@ -166,7 +166,7 @@ func (agent *RpcAgent) Run() { compressBuff,unCompressErr = compressor.UncompressBlock(byteData) if unCompressErr!= nil { agent.conn.ReleaseReadMsg(data) - log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", unCompressErr.Error()) + log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error()) return } byteData = compressBuff