优化rpc定时器

This commit is contained in:
boyce
2020-07-11 10:27:01 +08:00
parent 93b9c4f89a
commit 688d2d0bb4
6 changed files with 107 additions and 31 deletions

View File

@@ -69,8 +69,8 @@ func (slf *Cluster) Init(currentNodeId int) error{
rpcinfo.nodeinfo = nodeinfo
rpcinfo.client = &rpc.Client{}
if nodeinfo.NodeId == currentNodeId {
//rpcinfo.client.Connect("localhost")
rpcinfo.client.Connect(nodeinfo.ListenAddr)
rpcinfo.client.Connect("")
//rpcinfo.client.Connect(nodeinfo.ListenAddr)
}else{
rpcinfo.client.Connect(nodeinfo.ListenAddr)
}

View File

@@ -8,14 +8,13 @@ import (
"math"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)
type Client struct {
blocalhost bool
bSelfNode bool
network.TCPClient
conn *network.TCPConn
@@ -23,6 +22,8 @@ type Client struct {
startSeq uint64
pending map[uint64]*list.Element
pendingTimer *list.List
callRpcTimerout time.Duration
maxCheckCallRpcCount int
}
func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
@@ -34,10 +35,8 @@ func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
func (slf *Client) Connect(addr string) error {
slf.Addr = addr
if strings.Index(addr,"localhost") == 0 {
slf.blocalhost = true
return nil
}
slf.maxCheckCallRpcCount = 100
slf.callRpcTimerout = 10*time.Second
slf.ConnNum = 1
slf.ConnectInterval = time.Second*2
slf.PendingWriteNum = 10000
@@ -48,10 +47,59 @@ func (slf *Client) Connect(addr string) error {
slf.NewAgent = slf.NewClientAgent
slf.LittleEndian = LittleEndian
slf.ResetPending()
go slf.startCheckRpcCallTimer()
if addr == "" {
slf.bSelfNode = true
return nil
}
slf.Start()
return nil
}
func (slf *Client) startCheckRpcCallTimer(){
tick :=time.NewTicker( 3 * time.Second)
for{
select {
case <- tick.C:
slf.checkRpcCallTimerout()
}
}
tick.Stop()
}
func (slf *Client) makeCallFail(call *Call){
if call.callback!=nil && call.callback.IsValid() {
call.rpcHandler.(*RpcHandler).callResponeCallBack<-call
}else{
call.done <- call
}
slf.removePending(call.Seq)
}
func (slf *Client) checkRpcCallTimerout(){
tnow := time.Now()
for i:=0;i<slf.maxCheckCallRpcCount;i++ {
slf.pendingLock.Lock()
pElem := slf.pendingTimer.Front()
if pElem == nil {
slf.pendingLock.Unlock()
break
}
pCall := pElem.Value.(*Call)
if tnow.Sub(pCall.calltime) > slf.callRpcTimerout {
pCall.Err = fmt.Errorf("RPC call takes more than %d seconds!",slf.callRpcTimerout/time.Second)
slf.makeCallFail(pCall)
slf.pendingLock.Unlock()
continue
}
slf.pendingLock.Unlock()
}
}
func (slf *Client) ResetPending(){
slf.pendingLock.Lock()
if slf.pending != nil {
@@ -68,6 +116,7 @@ func (slf *Client) ResetPending(){
func (slf *Client) AddPending(call *Call){
slf.pendingLock.Lock()
call.calltime = time.Now()
elemTimer := slf.pendingTimer.PushBack(call)
slf.pending[call.Seq] = elemTimer//如果下面发送失败,将会一一直存在这里
slf.pendingLock.Unlock()
@@ -75,18 +124,20 @@ func (slf *Client) AddPending(call *Call){
func (slf *Client) RemovePending(seq uint64){
slf.pendingLock.Lock()
slf.removePending(seq)
slf.pendingLock.Unlock()
}
func (slf *Client) removePending(seq uint64){
v,ok := slf.pending[seq]
if ok == false{
slf.pendingLock.Unlock()
return
}
slf.pendingTimer.Remove(v)
delete(slf.pending,seq)
slf.pendingLock.Unlock()
}
func (slf *Client) FindPending(seq uint64) *Call{
slf.pendingLock.Lock()
v,ok := slf.pending[seq]

View File

@@ -3,6 +3,7 @@ package rpc
import (
"reflect"
"sync"
"time"
)
type RpcRequest struct {
@@ -45,6 +46,7 @@ type Call struct {
connid int
callback *reflect.Value
rpcHandler IRpcHandler
calltime time.Time
}
func (slf *Call) Clear(){

View File

@@ -237,12 +237,13 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var oParam reflect.Value
paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者
oParam = reflect.New(v.oParam.Type().Elem())
paramList = append(paramList,reflect.ValueOf(iparam))
if request.localReply!=nil {
oParam = reflect.ValueOf(request.localReply) //输出参数
}else{
oParam = reflect.New(v.oParam.Type().Elem())
}
paramList = append(paramList,oParam) //输出参数
returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface()
if errInter != nil {
@@ -293,7 +294,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
//2.rpcclient调用
//如果调用本结点服务
for _,pClient := range pClientList {
if pClient.blocalhost == true {
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
@@ -311,11 +312,11 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.rpcHandlerGo(true,sMethod[0],sMethod[1],args,nil)
defer ReleaseCall(pCall)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],args,nil)
if pCall.Err!=nil {
err = pCall.Err
}
ReleaseCall(pCall)
continue
}
@@ -345,7 +346,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
//2.rpcclient调用
//如果调用本结点服务
pClient := pClientList[0]
if pClient.blocalhost == true {
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
@@ -360,8 +361,9 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.rpcHandlerGo(false,sMethod[0],sMethod[1],args,reply)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,reply)
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
return err
}
@@ -416,7 +418,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
//2.rpcclient调用
//如果调用本结点服务
pClient := pClientList[0]
if pClient.blocalhost == true {
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
@@ -439,17 +441,18 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
//其他的rpcHandler的处理器
if callback!=nil {
err = pLocalRpcServer.rpcHandlerAsyncGo(slf,false,sMethod[0],sMethod[1],args,reply,fVal)
err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient,slf,false,sMethod[0],sMethod[1],args,reply,fVal)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
}
return nil
}
pCall := pLocalRpcServer.rpcHandlerGo(false,sMethod[0],sMethod[1],args,reply)
defer ReleaseCall(pCall)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,reply)
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
pResult := pCall.Done()
return pResult.Err
return err
}
//跨node调用

View File

@@ -201,9 +201,10 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args
}
func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call {
pCall := MakeCall()//&Call{}
//pCall.done = make( chan *Call,1)
func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil {
pCall.Err = fmt.Errorf("service method %s.%s not config!", handlerName,methodName)
@@ -217,7 +218,15 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin
req.localReply = reply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil)
if noReply == false {
client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err *RpcError){
v := client.FindPending(pCall.Seq)
if v == nil {
log.Error("rpcClient cannot find seq %d in pending",pCall.Seq)
ReleaseCall(pCall)
return
}
if Err!=nil {
pCall.Err = Err
}else{
@@ -239,8 +248,9 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin
return pCall
}
func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error {
func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
@@ -256,7 +266,18 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h
req.localReply = reply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil)
if noReply == false {
client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err *RpcError){
//processor.ReleaseRpcRequest(req.RpcRequestData)
//ReleaseRpcRequest(req)
v := client.FindPending(pCall.Seq)
if v == nil {
log.Error("rpcClient cannot find seq %d in pending",pCall.Seq)
ReleaseCall(pCall)
return
}
if Err == nil {
pCall.Err = nil
}else{

View File

@@ -37,7 +37,6 @@ type Service struct {
Module
rpc.RpcHandler //rpc
name string //service name
closeSig chan bool
wg sync.WaitGroup
serviceCfg interface{}
gorouterNum int32