增加rpc client状态判断

This commit is contained in:
boyce
2019-04-20 11:51:02 +08:00
parent 49545403b1
commit a900f32730
2 changed files with 30 additions and 9 deletions

View File

@@ -19,7 +19,10 @@ type RpcClient struct {
nodeid int nodeid int
pclient *rpc.Client pclient *rpc.Client
serverAddr string serverAddr string
isConnect bool }
func (slf *RpcClient) IsConnected() bool {
return (slf.pclient != nil) && (slf.pclient.IsClosed() == false)
} }
type CCluster struct { type CCluster struct {
@@ -131,7 +134,7 @@ func (slf *CCluster) ConnService() error {
if node.NodeID == slf.cfg.currentNode.NodeID { if node.NodeID == slf.cfg.currentNode.NodeID {
continue continue
} }
slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr, false} slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr}
} }
} }
@@ -139,14 +142,13 @@ func (slf *CCluster) ConnService() error {
for _, rpcClient := range slf.nodeclient { for _, rpcClient := range slf.nodeclient {
//连接状态发送ping //连接状态发送ping
if rpcClient.isConnect == true { if rpcClient.IsConnected() == true {
ping.TimeStamp = 0 ping.TimeStamp = 0
err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong) err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong)
if err != nil { if err != nil {
rpcClient.pclient.Close() rpcClient.pclient.Close()
rpcClient.pclient = nil rpcClient.pclient = nil
rpcClient.isConnect = false
continue continue
} }
@@ -168,10 +170,9 @@ func (slf *CCluster) ConnService() error {
v, _ := slf.nodeclient[rpcClient.nodeid] v, _ := slf.nodeclient[rpcClient.nodeid]
v.pclient = client v.pclient = client
v.isConnect = true
} }
time.Sleep(time.Second * 2) time.Sleep(time.Second * 4)
} }
return nil return nil
@@ -305,7 +306,7 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) []
if bOnline { if bOnline {
ret := make([]int, 0, len(nodeIDList)) ret := make([]int, 0, len(nodeIDList))
for _, nodeid := range nodeIDList { for _, nodeid := range nodeIDList {
if slf.CheckNodeIsOnlineByID(nodeid) { if slf.CheckNodeIsConnectedByID(nodeid) {
ret = append(ret, nodeid) ret = append(ret, nodeid)
} }
} }
@@ -315,8 +316,22 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) []
return slf.cfg.GetIdByService(servicename) return slf.cfg.GetIdByService(servicename)
} }
func (slf *CCluster) CheckNodeIsOnlineByID(nodeid int) bool { func (slf *CCluster) CheckNodeIsConnectedByID(nodeid int) bool {
return true pclient := slf.GetRpcClientByNodeId(nodeid)
if pclient == nil {
return false
}
return pclient.IsConnected()
}
func (slf *CCluster) GetRpcClientByNodeId(nodeid int) *RpcClient {
pclient, ok := slf.nodeclient[nodeid]
if ok == false {
return nil
}
return pclient
} }
func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error { func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error {

View File

@@ -70,6 +70,12 @@ type ClientCodec interface {
Close() error Close() error
} }
func (client *Client) IsClosed() bool {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
return client.shutdown || client.closing
}
func (client *Client) send(call *Call, queueMode bool) { func (client *Client) send(call *Call, queueMode bool) {
client.reqMutex.Lock() client.reqMutex.Lock()
defer client.reqMutex.Unlock() defer client.reqMutex.Unlock()