diff --git a/cluster/cluster.go b/cluster/cluster.go index d4ec358..64d4443 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -106,14 +106,24 @@ func (slf *Cluster) GetRpcClient(nodeid int) *rpc.Client { return c.client } -func GetRpcClient(serviceMethod string) ([]*rpc.Client,error) { +func GetRpcClient(nodeId int,serviceMethod string) ([]*rpc.Client,error) { + var rpcClientList []*rpc.Client + if nodeId>0 { + pClient := GetCluster().GetRpcClient(nodeId) + if pClient==nil { + return rpcClientList,fmt.Errorf("cannot find nodeid %d!",nodeId) + } + rpcClientList = append(rpcClientList,pClient) + return rpcClientList,nil + } + serviceAndMethod := strings.Split(serviceMethod,".") if len(serviceAndMethod)!=2 { return nil,fmt.Errorf("servicemethod param %s is error!",serviceMethod) } //1.找到对应的rpcnodeid - var rpcClientList []*rpc.Client + nodeidList := GetCluster().GetNodeIdByService(serviceAndMethod[0]) if len(nodeidList) ==0 { return rpcClientList,fmt.Errorf("Cannot Find %s nodeid",serviceMethod) diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index a6b6098..625e2d9 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -9,7 +9,7 @@ import ( "unicode/utf8" ) -type FuncRpcClient func(serviceMethod string) ([]*Client,error) +type FuncRpcClient func(nodeid int,serviceMethod string) ([]*Client,error) type FuncRpcServer func() (*Server) var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) @@ -212,8 +212,8 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i return err } -func (slf *RpcHandler) goRpc(serviceMethod string,mutiCoroutine bool,args interface{}) error { - pClientList,err := slf.funcRpcClient(serviceMethod) +func (slf *RpcHandler) goRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{}) error { + pClientList,err := slf.funcRpcClient(nodeId,serviceMethod) if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err @@ -250,8 +250,8 @@ func (slf *RpcHandler) goRpc(serviceMethod string,mutiCoroutine bool,args interf return pCall.Err } -func (slf *RpcHandler) callRpc(serviceMethod string,mutiCoroutine bool,args interface{},reply interface{}) error { - pClientList,err := slf.funcRpcClient(serviceMethod) +func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{},reply interface{}) error { + pClientList,err := slf.funcRpcClient(nodeId,serviceMethod) if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err @@ -290,14 +290,14 @@ func (slf *RpcHandler) callRpc(serviceMethod string,mutiCoroutine bool,args inte return pResult.Err } -func (slf *RpcHandler) asyncCallRpc(serviceMethod string,mutiCoroutine bool,args interface{},callback interface{}) error { +func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutine bool,args interface{},callback interface{}) error { fVal := reflect.ValueOf(callback) if fVal.Kind()!=reflect.Func{ return fmt.Errorf("input function is error!") } reply := reflect.New(fVal.Type().In(0).Elem()).Interface() - pClientList,err := slf.funcRpcClient(serviceMethod) + pClientList,err := slf.funcRpcClient(nodeid,serviceMethod) if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err @@ -353,25 +353,40 @@ func (slf *RpcHandler) GetName() string{ //func (slf *RpcHandler) goRpc(serviceMethod string,mutiCoroutine bool,args ...interface{}) error { //(reply *int,err error) {} func (slf *RpcHandler) AsyncCall(serviceMethod string,args interface{},callback interface{}) error { - return slf.asyncCallRpc(serviceMethod,false,args,callback) + return slf.asyncCallRpc(0,serviceMethod,false,args,callback) } func (slf *RpcHandler) GRAsyncCall(serviceMethod string,args interface{},callback interface{}) error { - return slf.asyncCallRpc(serviceMethod,true,args,callback) + return slf.asyncCallRpc(0,serviceMethod,true,args,callback) } func (slf *RpcHandler) Call(serviceMethod string,args interface{},reply interface{}) error { - return slf.callRpc(serviceMethod,false,args,reply) + return slf.callRpc(0,serviceMethod,false,args,reply) } func (slf *RpcHandler) GRCall(serviceMethod string,args interface{},reply interface{}) error { - return slf.callRpc(serviceMethod,true,args,reply) + return slf.callRpc(0,serviceMethod,true,args,reply) } func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error { - return slf.goRpc(serviceMethod,false,args) + return slf.goRpc(0,serviceMethod,false,args) } func (slf *RpcHandler) GRGo(serviceMethod string,args interface{}) error { - return slf.goRpc(serviceMethod,true,args) -} \ No newline at end of file + return slf.goRpc(0,serviceMethod,true,args) +} + + +func (slf *RpcHandler) AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error { + return slf.asyncCallRpc(nodeId,serviceMethod,false,args,callback) +} + + +func (slf *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error { + return slf.callRpc(nodeId,serviceMethod,false,args,reply) +} + +func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error { + return slf.goRpc(nodeId,serviceMethod,false,args) +} +