From 6edc5a07626213f3aa1335cd60ad32d9a27e1f99 Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 8 Oct 2020 14:12:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AF=B9rpc=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E6=95=B0=E6=8D=AE=E7=B1=BB=E5=9E=8B=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E8=AF=86=E5=88=AB=EF=BC=88=E5=90=8C=E6=97=B6=E6=94=AF=E6=8C=81?= =?UTF-8?q?json=E4=B8=8Eprotobuf=E6=96=B9=E5=BC=8F=E5=BA=8F=E5=88=97?= =?UTF-8?q?=E5=8C=96=E4=B8=8E=E5=8F=8D=E5=BA=8F=E5=88=97=E5=8C=96=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 17 ++++++--- rpc/jsonprocessor.go | 6 ++++ rpc/pbprocessor.go | 6 +++- rpc/processor.go | 1 + rpc/rpc.go | 1 + rpc/rpchandler.go | 42 ++++++++++++---------- rpc/server.go | 62 +++++++++++++++++++++++++++------ sysservice/tcpgateway/Router.go | 4 +-- 8 files changed, 102 insertions(+), 37 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 7cf67e6..062a9a5 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -164,6 +164,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback call.rpcHandler = rpcHandler call.ServiceMethod = serviceMethod + processorType, processor := GetProcessorType(args) InParam,herr := processor.Marshal(args) if herr != nil { ReleaseCall(call) @@ -190,7 +191,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod) } - err = slf.conn.WriteMsg(bytes) + err = slf.conn.WriteMsg([]byte{uint8(processorType)},bytes) if err != nil { slf.RemovePending(call.Seq) ReleaseCall(call) @@ -199,7 +200,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback return err } -func (slf *Client) RawGo(noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call { +func (slf *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call { call := MakeCall() call.ServiceMethod = serviceMethod call.Reply = reply @@ -235,13 +236,14 @@ func (slf *Client) RawGo(noReply bool,serviceMethod string,args []byte,additionP } func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call { + _,processor := GetProcessorType(args) InParam,err := processor.Marshal(args) if err != nil { call := MakeCall() call.Err = err } - return slf.RawGo(noReply,serviceMethod,InParam,nil,reply) + return slf.RawGo(processor,noReply,serviceMethod,InParam,nil,reply) } func (slf *Client) Run(){ @@ -260,11 +262,18 @@ func (slf *Client) Run(){ log.Error("rpcClient %s ReadMsg error:%+v",slf.Addr,err) return } + + processor := GetProcessor(uint8(bytes[0])) + if processor==nil { + log.Error("rpcClient %s ReadMsg head error:%+v",slf.Addr,err) + return + } + //1.解析head respone := &RpcResponse{} respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil) - err = processor.Unmarshal(bytes,respone.RpcResponeData) + err = processor.Unmarshal(bytes[1:],respone.RpcResponeData) if err != nil { processor.ReleaseRpcRespose(respone.RpcResponeData) log.Error("rpcClient Unmarshal head error,error:%+v",err) diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index a7d511d..de038ab 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -81,6 +81,12 @@ func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ rpcJsonResponeDataPool.Put(rpcRequestData) } +func (slf *JsonProcessor) IsParse(param interface{}) bool { + _,err := json.Marshal(param) + return err==nil +} + + func (slf *JsonRpcRequestData) IsNoReply() bool{ return slf.NoReply } diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index e481839..d0ff233 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -128,6 +128,11 @@ func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ rpcPbResponeDataPool.Put(rpcRequestData) } +func (slf *PBProcessor) IsParse(param interface{}) bool { + _,ok := param.(proto.Message) + return ok +} + func (slf *PBRpcRequestData) IsNoReply() bool{ return slf.GetNoReply() @@ -147,4 +152,3 @@ func (slf *PBRpcResponseData) GetErr() *RpcError { - diff --git a/rpc/processor.go b/rpc/processor.go index b21cb31..591f9bc 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -8,6 +8,7 @@ type IRpcProcessor interface { ReleaseRpcRequest(rpcRequestData IRpcRequestData) ReleaseRpcRespose(rpcRequestData IRpcResponseData) + IsParse(param interface{}) bool //是否可解析 } diff --git a/rpc/rpc.go b/rpc/rpc.go index 61f0c2d..726dc87 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -15,6 +15,7 @@ type RpcRequest struct { localRawParam []byte requestHandle RequestHandler callback *reflect.Value + rpcProcessor IRpcProcessor } type RpcResponse struct { diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index dc29c7e..7bf562c 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -46,6 +46,7 @@ type RpcMethodInfo struct { additionParam reflect.Value //addition *IRawAdditionParam hashAdditionParam bool + rpcProcessorType RpcProcessorType } type RpcHandler struct { @@ -76,8 +77,8 @@ type IRpcHandler interface { AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error - RawGoNode(nodeId int,serviceMethod string,args []byte,additionParam interface{}) error - RawCastGo(serviceMethod string,args []byte,additionParam interface{}) + RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error + RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) } var rawAdditionParamValueNull reflect.Value @@ -152,6 +153,9 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error { } rpcMethodInfo.iparam = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,) + pt,_ := GetProcessorType(rpcMethodInfo.iparam.Interface()) + rpcMethodInfo.rpcProcessorType = pt + parIdx++ if parIdx< typ.NumIn() { rpcMethodInfo.oParam = reflect.New(typ.In(parIdx).Elem()) @@ -225,7 +229,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } }() defer ReleaseRpcRequest(request) - defer processor.ReleaseRpcRequest(request.RpcRequestData) + defer request.rpcProcessor.ReleaseRpcRequest(request.RpcRequestData) v,ok := slf.mapfunctons[request.RpcRequestData.GetServiceMethod()] if ok == false { @@ -241,7 +245,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { var err error iparam := reflect.New(v.iparam.Type().Elem()).Interface() if request.bLocalRequest == false { - err = processor.Unmarshal(request.RpcRequestData.GetInParam(),iparam) + err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iparam) if err!=nil { rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) log.Error("%s",rerr.Error()) @@ -252,7 +256,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } }else { if request.localRawParam!=nil { - err = processor.Unmarshal(request.localRawParam,iparam) + err = request.rpcProcessor.Unmarshal(request.localRawParam,iparam) if err!=nil { rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) log.Error("%s",rerr.Error()) @@ -327,7 +331,7 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i return err } -func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args interface{}) error { +func (slf *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error { var pClientList []*Client err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList) if err != nil { @@ -360,7 +364,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],args,nil,nil,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],args,nil,nil,nil) if pCall.Err!=nil { err = pCall.Err } @@ -381,7 +385,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int -func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error { +func (slf *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error { var pClientList []*Client err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList) if err != nil { @@ -414,7 +418,7 @@ func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],nil,args,nil,additionParam) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],nil,args,nil,additionParam) if pCall.Err!=nil { err = pCall.Err } @@ -423,7 +427,7 @@ func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args } //跨node调用 - pCall := pClient.RawGo(true,serviceMethod,args,additionParam,nil) + pCall := pClient.RawGo(processor,true,serviceMethod,args,additionParam,nil) if pCall.Err!=nil { err = pCall.Err } @@ -464,7 +468,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{}, return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil) err = pCall.Done().Err pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) @@ -549,7 +553,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa } return nil } - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil) err = pCall.Done().Err pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) @@ -584,7 +588,7 @@ func (slf *RpcHandler) Call(serviceMethod string,args interface{},reply interfac func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error { - return slf.goRpc(false,0,serviceMethod,args) + return slf.goRpc(nil,false,0,serviceMethod,args) } func (slf *RpcHandler) AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error { @@ -596,18 +600,18 @@ func (slf *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{} } func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error { - return slf.goRpc(false,nodeId,serviceMethod,args) + return slf.goRpc(nil,false,nodeId,serviceMethod,args) } func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) { - slf.goRpc(true,0,serviceMethod,args) + slf.goRpc(nil,true,0,serviceMethod,args) } -func (slf *RpcHandler) RawGoNode(nodeId int,serviceMethod string,args []byte,additionParam interface{}) error { - return slf.rawGoRpc(false,nodeId,serviceMethod,args,additionParam) +func (slf *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error { + return slf.rawGoRpc(GetProcessor(uint8(rpcProcessorType)),false,nodeId,serviceMethod,args,additionParam) } -func (slf *RpcHandler) RawCastGo(serviceMethod string,args []byte,additionParam interface{}) { - slf.goRpc(true,0,serviceMethod,args) +func (slf *RpcHandler) RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) { + slf.goRpc(GetProcessor(uint8(rpcProcessorType)),true,0,serviceMethod,args) } diff --git a/rpc/server.go b/rpc/server.go index 92dd258..6b34db3 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -9,8 +9,16 @@ import ( "reflect" "strings" ) +type RpcProcessorType uint8 -var processor IRpcProcessor = &JsonProcessor{} +const ( + RPC_PROCESSOR_JSON RpcProcessorType = 0 + RPC_PROCESSOR_PB RpcProcessorType = 1 +) + +//var processor IRpcProcessor = &JsonProcessor{} +var arrayProcessor = []IRpcProcessor{&JsonProcessor{},&PBProcessor{}} +var arrayProcessorLen uint8 = 2 var LittleEndian bool type Server struct { @@ -20,8 +28,26 @@ type Server struct { rpcserver *network.TCPServer } -func SetProcessor(proc IRpcProcessor) { - processor = proc +func AppendProcessor(rpcProcessor IRpcProcessor) { + arrayProcessor = append(arrayProcessor,rpcProcessor) + arrayProcessorLen++ +} + +func GetProcessorType(param interface{}) (RpcProcessorType,IRpcProcessor){ + for i:=uint8(1);i=arrayProcessorLen{ + return nil + } + return arrayProcessor[processorType] } func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) { @@ -56,7 +82,7 @@ type RpcAgent struct { } -func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interface{},err *RpcError) { +func (agent *RpcAgent) WriteRespone(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err *RpcError) { var mReply []byte var rpcError *RpcError var errM error @@ -98,15 +124,22 @@ func (agent *RpcAgent) Run() { //will close tcpconn break } + processor := GetProcessor(uint8(data[0])) + if processor==nil { + log.Error("remote rpc %s data head error:%+v",agent.conn.RemoteAddr(),err) + return + } + //解析head req := MakeRpcRequest() + req.rpcProcessor = processor req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil,nil) - err = processor.Unmarshal(data,req.RpcRequestData) + err = processor.Unmarshal(data[1:],req.RpcRequestData) if err != nil { log.Error("rpc Unmarshal request is error: %v", err) if req.RpcRequestData.GetSeq()>0 { rpcError := RpcError(err.Error()) - agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) continue @@ -122,7 +155,7 @@ func (agent *RpcAgent) Run() { serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".") if len(serviceMethod)!=2 { rpcError := RpcError("rpc request req.ServiceMethod is error") - agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) log.Debug("rpc request req.ServiceMethod is error") @@ -132,7 +165,7 @@ func (agent *RpcAgent) Run() { rpcHandler := agent.rpcserver.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) if rpcHandler== nil { rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) - agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) @@ -141,7 +174,7 @@ func (agent *RpcAgent) Run() { if req.RpcRequestData.IsNoReply()==false { req.requestHandle = func(Returns interface{},Err *RpcError){ - agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err) + agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err) } } @@ -150,7 +183,7 @@ func (agent *RpcAgent) Run() { rpcError := RpcError(err.Error()) if req.RpcRequestData.IsNoReply() { - agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) } processor.ReleaseRpcRequest(req.RpcRequestData) @@ -200,7 +233,7 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args } -func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call { +func (slf *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call { pCall := MakeCall() pCall.Seq = client.generateSeq() @@ -217,7 +250,12 @@ func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName req.localParam = args req.localReply = reply req.localRawParam = rawArgs + if processor == nil { + _,processor = GetProcessorType(args) + } + req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,additionParam) + req.rpcProcessor = processor if noReply == false { client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err *RpcError){ @@ -267,6 +305,8 @@ func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRp req.localParam = args req.localReply = reply req.bLocalRequest = true + _,processor := GetProcessorType(args) + req.rpcProcessor =processor req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,nil) if noReply == false { client.AddPending(pCall) diff --git a/sysservice/tcpgateway/Router.go b/sysservice/tcpgateway/Router.go index 12717de..5aee98e 100644 --- a/sysservice/tcpgateway/Router.go +++ b/sysservice/tcpgateway/Router.go @@ -201,7 +201,7 @@ func (slf *Router) RouterMessage(clientId uint64,msgType uint16,msg []byte) { } if routerId>0 { - slf.rpcHandler.RawGoNode(routerId,routerInfo.Rpc,msg,proto.Uint64(clientId)) + slf.rpcHandler.RawGoNode(rpc.RPC_PROCESSOR_PB,routerId,routerInfo.Rpc,msg,proto.Uint64(clientId)) } } @@ -222,7 +222,7 @@ func (slf *Router) RouterEvent(clientId uint64,eventType string) bool{ } if routerId>0 { - slf.rpcHandler.RawGoNode(routerId,routerInfo.Rpc,[]byte{},proto.Uint64(clientId)) + slf.rpcHandler.RawGoNode(rpc.RPC_PROCESSOR_PB,routerId,routerInfo.Rpc,[]byte{},proto.Uint64(clientId)) return true }