From a900f327305816c30abd7183072e2ea5c13fd0fa Mon Sep 17 00:00:00 2001 From: boyce Date: Sat, 20 Apr 2019 11:51:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rpc=20client=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 33 ++++++++++++++++++++++++--------- rpc/client.go | 6 ++++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 420680a..7cfe639 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -19,7 +19,10 @@ type RpcClient struct { nodeid int pclient *rpc.Client serverAddr string - isConnect bool +} + +func (slf *RpcClient) IsConnected() bool { + return (slf.pclient != nil) && (slf.pclient.IsClosed() == false) } type CCluster struct { @@ -131,7 +134,7 @@ func (slf *CCluster) ConnService() error { if node.NodeID == slf.cfg.currentNode.NodeID { continue } - slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr, false} + slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr} } } @@ -139,14 +142,13 @@ func (slf *CCluster) ConnService() error { for _, rpcClient := range slf.nodeclient { //连接状态发送ping - if rpcClient.isConnect == true { + if rpcClient.IsConnected() == true { ping.TimeStamp = 0 err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong) if err != nil { rpcClient.pclient.Close() rpcClient.pclient = nil - rpcClient.isConnect = false continue } @@ -168,10 +170,9 @@ func (slf *CCluster) ConnService() error { v, _ := slf.nodeclient[rpcClient.nodeid] v.pclient = client - v.isConnect = true } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } return nil @@ -305,7 +306,7 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) [] if bOnline { ret := make([]int, 0, len(nodeIDList)) for _, nodeid := range nodeIDList { - if slf.CheckNodeIsOnlineByID(nodeid) { + if slf.CheckNodeIsConnectedByID(nodeid) { ret = append(ret, nodeid) } } @@ -315,8 +316,22 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) [] return slf.cfg.GetIdByService(servicename) } -func (slf *CCluster) CheckNodeIsOnlineByID(nodeid int) bool { - return true +func (slf *CCluster) CheckNodeIsConnectedByID(nodeid int) bool { + pclient := slf.GetRpcClientByNodeId(nodeid) + if pclient == nil { + return false + } + + return pclient.IsConnected() +} + +func (slf *CCluster) GetRpcClientByNodeId(nodeid int) *RpcClient { + pclient, ok := slf.nodeclient[nodeid] + if ok == false { + return nil + } + + return pclient } func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error { diff --git a/rpc/client.go b/rpc/client.go index bda7a33..7825359 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -70,6 +70,12 @@ type ClientCodec interface { Close() error } +func (client *Client) IsClosed() bool { + client.reqMutex.Lock() + defer client.reqMutex.Unlock() + return client.shutdown || client.closing +} + func (client *Client) send(call *Call, queueMode bool) { client.reqMutex.Lock() defer client.reqMutex.Unlock()