diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 5fdc825..c62902c 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -222,42 +222,52 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i return err } -func (slf *RpcHandler) goRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{}) error { +func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,mutiCoroutine bool,args interface{}) error { pClientList,err := slf.funcRpcClient(nodeId,serviceMethod) if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err } - if len(pClientList) > 1 { + if len(pClientList) > 1 && bCast == false{ log.Error("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!") } //2.rpcclient调用 //如果调用本结点服务 - pClient := pClientList[0] - if pClient.blocalhost == true { - pLocalRpcServer:=slf.funcRpcServer() - //判断是否是同一服务 - sMethod := strings.Split(serviceMethod,".") - if len(sMethod)!=2 { - err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) - log.Error("%+v",err) - return err + for _,pClient := range pClientList { + if pClient.blocalhost == true { + pLocalRpcServer:=slf.funcRpcServer() + //判断是否是同一服务 + sMethod := strings.Split(serviceMethod,".") + if len(sMethod)!=2 { + serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) + log.Error("%+v",serr) + if serr!= nil { + err = serr + } + continue + } + //调用自己rpcHandler处理器 + if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用 + // + return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) + } + //其他的rpcHandler的处理器 + pCall := pLocalRpcServer.rpcHandlerGo(true,mutiCoroutine,sMethod[0],sMethod[1],args,nil) + if pCall.Err!=nil { + err = pCall.Err + } } - //调用自己rpcHandler处理器 - if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用 - // - return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) + + //跨node调用 + pCall := pClient.Go(true,mutiCoroutine,serviceMethod,args,nil) + if pCall.Err!=nil { + err = pCall.Err } - //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.rpcHandlerGo(true,mutiCoroutine,sMethod[0],sMethod[1],args,nil) - return pCall.Err } - //跨node调用 - pCall := pClient.Go(true,mutiCoroutine,serviceMethod,args,nil) - return pCall.Err + return err } func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{},reply interface{}) error { @@ -379,11 +389,11 @@ func (slf *RpcHandler) GRCall(serviceMethod string,args interface{},reply interf } func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error { - return slf.goRpc(0,serviceMethod,false,args) + return slf.goRpc(false,0,serviceMethod,false,args) } func (slf *RpcHandler) GRGo(serviceMethod string,args interface{}) error { - return slf.goRpc(0,serviceMethod,true,args) + return slf.goRpc(false,0,serviceMethod,true,args) } @@ -397,6 +407,9 @@ func (slf *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{} } func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error { - return slf.goRpc(nodeId,serviceMethod,false,args) + return slf.goRpc(false,nodeId,serviceMethod,false,args) } +func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) { + slf.goRpc(true,0,serviceMethod,false,args) +} diff --git a/sysservice/tcpservice.go b/sysservice/tcpservice.go index 98c274e..5ae5a0b 100644 --- a/sysservice/tcpservice.go +++ b/sysservice/tcpservice.go @@ -152,4 +152,21 @@ func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{ return err } return client.tcpConn.WriteMsg(bytes) -} \ No newline at end of file +} + +func (slf *TcpService) Close(clientid uint64) { + // + slf.mapClientLocker.Lock() + defer slf.mapClientLocker.Unlock() + + client,ok := slf.mapClient[clientid] + if ok == false{ + return + } + + if client.tcpConn!=nil { + client.tcpConn.Close() + } + + return +}