From 9c9f72d559048928b48a13b1fcc4bfd13e5ed357 Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Tue, 31 Mar 2020 19:28:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E6=AD=A5=E8=B0=83?= =?UTF-8?q?=E7=94=A8,=E9=94=99=E8=AF=AF=E9=80=9A=E8=BF=87=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpchandler.go | 39 +++++++++++++++++++++++++++------------ rpc/server.go | 5 ++++- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 8068566..9a36551 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -33,7 +33,7 @@ type IRpcHandler interface { GetName() string InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) GetRpcHandler() IRpcHandler - PushRequest(callinfo *RpcRequest) + PushRequest(callinfo *RpcRequest) error HandlerRpcRequest(request *RpcRequest) HandlerRpcResponeCB(call *Call) @@ -131,8 +131,13 @@ func (slf *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error { return nil } -func (slf *RpcHandler) PushRequest(req *RpcRequest) { +func (slf *RpcHandler) PushRequest(req *RpcRequest) error{ + if len(slf.callRequest) >= cap(slf.callRequest){ + return fmt.Errorf("RpcHandler %s Rpc Channel is full.",slf.GetName()) + } + slf.callRequest <- req + return nil } func (slf *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) { @@ -181,11 +186,8 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { var oParam reflect.Value paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者 - if request.localReply!=nil { - oParam = reflect.ValueOf(request.localReply) - }else{ - oParam = reflect.New(v.oParam.Type().Elem()) - } + oParam = reflect.New(v.oParam.Type().Elem()) + paramList = append(paramList,reflect.ValueOf(v.iparam)) paramList = append(paramList,oParam) //输出参数 @@ -324,12 +326,16 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin reply := reflect.New(fVal.Type().In(0).Elem()).Interface() pClientList,err := slf.funcRpcClient(nodeid,serviceMethod) if err != nil { + fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Call serviceMethod is error:%+v!",err) - return err + return nil } + if len(pClientList) > 1 { + err := fmt.Errorf("Cannot call more then 1 node!") + fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Cannot call more then 1 node!") - return fmt.Errorf("Cannot call more then 1 node!") + return nil } //2.rpcclient调用 @@ -341,8 +347,9 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin sMethod := strings.Split(serviceMethod,".") if len(sMethod)!=2 { err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) + fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("%+v",err) - return err + return nil } //调用自己rpcHandler处理器 if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用 @@ -357,7 +364,11 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin //其他的rpcHandler的处理器 if callback!=nil { - return pLocalRpcServer.rpcHandlerAsyncGo(slf,false,mutiCoroutine,sMethod[0],sMethod[1],args,reply,fVal) + err = pLocalRpcServer.rpcHandlerAsyncGo(slf,false,mutiCoroutine,sMethod[0],sMethod[1],args,reply,fVal) + if err != nil { + fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) + } + return nil } pCall := pLocalRpcServer.rpcHandlerGo(false,mutiCoroutine,sMethod[0],sMethod[1],args,reply) pResult := pCall.Done() @@ -365,7 +376,11 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin } //跨node调用 - return pClient.AsycGo(slf,mutiCoroutine,serviceMethod,fVal,args,reply) + err = pClient.AsycGo(slf,mutiCoroutine,serviceMethod,fVal,args,reply) + if err != nil { + fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) + } + return nil } func (slf *RpcHandler) GetName() string{ diff --git a/rpc/server.go b/rpc/server.go index 341ae7f..62003d3 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -253,7 +253,10 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,m if mutiCoroutine == true { go rpcHandler.HandlerRpcRequest(&req) }else{ - rpcHandler.PushRequest(&req) + err := rpcHandler.PushRequest(&req) + if err != nil { + return err + } } return nil