From 1890b300ee7d241bdbf8d1b435b670950193d72c Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Tue, 25 Jul 2023 15:13:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=B0=E5=A2=9Erpc?= =?UTF-8?q?=E5=8E=8B=E7=BC=A9=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/slicepool.go | 2 +- rpc/compressor.go | 67 +++++++++++++++++++++++++++++--------------- rpc/rclient.go | 43 +++++++++++++++++----------- rpc/server.go | 26 +++++++++-------- 4 files changed, 86 insertions(+), 52 deletions(-) diff --git a/network/slicepool.go b/network/slicepool.go index 13e22fe..873f4df 100644 --- a/network/slicepool.go +++ b/network/slicepool.go @@ -16,7 +16,7 @@ type memAreaPool struct { pool []sync.Pool } -var memAreaPoolList = [3]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}} +var memAreaPoolList = [4]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}, &memAreaPool{minAreaValue: 417793, maxAreaValue: 1925120, growthValue: 65536}} func init() { for i := 0; i < len(memAreaPoolList); i++ { diff --git a/rpc/compressor.go b/rpc/compressor.go index bf834d8..32eda99 100644 --- a/rpc/compressor.go +++ b/rpc/compressor.go @@ -5,18 +5,20 @@ import ( "errors" "github.com/pierrec/lz4/v4" "fmt" + "github.com/duanhf2012/origin/network" ) -type ICompressor interface { - CompressBlock(src, dst []byte) ([]byte,int, error) //dst如果有预申请使用dst内存,传入nil时内部申请 - UncompressBlock(src []byte, dst []byte) ([]byte,int, error)//dst如果有预申请使用dst内存,传入nil时内部申请 +var memPool network.INetMempool = network.NewMemAreaPool() - CompressBlockBound(n int) int - UnCompressBlockBound(n int) int +type ICompressor interface { + CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请 + UncompressBlock(src []byte) ([]byte, error)//dst如果有预申请使用dst内存,传入nil时内部申请 + + CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收 + UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收 } var compressor ICompressor - func init(){ SetCompressor(&Lz4Compressor{}) } @@ -28,7 +30,7 @@ func SetCompressor(cp ICompressor){ type Lz4Compressor struct { } -func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (dest []byte,cnt int, err error) { +func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -38,19 +40,31 @@ func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (dest []byte,cnt int, er } }() - dest = dst var c lz4.Compressor - maxCompressSize := lc.CompressBlockBound(len(src)) - if len(dest) < maxCompressSize { - dest = make([]byte,maxCompressSize) + var cnt int + dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1) + cnt, err = c.CompressBlock(src, dest[1:]) + if err != nil { + memPool.ReleaseByteSlice(dest) + return nil,err } - cnt, err = c.CompressBlock(src, dest) + ratio := len(src) / cnt + if len(src) % cnt > 0 { + ratio += 1 + } + if ratio > 255 { + memPool.ReleaseByteSlice(dest) + return nil,fmt.Errorf("Impermissible errors") + } + + dest[0] = uint8(ratio) + dest = dest[:cnt+1] return } -func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (dest []byte,cnt int, err error) { +func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -60,22 +74,29 @@ func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (dest []byte,cnt int, } }() - dest = dst - maxUncompressSize := lc.UnCompressBlockBound(len(src)) - if len(dest) < maxUncompressSize { - dest = make([]byte,maxUncompressSize) + radio := uint8(src[0]) + if radio == 0 { + return nil,fmt.Errorf("Impermissible errors") } - cnt, err = lz4.UncompressBlock(src, dest) - return + dest = memPool.MakeByteSlice(len(src)*int(radio)) + cnt, err := lz4.UncompressBlock(src[1:], dest) + if err != nil { + memPool.ReleaseByteSlice(dest) + return nil,err + } + + return dest[:cnt],nil } -func (lc *Lz4Compressor) CompressBlockBound(n int) int{ +func (lc *Lz4Compressor) compressBlockBound(n int) int{ return lz4.CompressBlockBound(n) } -func (lc *Lz4Compressor) UnCompressBlockBound(n int) int{ - return n*10 +func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){ + memPool.ReleaseByteSlice(buffer) } - +func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) { + memPool.ReleaseByteSlice(buffer) +} \ No newline at end of file diff --git a/rpc/rclient.go b/rpc/rclient.go index abc6112..dafbccd 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -14,8 +14,6 @@ import ( //跨结点连接的Client type RClient struct { - compressBuff []byte - compressBytesLen int selfClient *Client network.TCPClient @@ -84,19 +82,19 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply return call } + var compressBuff[]byte bCompress := uint8(0) if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen { - var cnt int var cErr error - rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:]) + compressBuff,cErr = compressor.CompressBlock(bytes) if cErr != nil { call.Seq = 0 - log.SError(err.Error()) - call.DoError(err) + log.SError(cErr.Error()) + call.DoError(cErr) return call } - if cnt < len(bytes) { - bytes = rc.compressBuff[:cnt] + if len(compressBuff) < len(bytes) { + bytes = compressBuff bCompress = 1<<7 } } @@ -106,6 +104,12 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply } err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes) + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } if err != nil { rc.selfClient.RemovePending(call.Seq) @@ -148,18 +152,17 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod) } + var compressBuff[]byte bCompress := uint8(0) if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen { - var cnt int var cErr error - - rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:]) + compressBuff,cErr = compressor.CompressBlock(bytes) if cErr != nil { return emptyCancelRpc,cErr } - if cnt < len(bytes) { - bytes = rc.compressBuff[:cnt] + if len(compressBuff) < len(bytes) { + bytes = compressBuff bCompress = 1<<7 } } @@ -174,6 +177,9 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi rc.selfClient.AddPending(call) err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes) + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } if err != nil { rc.selfClient.RemovePending(call.Seq) ReleaseCall(call) @@ -221,20 +227,23 @@ func (rc *RClient) Run() { //解压缩 byteData := bytes[1:] + var compressBuff []byte if bCompress == true { - var cnt int var unCompressErr error - rc.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,rc.compressBuff[:]) + compressBuff,unCompressErr = compressor.UncompressBlock(byteData) if unCompressErr!= nil { rc.conn.ReleaseReadMsg(bytes) - log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) + log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", unCompressErr.Error()) return } - byteData = rc.compressBuff[:cnt] + byteData = compressBuff } err = processor.Unmarshal(byteData, response.RpcResponseData) + if cap(compressBuff) > 0 { + compressor.UnCompressBufferCollection(compressBuff) + } rc.conn.ReleaseReadMsg(bytes) if err != nil { processor.ReleaseRpcResponse(response.RpcResponseData) diff --git a/rpc/server.go b/rpc/server.go index ac30760..6e68d1f 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -35,8 +35,6 @@ type RpcAgent struct { conn network.Conn rpcServer *Server userData interface{} - - compressBuff []byte } func AppendProcessor(rpcProcessor IRpcProcessor) { @@ -117,23 +115,26 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri return } + var compressBuff[]byte bCompress := uint8(0) if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen { - var cnt int var cErr error - agent.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,agent.compressBuff[:]) + compressBuff,cErr = compressor.CompressBlock(bytes) if cErr != nil { - log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error()) + log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error()) return } - if cnt < len(bytes) { - bytes = agent.compressBuff[:cnt] + if len(compressBuff) < len(bytes) { + bytes = compressBuff bCompress = 1<<7 } } errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes) + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } if errM != nil { log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) } @@ -157,22 +158,25 @@ func (agent *RpcAgent) Run() { } //解析head + var compressBuff []byte byteData := data[1:] if bCompress == true { - var cnt int var unCompressErr error - agent.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,agent.compressBuff[:]) + compressBuff,unCompressErr = compressor.UncompressBlock(byteData) if unCompressErr!= nil { agent.conn.ReleaseReadMsg(data) - log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error()) + log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", unCompressErr.Error()) return } - byteData = agent.compressBuff[:cnt] + byteData = compressBuff } req := MakeRpcRequest(processor, 0, 0, "", false, nil) err = processor.Unmarshal(byteData, req.RpcRequestData) + if cap(compressBuff) > 0 { + compressor.UnCompressBufferCollection(compressBuff) + } agent.conn.ReleaseReadMsg(data) if err != nil { log.SError("rpc Unmarshal request is error:", err.Error())