From 3763f7d8484849950ada377b8eea87921a696931 Mon Sep 17 00:00:00 2001 From: orgin Date: Mon, 11 Jul 2022 10:55:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E4=BC=98=E5=8C=96cluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 9d44dfb..49273bc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -46,10 +46,11 @@ type Cluster struct { globalCfg interface{} //全局配置 localServiceCfg map[string]interface{} //map[serviceName]配置数据* - mapRpc map[int]NodeRpcInfo //nodeId serviceDiscovery IServiceDiscovery //服务发现接口 + locker sync.RWMutex //结点与服务关系保护锁 + mapRpc map[int]NodeRpcInfo //nodeId mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] @@ -95,9 +96,10 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { return } cls.locker.Lock() + defer cls.locker.Unlock() + nodeInfo, ok := cls.mapIdNode[nodeId] if ok == false { - cls.locker.Unlock() return } @@ -113,7 +115,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { if rpc.client.IsConnected() { nodeInfo.status = Discard rpc.client.Unlock() - cls.locker.Unlock() log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) return } @@ -127,7 +128,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { delete(cls.mapIdNode, nodeId) delete(cls.mapRpc, nodeId) - cls.locker.Unlock() if ok == true { rpc.client.Close(false) } @@ -368,6 +368,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) cls.locker.Unlock() cls.rpcEventLocker.Lock() + defer cls.rpcEventLocker.Unlock() for serviceName, _ := range cls.mapServiceListenRpcEvent { ser := service.GetService(serviceName) if ser == nil { @@ -380,7 +381,6 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) eventData.NodeId = nodeId ser.(service.IModule).NotifyEvent(&eventData) } - cls.rpcEventLocker.Unlock() } @@ -441,13 +441,7 @@ func (cls *Cluster) UnReDiscoveryEvent(serviceName string) { cls.rpcEventLocker.Unlock() } -func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) { - cls.locker.Lock() - for nodeId, _ := range cls.mapIdNode { - fetchFun(nodeId) - } - cls.locker.Unlock() -} + func HasService(nodeId int, serviceName string) bool { cluster.locker.RLock() @@ -485,8 +479,8 @@ func (cls *Cluster) GetGlobalCfg() interface{} { func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) { - cls.locker.Lock() - defer cls.locker.Unlock() + cls.locker.RLock() + defer cls.locker.RUnlock() nodeInfo,ok:= cls.mapIdNode[nodeId] return nodeInfo,ok