From e91dccca7d382fab4c89384f4116efbb55ef4c2f Mon Sep 17 00:00:00 2001 From: boyce Date: Wed, 4 Nov 2020 16:56:16 +0800 Subject: [PATCH] =?UTF-8?q?gc=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/tcp_client.go | 2 +- rpc/client.go | 2 +- rpc/jsonprocessor.go | 2 +- rpc/pbprocessor.go | 8 +++----- rpc/processor.go | 2 +- rpc/rpc.go | 2 +- rpc/rpchandler.go | 44 +++++++++++++++++++---------------------- rpc/server.go | 46 ++++++++++++++++--------------------------- 8 files changed, 45 insertions(+), 63 deletions(-) diff --git a/network/tcp_client.go b/network/tcp_client.go index e1c7249..e800c24 100644 --- a/network/tcp_client.go +++ b/network/tcp_client.go @@ -49,7 +49,7 @@ func (client *TCPClient) init() { log.Release("invalid ConnectInterval, reset to %v", client.ConnectInterval) } if client.PendingWriteNum <= 0 { - client.PendingWriteNum = 100 + client.PendingWriteNum = 1000 log.Release("invalid PendingWriteNum, reset to %v", client.PendingWriteNum) } if client.NewAgent == nil { diff --git a/rpc/client.go b/rpc/client.go index ca62954..361d5aa 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -273,7 +273,7 @@ func (client *Client) Run(){ //1.解析head response := RpcResponse{} - response.RpcResponseData =processor.MakeRpcResponse(0,nil,nil) + response.RpcResponseData =processor.MakeRpcResponse(0,RpcError(""),nil) err = processor.Unmarshal(bytes[1:], response.RpcResponseData) client.conn.ReleaseReadMsg(bytes) diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 2df6190..34fbb2c 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -60,7 +60,7 @@ func (jsonProcessor *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod stri return jsonRpcRequestData } -func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { +func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData { jsonRpcResponseData := rpcJsonResponseDataPool.Get().(*JsonRpcResponseData) jsonRpcResponseData.Seq = seq jsonRpcResponseData.Err = err.Error() diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index 5effaa3..905ada2 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -88,11 +88,9 @@ func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply return slf } -func (slf *PBRpcResponseData) MakeRespone(seq uint64,err *RpcError,reply []byte) *PBRpcResponseData{ +func (slf *PBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *PBRpcResponseData{ slf.Seq = proto.Uint64(seq) - if err != nil { - slf.Error = proto.String(err.Error()) - } + slf.Error = proto.String(err.Error()) slf.Reply = reply return slf @@ -113,7 +111,7 @@ func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply b return pPbRpcRequestData } -func (slf *PBProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { +func (slf *PBProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData { pPBRpcResponseData := rpcPbResponseDataPool.Get().(*PBRpcResponseData) pPBRpcResponseData.MakeRespone(seq,err,reply) return pPBRpcResponseData diff --git a/rpc/processor.go b/rpc/processor.go index 564010c..2557a60 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -4,7 +4,7 @@ type IRpcProcessor interface { Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区,可以填nil,由系统自动分配 Unmarshal(data []byte, v interface{}) error MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,additionParam interface{}) IRpcRequestData - MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData + MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData ReleaseRpcRequest(rpcRequestData IRpcRequestData) ReleaseRpcResponse(rpcRequestData IRpcResponseData) diff --git a/rpc/rpc.go b/rpc/rpc.go index da82272..dc425b1 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -55,7 +55,7 @@ type RpcHandleFinder interface { FindRpcHandler(serviceMethod string) IRpcHandler } -type RequestHandler func(Returns interface{},Err *RpcError) +type RequestHandler func(Returns interface{},Err RpcError) type RawAdditionParamNull struct { } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index cc6cf24..dede617 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -17,21 +17,17 @@ var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) type RpcError string -func (e *RpcError) Error() string { +func (e RpcError) Error() string { + return string(e) +} + +func ConvertError(e error) RpcError{ if e == nil { return "" } - return string(*e) -} - -func ConvertError(e error) *RpcError{ - if e == nil { - return nil - } - rpcErr := RpcError(e.Error()) - return &rpcErr + return rpcErr } func Errorf(format string, a ...interface{}) *RpcError { @@ -224,7 +220,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { log.Error("Handler Rpc %s Core dump info:%+v\n",request.RpcRequestData.GetServiceMethod(),err) rpcErr := RpcError("call error : core dumps") if request.requestHandle!=nil { - request.requestHandle(nil,&rpcErr) + request.requestHandle(nil,rpcErr) } } }() @@ -236,10 +232,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { v,ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()] if ok == false { - err := Errorf("RpcHandler %s cannot find %s", handler.rpcHandler.GetName(),request.RpcRequestData.GetServiceMethod()) - log.Error("%s",err.Error()) + err := "RpcHandler "+handler.rpcHandler.GetName()+"cannot find "+request.RpcRequestData.GetServiceMethod() + log.Error(err) if request.requestHandle!=nil { - request.requestHandle(nil,err) + request.requestHandle(nil,RpcError(err)) } return } @@ -257,10 +253,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { if request.bLocalRequest == false { err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam) if err!=nil { - rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) - log.Error("%s",rerr.Error()) + rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() + log.Error(rErr) if request.requestHandle!=nil { - request.requestHandle(nil, rerr) + request.requestHandle(nil, RpcError(rErr)) } return } @@ -268,10 +264,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { if request.inputArgs!=nil { err = request.rpcProcessor.Unmarshal(request.inputArgs.GetRawData(),iParam) if err!=nil { - rErr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) - log.Error("%s", rErr.Error()) + rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() + log.Error(rErr) if request.requestHandle!=nil { - request.requestHandle(nil, rErr) + request.requestHandle(nil, RpcError(rErr)) } return } @@ -302,10 +298,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { oParam = reflect.New(v.outParamValue.Type().Elem()) } paramList = append(paramList,oParam) //输出参数 - }else if(request.requestHandle != nil){ //调用方有返回值,但被调用函数没有返回参数 - rErr := Errorf("Call Rpc %s without return parameter!",request.RpcRequestData.GetServiceMethod()) - log.Error("%s",rErr.Error()) - request.requestHandle(nil, rErr) + }else if request.requestHandle != nil { //调用方有返回值,但被调用函数没有返回参数 + rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+"without return parameter!" + log.Error(rErr) + request.requestHandle(nil, RpcError(rErr)) return } returnValues := v.method.Func.Call(paramList) diff --git a/rpc/server.go b/rpc/server.go index 39aee49..534d005 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -81,19 +81,16 @@ func (server *Server) Start(listenAddr string) { func (agent *RpcAgent) OnDestroy() {} -func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err *RpcError) { +func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err RpcError) { var mReply []byte - var rpcError *RpcError + var rpcError RpcError var errM error - if err != nil { - rpcError = err - } else { - if reply!=nil { - mReply,errM = processor.Marshal(reply) - if errM != nil { - rpcError = ConvertError(errM) - } + + if reply!=nil { + mReply,errM = processor.Marshal(reply) + if errM != nil { + rpcError = ConvertError(errM) } } @@ -139,7 +136,7 @@ func (agent *RpcAgent) Run() { log.Error("rpc Unmarshal request is error: %v", err) if req.RpcRequestData.GetSeq()>0 { rpcError := RpcError(err.Error()) - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) continue @@ -155,7 +152,7 @@ func (agent *RpcAgent) Run() { serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".") if len(serviceMethod)!=2 { rpcError := RpcError("rpc request req.ServiceMethod is error") - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) log.Debug("rpc request req.ServiceMethod is error") @@ -165,7 +162,7 @@ func (agent *RpcAgent) Run() { rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) if rpcHandler== nil { rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) @@ -173,7 +170,7 @@ func (agent *RpcAgent) Run() { } if req.RpcRequestData.IsNoReply()==false { - req.requestHandle = func(Returns interface{},Err *RpcError){ + req.requestHandle = func(Returns interface{},Err RpcError){ agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err) } } @@ -183,7 +180,7 @@ func (agent *RpcAgent) Run() { rpcError := RpcError(err.Error()) if req.RpcRequestData.IsNoReply() { - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) } processor.ReleaseRpcRequest(req.RpcRequestData) @@ -264,20 +261,14 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien req.rpcProcessor = processor if noReply == false { client.AddPending(pCall) - req.requestHandle = func(Returns interface{},Err *RpcError){ + req.requestHandle = func(Returns interface{},Err RpcError){ v := client.RemovePending(pCall.Seq) if v == nil { log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) ReleaseCall(pCall) return } - - if Err!=nil { - pCall.Err = Err - }else{ - pCall.Err = nil - } - + pCall.Err = Err pCall.done <- pCall } } @@ -316,7 +307,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil,nil) if noReply == false { client.AddPending(pCall) - req.requestHandle = func(Returns interface{},Err *RpcError){ + req.requestHandle = func(Returns interface{},Err RpcError){ v := client.RemovePending(pCall.Seq) if v == nil { log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) @@ -326,11 +317,8 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler return } - if Err == nil { - pCall.Err = nil - }else{ - pCall.Err = Err - } + pCall.Err = Err + if Returns!=nil { pCall.Reply = Returns