From c7a1d860394fdccd61c86aa76fc8e3bd411c21ae Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 10 Jul 2020 14:09:52 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BC=98=E5=8C=96pending=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E7=BB=93=E6=9E=84=EF=BC=8C=E6=94=AF=E6=8C=81=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E5=88=A4=E6=96=AD=202.protobuf=E4=B8=8Ejson=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E4=B8=B4=E6=97=B6=E5=86=85=E5=AD=98=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 97 +++++++++++++++++++++++++++++++------------- rpc/jsonprocessor.go | 44 ++++++++++++++++---- rpc/pbprocessor.go | 41 ++++++++++++++----- rpc/server.go | 4 +- 4 files changed, 137 insertions(+), 49 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 755b89a..e421c89 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -1,6 +1,7 @@ package rpc import ( + "container/list" "fmt" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/network" @@ -9,6 +10,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" ) @@ -19,12 +21,14 @@ type Client struct { pendingLock sync.RWMutex startSeq uint64 - pending map[uint64]*Call + pending map[uint64]*list.Element + pendingTimer *list.List } func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent { slf.conn = conn - slf.pending = map[uint64]*Call{} + slf.ResetPending() + return slf } @@ -38,23 +42,69 @@ func (slf *Client) Connect(addr string) error { slf.ConnectInterval = time.Second*2 slf.PendingWriteNum = 10000 slf.AutoReconnect = true - slf.LenMsgLen =2 + slf.LenMsgLen = 2 slf.MinMsgLen = 2 slf.MaxMsgLen = math.MaxUint16 - slf.NewAgent =slf.NewClientAgent + slf.NewAgent = slf.NewClientAgent slf.LittleEndian = LittleEndian - - slf.pendingLock.Lock() - for _,v := range slf.pending { - v.Err = fmt.Errorf("node is disconnect.") - v.done <- v - } - slf.pending = map[uint64]*Call{} - slf.pendingLock.Unlock() + slf.ResetPending() slf.Start() return nil } +func (slf *Client) ResetPending(){ + slf.pendingLock.Lock() + if slf.pending != nil { + for _,v := range slf.pending { + v.Value.(*Call).Err = fmt.Errorf("node is disconnect.") + v.Value.(*Call).done <- v.Value.(*Call) + } + } + + slf.pending = map[uint64]*list.Element{} + slf.pendingTimer = list.New() + slf.pendingLock.Unlock() +} + +func (slf *Client) AddPending(call *Call){ + slf.pendingLock.Lock() + elemTimer := slf.pendingTimer.PushBack(call) + slf.pending[call.Seq] = elemTimer//如果下面发送失败,将会一一直存在这里 + slf.pendingLock.Unlock() +} + +func (slf *Client) RemovePending(seq uint64){ + slf.pendingLock.Lock() + + v,ok := slf.pending[seq] + if ok == false{ + slf.pendingLock.Unlock() + return + } + slf.pendingTimer.Remove(v) + delete(slf.pending,seq) + + slf.pendingLock.Unlock() +} + +func (slf *Client) FindPending(seq uint64) *Call{ + slf.pendingLock.Lock() + v,ok := slf.pending[seq] + if ok == false { + slf.pendingLock.Unlock() + return nil + } + + pCall := v.Value.(*Call) + slf.pendingLock.Unlock() + + return pCall +} + +func (slf *Client) generateSeq() uint64{ + return atomic.AddUint64(&slf.startSeq,1) +} + func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { call := MakeCall() call.Reply = replyParam @@ -69,12 +119,9 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback r request := &RpcRequest{} call.Arg = args - slf.pendingLock.Lock() - slf.startSeq += 1 - call.Seq = slf.startSeq + call.Seq = slf.generateSeq() request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam) - slf.pending[call.Seq] = call//如果下面发送失败,将会一一直存在这里 - slf.pendingLock.Unlock() + slf.AddPending(call) bytes,err := processor.Marshal(request.RpcRequestData) processor.ReleaseRpcRequest(request.RpcRequestData) @@ -106,12 +153,8 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply request := &RpcRequest{} call.Arg = args - slf.pendingLock.Lock() - slf.startSeq += 1 - call.Seq = slf.startSeq + call.Seq = slf.generateSeq() request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam) - //slf.pending[call.Seq] = call - slf.pendingLock.Unlock() bytes,err := processor.Marshal(request.RpcRequestData) processor.ReleaseRpcRequest(request.RpcRequestData) if err != nil { @@ -158,14 +201,12 @@ func (slf *Client) Run(){ continue } - slf.pendingLock.Lock() - v,ok := slf.pending[respone.RpcResponeData.GetSeq()] - if ok == false { + + v := slf.FindPending(respone.RpcResponeData.GetSeq()) + if v == nil { log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponeData.GetSeq()) - slf.pendingLock.Unlock() }else { - delete(slf.pending,respone.RpcResponeData.GetSeq()) - slf.pendingLock.Unlock() + slf.RemovePending(respone.RpcResponeData.GetSeq()) v.Err = nil if len(respone.RpcResponeData.GetReply()) >0 { diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 26c44bf..99cf66a 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -1,6 +1,9 @@ package rpc -import "encoding/json" +import ( + "encoding/json" + "sync" +) type JsonProcessor struct { } @@ -24,6 +27,22 @@ type JsonRpcResponseData struct { } + +var rpcJsonResponeDataPool sync.Pool +var rpcJsonRequestDataPool sync.Pool + + + +func init(){ + rpcJsonResponeDataPool.New = func()interface{}{ + return &JsonRpcResponseData{} + } + + rpcJsonRequestDataPool.New = func()interface{}{ + return &JsonRpcRequestData{} + } +} + func (slf *JsonProcessor) Marshal(v interface{}) ([]byte, error){ return json.Marshal(v) } @@ -33,22 +52,29 @@ func (slf *JsonProcessor) Unmarshal(data []byte, v interface{}) error{ } func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ - return &JsonRpcRequestData{Seq:seq,ServiceMethod:serviceMethod,NoReply:noReply,InParam:inParam} + jsonRpcRequestData := rpcJsonRequestDataPool.Get().(*JsonRpcRequestData) + jsonRpcRequestData.Seq = seq + jsonRpcRequestData.ServiceMethod = serviceMethod + jsonRpcRequestData.NoReply = noReply + jsonRpcRequestData.InParam = inParam + + return jsonRpcRequestData } func (slf *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { - return &JsonRpcResponseData{ - Seq: seq, - Err: err.Error(), - Reply: reply, - } + jsonRpcResponseData := rpcJsonResponeDataPool.Get().(*JsonRpcResponseData) + jsonRpcResponseData.Seq = seq + jsonRpcResponseData.Err = err.Error() + jsonRpcResponseData.Reply = reply + return jsonRpcResponseData } func (slf *JsonProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ - + rpcJsonRequestDataPool.Put(rpcRequestData) } -func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ +func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ + rpcJsonResponeDataPool.Put(rpcRequestData) } func (slf *JsonRpcRequestData) IsReply() bool{ diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index 2f475f6..d397695 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -2,11 +2,26 @@ package rpc import ( "github.com/golang/protobuf/proto" + "sync" ) type PBProcessor struct { } +var rpcPbResponeDataPool sync.Pool +var rpcPbRequestDataPool sync.Pool + + +func init(){ + rpcPbResponeDataPool.New = func()interface{}{ + return &JsonRpcResponseData{} + } + + rpcPbRequestDataPool.New = func()interface{}{ + return &JsonRpcRequestData{} + } +} + func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{ slf.Seq = proto.Uint64(seq) slf.ServiceMethod = proto.String(serviceMethod) @@ -23,15 +38,6 @@ func (slf *PBRpcResponseData) MakeRespone(seq uint64,err *RpcError,reply []byte) return slf } -func (slf *PBRpcRequestData) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ - -} - -func (slf *PBRpcRequestData) ReleaseRpcRespose(rpcRequestData IRpcRequestData){ - -} - - func (slf *PBProcessor) Marshal(v interface{}) ([]byte, error){ return proto.Marshal(v.(proto.Message)) } @@ -43,13 +49,26 @@ func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{ func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ - return (&PBRpcRequestData{}).MakeRequest(seq,serviceMethod,noReply,inParam) + pPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData) + pPbRpcRequestData.MakeRequest(seq,serviceMethod,noReply,inParam) + return pPbRpcRequestData } func (slf *PBProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { - return (&PBRpcResponseData{}).MakeRespone(seq,err,reply) + pPBRpcResponseData := rpcPbResponeDataPool.Get().(*PBRpcResponseData) + pPBRpcResponseData.MakeRespone(seq,err,reply) + return pPBRpcResponseData } +func (slf *PBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ + rpcPbRequestDataPool.Put(rpcRequestData) +} + +func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcRequestData){ + rpcPbResponeDataPool.Put(rpcRequestData) +} + + func (slf *PBRpcRequestData) IsReply() bool{ return slf.GetNoReply() } diff --git a/rpc/server.go b/rpc/server.go index 056ce9e..00cc320 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -75,12 +75,14 @@ func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interf var rpcResponse RpcResponse rpcResponse.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply) bytes,errM := processor.Marshal(rpcResponse.RpcResponeData) + defer processor.ReleaseRpcRespose(rpcResponse.RpcResponeData) + if errM != nil { log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcResponse,errM) return } - processor.ReleaseRpcRespose(rpcResponse.RpcResponeData) + errM = agent.conn.WriteMsg(bytes) if errM != nil { log.Error("Rpc %s return is error:%+v",serviceMethod,errM)