From da18cf3158b35edb1ffadd0e47d61aba51cb688b Mon Sep 17 00:00:00 2001 From: orgin Date: Mon, 24 Oct 2022 16:51:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E5=86=85?= =?UTF-8?q?=E7=9A=84RPC=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpchandler.go | 72 +++++++++++++++++++++++++++++++++++------------ rpc/server.go | 6 ++-- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index ba0438d..b2305e0 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -85,7 +85,7 @@ type IRpcHandler interface { GetRpcHandler() IRpcHandler HandlerRpcRequest(request *RpcRequest) HandlerRpcResponseCB(call *Call) - CallMethod(ServiceMethod string, param interface{}, reply interface{}) error + CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error AsyncCall(serviceMethod string, args interface{}, callback interface{}) error Call(serviceMethod string, args interface{}, reply interface{}) error Go(serviceMethod string, args interface{}) error @@ -299,7 +299,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } } -func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{}, reply interface{}) error { +func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error { var err error v, ok := handler.mapFunctions[ServiceMethod] if ok == false { @@ -309,16 +309,57 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{}, r } var paramList []reflect.Value - paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者 - paramList = append(paramList, reflect.ValueOf(param)) - paramList = append(paramList, reflect.ValueOf(reply)) //输出参数 + var returnValues []reflect.Value + if v.hasResponder == true { + paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者 + //有返回值时 + if reply != nil { + hander :=func(Returns interface{}, Err RpcError) { + _, processor := GetProcessorType(Returns) + bytes,errs := processor.Marshal(Returns) + if errs != nil { + callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errs)}) + } - returnValues := v.method.Func.Call(paramList) - errInter := returnValues[0].Interface() - if errInter != nil { - err = errInter.(error) + errs = processor.Unmarshal(bytes,reply) + + if len(Err)==0{ + callBack.Call([]reflect.Value{reflect.ValueOf(reply), nilError}) + }else{ + callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errors.New(Err.Error()))}) + } + + } + + paramList = append(paramList, reflect.ValueOf(hander)) + }else{//无返回值时,是一个requestHandlerNull空回调 + paramList = append(paramList, callBack) + } + paramList = append(paramList, reflect.ValueOf(param)) + + //rpc函数被调用 + returnValues = v.method.Func.Call(paramList) + + //判断返回值是否错误,有错误时则回调 + errInter := returnValues[0].Interface() + if errInter != nil { + err = errInter.(error) + callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) + } + }else{ + paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者 + paramList = append(paramList, reflect.ValueOf(param)) + paramList = append(paramList, reflect.ValueOf(reply)) //输出参数 + returnValues = v.method.Func.Call(paramList) + errInter := returnValues[0].Interface() + if errInter != nil { + err = errInter.(error) + callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) + } } + + return err } @@ -356,7 +397,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int serviceName := serviceMethod[:findIndex] if serviceName == handler.rpcHandler.GetName() { //自己服务调用 //调用自己rpcHandler处理器 - return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, nil) + return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, requestHandlerNull,nil) } //其他的rpcHandler的处理器 pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, pClientList[i], true, serviceName, 0, serviceMethod, args, nil, nil) @@ -410,7 +451,7 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf serviceName := serviceMethod[:findIndex] if serviceName == handler.rpcHandler.GetName() { //自己服务调用 //调用自己rpcHandler处理器 - return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, reply) + return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,requestHandlerNull, reply) } //其他的rpcHandler的处理器 pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil, pClient, false, serviceName, 0, serviceMethod, args, reply, nil) @@ -489,12 +530,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i serviceName := serviceMethod[:findIndex] //调用自己rpcHandler处理器 if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, reply) - if err == nil { - fVal.Call([]reflect.Value{reflect.ValueOf(reply), nilError}) - } else { - fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) - } + return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,fVal ,reply) } //其他的rpcHandler的处理器 @@ -571,7 +607,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i pLocalRpcServer := handler.funcRpcServer() //调用自己rpcHandler处理器 if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceName, rawArgs.GetRawData(), nil) + err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil) //args.DoGc() return err } diff --git a/rpc/server.go b/rpc/server.go index 8c89a74..3bacd6a 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -234,7 +234,7 @@ func (server *Server) NewAgent(c *network.TCPConn) network.Agent { return agent } -func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod string, args interface{}, reply interface{}) error { +func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error { rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler == nil { err := errors.New("service method " + serviceMethod + " not config!") @@ -242,7 +242,9 @@ func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod strin return err } - return rpcHandler.CallMethod(serviceMethod, args, reply) + + + return rpcHandler.CallMethod(serviceMethod, args,callBack, reply) } func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {