From 323e5313fbbe320e0bf19b8464d39d130ff3b4e8 Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 9 Jul 2020 20:01:45 +0800 Subject: [PATCH] =?UTF-8?q?rpc=E4=B8=B4=E6=97=B6=E5=86=85=E5=AD=98?= =?UTF-8?q?=E6=B1=A0=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 54 +++------------------ rpc/jsonprocessor.go | 7 +++ rpc/msgpprocessor.go | 44 ++++++++++++++--- rpc/pbprocessor.go | 10 ++++ rpc/processor.go | 39 ++------------- rpc/rpc.go | 113 +++++++++++++++++++++++++++++++++++++++++++ rpc/rpchandler.go | 21 +++++--- rpc/server.go | 89 +++++++++++++++------------------- 8 files changed, 234 insertions(+), 143 deletions(-) create mode 100644 rpc/rpc.go diff --git a/rpc/client.go b/rpc/client.go index 8b7142d..755b89a 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -56,7 +56,7 @@ func (slf *Client) Connect(addr string) error { } func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { - call := new(Call) + call := MakeCall() call.Reply = replyParam call.callback = &callback call.rpcHandler = rpcHandler @@ -72,16 +72,13 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback r slf.pendingLock.Lock() slf.startSeq += 1 call.Seq = slf.startSeq - //request.Seq = slf.startSeq - //request.NoReply = false - //request.ServiceMethod = serviceMethod request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam) - slf.pending[call.Seq] = call + slf.pending[call.Seq] = call//如果下面发送失败,将会一一直存在这里 slf.pendingLock.Unlock() - - bytes,err := processor.Marshal(request.RpcRequestData) + processor.ReleaseRpcRequest(request.RpcRequestData) + if err != nil { return err } @@ -97,8 +94,7 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback r } func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call { - call := new(Call) - call.done = make(chan *Call,1) + call := MakeCall() call.Reply = reply call.ServiceMethod = serviceMethod @@ -109,19 +105,15 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply } request := &RpcRequest{} - //request.NoReply = noReply call.Arg = args slf.pendingLock.Lock() slf.startSeq += 1 call.Seq = slf.startSeq - //request.Seq = slf.startSeq - // request.ServiceMethod = serviceMethod request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam) - slf.pending[call.Seq] = call + //slf.pending[call.Seq] = call slf.pendingLock.Unlock() - - bytes,err := processor.Marshal(request.RpcRequestData) + processor.ReleaseRpcRequest(request.RpcRequestData) if err != nil { call.Err = err return call @@ -140,37 +132,6 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply return call } -type RequestHandler func(Returns interface{},Err *RpcError) - -type RpcRequest struct { - RpcRequestData IRpcRequestData - - //packhead - /*Seq uint64 // sequence number chosen by client - ServiceMethod string // format: "Service.Method" - NoReply bool //是否需要返回 - //packbody - InParam []byte -*/ - //other data - localReply interface{} - localParam interface{} //本地调用的参数列表 - requestHandle RequestHandler - callback *reflect.Value -} - -type RpcResponse struct { - RpcResponeData IRpcResponseData - /* - //head - Seq uint64 // sequence number chosen by client - Err *RpcError - - //returns - Reply []byte*/ -} - - func (slf *Client) Run(){ defer func() { if r := recover(); r != nil { @@ -190,6 +151,7 @@ func (slf *Client) Run(){ //1.解析head respone := &RpcResponse{} respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil) + defer processor.ReleaseRpcRespose(respone.RpcResponeData) err = processor.Unmarshal(bytes,respone.RpcResponeData) if err != nil { log.Error("rpcClient Unmarshal head error,error:%+v",err) diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 0e5ae21..26c44bf 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -44,6 +44,13 @@ func (slf *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) } } +func (slf *JsonProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ + +} +func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ + +} + func (slf *JsonRpcRequestData) IsReply() bool{ return slf.NoReply } diff --git a/rpc/msgpprocessor.go b/rpc/msgpprocessor.go index f5387ae..6b36bc5 100644 --- a/rpc/msgpprocessor.go +++ b/rpc/msgpprocessor.go @@ -1,11 +1,27 @@ package rpc +import "sync" + type IMsgp interface { UnmarshalMsg(bts []byte) (o []byte, err error) MarshalMsg(b []byte) (o []byte, err error) } +var rpcResponeDataPool sync.Pool +var rpcRequestDataPool sync.Pool + type MsgpProcessor struct { + +} + +func init(){ + rpcResponeDataPool.New = func()interface{}{ + return &MsgpRpcResponseData{} + } + + rpcRequestDataPool.New = func()interface{}{ + return &MsgpRpcRequestData{} + } } //go:generate msgp @@ -31,6 +47,7 @@ type MsgpRpcResponseData struct { func (slf *MsgpProcessor) Marshal(v interface{}) ([]byte, error){ msgp := v.(IMsgp) + return msgp.MarshalMsg(nil) } @@ -41,15 +58,30 @@ func (slf *MsgpProcessor) Unmarshal(data []byte, v interface{}) error{ } func (slf *MsgpProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ - return &MsgpRpcRequestData{Seq:seq,ServiceMethod:serviceMethod,NoReply:noReply,InParam:inParam} + rpcRequestData := rpcRequestDataPool.Get().(*MsgpRpcRequestData) + rpcRequestData.Seq = seq + rpcRequestData.ServiceMethod = serviceMethod + rpcRequestData.NoReply = noReply + rpcRequestData.InParam = inParam + + return rpcRequestData//&MsgpRpcRequestData{Seq:seq,ServiceMethod:serviceMethod,NoReply:noReply,InParam:inParam} } func (slf *MsgpProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { - return &MsgpRpcResponseData{ - Seq: seq, - Err: err.Error(), - Reply: reply, - } + rpcRequestData := rpcResponeDataPool.Get().(*MsgpRpcResponseData) + rpcRequestData.Seq = seq + rpcRequestData.Err = err.Error() + rpcRequestData.Reply = reply + + return rpcRequestData +} + +func (slf *MsgpProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ + rpcRequestDataPool.Put(rpcRequestData) +} + +func (slf *MsgpProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ + rpcResponeDataPool.Put(rpcRequestData) } func (slf *MsgpRpcRequestData) IsReply() bool{ diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index 561a784..2f475f6 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -22,6 +22,16 @@ func (slf *PBRpcResponseData) MakeRespone(seq uint64,err *RpcError,reply []byte) return slf } + +func (slf *PBRpcRequestData) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ + +} + +func (slf *PBRpcRequestData) ReleaseRpcRespose(rpcRequestData IRpcRequestData){ + +} + + func (slf *PBProcessor) Marshal(v interface{}) ([]byte, error){ return proto.Marshal(v.(proto.Message)) } diff --git a/rpc/processor.go b/rpc/processor.go index cc84a10..808048d 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -1,43 +1,14 @@ package rpc -/* - Seq uint64 // sequence number chosen by client - ServiceMethod string // format: "Service.Method" - NoReply bool //是否需要返回 - //packbody - InParam []byte - - - - -type RpcResponse struct { - //head - Seq uint64 // sequence number chosen by client - Err *RpcError - - //returns - Reply []byte -} -*/ -type IRpcRequestData interface { - GetSeq() uint64 - GetServiceMethod() string - GetInParam() []byte - IsReply() bool -} - -type IRpcResponseData interface { - GetSeq() uint64 - GetErr() *RpcError - GetReply() []byte -} - type IRpcProcessor interface { - Marshal(v interface{}) ([]byte, error) + Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区,可以填nil,由系统自动分配 Unmarshal(data []byte, v interface{}) error - MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData + + ReleaseRpcRequest(rpcRequestData IRpcRequestData) + ReleaseRpcRespose(rpcRequestData IRpcResponseData) } + diff --git a/rpc/rpc.go b/rpc/rpc.go new file mode 100644 index 0000000..acee113 --- /dev/null +++ b/rpc/rpc.go @@ -0,0 +1,113 @@ +package rpc + +import ( + "reflect" + "sync" +) + +type RpcRequest struct { + RpcRequestData IRpcRequestData + + localReply interface{} + localParam interface{} //本地调用的参数列表 + requestHandle RequestHandler + callback *reflect.Value +} + +type RpcResponse struct { + RpcResponeData IRpcResponseData +} + +type IRpcRequestData interface { + GetSeq() uint64 + GetServiceMethod() string + GetInParam() []byte + IsReply() bool +} + +type IRpcResponseData interface { + GetSeq() uint64 + GetErr() *RpcError + GetReply() []byte +} + +type RequestHandler func(Returns interface{},Err *RpcError) + + +type Call struct { + Seq uint64 + ServiceMethod string + Arg interface{} + Reply interface{} + Respone *RpcResponse + Err error + done chan *Call // Strobes when call is complete. + connid int + callback *reflect.Value + rpcHandler IRpcHandler +} + +func (slf *Call) Clear(){ + slf.Seq = 0 + slf.ServiceMethod = "" + slf.Arg = nil + slf.Reply = nil + slf.Respone = nil + slf.Err = nil + slf.connid = 0 + slf.callback = nil + slf.rpcHandler = nil +} + +func (slf *Call) Done() *Call{ + return <-slf.done +} + +type RpcHandleFinder interface { + FindRpcHandler(serviceMethod string) IRpcHandler +} + + +var rpcResponePool sync.Pool +var rpcRequestPool sync.Pool +var rpcCallPool sync.Pool + +func init(){ + rpcResponePool.New = func()interface{}{ + return &RpcResponse{} + } + + rpcRequestPool.New = func() interface{} { + return &RpcRequest{} + } + + rpcCallPool.New = func() interface{} { + return &Call{done:make(chan *Call,1)} + } +} + +func MakeRpcResponse() *RpcResponse{ + return rpcResponePool.Get().(*RpcResponse) +} + +func MakeRpcRequest() *RpcRequest{ + return rpcRequestPool.Get().(*RpcRequest) +} + +func MakeCall() *Call { + call := rpcCallPool.Get().(*Call) + + return call +} + +func ReleaseRpcResponse(rpcRespone *RpcResponse){ + rpcResponePool.Put(rpcRespone) +} + +func ReleaseRpcRequest(rpcRequest *RpcRequest){ + rpcRequestPool.Put(rpcRequest) +} + +func ReleaseCall(call *Call){ + rpcCallPool.Put(call) +} \ No newline at end of file diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index dac257b..0ffd63b 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -186,7 +186,7 @@ func (slf *RpcHandler) HandlerRpcResponeCB(call *Call){ }else{ call.callback.Call([]reflect.Value{reflect.ValueOf(call.Reply),reflect.ValueOf(call.Err)}) } - + ReleaseCall(call) } @@ -203,6 +203,8 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } } }() + defer processor.ReleaseRpcRequest(request.RpcRequestData) + defer ReleaseRpcRequest(request) v,ok := slf.mapfunctons[request.RpcRequestData.GetServiceMethod()] if ok == false { @@ -309,6 +311,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int } //其他的rpcHandler的处理器 pCall := pLocalRpcServer.rpcHandlerGo(true,sMethod[0],sMethod[1],args,nil) + defer ReleaseCall(pCall) if pCall.Err!=nil { err = pCall.Err } @@ -316,11 +319,11 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int } //跨node调用 - pCall := pClient.Go(true,serviceMethod,args,nil) + pCall := pClient.Go(false,serviceMethod,args,nil) if pCall.Err!=nil { err = pCall.Err } - + ReleaseCall(pCall) } return err @@ -356,8 +359,9 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{}, } //其他的rpcHandler的处理器 pCall := pLocalRpcServer.rpcHandlerGo(false,sMethod[0],sMethod[1],args,reply) - pResult := pCall.Done() - return pResult.Err + err = pCall.Done().Err + ReleaseCall(pCall) + return err } //跨node调用 @@ -365,8 +369,9 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{}, if pCall.Err != nil { return pCall.Err } - pResult := pCall.Done() - return pResult.Err + err = pCall.Done().Err + ReleaseCall(pCall) + return err } func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interface{},callback interface{}) error { @@ -437,6 +442,8 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa return nil } pCall := pLocalRpcServer.rpcHandlerGo(false,sMethod[0],sMethod[1],args,reply) + defer ReleaseCall(pCall) + pResult := pCall.Done() return pResult.Err } diff --git a/rpc/server.go b/rpc/server.go index fe0f435..056ce9e 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -13,46 +13,19 @@ import ( var processor IRpcProcessor = &JsonProcessor{} var LittleEndian bool - - -type Call struct { - Seq uint64 - ServiceMethod string - Arg interface{} - Reply interface{} - Respone *RpcResponse - Err error - done chan *Call // Strobes when call is complete. - connid int - callback *reflect.Value - rpcHandler IRpcHandler -} - -func (slf *Call) Done() *Call{ - return <-slf.done -} - - - type Server struct { functions map[interface{}]interface{} - cmdchannel chan *Call - rpcHandleFinder RpcHandleFinder rpcserver *network.TCPServer } -type RpcHandleFinder interface { - FindRpcHandler(serviceMethod string) IRpcHandler -} - func SetProcessor(proc IRpcProcessor) { processor = proc } func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) { - slf.cmdchannel = make(chan *Call,10000) + slf.cmdchannel = make(chan *Call,100000) slf.rpcHandleFinder = rpcHandleFinder slf.rpcserver = &network.TCPServer{} } @@ -84,13 +57,10 @@ type RpcAgent struct { func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interface{},err *RpcError) { - var rpcRespone RpcResponse - - //rpcRespone.Seq = seq - //rpcRespone.Err = err var mReply []byte var rpcError *RpcError var errM error + if err != nil { rpcError = err } else { @@ -102,13 +72,15 @@ func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interf } } - rpcRespone.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply) - bytes,errM := processor.Marshal(rpcRespone.RpcResponeData) + var rpcResponse RpcResponse + rpcResponse.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply) + bytes,errM := processor.Marshal(rpcResponse.RpcResponeData) if errM != nil { - log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcRespone,errM) + log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcResponse,errM) return } + processor.ReleaseRpcRespose(rpcResponse.RpcResponeData) errM = agent.conn.WriteMsg(bytes) if errM != nil { log.Error("Rpc %s return is error:%+v",serviceMethod,errM) @@ -126,17 +98,21 @@ func (agent *RpcAgent) Run() { } //解析head - var req RpcRequest + req := MakeRpcRequest() req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil) err = processor.Unmarshal(data,req.RpcRequestData) if err != nil { if req.RpcRequestData.GetSeq()>0 { rpcError := RpcError("rpc Unmarshal request is error") agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) continue }else{ log.Error("rpc Unmarshal request is error: %v", err) //will close tcpconn + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) break } } @@ -146,6 +122,8 @@ func (agent *RpcAgent) Run() { if len(serviceMethod)!=2 { rpcError := RpcError("rpc request req.ServiceMethod is error") agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) log.Debug("rpc request req.ServiceMethod is error") continue } @@ -154,6 +132,8 @@ func (agent *RpcAgent) Run() { if rpcHandler== nil { rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) continue } @@ -164,7 +144,13 @@ func (agent *RpcAgent) Run() { } } - rpcHandler.PushRequest(&req) + err = rpcHandler.PushRequest(req) + if err != nil { + rpcError := RpcError(err.Error()) + agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) + } } } @@ -210,8 +196,8 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call { - pCall := &Call{} - pCall.done = make( chan *Call,1) + pCall := MakeCall()//&Call{} + //pCall.done = make( chan *Call,1) rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler== nil { pCall.Err = fmt.Errorf("service method %s.%s not config!", handlerName,methodName) @@ -219,12 +205,10 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin pCall.done <- pCall return pCall } - var req RpcRequest + req := MakeRpcRequest() - //req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName) req.localParam = args req.localReply = reply - //req.NoReply = noReply req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) if noReply == false { req.requestHandle = func(Returns interface{},Err *RpcError){ @@ -238,15 +222,19 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin } } - - rpcHandler.PushRequest(&req) + err := rpcHandler.PushRequest(req) + if err != nil { + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) + pCall.Err = err + pCall.done <- pCall + } return pCall } func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error { - pCall := &Call{} - //pCall.done = make( chan *Call,1) + pCall := MakeCall() pCall.rpcHandler = callerRpcHandler pCall.callback = &callback pCall.Reply = reply @@ -257,11 +245,9 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h return err } - var req RpcRequest - //req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName) + req := MakeRpcRequest() req.localParam = args req.localReply = reply - //req.NoReply = noReply req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) if noReply == false { req.requestHandle = func(Returns interface{},Err *RpcError){ @@ -279,8 +265,11 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h } - err := rpcHandler.PushRequest(&req) + err := rpcHandler.PushRequest(req) if err != nil { + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) + return err }