From c6ade7d3e1b1537c43bea84f98f781e5c83bc9a8 Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 25 Apr 2024 16:36:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96origin=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=8F=91=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/origindiscovery.go | 57 ++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/cluster/origindiscovery.go b/cluster/origindiscovery.go index 06b6625..505bd76 100644 --- a/cluster/origindiscovery.go +++ b/cluster/origindiscovery.go @@ -35,9 +35,10 @@ type OriginDiscoveryClient struct { funSetNode FunSetNode localNodeId string - mapDiscovery map[string]map[string]struct{} //map[masterNodeId]map[nodeId]struct{} - mapMasterNetwork map[string]string + mapDiscovery map[string]map[string][]string //map[masterNodeId]map[nodeId]struct{} + //mapMasterNetwork map[string]string bRetire bool + isRegisterOk bool } var masterService OriginDiscoveryMaster @@ -98,6 +99,7 @@ func (ds *OriginDiscoveryMaster) removeNodeInfo(nodeId string) { } } + ds.nsTTL.removeNode(nodeId) delete(ds.mapNodeInfo,nodeId) } @@ -187,12 +189,13 @@ func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{ } func (ds *OriginDiscoveryMaster) RPC_Ping(req *rpc.Ping, res *rpc.Pong) error { + log.Debug("ping",log.String("nodeId",req.NodeId)) if ds.isRegNode(req.NodeId) == false{ res.Ok = false return nil } - return nil + //return nil res.Ok = true ds.nsTTL.addAndRefreshNode(req.NodeId) return nil @@ -256,22 +259,38 @@ func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.RegServiceDisco return nil } +func (ds *OriginDiscoveryMaster) RPC_UnRegServiceDiscover(req *rpc.UnRegServiceDiscoverReq, res *rpc.Empty) error { + log.Debug("RPC_UnRegServiceDiscover",log.String("nodeId",req.NodeId)) + ds.OnNodeDisconnect(req.NodeId) + return nil +} + func (dc *OriginDiscoveryClient) OnInit() error { dc.RegNodeConnListener(dc) dc.RegNatsConnListener(dc) - dc.mapDiscovery = map[string]map[string]struct{}{} - dc.mapMasterNetwork = map[string]string{} + dc.mapDiscovery = map[string]map[string][]string{} + //dc.mapMasterNetwork = map[string]string{} return nil } -func (dc *OriginDiscoveryClient) addMasterNode(masterNodeId string, nodeId string) { +func (dc *OriginDiscoveryClient) addMasterNode(masterNodeId string, nodeId string,serviceList []string) { _, ok := dc.mapDiscovery[masterNodeId] if ok == false { - dc.mapDiscovery[masterNodeId] = map[string]struct{}{} + dc.mapDiscovery[masterNodeId] = map[string][]string{} } - dc.mapDiscovery[masterNodeId][nodeId] = struct{}{} + dc.mapDiscovery[masterNodeId][nodeId] = serviceList +} + +func (dc *OriginDiscoveryClient) getNodePublicService(masterNodeId string,nodeId string) []string{ + mapNodeId, ok := dc.mapDiscovery[masterNodeId] + if ok == false { + return nil + } + + publicService := mapNodeId[nodeId] + return publicService } func (dc *OriginDiscoveryClient) removeMasterNode(masterNodeId string, nodeId string) { @@ -295,10 +314,6 @@ func (dc *OriginDiscoveryClient) findNodeId(nodeId string) bool { } func (dc *OriginDiscoveryClient) ping(){ - if cluster.IsNatsMode() == false { - return - } - interval := time.Duration(cluster.GetOriginDiscovery().TTLSecond)*time.Second interval = interval /3 if interval < time.Second { @@ -306,6 +321,9 @@ func (dc *OriginDiscoveryClient) ping(){ } dc.NewTicker(interval,func(t *timer.Ticker){ + if cluster.IsNatsMode() == false || dc.isRegisterOk == false{ + return + } var ping rpc.Ping ping.NodeId = cluster.GetLocalNodeInfo().NodeId masterNodes := GetCluster().GetOriginDiscovery().MasterNodeList @@ -315,7 +333,7 @@ func (dc *OriginDiscoveryClient) ping(){ } masterNodeId := masterNodes[i].NodeId - dc.AsyncCallNode(masterNodeId,RpcPingMethod,&ping, func(empty *rpc.Pong,err error) { + dc.AsyncCallNodeWithTimeout(3*time.Second,masterNodeId,RpcPingMethod,&ping, func(empty *rpc.Pong,err error) { if err == nil && empty.Ok == false{ //断开master重 dc.regServiceDiscover(masterNodeId) @@ -349,7 +367,7 @@ func (dc *OriginDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNod } diffNodeIdSlice := make([]string, 0, len(mapNodeInfo)) - mapNodeId := map[string]struct{}{} + mapNodeId := map[string][]string{} mapNodeId, ok := dc.mapDiscovery[masterNodeId] if ok == false { return nil @@ -368,6 +386,7 @@ func (dc *OriginDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNod //订阅发现的服务通知 func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error { + log.Debug("RPC_SubServiceDiscover",log.String("masterNodeId",req.MasterNodeId),log.String("delNodeId",req.GetDelNodeId())) mapNodeInfo := map[string]*rpc.NodeInfo{} for _, nodeInfo := range req.NodeInfo { //不对本地结点或者不存在任何公开服务的结点 @@ -414,15 +433,14 @@ func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscov //删除不必要的结点 for _, nodeId := range willDelNodeId { - cluster.TriggerDiscoveryEvent(false,nodeId,nil) + cluster.TriggerDiscoveryEvent(false,nodeId,dc.getNodePublicService(req.MasterNodeId, nodeId)) + dc.funDelNode(nodeId, true) dc.removeMasterNode(req.MasterNodeId, nodeId) - if dc.findNodeId(nodeId) == false { - dc.funDelNode(nodeId, false) - } } //设置新结点 for _, nodeInfo := range mapNodeInfo { + dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId,nodeInfo.PublicServiceList) bSet := dc.setNodeInfo(req.MasterNodeId,nodeInfo) if bSet == false { continue @@ -439,6 +457,8 @@ func (dc *OriginDiscoveryClient) OnNodeConnected(nodeId string) { } func (dc *OriginDiscoveryClient) OnRelease(){ + log.Debug("OriginDiscoveryClient") + //取消注册 var nodeRetireReq rpc.UnRegServiceDiscoverReq nodeRetireReq.NodeId = cluster.GetLocalNodeInfo().NodeId @@ -510,6 +530,7 @@ func (dc *OriginDiscoveryClient) regServiceDiscover(nodeId string){ return } + dc.isRegisterOk = true dc.RPC_SubServiceDiscover(res) })