diff --git a/cluster/cluster.go b/cluster/cluster.go index ee35ee2..85e3e06 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -182,24 +182,25 @@ func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client { return c.client } -func GetRpcClient(nodeId int,serviceMethod string,clientList *[]*rpc.Client) error { +func GetRpcClient(nodeId int,serviceMethod string,clientList []*rpc.Client) (error,int) { if nodeId>0 { pClient := GetCluster().GetRpcClient(nodeId) if pClient==nil { - return fmt.Errorf("cannot find nodeid %d!",nodeId) + return fmt.Errorf("cannot find nodeid %d!",nodeId),0 } - *clientList = append(*clientList,pClient) - return nil + clientList[0] = pClient + return nil,1 } - serviceAndMethod := strings.Split(serviceMethod,".") - if len(serviceAndMethod)!=2 { - return fmt.Errorf("servicemethod param %s is error!",serviceMethod) + + findIndex := strings.Index(serviceMethod,".") + if findIndex==-1 { + return fmt.Errorf("servicemethod param %s is error!",serviceMethod),0 } + serviceName := serviceMethod[:findIndex] //1.找到对应的rpcNodeid - GetCluster().GetNodeIdByService(serviceAndMethod[0],clientList) - return nil + return GetCluster().GetNodeIdByService(serviceName,clientList) } func GetRpcServer() *rpc.Server{ diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index 9b0ef7c..316adb1 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -194,10 +194,11 @@ func (cls *Cluster) IsConfigService(serviceName string) bool { return false } -func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList *[]*rpc.Client) { +func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList []*rpc.Client) (error,int) { cls.locker.RLock() defer cls.locker.RUnlock() nodeIdList,ok := cls.mapServiceNode[serviceName] + count := 0 if ok == true { for _,nodeId := range nodeIdList { pClient := GetCluster().GetRpcClient(nodeId) @@ -205,9 +206,15 @@ func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList *[]*rpc. log.Error("Cannot connect node id %d",nodeId) continue } - *rpcClientList = append(*rpcClientList,pClient) + rpcClientList[count] = pClient + count++ + if count>=cap(rpcClientList) { + break + } } } + + return nil,count } func (cls *Cluster) getServiceCfg(serviceName string) interface{}{ diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index dcb45a6..d467a16 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -10,7 +10,8 @@ import ( "unicode/utf8" ) -type FuncRpcClient func(nodeId int,serviceMethod string,client *[]*Client) error +const maxClusterNode int = 128 +type FuncRpcClient func(nodeId int,serviceMethod string,client []*Client) (error,int) type FuncRpcServer func() (*Server) var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) @@ -343,21 +344,21 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string,param interface{},rep } func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error { - var pClientList []*Client - err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) - if err != nil { + var pClientList [maxClusterNode]*Client + err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:]) + if count==0||err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err } - if len(pClientList) > 1 && bCast == false{ + if count > 1 && bCast == false{ log.Error("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!") } //2.rpcclient调用 //如果调用本结点服务 - for _,pClient := range pClientList { - if pClient.bSelfNode == true { + for i:=0;i 1 { + if count > 1 { log.Error("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!") } @@ -465,15 +466,15 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int } reply := reflect.New(fVal.Type().In(0).Elem()).Interface() - var pClientList []*Client - err := handler.funcRpcClient(nodeid,serviceMethod,&pClientList) - if err != nil { + var pClientList [maxClusterNode]*Client + err,count := handler.funcRpcClient(nodeid,serviceMethod,pClientList[:]) + if count==0||err != nil { fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Call serviceMethod is error:%+v!",err) return nil } - if pClientList== nil || len(pClientList) > 1 { + if count > 1 { err := fmt.Errorf("Cannot call more then 1 node!") fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Cannot call more then 1 node!") @@ -565,16 +566,17 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) { handler.goRpc(nil,true,0,serviceMethod,args) } + func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error { processor := GetProcessor(uint8(rpcProcessorType)) - var pClientList []*Client - err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) - if err != nil { + var pClientList [maxClusterNode]*Client + err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:]) + if count==0||err != nil { args.DoGc() log.Error("Call serviceMethod is error:%+v!",err) return err } - if len(pClientList) > 1 { + if count > 1 { args.DoGc() log.Error("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!") @@ -582,8 +584,9 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in //2.rpcclient调用 //如果调用本结点服务 - for _,pClient := range pClientList { - if pClient.bSelfNode == true { + for i:=0;i