整理优化cluster

This commit is contained in:
orgin
2022-07-11 10:55:57 +08:00
parent 769f680b17
commit 3763f7d848

View File

@@ -46,10 +46,11 @@ type Cluster struct {
globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int]NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[int]NodeRpcInfo //nodeId
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
@@ -95,9 +96,10 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
return
}
cls.locker.Lock()
defer cls.locker.Unlock()
nodeInfo, ok := cls.mapIdNode[nodeId]
if ok == false {
cls.locker.Unlock()
return
}
@@ -113,7 +115,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
if rpc.client.IsConnected() {
nodeInfo.status = Discard
rpc.client.Unlock()
cls.locker.Unlock()
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
return
}
@@ -127,7 +128,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
delete(cls.mapIdNode, nodeId)
delete(cls.mapRpc, nodeId)
cls.locker.Unlock()
if ok == true {
rpc.client.Close(false)
}
@@ -368,6 +368,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
cls.locker.Unlock()
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for serviceName, _ := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName)
if ser == nil {
@@ -380,7 +381,6 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
eventData.NodeId = nodeId
ser.(service.IModule).NotifyEvent(&eventData)
}
cls.rpcEventLocker.Unlock()
}
@@ -441,13 +441,7 @@ func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) {
cls.locker.Lock()
for nodeId, _ := range cls.mapIdNode {
fetchFun(nodeId)
}
cls.locker.Unlock()
}
func HasService(nodeId int, serviceName string) bool {
cluster.locker.RLock()
@@ -485,8 +479,8 @@ func (cls *Cluster) GetGlobalCfg() interface{} {
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
cls.locker.Lock()
defer cls.locker.Unlock()
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeInfo,ok:= cls.mapIdNode[nodeId]
return nodeInfo,ok