diff --git a/cluster/cluster.go b/cluster/cluster.go index 420680a..7cfe639 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -19,7 +19,10 @@ type RpcClient struct { nodeid int pclient *rpc.Client serverAddr string - isConnect bool +} + +func (slf *RpcClient) IsConnected() bool { + return (slf.pclient != nil) && (slf.pclient.IsClosed() == false) } type CCluster struct { @@ -131,7 +134,7 @@ func (slf *CCluster) ConnService() error { if node.NodeID == slf.cfg.currentNode.NodeID { continue } - slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr, false} + slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr} } } @@ -139,14 +142,13 @@ func (slf *CCluster) ConnService() error { for _, rpcClient := range slf.nodeclient { //连接状态发送ping - if rpcClient.isConnect == true { + if rpcClient.IsConnected() == true { ping.TimeStamp = 0 err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong) if err != nil { rpcClient.pclient.Close() rpcClient.pclient = nil - rpcClient.isConnect = false continue } @@ -168,10 +170,9 @@ func (slf *CCluster) ConnService() error { v, _ := slf.nodeclient[rpcClient.nodeid] v.pclient = client - v.isConnect = true } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } return nil @@ -305,7 +306,7 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) [] if bOnline { ret := make([]int, 0, len(nodeIDList)) for _, nodeid := range nodeIDList { - if slf.CheckNodeIsOnlineByID(nodeid) { + if slf.CheckNodeIsConnectedByID(nodeid) { ret = append(ret, nodeid) } } @@ -315,8 +316,22 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) [] return slf.cfg.GetIdByService(servicename) } -func (slf *CCluster) CheckNodeIsOnlineByID(nodeid int) bool { - return true +func (slf *CCluster) CheckNodeIsConnectedByID(nodeid int) bool { + pclient := slf.GetRpcClientByNodeId(nodeid) + if pclient == nil { + return false + } + + return pclient.IsConnected() +} + +func (slf *CCluster) GetRpcClientByNodeId(nodeid int) *RpcClient { + pclient, ok := slf.nodeclient[nodeid] + if ok == false { + return nil + } + + return pclient } func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error { diff --git a/rpc/client.go b/rpc/client.go index bda7a33..7825359 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -70,6 +70,12 @@ type ClientCodec interface { Close() error } +func (client *Client) IsClosed() bool { + client.reqMutex.Lock() + defer client.reqMutex.Unlock() + return client.shutdown || client.closing +} + func (client *Client) send(call *Call, queueMode bool) { client.reqMutex.Lock() defer client.reqMutex.Unlock()