node支持rpc压缩

This commit is contained in:
duanhf2012
2023-07-21 15:28:52 +08:00
parent 4b84d9a1d5
commit ec1c2b4517
5 changed files with 67 additions and 27 deletions

View File

@@ -196,7 +196,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,nodeInfo.CompressBytesLen,cls.triggerRpcEvent) rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo cls.mapRpc[nodeInfo.NodeId] = rpcInfo
} }

View File

@@ -5,6 +5,8 @@ import (
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"time"
"github.com/duanhf2012/origin/util/timer"
) )
const DynamicDiscoveryMasterName = "DiscoveryMaster" const DynamicDiscoveryMasterName = "DiscoveryMaster"
@@ -341,6 +343,10 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool {
} }
func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
dc.regServiceDiscover(nodeId)
}
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId) nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil { if nodeInfo == nil {
return return
@@ -364,6 +370,10 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
if err != nil { if err != nil {
log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
dc.AfterFunc(time.Second*3, func(timer *timer.Timer) {
dc.regServiceDiscover(nodeId)
})
return return
} }
}) })

View File

@@ -8,8 +8,8 @@ import (
) )
type ICompressor interface { type ICompressor interface {
CompressBlock(src, dst []byte) (int, error) //dst如果有预申请使用dst内存传入nil时内部申请 CompressBlock(src, dst []byte) ([]byte,int, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte, dst []byte) (int, error)//dst如果有预申请使用dst内存传入nil时内部申请 UncompressBlock(src []byte, dst []byte) ([]byte,int, error)//dst如果有预申请使用dst内存传入nil时内部申请
CompressBlockBound(n int) int CompressBlockBound(n int) int
UnCompressBlockBound(n int) int UnCompressBlockBound(n int) int
@@ -28,7 +28,7 @@ func SetCompressor(cp ICompressor){
type Lz4Compressor struct { type Lz4Compressor struct {
} }
func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (cnt int, err error) { func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (dest []byte,cnt int, err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) buf := make([]byte, 4096)
@@ -38,13 +38,19 @@ func (lc *Lz4Compressor) CompressBlock(src, dst []byte) (cnt int, err error) {
} }
}() }()
dest = dst
var c lz4.Compressor var c lz4.Compressor
cnt, err = c.CompressBlock(src, dst) maxCompressSize := lc.CompressBlockBound(len(src))
if len(dest) < maxCompressSize {
dest = make([]byte,maxCompressSize)
}
cnt, err = c.CompressBlock(src, dest)
return return
} }
func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (cnt int, err error) { func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (dest []byte,cnt int, err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) buf := make([]byte, 4096)
@@ -54,7 +60,13 @@ func (lc *Lz4Compressor) UncompressBlock(src, dst []byte) (cnt int, err error) {
} }
}() }()
cnt, err = lz4.UncompressBlock(src, dst) dest = dst
maxUncompressSize := lc.UnCompressBlockBound(len(src))
if len(dest) < maxUncompressSize {
dest = make([]byte,maxUncompressSize)
}
cnt, err = lz4.UncompressBlock(src, dest)
return return
} }

View File

@@ -84,24 +84,28 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
return call return call
} }
bCompress := uint8(0x7f) bCompress := uint8(0)
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen { if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:]) var cnt int
var cErr error
rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:])
if cErr != nil { if cErr != nil {
call.Seq = 0 call.Seq = 0
log.SError(err.Error()) log.SError(err.Error())
call.DoError(err) call.DoError(err)
return call return call
} }
bytes = rc.compressBuff[:cnt] if cnt < len(bytes) {
bCompress = 0xff bytes = rc.compressBuff[:cnt]
bCompress = 1<<7
}
} }
if noReply == false { if noReply == false {
rc.selfClient.AddPending(call) rc.selfClient.AddPending(call)
} }
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes) err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
@@ -144,14 +148,20 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod) return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod)
} }
bCompress := uint8(0x7f) bCompress := uint8(0)
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen { if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
cnt,cErr := compressor.CompressBlock(bytes,rc.compressBuff[:]) var cnt int
var cErr error
rc.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,rc.compressBuff[:])
if cErr != nil { if cErr != nil {
return emptyCancelRpc,cErr return emptyCancelRpc,cErr
} }
bytes = rc.compressBuff[:cnt]
bCompress = 0xff if cnt < len(bytes) {
bytes = rc.compressBuff[:cnt]
bCompress = 1<<7
}
} }
call := MakeCall() call := MakeCall()
@@ -163,7 +173,7 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
call.TimeOut = timeout call.TimeOut = timeout
rc.selfClient.AddPending(call) rc.selfClient.AddPending(call)
err = conn.WriteMsg([]byte{uint8(processorType)&bCompress}, bytes) err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes)
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
ReleaseCall(call) ReleaseCall(call)
@@ -212,7 +222,10 @@ func (rc *RClient) Run() {
//解压缩 //解压缩
byteData := bytes[1:] byteData := bytes[1:]
if bCompress == true { if bCompress == true {
cnt,unCompressErr := compressor.UncompressBlock(byteData,rc.compressBuff) var cnt int
var unCompressErr error
rc.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,rc.compressBuff[:])
if unCompressErr!= nil { if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
@@ -283,8 +296,6 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen
c.LittleEndian = LittleEndian c.LittleEndian = LittleEndian
c.NewAgent = client.NewClientAgent c.NewAgent = client.NewClientAgent
c.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(maxRpcParamLen)))
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
c.MaxMsgLen = maxRpcParamLen c.MaxMsgLen = maxRpcParamLen
} else { } else {

View File

@@ -117,18 +117,23 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
return return
} }
bCompress := uint8(0x7f) bCompress := uint8(0)
if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen { if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen {
cnt,cErr := compressor.CompressBlock(bytes,agent.compressBuff[:]) var cnt int
var cErr error
agent.compressBuff,cnt,cErr = compressor.CompressBlock(bytes,agent.compressBuff[:])
if cErr != nil { if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error()) log.SError("service method ", serviceMethod, " CompressBlock error:", errM.Error())
return return
} }
bytes = agent.compressBuff[:cnt] if cnt < len(bytes) {
bCompress = 0xff bytes = agent.compressBuff[:cnt]
bCompress = 1<<7
}
} }
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())&bCompress}, bytes) errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if errM != nil { if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
} }
@@ -154,7 +159,10 @@ func (agent *RpcAgent) Run() {
//解析head //解析head
byteData := data[1:] byteData := data[1:]
if bCompress == true { if bCompress == true {
cnt,unCompressErr := compressor.UncompressBlock(byteData,agent.compressBuff) var cnt int
var unCompressErr error
agent.compressBuff,cnt,unCompressErr = compressor.UncompressBlock(byteData,agent.compressBuff[:])
if unCompressErr!= nil { if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error()) log.SError("rpcClient ", agent.conn.RemoteAddr(), " ReadMsg head error:", err.Error())
@@ -261,7 +269,6 @@ func (agent *RpcAgent) Destroy() {
func (server *Server) NewAgent(c *network.TCPConn) network.Agent { func (server *Server) NewAgent(c *network.TCPConn) network.Agent {
agent := &RpcAgent{conn: c, rpcServer: server} agent := &RpcAgent{conn: c, rpcServer: server}
agent.compressBuff = make([]byte, compressor.UnCompressBlockBound(int(server.rpcServer.MaxMsgLen)))
return agent return agent
} }