diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index b2305e0..0e0403b 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -85,7 +85,7 @@ type IRpcHandler interface { GetRpcHandler() IRpcHandler HandlerRpcRequest(request *RpcRequest) HandlerRpcResponseCB(call *Call) - CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error + CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error AsyncCall(serviceMethod string, args interface{}, callback interface{}) error Call(serviceMethod string, args interface{}, reply interface{}) error Go(serviceMethod string, args interface{}) error @@ -299,7 +299,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } } -func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error { +func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error { var err error v, ok := handler.mapFunctions[ServiceMethod] if ok == false { @@ -310,27 +310,60 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},ca var paramList []reflect.Value var returnValues []reflect.Value + var pCall *Call + var callSeq uint64 if v.hasResponder == true { paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者 + pCall = MakeCall() + pCall.Seq = client.generateSeq() + callSeq = pCall.Seq + client.AddPending(pCall) + //有返回值时 if reply != nil { + //如果是Call同步调用 hander :=func(Returns interface{}, Err RpcError) { + //如果返回错误 + if len(Err)!=0 { + if callBack!=requestHandlerNull { + callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errors.New(Err.Error()))}) + }else{ + rpcCall := client.FindPending(callSeq) + //如果找不到,说明已经超时 + if rpcCall!= nil { + rpcCall.Err = errors.New(Err.Error()) + rpcCall.Done() + }else{ + log.SError("cannot find call seq ",callSeq) + } + } + return + } + + //解析数据 _, processor := GetProcessorType(Returns) bytes,errs := processor.Marshal(Returns) - if errs != nil { - callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errs)}) + if errs == nil { + errs = processor.Unmarshal(bytes,reply) } - errs = processor.Unmarshal(bytes,reply) + if callBack!=requestHandlerNull { + if errs != nil { + callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errs)}) + } else{ + callBack.Call([]reflect.Value{reflect.ValueOf(reply), nilError}) + } + } - if len(Err)==0{ - callBack.Call([]reflect.Value{reflect.ValueOf(reply), nilError}) + rpcCall := client.FindPending(callSeq) + //如果找不到,说明已经超时 + if rpcCall!= nil { + rpcCall.Err = errs + rpcCall.done<-rpcCall }else{ - callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errors.New(Err.Error()))}) + log.SError("cannot find call seq ",callSeq) } - } - paramList = append(paramList, reflect.ValueOf(hander)) }else{//无返回值时,是一个requestHandlerNull空回调 paramList = append(paramList, callBack) @@ -342,23 +375,45 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},ca //判断返回值是否错误,有错误时则回调 errInter := returnValues[0].Interface() - if errInter != nil { + if errInter != nil && callBack!=requestHandlerNull{ err = errInter.(error) callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) } }else{ paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者 paramList = append(paramList, reflect.ValueOf(param)) - paramList = append(paramList, reflect.ValueOf(reply)) //输出参数 + + //被调用RPC函数有返回值时 + if v.outParamValue.IsValid() { + //不带返回值参数的RPC函数 + if reply == nil { + paramList = append(paramList, reflect.New(v.outParamValue.Type().Elem())) + }else{ + //带返回值参数的RPC函数 + paramList = append(paramList, reflect.ValueOf(reply)) //输出参数 + } + } + returnValues = v.method.Func.Call(paramList) errInter := returnValues[0].Interface() - if errInter != nil { - err = errInter.(error) - callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) + + //如果无回调 + if callBack != requestHandlerNull { + valErr := nilError + if errInter != nil { + err = errInter.(error) + valErr = reflect.ValueOf(err) + } + + callBack.Call([]reflect.Value{reflect.ValueOf(reply),valErr }) } } - + if pCall != nil { + err = pCall.Done().Err + client.RemovePending(pCall.Seq) + ReleaseCall(pCall) + } return err } @@ -397,7 +452,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int serviceName := serviceMethod[:findIndex] if serviceName == handler.rpcHandler.GetName() { //自己服务调用 //调用自己rpcHandler处理器 - return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, requestHandlerNull,nil) + return pLocalRpcServer.myselfRpcHandlerGo(pClientList[i],serviceName, serviceMethod, args, requestHandlerNull,nil) } //其他的rpcHandler的处理器 pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, pClientList[i], true, serviceName, 0, serviceMethod, args, nil, nil) @@ -451,7 +506,7 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf serviceName := serviceMethod[:findIndex] if serviceName == handler.rpcHandler.GetName() { //自己服务调用 //调用自己rpcHandler处理器 - return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,requestHandlerNull, reply) + return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,requestHandlerNull, reply) } //其他的rpcHandler的处理器 pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil, pClient, false, serviceName, 0, serviceMethod, args, reply, nil) @@ -530,7 +585,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i serviceName := serviceMethod[:findIndex] //调用自己rpcHandler处理器 if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,fVal ,reply) + return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,fVal ,reply) } //其他的rpcHandler的处理器 @@ -607,7 +662,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i pLocalRpcServer := handler.funcRpcServer() //调用自己rpcHandler处理器 if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil) + err := pLocalRpcServer.myselfRpcHandlerGo(handler.pClientList[i],serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil) //args.DoGc() return err } diff --git a/rpc/server.go b/rpc/server.go index 3bacd6a..6d5dcc5 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -9,6 +9,7 @@ import ( "net" "reflect" "strings" + "time" ) type RpcProcessorType uint8 @@ -62,6 +63,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { server.rpcServer = &network.TCPServer{} } +const Default_ReadWriteDeadline = 10*time.Second func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { splitAddr := strings.Split(listenAddr, ":") @@ -82,6 +84,8 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { server.rpcServer.PendingWriteNum = 2000000 server.rpcServer.NewAgent = server.NewAgent server.rpcServer.LittleEndian = LittleEndian + server.rpcServer.WriteDeadline = network.Default_WriteDeadline + server.rpcServer.ReadDeadline = network.Default_WriteDeadline server.rpcServer.Start() } @@ -234,7 +238,7 @@ func (server *Server) NewAgent(c *network.TCPConn) network.Agent { return agent } -func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error { +func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error { rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler == nil { err := errors.New("service method " + serviceMethod + " not config!") @@ -244,7 +248,7 @@ func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod strin - return rpcHandler.CallMethod(serviceMethod, args,callBack, reply) + return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply) } func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call { @@ -255,8 +259,8 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie if rpcHandler == nil { pCall.Seq = 0 pCall.Err = errors.New("service method " + serviceMethod + " not config!") - log.SError(pCall.Err.Error()) pCall.done <- pCall + log.SError(pCall.Err.Error()) return pCall } @@ -280,33 +284,34 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie if noReply == false { client.AddPending(pCall) + callSeq := pCall.Seq req.requestHandle = func(Returns interface{}, Err RpcError) { if reply != nil && Returns != reply && Returns != nil { byteReturns, err := req.rpcProcessor.Marshal(Returns) if err != nil { - log.SError("returns data cannot be marshal ", pCall.Seq) + log.SError("returns data cannot be marshal ", callSeq) ReleaseRpcRequest(req) } err = req.rpcProcessor.Unmarshal(byteReturns, reply) if err != nil { - log.SError("returns data cannot be Unmarshal ", pCall.Seq) + log.SError("returns data cannot be Unmarshal ", callSeq) ReleaseRpcRequest(req) } } - v := client.RemovePending(pCall.Seq) + v := client.RemovePending(callSeq) if v == nil { - log.SError("rpcClient cannot find seq ", pCall.Seq, " in pending") + log.SError("rpcClient cannot find seq ",callSeq, " in pending") ReleaseRpcRequest(req) return } if len(Err) == 0 { - pCall.Err = nil + v.Err = nil } else { - pCall.Err = Err + v.Err = Err } - pCall.done <- pCall + v.done <- v ReleaseRpcRequest(req) } }