优化rpc超时-使用时间轮定时器

This commit is contained in:
boyce
2020-11-02 14:50:54 +08:00
parent 59c9d20071
commit c98de9b1e9
7 changed files with 34 additions and 41 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/util/timewheel"
"math"
"reflect"
"runtime"
@@ -35,7 +36,7 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
func (client *Client) Connect(addr string) error {
client.Addr = addr
client.maxCheckCallRpcCount = 100
client.maxCheckCallRpcCount = 1000
client.callRpcTimeout = 15*time.Second
client.ConnNum = 1
client.ConnectInterval = time.Second*2
@@ -58,16 +59,18 @@ func (client *Client) Connect(addr string) error {
}
func (client *Client) startCheckRpcCallTimer(){
tick :=time.NewTicker( 3 * time.Second)
timer:=timewheel.NewTimer(3*time.Second)
for{
select {
case <- tick.C:
case <- timer.C:
timewheel.ReleaseTimer(timer)
timer=timewheel.NewTimer(3*time.Second)
client.checkRpcCallTimeout()
}
}
tick.Stop()
timer.Close()
timewheel.ReleaseTimer(timer)
}
func (client *Client) makeCallFail(call *Call){
@@ -109,7 +112,7 @@ func (client *Client) ResetPending(){
}
}
client.pending = map[uint64]*list.Element{}
client.pending = make(map[uint64]*list.Element,4096)
client.pendingTimer = list.New()
client.pendingLock.Unlock()
}
@@ -271,32 +274,32 @@ func (client *Client) Run(){
}
//1.解析head
respone := &RpcResponse{}
respone.RpcResponseData =processor.MakeRpcResponse(0,nil,nil)
response := RpcResponse{}
response.RpcResponseData =processor.MakeRpcResponse(0,nil,nil)
err = processor.Unmarshal(bytes[1:],respone.RpcResponseData)
err = processor.Unmarshal(bytes[1:], response.RpcResponseData)
client.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcRespose(respone.RpcResponseData)
processor.ReleaseRpcResponse(response.RpcResponseData)
log.Error("rpcClient Unmarshal head error,error:%+v",err)
continue
}
v := client.RemovePending(respone.RpcResponseData.GetSeq())
v := client.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponseData.GetSeq())
log.Error("rpcClient cannot find seq %d in pending", response.RpcResponseData.GetSeq())
}else {
v.Err = nil
if len(respone.RpcResponseData.GetReply()) >0 {
err = processor.Unmarshal(respone.RpcResponseData.GetReply(),v.Reply)
if len(response.RpcResponseData.GetReply()) >0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(),v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body error,error:%+v",err)
v.Err = err
}
}
if respone.RpcResponseData.GetErr() != nil {
v.Err= respone.RpcResponseData.GetErr()
if response.RpcResponseData.GetErr() != nil {
v.Err= response.RpcResponseData.GetErr()
}
if v.callback!=nil && v.callback.IsValid() {
@@ -306,7 +309,7 @@ func (client *Client) Run(){
}
}
processor.ReleaseRpcRespose(respone.RpcResponseData)
processor.ReleaseRpcResponse(response.RpcResponseData)
}
}

View File

@@ -43,7 +43,7 @@ func init(){
}
func (jsonProcessor *JsonProcessor) Marshal(v interface{}) ([]byte, error){
return jsonProcessor.Marshal(v)
return json.Marshal(v)
}
func (jsonProcessor *JsonProcessor) Unmarshal(data []byte, v interface{}) error{
@@ -65,6 +65,7 @@ func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,rep
jsonRpcResponseData.Seq = seq
jsonRpcResponseData.Err = err.Error()
jsonRpcResponseData.Reply = reply
return jsonRpcResponseData
}
@@ -72,8 +73,8 @@ func (jsonProcessor *JsonProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequest
rpcJsonRequestDataPool.Put(rpcRequestData)
}
func (jsonProcessor *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcJsonResponseDataPool.Put(rpcRequestData)
func (jsonProcessor *JsonProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){
rpcJsonResponseDataPool.Put(rpcResponseData)
}
func (jsonProcessor *JsonProcessor) IsParse(param interface{}) bool {

View File

@@ -1,8 +1,8 @@
package rpc
import (
"github.com/golang/protobuf/proto"
"fmt"
"github.com/golang/protobuf/proto"
"sync"
)
@@ -107,7 +107,6 @@ func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{
return proto.Unmarshal(data, protoMsg)
}
func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,inAdditionParam interface{}) IRpcRequestData{
pPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData)
pPbRpcRequestData.MakeRequest(seq,serviceMethod,noReply,inParam,inAdditionParam)
@@ -124,8 +123,8 @@ func (slf *PBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
rpcPbRequestDataPool.Put(rpcRequestData)
}
func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcPbResponseDataPool.Put(rpcRequestData)
func (slf *PBProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){
rpcPbResponseDataPool.Put(rpcResponseData)
}
func (slf *PBProcessor) IsParse(param interface{}) bool {
@@ -133,12 +132,10 @@ func (slf *PBProcessor) IsParse(param interface{}) bool {
return ok
}
func (slf *PBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorPb
}
func (slf *PBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
}

View File

@@ -7,7 +7,7 @@ type IRpcProcessor interface {
MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData
ReleaseRpcRequest(rpcRequestData IRpcRequestData)
ReleaseRpcRespose(rpcRequestData IRpcResponseData)
ReleaseRpcResponse(rpcRequestData IRpcResponseData)
IsParse(param interface{}) bool //是否可解析
GetProcessorType() RpcProcessorType
}

View File

@@ -22,7 +22,7 @@ type RpcResponse struct {
RpcResponseData IRpcResponseData
}
var rpcResponsePool sync.Pool
//var rpcResponsePool sync.Pool
var rpcRequestPool sync.Pool
var rpcCallPool sync.Pool
@@ -67,10 +67,6 @@ type Call struct {
}
func init(){
rpcResponsePool.New = func()interface{}{
return &RpcResponse{}
}
rpcRequestPool.New = func() interface{} {
return &RpcRequest{}
}
@@ -111,10 +107,6 @@ func (call *Call) Done() *Call{
return <-call.done
}
func MakeRpcResponse() *RpcResponse{
return rpcResponsePool.Get().(*RpcResponse).Clear()
}
func MakeRpcRequest() *RpcRequest{
return rpcRequestPool.Get().(*RpcRequest).Clear()
}
@@ -123,10 +115,6 @@ func MakeCall() *Call {
return rpcCallPool.Get().(*Call).Clear()
}
func ReleaseRpcResponse(rpcResponse *RpcResponse){
rpcResponsePool.Put(rpcResponse)
}
func ReleaseRpcRequest(rpcRequest *RpcRequest){
rpcRequestPool.Put(rpcRequest)
}

View File

@@ -100,7 +100,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod strin
var rpcResponse RpcResponse
rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq,rpcError,mReply)
bytes,errM := processor.Marshal(rpcResponse.RpcResponseData)
defer processor.ReleaseRpcRespose(rpcResponse.RpcResponseData)
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil {
log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcResponse,errM)

View File

@@ -109,6 +109,10 @@ type Timer struct {
//停止停时器
func (timer *Timer) Close(){
timer.bClose = true
if timer.bClose == true {
return
}
//将关闭标志设为1关闭状态
if atomic.SwapInt32(&timer.end,1) == 0 {
chanStopTimer<-timer