From 1d970de0f94e828a4611ccf1f5e05661c959fd0b Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 25 Apr 2024 18:27:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96etcd=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=8F=91=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 21 +++++++-------------- cluster/etcddiscovery.go | 15 +++++---------- cluster/origindiscovery.go | 9 +++------ cluster/servicediscovery.go | 2 +- 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index ffbe020..75008a5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -100,11 +100,11 @@ func (cls *Cluster) DiscardNode(nodeId string) { cls.locker.Unlock() if bDel { - cls.DelNode(nodeId, true) + cls.DelNode(nodeId) } } -func (cls *Cluster) DelNode(nodeId string, immediately bool) { +func (cls *Cluster) DelNode(nodeId string) { //MasterDiscover结点与本地结点不删除 if cls.IsOriginMasterDiscoveryNode(nodeId) || nodeId == cls.localNodeInfo.NodeId { return @@ -117,15 +117,7 @@ func (cls *Cluster) DelNode(nodeId string, immediately bool) { return } - if immediately ==false { - //正在连接中不主动断开,只断开没有连接中的 - if rpc.client.IsConnected() { - rpc.nodeInfo.status = Discard - log.Info("Discard node",log.String("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) - return - } - } - + cls.TriggerDiscoveryEvent(false,nodeId,rpc.nodeInfo.ServiceList) for _, serviceName := range rpc.nodeInfo.ServiceList { cls.delServiceNode(serviceName, nodeId) } @@ -138,8 +130,8 @@ func (cls *Cluster) DelNode(nodeId string, immediately bool) { log.Info("remove node ",log.String("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) } -func (cls *Cluster) serviceDiscoveryDelNode(nodeId string, immediately bool) { - cls.DelNode(nodeId, immediately) +func (cls *Cluster) serviceDiscoveryDelNode(nodeId string) { + cls.DelNode(nodeId) } func (cls *Cluster) delServiceNode(serviceName string, nodeId string) { @@ -171,6 +163,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { } } + cluster.TriggerDiscoveryEvent(true,nodeInfo.NodeId,nodeInfo.PublicServiceList) //再重新组装 mapDuplicate := map[string]interface{}{} //预防重复数据 for _, serviceName := range nodeInfo.PublicServiceList { @@ -202,7 +195,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,&cls.callSet,cls.NotifyAllService) } cls.mapRpc[nodeInfo.NodeId] = &rpcInfo - if cls.IsNatsMode() == true { + if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType!=OriginType { log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire)) }else{ log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) diff --git a/cluster/etcddiscovery.go b/cluster/etcddiscovery.go index c89e7da..eaedf4e 100644 --- a/cluster/etcddiscovery.go +++ b/cluster/etcddiscovery.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/zap" "path" "runtime" "strings" @@ -93,6 +94,7 @@ func (ed *EtcdDiscoveryService) OnInit() error { client, cerr := clientv3.New(clientv3.Config{ Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints, DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond, + Logger: zap.NewNop(), }) if cerr != nil { @@ -102,7 +104,6 @@ func (ed *EtcdDiscoveryService) OnInit() error { ctx,_:=context.WithTimeout(context.Background(),time.Second*3) _,err = client.Leases(ctx) - //_,err = client.Put(ctx,testKey,"") if err != nil { log.Error("etcd discovery init fail",log.Any("endpoint",etcdDiscoveryCfg.EtcdList[i].Endpoints),log.ErrorAttr("err",err)) return err @@ -111,7 +112,6 @@ func (ed *EtcdDiscoveryService) OnInit() error { ec := &etcdClientInfo{} for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NetworkName { ec.watchKeys = append(ec.watchKeys,fmt.Sprintf("%s/%s",originDir,networkName)) - //ec.etcdKey = append(ec.etcdKey,fmt.Sprintf("%s/%s/%s",originDir,networkName,nd.localNodeId)) } ed.mapClient[client] = ec @@ -334,7 +334,6 @@ func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client,etcdClient *etcd } }() - log.Debug(">>try watcher") rch := client.Watch(context.Background(), watchKey, clientv3.WithPrefix()) if ed.getServices(client,etcdClient,watchKey) == false { @@ -374,7 +373,7 @@ func (ed *EtcdDiscoveryService) delNode(fullKey string) string{ return "" } - ed.funDelNode(nodeId,false) + ed.funDelNode(nodeId) return nodeId } @@ -437,29 +436,25 @@ func (ed *EtcdDiscoveryService) OnEventGets(watchKey string,Kvs []*mvccpb.KeyVal mapNode[nodeId] = struct{}{} ed.addNodeId(watchKey,nodeId) } - - log.Debug(">>etcd OnEventGets",log.String("watchKey",watchKey),log.Any("mapNode",mapNode)) + // 此段代码为遍历并删除过期节点的逻辑。 // 对于mapDiscoveryNodeId中与watchKey关联的所有节点ID,遍历该集合。 // 如果某个节点ID不在mapNode中且不是本地节点ID,则调用funDelNode函数删除该节点。 mapLastNodeId := ed.mapDiscoveryNodeId[watchKey] // 根据watchKey获取对应的节点ID集合 for nodeId := range mapLastNodeId { // 遍历所有节点ID if _,ok := mapNode[nodeId];ok == false && nodeId != ed.localNodeId { // 检查节点是否不存在于mapNode且不是本地节点 - ed.funDelNode(nodeId,false) // 调用函数删除该节点 + ed.funDelNode(nodeId) // 调用函数删除该节点 delete(ed.mapDiscoveryNodeId[watchKey],nodeId) - log.Debug(">>etcd OnEventGets Delete",log.String("watchKey",watchKey),log.String("nodeId",nodeId)) } } } func (ed *EtcdDiscoveryService) OnEventPut(watchKey string,Kv *mvccpb.KeyValue) { - log.Debug(">>etcd OnEventPut",log.String("watchKey",watchKey),log.String("nodeId",ed.getNodeId(string(Kv.Key)))) nodeId := ed.setNode(ed.getNetworkNameByFullKey(string(Kv.Key)), Kv.Value) ed.addNodeId(watchKey,nodeId) } func (ed *EtcdDiscoveryService) OnEventDelete(watchKey string,Kv *mvccpb.KeyValue) { - log.Debug(">>etcd OnEventDelete",log.String("watchKey",watchKey),log.String("nodeId",ed.getNodeId(string(Kv.Key)))) nodeId := ed.delNode(string(Kv.Key)) delete(ed.mapDiscoveryNodeId[watchKey],nodeId) } diff --git a/cluster/origindiscovery.go b/cluster/origindiscovery.go index bdfcb1d..048654d 100644 --- a/cluster/origindiscovery.go +++ b/cluster/origindiscovery.go @@ -175,7 +175,7 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) { notifyDiscover.DelNodeId = nodeId //删除结点 - cluster.DelNode(nodeId, true) + cluster.DelNode(nodeId) //无注册过的结点不广播,避免非当前Master网络中的连接断开时通知到本网络 ds.CastGo(SubServiceDiscover, ¬ifyDiscover) @@ -244,7 +244,7 @@ func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.RegServiceDisco nodeInfo.Retire = req.NodeInfo.Retire //主动删除已经存在的结点,确保先断开,再连接 - cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true) + cluster.serviceDiscoveryDelNode(nodeInfo.NodeId) //加入到本地Cluster模块中,将连接该结点 cluster.serviceDiscoverySetNodeInfo(&nodeInfo) @@ -429,8 +429,7 @@ func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscov //删除不必要的结点 for _, nodeId := range willDelNodeId { - cluster.TriggerDiscoveryEvent(false,nodeId,dc.getNodePublicService(req.MasterNodeId, nodeId)) - dc.funDelNode(nodeId, true) + dc.funDelNode(nodeId) dc.removeMasterNode(req.MasterNodeId, nodeId) } @@ -441,8 +440,6 @@ func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscov if bSet == false { continue } - - cluster.TriggerDiscoveryEvent(true,nodeInfo.NodeId,nodeInfo.PublicServiceList) } return nil diff --git a/cluster/servicediscovery.go b/cluster/servicediscovery.go index 9fb6c39..2e01a5d 100644 --- a/cluster/servicediscovery.go +++ b/cluster/servicediscovery.go @@ -3,7 +3,7 @@ package cluster type OperType int -type FunDelNode func (nodeId string,immediately bool) +type FunDelNode func (nodeId string) type FunSetNode func(nodeInfo *NodeInfo) type IServiceDiscovery interface {