From c98de9b1e9113ef3695751d2b8d01b7e18a113f7 Mon Sep 17 00:00:00 2001 From: boyce Date: Mon, 2 Nov 2020 14:50:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rpc=E8=B6=85=E6=97=B6-?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E6=97=B6=E9=97=B4=E8=BD=AE=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 37 ++++++++++++++++++++----------------- rpc/jsonprocessor.go | 7 ++++--- rpc/pbprocessor.go | 9 +++------ rpc/processor.go | 2 +- rpc/rpc.go | 14 +------------- rpc/server.go | 2 +- util/timewheel/timewheel.go | 4 ++++ 7 files changed, 34 insertions(+), 41 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index f0bf474..bb825b6 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/util/timewheel" "math" "reflect" "runtime" @@ -35,7 +36,7 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent { func (client *Client) Connect(addr string) error { client.Addr = addr - client.maxCheckCallRpcCount = 100 + client.maxCheckCallRpcCount = 1000 client.callRpcTimeout = 15*time.Second client.ConnNum = 1 client.ConnectInterval = time.Second*2 @@ -58,16 +59,18 @@ func (client *Client) Connect(addr string) error { } func (client *Client) startCheckRpcCallTimer(){ - tick :=time.NewTicker( 3 * time.Second) - + timer:=timewheel.NewTimer(3*time.Second) for{ select { - case <- tick.C: + case <- timer.C: + timewheel.ReleaseTimer(timer) + timer=timewheel.NewTimer(3*time.Second) client.checkRpcCallTimeout() } } - tick.Stop() + timer.Close() + timewheel.ReleaseTimer(timer) } func (client *Client) makeCallFail(call *Call){ @@ -109,7 +112,7 @@ func (client *Client) ResetPending(){ } } - client.pending = map[uint64]*list.Element{} + client.pending = make(map[uint64]*list.Element,4096) client.pendingTimer = list.New() client.pendingLock.Unlock() } @@ -271,32 +274,32 @@ func (client *Client) Run(){ } //1.解析head - respone := &RpcResponse{} - respone.RpcResponseData =processor.MakeRpcResponse(0,nil,nil) + response := RpcResponse{} + response.RpcResponseData =processor.MakeRpcResponse(0,nil,nil) - err = processor.Unmarshal(bytes[1:],respone.RpcResponseData) + err = processor.Unmarshal(bytes[1:], response.RpcResponseData) client.conn.ReleaseReadMsg(bytes) if err != nil { - processor.ReleaseRpcRespose(respone.RpcResponseData) + processor.ReleaseRpcResponse(response.RpcResponseData) log.Error("rpcClient Unmarshal head error,error:%+v",err) continue } - v := client.RemovePending(respone.RpcResponseData.GetSeq()) + v := client.RemovePending(response.RpcResponseData.GetSeq()) if v == nil { - log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponseData.GetSeq()) + log.Error("rpcClient cannot find seq %d in pending", response.RpcResponseData.GetSeq()) }else { v.Err = nil - if len(respone.RpcResponseData.GetReply()) >0 { - err = processor.Unmarshal(respone.RpcResponseData.GetReply(),v.Reply) + if len(response.RpcResponseData.GetReply()) >0 { + err = processor.Unmarshal(response.RpcResponseData.GetReply(),v.Reply) if err != nil { log.Error("rpcClient Unmarshal body error,error:%+v",err) v.Err = err } } - if respone.RpcResponseData.GetErr() != nil { - v.Err= respone.RpcResponseData.GetErr() + if response.RpcResponseData.GetErr() != nil { + v.Err= response.RpcResponseData.GetErr() } if v.callback!=nil && v.callback.IsValid() { @@ -306,7 +309,7 @@ func (client *Client) Run(){ } } - processor.ReleaseRpcRespose(respone.RpcResponseData) + processor.ReleaseRpcResponse(response.RpcResponseData) } } diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 4e1fe0d..2df6190 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -43,7 +43,7 @@ func init(){ } func (jsonProcessor *JsonProcessor) Marshal(v interface{}) ([]byte, error){ - return jsonProcessor.Marshal(v) + return json.Marshal(v) } func (jsonProcessor *JsonProcessor) Unmarshal(data []byte, v interface{}) error{ @@ -65,6 +65,7 @@ func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,rep jsonRpcResponseData.Seq = seq jsonRpcResponseData.Err = err.Error() jsonRpcResponseData.Reply = reply + return jsonRpcResponseData } @@ -72,8 +73,8 @@ func (jsonProcessor *JsonProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequest rpcJsonRequestDataPool.Put(rpcRequestData) } -func (jsonProcessor *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ - rpcJsonResponseDataPool.Put(rpcRequestData) +func (jsonProcessor *JsonProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){ + rpcJsonResponseDataPool.Put(rpcResponseData) } func (jsonProcessor *JsonProcessor) IsParse(param interface{}) bool { diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index 56fb5ad..d562808 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -1,8 +1,8 @@ package rpc import ( - "github.com/golang/protobuf/proto" "fmt" + "github.com/golang/protobuf/proto" "sync" ) @@ -107,7 +107,6 @@ func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{ return proto.Unmarshal(data, protoMsg) } - func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,inAdditionParam interface{}) IRpcRequestData{ pPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData) pPbRpcRequestData.MakeRequest(seq,serviceMethod,noReply,inParam,inAdditionParam) @@ -124,8 +123,8 @@ func (slf *PBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ rpcPbRequestDataPool.Put(rpcRequestData) } -func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ - rpcPbResponseDataPool.Put(rpcRequestData) +func (slf *PBProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){ + rpcPbResponseDataPool.Put(rpcResponseData) } func (slf *PBProcessor) IsParse(param interface{}) bool { @@ -133,12 +132,10 @@ func (slf *PBProcessor) IsParse(param interface{}) bool { return ok } - func (slf *PBProcessor) GetProcessorType() RpcProcessorType{ return RpcProcessorPb } - func (slf *PBRpcRequestData) IsNoReply() bool{ return slf.GetNoReply() } diff --git a/rpc/processor.go b/rpc/processor.go index 42c3570..564010c 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -7,7 +7,7 @@ type IRpcProcessor interface { MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData ReleaseRpcRequest(rpcRequestData IRpcRequestData) - ReleaseRpcRespose(rpcRequestData IRpcResponseData) + ReleaseRpcResponse(rpcRequestData IRpcResponseData) IsParse(param interface{}) bool //是否可解析 GetProcessorType() RpcProcessorType } diff --git a/rpc/rpc.go b/rpc/rpc.go index 3784072..238de0e 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -22,7 +22,7 @@ type RpcResponse struct { RpcResponseData IRpcResponseData } -var rpcResponsePool sync.Pool +//var rpcResponsePool sync.Pool var rpcRequestPool sync.Pool var rpcCallPool sync.Pool @@ -67,10 +67,6 @@ type Call struct { } func init(){ - rpcResponsePool.New = func()interface{}{ - return &RpcResponse{} - } - rpcRequestPool.New = func() interface{} { return &RpcRequest{} } @@ -111,10 +107,6 @@ func (call *Call) Done() *Call{ return <-call.done } -func MakeRpcResponse() *RpcResponse{ - return rpcResponsePool.Get().(*RpcResponse).Clear() -} - func MakeRpcRequest() *RpcRequest{ return rpcRequestPool.Get().(*RpcRequest).Clear() } @@ -123,10 +115,6 @@ func MakeCall() *Call { return rpcCallPool.Get().(*Call).Clear() } -func ReleaseRpcResponse(rpcResponse *RpcResponse){ - rpcResponsePool.Put(rpcResponse) -} - func ReleaseRpcRequest(rpcRequest *RpcRequest){ rpcRequestPool.Put(rpcRequest) } diff --git a/rpc/server.go b/rpc/server.go index 0ac4cb9..1745b2c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -100,7 +100,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod strin var rpcResponse RpcResponse rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq,rpcError,mReply) bytes,errM := processor.Marshal(rpcResponse.RpcResponseData) - defer processor.ReleaseRpcRespose(rpcResponse.RpcResponseData) + defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) if errM != nil { log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcResponse,errM) diff --git a/util/timewheel/timewheel.go b/util/timewheel/timewheel.go index e0370a1..b2e9402 100644 --- a/util/timewheel/timewheel.go +++ b/util/timewheel/timewheel.go @@ -109,6 +109,10 @@ type Timer struct { //停止停时器 func (timer *Timer) Close(){ timer.bClose = true + if timer.bClose == true { + return + } + //将关闭标志设为1关闭状态 if atomic.SwapInt32(&timer.end,1) == 0 { chanStopTimer<-timer