优化新增rpc压缩功能

This commit is contained in:
duanhf2012
2023-07-25 15:13:58 +08:00
parent 6fea2226e1
commit 1890b300ee
4 changed files with 86 additions and 52 deletions

View File

@@ -16,7 +16,7 @@ type memAreaPool struct {
pool []sync.Pool
}
var memAreaPoolList = [3]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}}
var memAreaPoolList = [4]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}, &memAreaPool{minAreaValue: 417793, maxAreaValue: 1925120, growthValue: 65536}}
func init() {
for i := 0; i < len(memAreaPoolList); i++ {

View File

@@ -5,18 +5,20 @@ import (
"errors"
"github.com/pierrec/lz4/v4"
"fmt"
"github.com/duanhf2012/origin/network"
)
type ICompressor interface {
CompressBlock(src, dst []byte) ([]byte,int, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte, dst []byte) ([]byte,int, error)//dst如果有预申请使用dst内存传入nil时内部申请
var memPool network.INetMempool = network.NewMemAreaPool()
CompressBlockBound(n int) int
UnCompressBlockBound(n int) int
type ICompressor interface {
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte) ([]byte, error)//dst如果有预申请使用dst内存传入nil时内部申请
CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收
UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收
}
var compressor ICompressor
func init(){
SetCompressor(&Lz4Compressor{})
}
@@ -28,7 +30,7 @@ func SetCompressor(cp ICompressor){
type Lz4Compressor struct {
}
func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (dest []byte,cnt int, err error) {
func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
@@ -38,19 +40,31 @@ func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (dest []byte,cnt int, er
}
}()
dest = dst
var c lz4.Compressor
maxCompressSize := lc.CompressBlockBound(len(src))
if len(dest) < maxCompressSize {
dest = make([]byte,maxCompressSize)
var cnt int
dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1)
cnt, err = c.CompressBlock(src, dest[1:])
if err != nil {
memPool.ReleaseByteSlice(dest)
return nil,err
}
cnt, err = c.CompressBlock(src, dest)
ratio := len(src) / cnt
if len(src) % cnt > 0 {
ratio += 1
}
if ratio > 255 {
memPool.ReleaseByteSlice(dest)
return nil,fmt.Errorf("Impermissible errors")
}
dest[0] = uint8(ratio)
dest = dest[:cnt+1]
return
}
func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (dest []byte,cnt int, err error) {
func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
@@ -60,22 +74,29 @@ func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (dest []byte,cnt int,
}
}()
dest = dst
maxUncompressSize := lc.UnCompressBlockBound(len(src))
if len(dest) < maxUncompressSize {
dest = make([]byte,maxUncompressSize)
radio := uint8(src[0])
if radio == 0 {
return nil,fmt.Errorf("Impermissible errors")
}
cnt, err = lz4.UncompressBlock(src, dest)
return
dest = memPool.MakeByteSlice(len(src)*int(radio))
cnt, err := lz4.UncompressBlock(src[1:], dest)
if err != nil {
memPool.ReleaseByteSlice(dest)
return nil,err
}
return dest[:cnt],nil
}
func (lc *Lz4Compressor) CompressBlockBound(n int) int{
func (lc *Lz4Compressor) compressBlockBound(n int) int{
return lz4.CompressBlockBound(n)
}
func (lc *Lz4Compressor) UnCompressBlockBound(n int) int{
return n*10
func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){
memPool.ReleaseByteSlice(buffer)
}
func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) {
memPool.ReleaseByteSlice(buffer)
}

View File

@@ -14,8 +14,6 @@ import (
//跨结点连接的Client
type RClient struct {
compressBuff []byte
compressBytesLen int
selfClient *Client
network.TCPClient
@@ -84,19 +82,19 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
return call
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
var cnt int
var cErr error
rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:])
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
call.Seq = 0
log.SError(err.Error())
call.DoError(err)
log.SError(cErr.Error())
call.DoError(cErr)
return call
}
if cnt < len(bytes) {
bytes = rc.compressBuff[:cnt]
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
@@ -106,6 +104,12 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
}
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.selfClient.RemovePending(call.Seq)
@@ -148,18 +152,17 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod)
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
var cnt int
var cErr error
rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:])
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
return emptyCancelRpc,cErr
}
if cnt < len(bytes) {
bytes = rc.compressBuff[:cnt]
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
@@ -174,6 +177,9 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
rc.selfClient.AddPending(call)
err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.selfClient.RemovePending(call.Seq)
ReleaseCall(call)
@@ -221,20 +227,23 @@ func (rc *RClient) Run() {
//解压缩
byteData := bytes[1:]
var compressBuff []byte
if bCompress == true {
var cnt int
var unCompressErr error
rc.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,rc.compressBuff[:])
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", unCompressErr.Error())
return
}
byteData = rc.compressBuff[:cnt]
byteData = compressBuff
}
err = processor.Unmarshal(byteData, response.RpcResponseData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
rc.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)

View File

@@ -35,8 +35,6 @@ type RpcAgent struct {
conn network.Conn
rpcServer *Server
userData interface{}
compressBuff []byte
}
func AppendProcessor(rpcProcessor IRpcProcessor) {
@@ -117,23 +115,26 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
return
}
var compressBuff[]byte
bCompress := uint8(0)
if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen {
var cnt int
var cErr error
agent.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,agent.compressBuff[:])
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error())
log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error())
return
}
if cnt < len(bytes) {
bytes = agent.compressBuff[:cnt]
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
}
@@ -157,22 +158,25 @@ func (agent *RpcAgent) Run() {
}
//解析head
var compressBuff []byte
byteData := data[1:]
if bCompress == true {
var cnt int
var unCompressErr error
agent.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,agent.compressBuff[:])
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error())
log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", unCompressErr.Error())
return
}
byteData = agent.compressBuff[:cnt]
byteData = compressBuff
}
req := MakeRpcRequest(processor, 0, 0, "", false, nil)
err = processor.Unmarshal(byteData, req.RpcRequestData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
agent.conn.ReleaseReadMsg(data)
if err != nil {
log.SError("rpc Unmarshal request is error:", err.Error())