优化rpc-减少gc

This commit is contained in:
boyce
2020-11-04 15:06:39 +08:00
parent 2fc1217b18
commit 4d088d66da
4 changed files with 51 additions and 37 deletions

View File

@@ -182,24 +182,25 @@ func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
return c.client return c.client
} }
func GetRpcClient(nodeId int,serviceMethod string,clientList *[]*rpc.Client) error { func GetRpcClient(nodeId int,serviceMethod string,clientList []*rpc.Client) (error,int) {
if nodeId>0 { if nodeId>0 {
pClient := GetCluster().GetRpcClient(nodeId) pClient := GetCluster().GetRpcClient(nodeId)
if pClient==nil { if pClient==nil {
return fmt.Errorf("cannot find nodeid %d!",nodeId) return fmt.Errorf("cannot find nodeid %d!",nodeId),0
} }
*clientList = append(*clientList,pClient) clientList[0] = pClient
return nil return nil,1
} }
serviceAndMethod := strings.Split(serviceMethod,".")
if len(serviceAndMethod)!=2 { findIndex := strings.Index(serviceMethod,".")
return fmt.Errorf("servicemethod param %s is error!",serviceMethod) if findIndex==-1 {
return fmt.Errorf("servicemethod param %s is error!",serviceMethod),0
} }
serviceName := serviceMethod[:findIndex]
//1.找到对应的rpcNodeid //1.找到对应的rpcNodeid
GetCluster().GetNodeIdByService(serviceAndMethod[0],clientList) return GetCluster().GetNodeIdByService(serviceName,clientList)
return nil
} }
func GetRpcServer() *rpc.Server{ func GetRpcServer() *rpc.Server{

View File

@@ -194,10 +194,11 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
return false return false
} }
func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList *[]*rpc.Client) { func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList []*rpc.Client) (error,int) {
cls.locker.RLock() cls.locker.RLock()
defer cls.locker.RUnlock() defer cls.locker.RUnlock()
nodeIdList,ok := cls.mapServiceNode[serviceName] nodeIdList,ok := cls.mapServiceNode[serviceName]
count := 0
if ok == true { if ok == true {
for _,nodeId := range nodeIdList { for _,nodeId := range nodeIdList {
pClient := GetCluster().GetRpcClient(nodeId) pClient := GetCluster().GetRpcClient(nodeId)
@@ -205,9 +206,15 @@ func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList *[]*rpc.
log.Error("Cannot connect node id %d",nodeId) log.Error("Cannot connect node id %d",nodeId)
continue continue
} }
*rpcClientList = append(*rpcClientList,pClient) rpcClientList[count] = pClient
count++
if count>=cap(rpcClientList) {
break
}
} }
} }
return nil,count
} }
func (cls *Cluster) getServiceCfg(serviceName string) interface{}{ func (cls *Cluster) getServiceCfg(serviceName string) interface{}{

View File

@@ -10,7 +10,8 @@ import (
"unicode/utf8" "unicode/utf8"
) )
type FuncRpcClient func(nodeId int,serviceMethod string,client *[]*Client) error const maxClusterNode int = 128
type FuncRpcClient func(nodeId int,serviceMethod string,client []*Client) (error,int)
type FuncRpcServer func() (*Server) type FuncRpcServer func() (*Server)
var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
@@ -343,21 +344,21 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string,param interface{},rep
} }
func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error { func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error {
var pClientList []*Client var pClientList [maxClusterNode]*Client
err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:])
if err != nil { if count==0||err != nil {
log.Error("Call serviceMethod is error:%+v!",err) log.Error("Call serviceMethod is error:%+v!",err)
return err return err
} }
if len(pClientList) > 1 && bCast == false{ if count > 1 && bCast == false{
log.Error("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!")
} }
//2.rpcclient调用 //2.rpcclient调用
//如果调用本结点服务 //如果调用本结点服务
for _,pClient := range pClientList { for i:=0;i<count;i++{
if pClient.bSelfNode == true { if pClientList[i].bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer() pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
findIndex := strings.Index(serviceMethod,".") findIndex := strings.Index(serviceMethod,".")
@@ -376,7 +377,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
return pLocalRpcServer.myselfRpcHandlerGo(serviceName,method,args,nil) return pLocalRpcServer.myselfRpcHandlerGo(serviceName,method,args,nil)
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,serviceName,method,args,nil,nil) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,method,args,nil,nil)
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
@@ -385,7 +386,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
} }
//跨node调用 //跨node调用
pCall := pClient.Go(true,serviceMethod,args,nil) pCall := pClientList[i].Go(true,serviceMethod,args,nil)
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
@@ -396,13 +397,13 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
} }
func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error { func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
var pClientList []*Client var pClientList [maxClusterNode]*Client
err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:])
if err != nil { if count==0||err != nil {
log.Error("Call serviceMethod is error:%+v!",err) log.Error("Call serviceMethod is error:%+v!",err)
return err return err
} }
if len(pClientList) > 1 { if count > 1 {
log.Error("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!")
} }
@@ -465,15 +466,15 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
} }
reply := reflect.New(fVal.Type().In(0).Elem()).Interface() reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList []*Client var pClientList [maxClusterNode]*Client
err := handler.funcRpcClient(nodeid,serviceMethod,&pClientList) err,count := handler.funcRpcClient(nodeid,serviceMethod,pClientList[:])
if err != nil { if count==0||err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("Call serviceMethod is error:%+v!",err) log.Error("Call serviceMethod is error:%+v!",err)
return nil return nil
} }
if pClientList== nil || len(pClientList) > 1 { if count > 1 {
err := fmt.Errorf("Cannot call more then 1 node!") err := fmt.Errorf("Cannot call more then 1 node!")
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!")
@@ -565,16 +566,17 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) {
handler.goRpc(nil,true,0,serviceMethod,args) handler.goRpc(nil,true,0,serviceMethod,args)
} }
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error { func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error {
processor := GetProcessor(uint8(rpcProcessorType)) processor := GetProcessor(uint8(rpcProcessorType))
var pClientList []*Client var pClientList [maxClusterNode]*Client
err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:])
if err != nil { if count==0||err != nil {
args.DoGc() args.DoGc()
log.Error("Call serviceMethod is error:%+v!",err) log.Error("Call serviceMethod is error:%+v!",err)
return err return err
} }
if len(pClientList) > 1 { if count > 1 {
args.DoGc() args.DoGc()
log.Error("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!")
@@ -582,8 +584,9 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
//2.rpcclient调用 //2.rpcclient调用
//如果调用本结点服务 //如果调用本结点服务
for _,pClient := range pClientList { for i:=0;i<count;i++{
if pClient.bSelfNode == true { //for _,pClient := range pClientList {
if pClientList[i].bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer() pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
findIndex := strings.Index(serviceMethod,".") findIndex := strings.Index(serviceMethod,".")
@@ -605,7 +608,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,serviceName,method,nil,nil,args) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,method,nil,nil,args)
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
@@ -614,7 +617,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
} }
//跨node调用 //跨node调用
pCall := pClient.RawGo(processor,true,serviceMethod,args.GetRawData(),args.GetAdditionParam(),nil) pCall := pClientList[i].RawGo(processor,true,serviceMethod,args.GetRawData(),args.GetAdditionParam(),nil)
args.DoGc() args.DoGc()
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err

View File

@@ -30,7 +30,6 @@ type TcpGateService struct {
func (gateService *TcpGateService) OnInit() error { func (gateService *TcpGateService) OnInit() error {
gateService.OnLoad() gateService.OnLoad()
//注册监听客户连接断开事件 //注册监听客户连接断开事件
gateService.processor.SetDisConnectedHandler(gateService.router.OnDisconnected) gateService.processor.SetDisConnectedHandler(gateService.router.OnDisconnected)
//注册监听客户连接事件 //注册监听客户连接事件
@@ -44,6 +43,10 @@ func (gateService *TcpGateService) OnInit() error {
return nil return nil
} }
func (gateService *TcpGateService) SetEventChannel(channelNum int){
gateService.GetEventProcessor().SetEventChannel(channelNum)
}
func (gateService *TcpGateService) OnLoad() { func (gateService *TcpGateService) OnLoad() {
//设置默认LoadBalance //设置默认LoadBalance
if gateService.loadBalance == nil { if gateService.loadBalance == nil {