diff --git a/rpc/rpc.go b/rpc/rpc.go index 291b5d3..372d8e9 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -10,10 +10,8 @@ type RpcRequest struct { ref bool RpcRequestData IRpcRequestData - bLocalRequest bool + inParam interface{} localReply interface{} - localParam interface{} //本地调用的参数列表 - inputArgs IRawInputArgs requestHandle RequestHandler callback *reflect.Value @@ -87,11 +85,9 @@ func init(){ func (slf *RpcRequest) Clear() *RpcRequest{ slf.RpcRequestData = nil slf.localReply = nil - slf.localParam = nil + slf.inParam = nil slf.requestHandle = nil slf.callback = nil - slf.bLocalRequest = false - slf.inputArgs = nil slf.rpcProcessor = nil return slf } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 21ee0fa..1b8e3c2 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -1,6 +1,7 @@ package rpc import ( + "errors" "fmt" "github.com/duanhf2012/origin/log" "reflect" @@ -45,7 +46,12 @@ type RpcMethodInfo struct { rpcProcessorType RpcProcessorType } -type RawRpcCallBack func(rawData []byte) +type RawRpcCallBack interface { + Unmarshal(data []byte) (interface{},error) + CB(data interface{}) +} + +//type RawRpcCallBack func(rawData []byte) type RpcHandler struct { callRequest chan *RpcRequest rpcHandler IRpcHandler @@ -81,11 +87,13 @@ type IRpcHandler interface { AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error - RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error + RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs []byte) error CastGo(serviceMethod string,args interface{}) IsSingleCoroutine() bool + UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error) } + func reqHandlerNull(Returns interface{},Err RpcError) { } @@ -240,9 +248,6 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { ReleaseRpcRequest(request) } }() - if request.inputArgs!=nil { - defer request.inputArgs.DoGc() - } //如果是原始RPC请求 rawRpcId := request.RpcRequestData.GetRpcMethodId() @@ -253,12 +258,8 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { log.Error(err) return } - if request.inputArgs != nil { - v(request.inputArgs.GetRawData()) - }else{ - v(request.RpcRequestData.GetInParam()) - } + v.CB(request.inParam) return } @@ -275,21 +276,6 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { var paramList []reflect.Value var err error - iParam := reflect.New(v.inParamValue.Type().Elem()).Interface() - if request.bLocalRequest == false { - err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam) - if err!=nil { - rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() - log.Error(rErr) - if request.requestHandle!=nil { - request.requestHandle(nil, RpcError(rErr)) - } - return - } - }else { - iParam = request.localParam - } - //生成Call参数 paramList = append(paramList,reflect.ValueOf(handler.GetRpcHandler())) //接受者 if v.hasResponder == true { @@ -301,7 +287,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } } - paramList = append(paramList,reflect.ValueOf(iParam)) + paramList = append(paramList,reflect.ValueOf(request.inParam)) var oParam reflect.Value if v.outParamValue.IsValid() { if request.localReply!=nil { @@ -567,17 +553,17 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) { handler.goRpc(nil,true,0,serviceMethod,args) } -func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error { +func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs []byte) error { processor := GetProcessor(uint8(rpcProcessorType)) var pClientList [maxClusterNode]*Client err,count := handler.funcRpcClient(nodeId,serviceName,pClientList[:]) if count==0||err != nil { - args.DoGc() + //args.DoGc() log.Error("Call serviceMethod is error:%+v!",err) return err } if count > 1 { - args.DoGc() + //args.DoGc() log.Error("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!") } @@ -589,13 +575,13 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in pLocalRpcServer:= handler.funcRpcServer() //调用自己rpcHandler处理器 if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceName,args,nil) - args.DoGc() + err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceName,rawArgs,nil) + //args.DoGc() return err } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,rpcMethodId,serviceName,nil,nil,args) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,rpcMethodId,serviceName,nil,nil,rawArgs) if pCall.Err!=nil { err = pCall.Err } @@ -605,8 +591,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in } //跨node调用 - pCall := pClientList[i].RawGo(processor,true,rpcMethodId,serviceName,args.GetRawData(),nil) - args.DoGc() + pCall := pClientList[i].RawGo(processor,true,rpcMethodId,serviceName,rawArgs,nil) if pCall.Err!=nil { err = pCall.Err } @@ -620,3 +605,33 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in func (handler *RpcHandler) RegRawRpc(rpcMethodId uint32,rawRpcCB RawRpcCallBack){ handler.mapRawFunctions[rpcMethodId] = rawRpcCB } + +func (handler *RpcHandler) UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error){ + if rawRpcMethodId>0 { + v,ok := handler.mapRawFunctions[rawRpcMethodId] + if ok == false { + err := fmt.Errorf("RpcHandler cannot find request rpc id %d!",rawRpcMethodId) + log.Error(err.Error()) + return nil,err + } + + msg,err := v.Unmarshal(inParam) + if err != nil { + err := fmt.Errorf("RpcHandler cannot Unmarshal rpc id %d!",rawRpcMethodId) + log.Error(err.Error()) + return nil,err + } + + return msg,err + } + + v,ok := handler.mapFunctions[serviceMethod] + if ok == false { + return nil,errors.New( "RpcHandler "+handler.rpcHandler.GetName()+"cannot find "+serviceMethod) + } + + var err error + param := reflect.New(v.inParamValue.Type().Elem()).Interface() + err = rpcProcessor.Unmarshal(inParam,inParam) + return param,err +} diff --git a/rpc/server.go b/rpc/server.go index 9e8145e..a6b7596 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -132,7 +132,9 @@ func (agent *RpcAgent) Run() { log.Error("rpc Unmarshal request is error: %v", err) if req.RpcRequestData.GetSeq()>0 { rpcError := RpcError(err.Error()) - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) + if req.RpcRequestData.IsNoReply()==false { + agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) + } ReleaseRpcRequest(req) continue }else{ @@ -146,7 +148,9 @@ func (agent *RpcAgent) Run() { serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".") if len(serviceMethod) < 1 { rpcError := RpcError("rpc request req.ServiceMethod is error") - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) + if req.RpcRequestData.IsNoReply()==false { + agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) + } ReleaseRpcRequest(req) log.Debug("rpc request req.ServiceMethod is error") continue @@ -155,7 +159,9 @@ func (agent *RpcAgent) Run() { rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) if rpcHandler== nil { rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) + if req.RpcRequestData.IsNoReply()==false { + agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) + } ReleaseRpcRequest(req) log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) continue @@ -168,6 +174,17 @@ func (agent *RpcAgent) Run() { } } + req.inParam,err = rpcHandler.UnmarshalInParam(req.rpcProcessor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetRpcMethodId(),req.RpcRequestData.GetInParam()) + if err != nil { + rErr := "Call Rpc "+req.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() + if req.requestHandle!=nil { + req.requestHandle(nil, RpcError(rErr)) + } + log.Error(rErr) + ReleaseRpcRequest(req) + continue + } + err = rpcHandler.PushRequest(req) if err != nil { rpcError := RpcError(err.Error()) @@ -221,15 +238,12 @@ func (server *Server) myselfRpcHandlerGo(handlerName string,serviceMethod string } -func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,rpcMethodId uint32,serviceMethod string, args interface{},reply interface{},inputArgs IRawInputArgs) *Call { +func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,rpcMethodId uint32,serviceMethod string, args interface{},reply interface{},rawArgs []byte) *Call { pCall := MakeCall() pCall.Seq = client.generateSeq() rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler== nil { - if inputArgs!= nil { - inputArgs.DoGc() - } pCall.Seq = 0 pCall.Err = fmt.Errorf("service method %s not config!", serviceMethod) log.Error("%s",pCall.Err.Error()) @@ -242,11 +256,18 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien _,processor = GetProcessorType(args) } req := MakeRpcRequest(processor,0,rpcMethodId, serviceMethod,noReply,nil) - - req.bLocalRequest = true - req.localParam = args + req.inParam = args req.localReply = reply - req.inputArgs = inputArgs + if rawArgs!=nil { + var err error + req.inParam,err = rpcHandler.UnmarshalInParam(processor,serviceMethod,rpcMethodId,rawArgs) + if err != nil { + ReleaseRpcRequest(req) + pCall.Err = err + pCall.done <- pCall + } + } + //req.inputArgs = inputArgs if noReply == false { client.AddPending(pCall) @@ -285,12 +306,9 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler _,processor := GetProcessorType(args) req := MakeRpcRequest(processor,0,0,serviceMethod,noReply,nil) - req.localParam = args + req.inParam = args req.localReply = reply - req.bLocalRequest = true - //req.rpcProcessor =processor - //req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil) if noReply == false { callSeq := client.generateSeq() pCall := MakeCall()