package cluster import ( "context" "crypto/tls" "crypto/x509" "errors" "fmt" "os" "path" "strings" "sync/atomic" "time" "github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/util/timer" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/proto" ) const originDir = "/origin" type etcdClientInfo struct { isLocalNetwork bool watchKeys []string leaseID clientv3.LeaseID keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse } type EtcdDiscoveryService struct { service.Service funDelNode FunDelNode funSetNode FunSetNode localNodeId string byteLocalNodeInfo string mapClient map[*clientv3.Client]*etcdClientInfo isClose int32 bRetire bool mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId] } var etcdDiscovery *EtcdDiscoveryService func getEtcdDiscovery() IServiceDiscovery { if etcdDiscovery == nil { etcdDiscovery = &EtcdDiscoveryService{} } return etcdDiscovery } func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error { ed.localNodeId = localNodeId ed.funDelNode = funDelNode ed.funSetNode = funSetNode return nil } const ( eeGets = 0 eePut = 1 eeDelete = 2 ) type etcdDiscoveryEvent struct { typ int watchKey string Kvs []*mvccpb.KeyValue } func (ee *etcdDiscoveryEvent) GetEventType() event.EventType { return event.Sys_Event_EtcdDiscovery } func (ed *EtcdDiscoveryService) OnInit() error { ed.mapClient = make(map[*clientv3.Client]*etcdClientInfo, 1) ed.mapDiscoveryNodeId = make(map[string]map[string]struct{}) ed.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_EtcdDiscovery, ed.GetEventHandler(), ed.OnEtcdDiscovery) err := ed.marshalNodeInfo() if err != nil { return err } etcdDiscoveryCfg := cluster.GetEtcdDiscovery() if etcdDiscoveryCfg == nil { return errors.New("etcd discovery config is nil.") } var hasLocalNetwork bool for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ { var client *clientv3.Client var tlsConfig *tls.Config if etcdDiscoveryCfg.EtcdList[i].Cert != "" { // load cert cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey) if cErr != nil { log.Error("load cert error", log.ErrorField("err", cErr)) return cErr } // load root ca caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca) if cErr != nil { log.Error("load root ca error", log.ErrorField("err", cErr)) return cErr } pool := x509.NewCertPool() pool.AppendCertsFromPEM(caData) tlsConfig = &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: pool, } } client, err = clientv3.New(clientv3.Config{ Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints, DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond, Username: etcdDiscoveryCfg.EtcdList[i].UserName, Password: etcdDiscoveryCfg.EtcdList[i].Password, Logger: log.GetLogger().Logger, TLS: tlsConfig, }) if err != nil { log.Error("etcd discovery init fail", log.ErrorField("err", err)) return err } ctx, _ := context.WithTimeout(context.Background(), time.Second*3) _, err = client.Leases(ctx) if err != nil { log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorField("err", err)) return err } ec := &etcdClientInfo{} if etcdDiscoveryCfg.EtcdList[i].LocalNetworkName != "" { hasLocalNetwork = true ec.isLocalNetwork = true ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, etcdDiscoveryCfg.EtcdList[i].LocalNetworkName)) } else { ec.isLocalNetwork = false for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NeighborNetworkName { ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName)) } } ed.mapClient[client] = ec } if !hasLocalNetwork { return errors.New("etcd discovery init fail,cannot find local network") } return nil } func (ed *EtcdDiscoveryService) getRegisterKey(watchkey string) string { return watchkey + "/" + ed.localNodeId } func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client, etcdClient *etcdClientInfo) { // 创建租约 var err error var resp *clientv3.LeaseGrantResponse resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond) if err != nil { log.Error("etcd registerService fail", log.ErrorField("err", err)) ed.tryRegisterService(client, etcdClient) return } etcdClient.leaseID = resp.ID // 注册服务节点到 etcd,LocalNetwork时才会注册,且etcdClient.watchKeys必然>0 if len(etcdClient.watchKeys) != 1 { log.Error("LocalNetwork watchkey is error") return } _, err = client.Put(context.Background(), ed.getRegisterKey(etcdClient.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID)) if err != nil { log.Error("etcd Put fail", log.ErrorField("err", err)) ed.tryRegisterService(client, etcdClient) return } etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID) if err != nil { log.Error("etcd KeepAlive fail", log.ErrorField("err", err)) ed.tryRegisterService(client, etcdClient) return } go func() { for { select { case _, ok := <-etcdClient.keepAliveChan: //log.Debug("ok",log.Any("addr",client.Endpoints())) if !ok { log.Error("etcd keepAliveChan fail", log.Any("watchKeys", etcdClient.watchKeys)) ed.tryRegisterService(client, etcdClient) return } } } }() } func (ed *EtcdDiscoveryService) tryRegisterService(client *clientv3.Client, etcdClient *etcdClientInfo) { if ed.isStop() { return } if !etcdClient.isLocalNetwork { return } ed.AfterFunc(time.Second*3, func(t *timer.Timer) { ed.registerServiceByClient(client, etcdClient) }) } func (ed *EtcdDiscoveryService) tryWatch(client *clientv3.Client, etcdClient *etcdClientInfo) { if ed.isStop() { return } ed.AfterFunc(time.Second*3, func(t *timer.Timer) { ed.watchByClient(client, etcdClient) }) } func (ed *EtcdDiscoveryService) tryLaterRetire() { ed.AfterFunc(time.Second, func(*timer.Timer) { if ed.retire() != nil { ed.tryLaterRetire() } }) } func (ed *EtcdDiscoveryService) retire() error { //从etcd中更新 for c, ec := range ed.mapClient { if !ec.isLocalNetwork { continue } if len(ec.watchKeys)!=1 { log.Error("LocalNetwork watchkey is error") continue } _, err := c.Put(context.Background(), ed.getRegisterKey(ec.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) if err != nil { log.Error("etcd Put fail", log.ErrorField("err", err)) return err } } return nil } func (ed *EtcdDiscoveryService) OnRetire() { ed.bRetire = true ed.marshalNodeInfo() if ed.retire() != nil { ed.tryLaterRetire() } } func (ed *EtcdDiscoveryService) OnRelease() { atomic.StoreInt32(&ed.isClose, 1) ed.close() } func (ed *EtcdDiscoveryService) isStop() bool { return atomic.LoadInt32(&ed.isClose) == 1 } func (ed *EtcdDiscoveryService) OnStart() { for c, ec := range ed.mapClient { ed.tryRegisterService(c, ec) ed.tryWatch(c, ec) } } func (ed *EtcdDiscoveryService) marshalNodeInfo() error { nInfo := cluster.GetLocalNodeInfo() var nodeInfo rpc.NodeInfo nodeInfo.NodeId = nInfo.NodeId nodeInfo.ListenAddr = nInfo.ListenAddr nodeInfo.Retire = ed.bRetire nodeInfo.PublicServiceList = nInfo.PublicServiceList nodeInfo.MaxRpcParamLen = nInfo.MaxRpcParamLen byteLocalNodeInfo, err := proto.Marshal(&nodeInfo) if err == nil { ed.byteLocalNodeInfo = string(byteLocalNodeInfo) } return err } func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.NodeInfo) bool { if nodeInfo == nil || nodeInfo.Private == true || nodeInfo.NodeId == ed.localNodeId { return false } //筛选关注的服务 var discoverServiceSlice = make([]string, 0, 24) for _, pubService := range nodeInfo.PublicServiceList { if cluster.CanDiscoveryService(networkName, "", nodeInfo.NodeId, pubService) == true { discoverServiceSlice = append(discoverServiceSlice, pubService) } } if len(discoverServiceSlice) == 0 { return false } var nInfo NodeInfo nInfo.ServiceList = discoverServiceSlice nInfo.PublicServiceList = discoverServiceSlice nInfo.NodeId = nodeInfo.NodeId nInfo.ListenAddr = nodeInfo.ListenAddr nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen nInfo.Retire = nodeInfo.Retire nInfo.Private = nodeInfo.Private ed.funSetNode(&nInfo) return true } func (ed *EtcdDiscoveryService) close() { for c, ec := range ed.mapClient { if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil { log.Error("etcd Revoke fail", log.ErrorField("err", err)) } c.Watcher.Close() err := c.Close() if err != nil { log.Error("etcd Close fail", log.ErrorField("err", err)) } } } func (ed *EtcdDiscoveryService) getServices(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) bool { // 根据前缀获取现有的key resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix()) if err != nil { log.Error("etcd Get fail", log.ErrorField("err", err)) ed.tryWatch(client, etcdClient) return false } // 遍历获取得到的k和v ed.notifyGets(watchKey, resp.Kvs) return true } func (ed *EtcdDiscoveryService) watchByClient(client *clientv3.Client, etcdClient *etcdClientInfo) { //先关闭所有的watcher for _, watchKey := range etcdClient.watchKeys { // 监视前缀,修改变更server go ed.watcher(client, etcdClient, watchKey) } } // watcher 监听Key的前缀 func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) { defer func() { if r := recover(); r != nil { log.StackError(fmt.Sprint(r)) ed.tryWatch(client, etcdClient) } }() rch := client.Watch(context.Background(), watchKey, clientv3.WithPrefix()) if ed.getServices(client, etcdClient, watchKey) == false { return } for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case clientv3.EventTypePut: // 修改或者新增 ed.notifyPut(watchKey, ev.Kv) case clientv3.EventTypeDelete: // 删除 ed.notifyDelete(watchKey, ev.Kv) } } } ed.tryWatch(client, etcdClient) } func (ed *EtcdDiscoveryService) setNode(netWorkName string, byteNode []byte) string { var nodeInfo rpc.NodeInfo err := proto.Unmarshal(byteNode, &nodeInfo) if err != nil { log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorField("err", err)) return "" } ed.setNodeInfo(netWorkName, &nodeInfo) return nodeInfo.NodeId } func (ed *EtcdDiscoveryService) delNode(fullKey string) string { nodeId := ed.getNodeId(fullKey) if nodeId == ed.localNodeId { return "" } ed.funDelNode(nodeId) return nodeId } func (ed *EtcdDiscoveryService) getNetworkNameByWatchKey(watchKey string) string { return watchKey[strings.LastIndex(watchKey, "/")+1:] } func (ed *EtcdDiscoveryService) getNetworkNameByFullKey(fullKey string) string { return fullKey[len(originDir)+1 : strings.LastIndex(fullKey, "/")] } func (ed *EtcdDiscoveryService) getNodeId(fullKey string) string { return fullKey[strings.LastIndex(fullKey, "/")+1:] } func (ed *EtcdDiscoveryService) OnEtcdDiscovery(ev event.IEvent) { disEvent := ev.(*etcdDiscoveryEvent) switch disEvent.typ { case eeGets: ed.OnEventGets(disEvent.watchKey, disEvent.Kvs) case eePut: if len(disEvent.Kvs) == 1 { ed.OnEventPut(disEvent.watchKey, disEvent.Kvs[0]) } case eeDelete: if len(disEvent.Kvs) == 1 { ed.OnEventDelete(disEvent.watchKey, disEvent.Kvs[0]) } } } func (ed *EtcdDiscoveryService) notifyGets(watchKey string, Kvs []*mvccpb.KeyValue) { var ev etcdDiscoveryEvent ev.typ = eeGets ev.watchKey = watchKey ev.Kvs = Kvs ed.NotifyEvent(&ev) } func (ed *EtcdDiscoveryService) notifyPut(watchKey string, Kvs *mvccpb.KeyValue) { var ev etcdDiscoveryEvent ev.typ = eePut ev.watchKey = watchKey ev.Kvs = append(ev.Kvs, Kvs) ed.NotifyEvent(&ev) } func (ed *EtcdDiscoveryService) notifyDelete(watchKey string, Kvs *mvccpb.KeyValue) { var ev etcdDiscoveryEvent ev.typ = eeDelete ev.watchKey = watchKey ev.Kvs = append(ev.Kvs, Kvs) ed.NotifyEvent(&ev) } func (ed *EtcdDiscoveryService) OnEventGets(watchKey string, Kvs []*mvccpb.KeyValue) { mapNode := make(map[string]struct{}, 32) for _, kv := range Kvs { nodeId := ed.setNode(ed.getNetworkNameByFullKey(string(kv.Key)), kv.Value) mapNode[nodeId] = struct{}{} ed.addNodeId(watchKey, nodeId) } // 此段代码为遍历并删除过期节点的逻辑。 // 对于mapDiscoveryNodeId中与watchKey关联的所有节点ID,遍历该集合。 // 如果某个节点ID不在mapNode中且不是本地节点ID,则调用funDelNode函数删除该节点。 mapLastNodeId := ed.mapDiscoveryNodeId[watchKey] // 根据watchKey获取对应的节点ID集合 for nodeId := range mapLastNodeId { // 遍历所有节点ID if _, ok := mapNode[nodeId]; ok == false && nodeId != ed.localNodeId { // 检查节点是否不存在于mapNode且不是本地节点 ed.funDelNode(nodeId) // 调用函数删除该节点 delete(ed.mapDiscoveryNodeId[watchKey], nodeId) } } } func (ed *EtcdDiscoveryService) OnEventPut(watchKey string, Kv *mvccpb.KeyValue) { nodeId := ed.setNode(ed.getNetworkNameByFullKey(string(Kv.Key)), Kv.Value) ed.addNodeId(watchKey, nodeId) } func (ed *EtcdDiscoveryService) OnEventDelete(watchKey string, Kv *mvccpb.KeyValue) { nodeId := ed.delNode(string(Kv.Key)) delete(ed.mapDiscoveryNodeId[watchKey], nodeId) } func (ed *EtcdDiscoveryService) addNodeId(watchKey string, nodeId string) { if _, ok := ed.mapDiscoveryNodeId[watchKey]; ok == false { ed.mapDiscoveryNodeId[watchKey] = make(map[string]struct{}) } ed.mapDiscoveryNodeId[watchKey][nodeId] = struct{}{} } func (ed *EtcdDiscoveryService) OnNodeDisconnect(nodeId string) { //将Discard结点清理 cluster.DiscardNode(nodeId) } func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.EtcdServiceRecordEvent, empty *service.Empty) error { var client *clientv3.Client //写入到etcd中 for c, info := range ed.mapClient { if !info.isLocalNetwork { continue } if len(info.watchKeys)!=1 { log.Error("") } if ed.getNetworkNameByWatchKey(info.watchKeys[0]) == etcdServiceRecord.NetworkName { client = c break } } if client == nil { log.Error("etcd record fail,cannot find network name", log.String("networkName", etcdServiceRecord.NetworkName)) return errors.New("annot find network name") } var lg *clientv3.LeaseGrantResponse var err error if etcdServiceRecord.TTLSecond > 0 { ctx, _ := context.WithTimeout(context.Background(), time.Second*3) lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond) if err != nil { log.Error("etcd record fail,cannot grant lease", log.ErrorField("err", err)) return errors.New("cannot grant lease") } } if lg != nil { ctx, _ := context.WithTimeout(context.Background(), time.Second*3) _, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID)) if err != nil { log.Error("etcd record fail,cannot put record", log.ErrorField("err", err)) } return errors.New("cannot put record") } _, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo) if err != nil { log.Error("etcd record fail,cannot put record", log.ErrorField("err", err)) return errors.New("cannot put record") } return nil }