优化rpc内存

This commit is contained in:
boyce
2020-12-18 10:50:54 +08:00
parent 04ff1cc4fb
commit 20626c8c65
4 changed files with 98 additions and 88 deletions

View File

@@ -76,12 +76,15 @@ func (client *Client) startCheckRpcCallTimer(){
} }
func (client *Client) makeCallFail(call *Call){ func (client *Client) makeCallFail(call *Call){
client.removePending(call.Seq)
if call.callback!=nil && call.callback.IsValid() { if call.callback!=nil && call.callback.IsValid() {
call.rpcHandler.(*RpcHandler).callResponseCallBack <-call call.rpcHandler.(*RpcHandler).callResponseCallBack <-call
}else{ }else{
call.done <- call call.done <- call
} }
client.removePending(call.Seq)
} }
func (client *Client) checkRpcCallTimeout(){ func (client *Client) checkRpcCallTimeout(){
@@ -128,6 +131,9 @@ func (client *Client) AddPending(call *Call){
} }
func (client *Client) RemovePending(seq uint64) *Call{ func (client *Client) RemovePending(seq uint64) *Call{
if seq == 0 {
return nil
}
client.pendingLock.Lock() client.pendingLock.Lock()
call := client.removePending(seq) call := client.removePending(seq)
client.pendingLock.Unlock() client.pendingLock.Unlock()
@@ -139,9 +145,10 @@ func (client *Client) removePending(seq uint64) *Call{
if ok == false{ if ok == false{
return nil return nil
} }
call := v.Value.(*Call)
client.pendingTimer.Remove(v) client.pendingTimer.Remove(v)
delete(client.pending,seq) delete(client.pending,seq)
return v.Value.(*Call) return call
} }
func (client *Client) FindPending(seq uint64) *Call{ func (client *Client) FindPending(seq uint64) *Call{
@@ -163,75 +170,71 @@ func (client *Client) generateSeq() uint64{
} }
func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error {
processorType, processor := GetProcessorType(args)
InParam,herr := processor.Marshal(args)
if herr != nil {
return herr
}
seq := client.generateSeq()
request:=MakeRpcRequest(processor,seq,serviceMethod,false,InParam)
bytes,err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
return err
}
if client.conn == nil {
return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod)
}
call := MakeCall() call := MakeCall()
call.Reply = replyParam call.Reply = replyParam
call.callback = &callback call.callback = &callback
call.rpcHandler = rpcHandler call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod call.ServiceMethod = serviceMethod
call.Seq = seq
processorType, processor := GetProcessorType(args)
InParam,herr := processor.Marshal(args)
if herr != nil {
ReleaseCall(call)
return herr
}
request := &RpcRequest{}
call.Seq = client.generateSeq()
request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,false,InParam)
client.AddPending(call) client.AddPending(call)
bytes,err := processor.Marshal(request.RpcRequestData) err = client.conn.WriteMsg([]byte{uint8(processorType)},bytes)
processor.ReleaseRpcRequest(request.RpcRequestData)
if err != nil { if err != nil {
client.RemovePending(call.Seq) client.RemovePending(call.Seq)
ReleaseCall(call) ReleaseCall(call)
return err return err
} }
if client.conn == nil { return nil
client.RemovePending(call.Seq)
ReleaseCall(call)
return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod)
}
err = client.conn.WriteMsg([]byte{uint8(processorType)},bytes)
if err != nil {
client.RemovePending(call.Seq)
ReleaseCall(call)
}
return err
} }
func (client *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,reply interface{}) *Call { func (client *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,reply interface{}) *Call {
call := MakeCall() call := MakeCall()
call.ServiceMethod = serviceMethod call.ServiceMethod = serviceMethod
call.Reply = reply call.Reply = reply
request := &RpcRequest{}
call.Seq = client.generateSeq() call.Seq = client.generateSeq()
if noReply == false {
client.AddPending(call) request := MakeRpcRequest(processor,call.Seq,serviceMethod,noReply,args)
}
request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,noReply,args)
bytes,err := processor.Marshal(request.RpcRequestData) bytes,err := processor.Marshal(request.RpcRequestData)
processor.ReleaseRpcRequest(request.RpcRequestData) ReleaseRpcRequest(request)
if err != nil { if err != nil {
call.Seq = 0
call.Err = err call.Err = err
client.RemovePending(call.Seq)
return call return call
} }
if client.conn == nil { if client.conn == nil {
call.Seq = 0
call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod) call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod)
client.RemovePending(call.Seq)
return call return call
} }
if noReply == false {
client.AddPending(call)
}
err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes) err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes)
if err != nil { if err != nil {
client.RemovePending(call.Seq) client.RemovePending(call.Seq)
call.Seq = 0
call.Err = err call.Err = err
} }
@@ -244,6 +247,7 @@ func (client *Client) Go(noReply bool,serviceMethod string, args interface{},rep
if err != nil { if err != nil {
call := MakeCall() call := MakeCall()
call.Err = err call.Err = err
return call
} }
return client.RawGo(processor,noReply,serviceMethod,InParam,reply) return client.RawGo(processor,noReply,serviceMethod,InParam,reply)

View File

@@ -7,6 +7,7 @@ import (
) )
type RpcRequest struct { type RpcRequest struct {
ref bool
RpcRequestData IRpcRequestData RpcRequestData IRpcRequestData
bLocalRequest bool bLocalRequest bool
@@ -59,6 +60,7 @@ type RpcHandleFinder interface {
type RequestHandler func(Returns interface{},Err RpcError) type RequestHandler func(Returns interface{},Err RpcError)
type Call struct { type Call struct {
ref bool
Seq uint64 Seq uint64
ServiceMethod string ServiceMethod string
Reply interface{} Reply interface{}
@@ -118,19 +120,35 @@ func (call *Call) Done() *Call{
return <-call.done return <-call.done
} }
func MakeRpcRequest() *RpcRequest{ func MakeRpcRequest(rpcProcessor IRpcProcessor,seq uint64,serviceMethod string,noReply bool,inParam []byte) *RpcRequest{
return rpcRequestPool.Get().(*RpcRequest).Clear() rpcRequest := rpcRequestPool.Get().(*RpcRequest).Clear()
} rpcRequest.rpcProcessor = rpcProcessor
rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,serviceMethod,noReply,inParam)
rpcRequest.ref = true
func MakeCall() *Call { return rpcRequest
return rpcCallPool.Get().(*Call).Clear()
} }
func ReleaseRpcRequest(rpcRequest *RpcRequest){ func ReleaseRpcRequest(rpcRequest *RpcRequest){
if rpcRequest.ref == false {
panic("Duplicate memory release!")
}
rpcRequest.ref = false
rpcRequest.rpcProcessor.ReleaseRpcRequest(rpcRequest.RpcRequestData)
rpcRequestPool.Put(rpcRequest) rpcRequestPool.Put(rpcRequest)
} }
func MakeCall() *Call {
call := rpcCallPool.Get().(*Call).Clear()
call.ref = true
return call
}
func ReleaseCall(call *Call){ func ReleaseCall(call *Call){
if call.ref == false {
panic("Duplicate memory release!")
}
call.ref = false
rpcCallPool.Put(call) rpcCallPool.Put(call)
} }

View File

@@ -238,7 +238,6 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
} }
}() }()
defer ReleaseRpcRequest(request) defer ReleaseRpcRequest(request)
defer request.rpcProcessor.ReleaseRpcRequest(request.RpcRequestData)
if request.inputArgs!=nil { if request.inputArgs!=nil {
defer request.inputArgs.DoGc() defer request.inputArgs.DoGc()
} }
@@ -265,6 +264,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
if request.bLocalRequest == false { if request.bLocalRequest == false {
if iParam == nil { if iParam == nil {
//原始调用
iParam = request.RpcRequestData.GetInParam() iParam = request.RpcRequestData.GetInParam()
}else{ }else{
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam) err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam)
@@ -394,6 +394,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
continue continue
} }
@@ -403,6 +404,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
} }
@@ -448,9 +450,11 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac
//跨node调用 //跨node调用
pCall := pClient.Go(false,serviceMethod,args,reply) pCall := pClient.Go(false,serviceMethod,args,reply)
pClient.RemovePending(pCall.Seq)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err
ReleaseCall(pCall) ReleaseCall(pCall)
return pCall.Err return err
} }
err = pCall.Done().Err err = pCall.Done().Err
ReleaseCall(pCall) ReleaseCall(pCall)
@@ -518,19 +522,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
if callback!=nil { err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,serviceName,serviceMethod,args,reply,fVal)
err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,serviceName,serviceMethod,args,reply,fVal) if err != nil {
if err != nil { fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
}
return nil
} }
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,serviceMethod,args,reply,nil) return nil
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
return err
} }
//跨node调用 //跨node调用
@@ -621,6 +617,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
continue continue
} }
@@ -631,6 +628,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
} }

View File

@@ -127,9 +127,7 @@ func (agent *RpcAgent) Run() {
} }
//解析head //解析head
req := MakeRpcRequest() req := MakeRpcRequest(processor,0,"",false,nil)
req.rpcProcessor = processor
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil)
err = processor.Unmarshal(data[1:],req.RpcRequestData) err = processor.Unmarshal(data[1:],req.RpcRequestData)
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
if err != nil { if err != nil {
@@ -137,12 +135,10 @@ func (agent *RpcAgent) Run() {
if req.RpcRequestData.GetSeq()>0 { if req.RpcRequestData.GetSeq()>0 {
rpcError := RpcError(err.Error()) rpcError := RpcError(err.Error())
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
continue continue
}else{ }else{
//will close tcpconn //will close tcpconn
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
break break
} }
@@ -153,7 +149,6 @@ func (agent *RpcAgent) Run() {
if len(serviceMethod)!=2 { if len(serviceMethod)!=2 {
rpcError := RpcError("rpc request req.ServiceMethod is error") rpcError := RpcError("rpc request req.ServiceMethod is error")
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
log.Debug("rpc request req.ServiceMethod is error") log.Debug("rpc request req.ServiceMethod is error")
continue continue
@@ -163,7 +158,6 @@ func (agent *RpcAgent) Run() {
if rpcHandler== nil { if rpcHandler== nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
continue continue
@@ -183,7 +177,6 @@ func (agent *RpcAgent) Run() {
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
} }
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
} }
} }
@@ -238,30 +231,30 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
if inputArgs!= nil { if inputArgs!= nil {
inputArgs.DoGc() inputArgs.DoGc()
} }
pCall.Seq = 0
pCall.Err = fmt.Errorf("service method %s not config!", serviceMethod) pCall.Err = fmt.Errorf("service method %s not config!", serviceMethod)
log.Error("%s",pCall.Err.Error()) log.Error("%s",pCall.Err.Error())
pCall.done <- pCall pCall.done <- pCall
return pCall return pCall
} }
req := MakeRpcRequest()
if processor == nil {
_,processor = GetProcessorType(args)
}
req := MakeRpcRequest(processor,0, serviceMethod,noReply,nil)
req.bLocalRequest = true req.bLocalRequest = true
req.localParam = args req.localParam = args
req.localReply = reply req.localReply = reply
req.inputArgs = inputArgs req.inputArgs = inputArgs
if processor == nil {
_,processor = GetProcessorType(args)
}
req.RpcRequestData = processor.MakeRpcRequest(0, serviceMethod,noReply,nil)
req.rpcProcessor = processor
if noReply == false { if noReply == false {
client.AddPending(pCall) client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err RpcError){ req.requestHandle = func(Returns interface{},Err RpcError){
v := client.RemovePending(pCall.Seq) v := client.RemovePending(pCall.Seq)
if v == nil { if v == nil {
log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) log.Error("rpcClient cannot find seq %d in pending",pCall.Seq)
ReleaseCall(pCall)
return return
} }
if len(Err) == 0 { if len(Err) == 0 {
@@ -275,7 +268,6 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
err := rpcHandler.PushRequest(req) err := rpcHandler.PushRequest(req)
if err != nil { if err != nil {
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
pCall.Err = err pCall.Err = err
pCall.done <- pCall pCall.done <- pCall
@@ -285,35 +277,35 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
} }
func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,serviceMethod string,args interface{},reply interface{},callback reflect.Value) error { func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,serviceMethod string,args interface{},reply interface{},callback reflect.Value) error {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil { if rpcHandler== nil {
err := fmt.Errorf("service method %s not config!", serviceMethod) err := fmt.Errorf("service method %s not config!", serviceMethod)
log.Error("%+v",err) log.Error("%+v",err)
ReleaseCall(pCall)
return err return err
} }
req := MakeRpcRequest() _,processor := GetProcessorType(args)
req := MakeRpcRequest(processor,0,serviceMethod,noReply,nil)
req.localParam = args req.localParam = args
req.localReply = reply req.localReply = reply
req.bLocalRequest = true req.bLocalRequest = true
_,processor := GetProcessorType(args)
req.rpcProcessor =processor //req.rpcProcessor =processor
req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil) //req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil)
if noReply == false { if noReply == false {
callSeq := client.generateSeq()
pCall := MakeCall()
pCall.Seq = callSeq
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
client.AddPending(pCall) client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err RpcError){ req.requestHandle = func(Returns interface{},Err RpcError){
v := client.RemovePending(pCall.Seq) v := client.RemovePending(callSeq)
if v == nil { if v == nil {
log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) log.Error("rpcClient cannot find seq %d in pending",pCall.Seq)
ReleaseCall(pCall) //ReleaseCall(pCall)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
return return
} }
if len(Err) == 0 { if len(Err) == 0 {
@@ -331,9 +323,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler
err := rpcHandler.PushRequest(req) err := rpcHandler.PushRequest(req)
if err != nil { if err != nil {
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
return err return err
} }