From d365dde8c02b6ce7c00cb0b8d955ae334d1292db Mon Sep 17 00:00:00 2001 From: boyce <6549168@qq.com> Date: Thu, 11 Dec 2025 09:56:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E5=8F=91?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/etcddiscovery.go | 95 +++++++++++++++++++++++++------------- cluster/origindiscovery.go | 17 ++++++- cluster/parsecfg.go | 55 +++++++++++----------- 3 files changed, 107 insertions(+), 60 deletions(-) diff --git a/cluster/etcddiscovery.go b/cluster/etcddiscovery.go index 5dc4d84..4c39ed2 100644 --- a/cluster/etcddiscovery.go +++ b/cluster/etcddiscovery.go @@ -6,6 +6,12 @@ import ( "crypto/x509" "errors" "fmt" + "os" + "path" + "strings" + "sync/atomic" + "time" + "github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/rpc" @@ -14,19 +20,15 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/proto" - "os" - "path" - "strings" - "sync/atomic" - "time" ) const originDir = "/origin" type etcdClientInfo struct { - watchKeys []string - leaseID clientv3.LeaseID - keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse + isLocalNetwork bool + watchKeys []string + leaseID clientv3.LeaseID + keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse } type EtcdDiscoveryService struct { @@ -93,6 +95,7 @@ func (ed *EtcdDiscoveryService) OnInit() error { return errors.New("etcd discovery config is nil.") } + var hasLocalNetwork bool for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ { var client *clientv3.Client var tlsConfig *tls.Config @@ -141,13 +144,25 @@ func (ed *EtcdDiscoveryService) OnInit() error { } ec := &etcdClientInfo{} - for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NetworkName { - ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName)) + + if etcdDiscoveryCfg.EtcdList[i].LocalNetworkName != "" { + hasLocalNetwork = true + ec.isLocalNetwork = true + ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, etcdDiscoveryCfg.EtcdList[i].LocalNetworkName)) + } else { + ec.isLocalNetwork = false + for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NeighborNetworkName { + ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName)) + } } ed.mapClient[client] = ec } + if !hasLocalNetwork { + return errors.New("etcd discovery init fail,cannot find local network") + } + return nil } @@ -167,14 +182,18 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client, } etcdClient.leaseID = resp.ID - for _, watchKey := range etcdClient.watchKeys { - // 注册服务节点到 etcd - _, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID)) - if err != nil { - log.Error("etcd Put fail", log.ErrorField("err", err)) - ed.tryRegisterService(client, etcdClient) - return - } + + // 注册服务节点到 etcd,LocalNetwork时才会注册,且etcdClient.watchKeys必然>0 + if len(etcdClient.watchKeys) != 1 { + log.Error("LocalNetwork watchkey is error") + return + } + + _, err = client.Put(context.Background(), ed.getRegisterKey(etcdClient.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID)) + if err != nil { + log.Error("etcd Put fail", log.ErrorField("err", err)) + ed.tryRegisterService(client, etcdClient) + return } etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID) @@ -204,6 +223,10 @@ func (ed *EtcdDiscoveryService) tryRegisterService(client *clientv3.Client, etcd return } + if !etcdClient.isLocalNetwork { + return + } + ed.AfterFunc(time.Second*3, func(t *timer.Timer) { ed.registerServiceByClient(client, etcdClient) }) @@ -229,13 +252,18 @@ func (ed *EtcdDiscoveryService) tryLaterRetire() { func (ed *EtcdDiscoveryService) retire() error { //从etcd中更新 for c, ec := range ed.mapClient { - for _, watchKey := range ec.watchKeys { - // 注册服务节点到 etcd - _, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) - if err != nil { - log.Error("etcd Put fail", log.ErrorField("err", err)) - return err - } + if !ec.isLocalNetwork { + continue + } + + if len(ec.watchKeys)!=1 { + log.Error("LocalNetwork watchkey is error") + continue + } + _, err := c.Put(context.Background(), ed.getRegisterKey(ec.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) + if err != nil { + log.Error("etcd Put fail", log.ErrorField("err", err)) + return err } } @@ -292,7 +320,7 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No //筛选关注的服务 var discoverServiceSlice = make([]string, 0, 24) for _, pubService := range nodeInfo.PublicServiceList { - if cluster.CanDiscoveryService(networkName, "",nodeInfo.NodeId,pubService) == true { + if cluster.CanDiscoveryService(networkName, "", nodeInfo.NodeId, pubService) == true { discoverServiceSlice = append(discoverServiceSlice, pubService) } } @@ -503,11 +531,16 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc //写入到etcd中 for c, info := range ed.mapClient { - for _, watchKey := range info.watchKeys { - if ed.getNetworkNameByWatchKey(watchKey) == etcdServiceRecord.NetworkName { - client = c - break - } + if !info.isLocalNetwork { + continue + } + + if len(info.watchKeys)!=1 { + log.Error("") + } + if ed.getNetworkNameByWatchKey(info.watchKeys[0]) == etcdServiceRecord.NetworkName { + client = c + break } } diff --git a/cluster/origindiscovery.go b/cluster/origindiscovery.go index a7e86ce..7d129b2 100644 --- a/cluster/origindiscovery.go +++ b/cluster/origindiscovery.go @@ -508,11 +508,17 @@ func (dc *OriginDiscoveryClient) regServiceDiscover(nodeId string) { if nodeId == cluster.GetLocalNodeInfo().NodeId { return } + nodeInfo := cluster.getOriginMasterDiscoveryNodeInfo(nodeId) if nodeInfo == nil { return } + // 只能往本地Master结点上注册 + if !cluster.IsLocalMasterNodeId(nodeId) { + return + } + var req rpc.RegServiceDiscoverReq req.NodeInfo = &rpc.NodeInfo{} req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId @@ -617,13 +623,22 @@ func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool { return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil } +func (cls *Cluster) IsLocalMasterNodeId(nodeId string) bool { + if cls.discoveryInfo.Origin == nil { + return false + } + + return cls.discoveryInfo.Origin.LocalMasterNodeId == nodeId +} func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo { if cls.discoveryInfo.Origin == nil { return nil } for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ { - if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId { + + + if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId { return &cls.discoveryInfo.Origin.MasterNodeList[i] } } diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index 564f78a..1a776e0 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -16,27 +16,27 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary - type EtcdList struct { - NetworkName []string - Endpoints []string - UserName string - Password string - Cert string - CertKey string - Ca string + LocalNetworkName string // 如果配置,则为本地网络,必需配置一个本地网络 + NeighborNetworkName []string + Endpoints []string + UserName string + Password string + Cert string + CertKey string + Ca string } type EtcdDiscovery struct { DialTimeoutMillisecond time.Duration TTLSecond int64 - - EtcdList []EtcdList + EtcdList []EtcdList } type OriginDiscovery struct { - TTLSecond int64 - MasterNodeList []NodeInfo + TTLSecond int64 + LocalMasterNodeId string + MasterNodeList []NodeInfo } type DiscoveryType int @@ -72,7 +72,7 @@ type NodeInfoList struct { } func validConfigFile(f string) bool { - return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml") + return strings.HasSuffix(f, ".json") || strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml") } func yamlToJson(data []byte, v interface{}) ([]byte, error) { @@ -131,21 +131,20 @@ func (d *DiscoveryInfo) setEtcd(etcd *EtcdDiscovery) error { return fmt.Errorf("repeat configuration of Discovery") } - //Endpoints不允许重复 - mapAddr := make(map[string]struct{}) for _, n := range etcd.EtcdList { - for _, endPoint := range n.Endpoints { - if _, ok := mapAddr[endPoint]; ok == true { - return fmt.Errorf("etcd discovery config Etcd.EtcdList.Endpoints %+v is repeat", endPoint) - } - mapAddr[endPoint] = struct{}{} - } - //networkName不允许重复 mapNetworkName := make(map[string]struct{}) - for _, netName := range n.NetworkName { + if n.LocalNetworkName != "" { + if _, ok := mapNetworkName[n.LocalNetworkName]; ok == true { + return fmt.Errorf("etcd discovery config Etcd.EtcdList.LocalNetworkName %+v is repeat", n.LocalNetworkName) + } + mapNetworkName[n.LocalNetworkName] = struct{}{} + continue + } + + for _, netName := range n.NeighborNetworkName { if _, ok := mapNetworkName[netName]; ok == true { - return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", n.NetworkName) + return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", netName) } mapNetworkName[netName] = struct{}{} @@ -275,7 +274,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node var rpcMode RpcMode //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 - err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error { + err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error { if info.IsDir() { return nil } @@ -340,7 +339,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error { nodeService := map[string]interface{}{} //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 - err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{ + err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error { if info.IsDir() { return nil } @@ -432,7 +431,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error { return nil } -func (cls *Cluster) parseLocalCfg() error{ +func (cls *Cluster) parseLocalCfg() error { rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet) @@ -454,7 +453,7 @@ func (cls *Cluster) parseLocalCfg() error{ cls.mapServiceNode[serviceName] = make(map[string]struct{}) } - if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok { + if _, ok := cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]; ok { return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId) } cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}