From 20626c8c6529e84eb1f4183d7aeb7e01d62a6ed2 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 18 Dec 2020 10:50:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rpc=E5=86=85=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 78 +++++++++++++++++++++++++---------------------- rpc/rpc.go | 28 ++++++++++++++--- rpc/rpchandler.go | 26 ++++++++-------- rpc/server.go | 54 +++++++++++++------------------- 4 files changed, 98 insertions(+), 88 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 9ba73fb..76fda36 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -76,12 +76,15 @@ func (client *Client) startCheckRpcCallTimer(){ } func (client *Client) makeCallFail(call *Call){ + client.removePending(call.Seq) if call.callback!=nil && call.callback.IsValid() { call.rpcHandler.(*RpcHandler).callResponseCallBack <-call }else{ call.done <- call } - client.removePending(call.Seq) + + + } func (client *Client) checkRpcCallTimeout(){ @@ -128,6 +131,9 @@ func (client *Client) AddPending(call *Call){ } func (client *Client) RemovePending(seq uint64) *Call{ + if seq == 0 { + return nil + } client.pendingLock.Lock() call := client.removePending(seq) client.pendingLock.Unlock() @@ -139,9 +145,10 @@ func (client *Client) removePending(seq uint64) *Call{ if ok == false{ return nil } + call := v.Value.(*Call) client.pendingTimer.Remove(v) delete(client.pending,seq) - return v.Value.(*Call) + return call } func (client *Client) FindPending(seq uint64) *Call{ @@ -163,75 +170,71 @@ func (client *Client) generateSeq() uint64{ } func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { + processorType, processor := GetProcessorType(args) + InParam,herr := processor.Marshal(args) + if herr != nil { + return herr + } + + seq := client.generateSeq() + request:=MakeRpcRequest(processor,seq,serviceMethod,false,InParam) + bytes,err := processor.Marshal(request.RpcRequestData) + ReleaseRpcRequest(request) + if err != nil { + return err + } + + if client.conn == nil { + return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod) + } + call := MakeCall() call.Reply = replyParam call.callback = &callback call.rpcHandler = rpcHandler call.ServiceMethod = serviceMethod - - processorType, processor := GetProcessorType(args) - InParam,herr := processor.Marshal(args) - if herr != nil { - ReleaseCall(call) - return herr - } - - request := &RpcRequest{} - call.Seq = client.generateSeq() - request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,false,InParam) + call.Seq = seq client.AddPending(call) - bytes,err := processor.Marshal(request.RpcRequestData) - processor.ReleaseRpcRequest(request.RpcRequestData) + err = client.conn.WriteMsg([]byte{uint8(processorType)},bytes) if err != nil { client.RemovePending(call.Seq) ReleaseCall(call) return err } - if client.conn == nil { - client.RemovePending(call.Seq) - ReleaseCall(call) - return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod) - } - - err = client.conn.WriteMsg([]byte{uint8(processorType)},bytes) - if err != nil { - client.RemovePending(call.Seq) - ReleaseCall(call) - } - - return err + return nil } func (client *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,reply interface{}) *Call { call := MakeCall() call.ServiceMethod = serviceMethod call.Reply = reply - - request := &RpcRequest{} call.Seq = client.generateSeq() - if noReply == false { - client.AddPending(call) - } - request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,noReply,args) + + request := MakeRpcRequest(processor,call.Seq,serviceMethod,noReply,args) bytes,err := processor.Marshal(request.RpcRequestData) - processor.ReleaseRpcRequest(request.RpcRequestData) + ReleaseRpcRequest(request) if err != nil { + call.Seq = 0 call.Err = err - client.RemovePending(call.Seq) return call } if client.conn == nil { + call.Seq = 0 call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod) - client.RemovePending(call.Seq) return call } + if noReply == false { + client.AddPending(call) + } + err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes) if err != nil { client.RemovePending(call.Seq) + call.Seq = 0 call.Err = err } @@ -244,6 +247,7 @@ func (client *Client) Go(noReply bool,serviceMethod string, args interface{},rep if err != nil { call := MakeCall() call.Err = err + return call } return client.RawGo(processor,noReply,serviceMethod,InParam,reply) diff --git a/rpc/rpc.go b/rpc/rpc.go index c4911d0..b5d18fe 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -7,6 +7,7 @@ import ( ) type RpcRequest struct { + ref bool RpcRequestData IRpcRequestData bLocalRequest bool @@ -59,6 +60,7 @@ type RpcHandleFinder interface { type RequestHandler func(Returns interface{},Err RpcError) type Call struct { + ref bool Seq uint64 ServiceMethod string Reply interface{} @@ -118,19 +120,35 @@ func (call *Call) Done() *Call{ return <-call.done } -func MakeRpcRequest() *RpcRequest{ - return rpcRequestPool.Get().(*RpcRequest).Clear() -} +func MakeRpcRequest(rpcProcessor IRpcProcessor,seq uint64,serviceMethod string,noReply bool,inParam []byte) *RpcRequest{ + rpcRequest := rpcRequestPool.Get().(*RpcRequest).Clear() + rpcRequest.rpcProcessor = rpcProcessor + rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,serviceMethod,noReply,inParam) + rpcRequest.ref = true -func MakeCall() *Call { - return rpcCallPool.Get().(*Call).Clear() + return rpcRequest } func ReleaseRpcRequest(rpcRequest *RpcRequest){ + if rpcRequest.ref == false { + panic("Duplicate memory release!") + } + rpcRequest.ref = false + rpcRequest.rpcProcessor.ReleaseRpcRequest(rpcRequest.RpcRequestData) rpcRequestPool.Put(rpcRequest) } +func MakeCall() *Call { + call := rpcCallPool.Get().(*Call).Clear() + call.ref = true + return call +} + func ReleaseCall(call *Call){ + if call.ref == false { + panic("Duplicate memory release!") + } + call.ref = false rpcCallPool.Put(call) } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 67aa995..577f49e 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -238,7 +238,6 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } }() defer ReleaseRpcRequest(request) - defer request.rpcProcessor.ReleaseRpcRequest(request.RpcRequestData) if request.inputArgs!=nil { defer request.inputArgs.DoGc() } @@ -265,6 +264,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { if request.bLocalRequest == false { if iParam == nil { + //原始调用 iParam = request.RpcRequestData.GetInParam() }else{ err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam) @@ -394,6 +394,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s if pCall.Err!=nil { err = pCall.Err } + pClientList[i].RemovePending(pCall.Seq) ReleaseCall(pCall) continue } @@ -403,6 +404,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s if pCall.Err!=nil { err = pCall.Err } + pClientList[i].RemovePending(pCall.Seq) ReleaseCall(pCall) } @@ -448,9 +450,11 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac //跨node调用 pCall := pClient.Go(false,serviceMethod,args,reply) + pClient.RemovePending(pCall.Seq) if pCall.Err != nil { + err = pCall.Err ReleaseCall(pCall) - return pCall.Err + return err } err = pCall.Done().Err ReleaseCall(pCall) @@ -518,19 +522,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int } //其他的rpcHandler的处理器 - if callback!=nil { - err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,serviceName,serviceMethod,args,reply,fVal) - if err != nil { - fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) - } - return nil + err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,serviceName,serviceMethod,args,reply,fVal) + if err != nil { + fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) } - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,serviceMethod,args,reply,nil) - err = pCall.Done().Err - pClient.RemovePending(pCall.Seq) - ReleaseCall(pCall) - - return err + return nil } //跨node调用 @@ -621,6 +617,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in if pCall.Err!=nil { err = pCall.Err } + pClientList[i].RemovePending(pCall.Seq) ReleaseCall(pCall) continue } @@ -631,6 +628,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in if pCall.Err!=nil { err = pCall.Err } + pClientList[i].RemovePending(pCall.Seq) ReleaseCall(pCall) } diff --git a/rpc/server.go b/rpc/server.go index 38352c7..10dc3ee 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -127,9 +127,7 @@ func (agent *RpcAgent) Run() { } //解析head - req := MakeRpcRequest() - req.rpcProcessor = processor - req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil) + req := MakeRpcRequest(processor,0,"",false,nil) err = processor.Unmarshal(data[1:],req.RpcRequestData) agent.conn.ReleaseReadMsg(data) if err != nil { @@ -137,12 +135,10 @@ func (agent *RpcAgent) Run() { if req.RpcRequestData.GetSeq()>0 { rpcError := RpcError(err.Error()) agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) continue }else{ //will close tcpconn - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) break } @@ -153,7 +149,6 @@ func (agent *RpcAgent) Run() { if len(serviceMethod)!=2 { rpcError := RpcError("rpc request req.ServiceMethod is error") agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) log.Debug("rpc request req.ServiceMethod is error") continue @@ -163,7 +158,6 @@ func (agent *RpcAgent) Run() { if rpcHandler== nil { rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) continue @@ -183,7 +177,6 @@ func (agent *RpcAgent) Run() { agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) } - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) } } @@ -238,30 +231,30 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien if inputArgs!= nil { inputArgs.DoGc() } + pCall.Seq = 0 pCall.Err = fmt.Errorf("service method %s not config!", serviceMethod) log.Error("%s",pCall.Err.Error()) pCall.done <- pCall + return pCall } - req := MakeRpcRequest() + + if processor == nil { + _,processor = GetProcessorType(args) + } + req := MakeRpcRequest(processor,0, serviceMethod,noReply,nil) req.bLocalRequest = true req.localParam = args req.localReply = reply req.inputArgs = inputArgs - if processor == nil { - _,processor = GetProcessorType(args) - } - req.RpcRequestData = processor.MakeRpcRequest(0, serviceMethod,noReply,nil) - req.rpcProcessor = processor if noReply == false { client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err RpcError){ v := client.RemovePending(pCall.Seq) if v == nil { log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) - ReleaseCall(pCall) return } if len(Err) == 0 { @@ -275,7 +268,6 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien err := rpcHandler.PushRequest(req) if err != nil { - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) pCall.Err = err pCall.done <- pCall @@ -285,35 +277,35 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien } func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,serviceMethod string,args interface{},reply interface{},callback reflect.Value) error { - pCall := MakeCall() - pCall.Seq = client.generateSeq() - pCall.rpcHandler = callerRpcHandler - pCall.callback = &callback - pCall.Reply = reply rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler== nil { err := fmt.Errorf("service method %s not config!", serviceMethod) log.Error("%+v",err) - ReleaseCall(pCall) return err } - req := MakeRpcRequest() + _,processor := GetProcessorType(args) + req := MakeRpcRequest(processor,0,serviceMethod,noReply,nil) req.localParam = args req.localReply = reply req.bLocalRequest = true - _,processor := GetProcessorType(args) - req.rpcProcessor =processor - req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil) + + //req.rpcProcessor =processor + //req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil) if noReply == false { + callSeq := client.generateSeq() + pCall := MakeCall() + pCall.Seq = callSeq + pCall.rpcHandler = callerRpcHandler + pCall.callback = &callback + pCall.Reply = reply + client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err RpcError){ - v := client.RemovePending(pCall.Seq) + v := client.RemovePending(callSeq) if v == nil { log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) - ReleaseCall(pCall) - processor.ReleaseRpcRequest(req.RpcRequestData) - ReleaseRpcRequest(req) + //ReleaseCall(pCall) return } if len(Err) == 0 { @@ -331,9 +323,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler err := rpcHandler.PushRequest(req) if err != nil { - processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) - return err }