优化新增Rpc压缩与自定义超时功能

This commit is contained in:
duanhf2012
2023-07-25 17:36:47 +08:00
parent 1890b300ee
commit 6ef98a2104
3 changed files with 10 additions and 13 deletions

View File

@@ -11,11 +11,11 @@ import (
var memPool network.INetMempool = network.NewMemAreaPool()
type ICompressor interface {
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte) ([]byte, error)//dst如果有预申请使用dst内存传入nil时内部申请
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收
UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收
CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收
UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收
}
var compressor ICompressor
@@ -99,4 +99,4 @@ func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){
func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) {
memPool.ReleaseByteSlice(buffer)
}
}

View File

@@ -55,7 +55,6 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
}
func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
call := MakeCall()
call.ServiceMethod = serviceMethod
@@ -107,9 +106,6 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.selfClient.RemovePending(call.Seq)
@@ -228,9 +224,9 @@ func (rc *RClient) Run() {
//解压缩
byteData := bytes[1:]
var compressBuff []byte
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes)
@@ -244,13 +240,14 @@ func (rc *RClient) Run() {
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
rc.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)
log.SError("rpcClient Unmarshal head error:", err.Error())
continue
}
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending")

View File

@@ -153,7 +153,7 @@ func (agent *RpcAgent) Run() {
processor := GetProcessor(data[0]&0x7f)
if processor == nil {
agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0])
log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0])
return
}
@@ -166,7 +166,7 @@ func (agent *RpcAgent) Run() {
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", unCompressErr.Error())
log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error())
return
}
byteData = compressBuff