From dd4aaf9c571d40e4634bddd69d180f0d8c8cfc5d Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Fri, 28 Jul 2023 17:38:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rpc=E8=B6=85=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 2 +- rpc/lclient.go | 15 ++++++++------- rpc/rclient.go | 5 +++-- rpc/rpc.go | 2 ++ rpc/rpchandler.go | 2 +- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index d7ba594..41768e5 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -32,7 +32,7 @@ type IRealClient interface { AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call - RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call + RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call IsConnected() bool Run() diff --git a/rpc/lclient.go b/rpc/lclient.go index 9ea3c90..b49c545 100644 --- a/rpc/lclient.go +++ b/rpc/lclient.go @@ -70,15 +70,16 @@ func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, } -func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { +func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { pLocalRpcServer := rpcHandler.GetRpcServer()() - - call := MakeCall() - call.ServiceMethod = serviceName - call.Reply = reply - + //服务自我调用 if serviceName == rpcHandler.GetName() { + call := MakeCall() + call.ServiceMethod = serviceName + call.Reply = reply + call.TimeOut = timeout + err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil) call.Err = err call.done <- call @@ -87,7 +88,7 @@ func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply } //其他的rpcHandler的处理器 - return pLocalRpcServer.selfNodeRpcHandlerGo(DefaultRpcTimeout,processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs) + return pLocalRpcServer.selfNodeRpcHandlerGo(timeout,processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs) } diff --git a/rpc/rclient.go b/rpc/rclient.go index 774a9f9..990c727 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -52,14 +52,15 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, return call } - return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) + return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) } -func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { +func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { call := MakeCall() call.ServiceMethod = serviceMethod call.Reply = reply call.Seq = rc.selfClient.generateSeq() + call.TimeOut = timeout request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs) bytes, err := processor.Marshal(request.RpcRequestData) diff --git a/rpc/rpc.go b/rpc/rpc.go index 5ebb9c6..04f91d2 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -133,6 +133,8 @@ func (call *Call) Clear() *Call{ call.connId = 0 call.callback = nil call.rpcHandler = nil + call.TimeOut = 0 + return call } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 3c54456..b02634a 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -599,7 +599,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i //如果调用本结点服务 for i := 0; i < count; i++ { //跨node调用 - pCall := handler.pClientList[i].RawGo(handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) + pCall := handler.pClientList[i].RawGo(DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) if pCall.Err != nil { err = pCall.Err }