mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-04 06:54:45 +08:00
优化etcd服务发现
This commit is contained in:
@@ -100,11 +100,11 @@ func (cls *Cluster) DiscardNode(nodeId string) {
|
||||
cls.locker.Unlock()
|
||||
|
||||
if bDel {
|
||||
cls.DelNode(nodeId, true)
|
||||
cls.DelNode(nodeId)
|
||||
}
|
||||
}
|
||||
|
||||
func (cls *Cluster) DelNode(nodeId string, immediately bool) {
|
||||
func (cls *Cluster) DelNode(nodeId string) {
|
||||
//MasterDiscover结点与本地结点不删除
|
||||
if cls.IsOriginMasterDiscoveryNode(nodeId) || nodeId == cls.localNodeInfo.NodeId {
|
||||
return
|
||||
@@ -117,15 +117,7 @@ func (cls *Cluster) DelNode(nodeId string, immediately bool) {
|
||||
return
|
||||
}
|
||||
|
||||
if immediately ==false {
|
||||
//正在连接中不主动断开,只断开没有连接中的
|
||||
if rpc.client.IsConnected() {
|
||||
rpc.nodeInfo.status = Discard
|
||||
log.Info("Discard node",log.String("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
cls.TriggerDiscoveryEvent(false,nodeId,rpc.nodeInfo.ServiceList)
|
||||
for _, serviceName := range rpc.nodeInfo.ServiceList {
|
||||
cls.delServiceNode(serviceName, nodeId)
|
||||
}
|
||||
@@ -138,8 +130,8 @@ func (cls *Cluster) DelNode(nodeId string, immediately bool) {
|
||||
log.Info("remove node ",log.String("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
|
||||
}
|
||||
|
||||
func (cls *Cluster) serviceDiscoveryDelNode(nodeId string, immediately bool) {
|
||||
cls.DelNode(nodeId, immediately)
|
||||
func (cls *Cluster) serviceDiscoveryDelNode(nodeId string) {
|
||||
cls.DelNode(nodeId)
|
||||
}
|
||||
|
||||
func (cls *Cluster) delServiceNode(serviceName string, nodeId string) {
|
||||
@@ -171,6 +163,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
cluster.TriggerDiscoveryEvent(true,nodeInfo.NodeId,nodeInfo.PublicServiceList)
|
||||
//再重新组装
|
||||
mapDuplicate := map[string]interface{}{} //预防重复数据
|
||||
for _, serviceName := range nodeInfo.PublicServiceList {
|
||||
@@ -202,7 +195,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
||||
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,&cls.callSet,cls.NotifyAllService)
|
||||
}
|
||||
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
|
||||
if cls.IsNatsMode() == true {
|
||||
if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType!=OriginType {
|
||||
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire))
|
||||
}else{
|
||||
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
@@ -93,6 +94,7 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
client, cerr := clientv3.New(clientv3.Config{
|
||||
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
||||
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
||||
Logger: zap.NewNop(),
|
||||
})
|
||||
|
||||
if cerr != nil {
|
||||
@@ -102,7 +104,6 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
|
||||
ctx,_:=context.WithTimeout(context.Background(),time.Second*3)
|
||||
_,err = client.Leases(ctx)
|
||||
//_,err = client.Put(ctx,testKey,"")
|
||||
if err != nil {
|
||||
log.Error("etcd discovery init fail",log.Any("endpoint",etcdDiscoveryCfg.EtcdList[i].Endpoints),log.ErrorAttr("err",err))
|
||||
return err
|
||||
@@ -111,7 +112,6 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
ec := &etcdClientInfo{}
|
||||
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NetworkName {
|
||||
ec.watchKeys = append(ec.watchKeys,fmt.Sprintf("%s/%s",originDir,networkName))
|
||||
//ec.etcdKey = append(ec.etcdKey,fmt.Sprintf("%s/%s/%s",originDir,networkName,nd.localNodeId))
|
||||
}
|
||||
|
||||
ed.mapClient[client] = ec
|
||||
@@ -334,7 +334,6 @@ func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client,etcdClient *etcd
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debug(">>try watcher")
|
||||
rch := client.Watch(context.Background(), watchKey, clientv3.WithPrefix())
|
||||
|
||||
if ed.getServices(client,etcdClient,watchKey) == false {
|
||||
@@ -374,7 +373,7 @@ func (ed *EtcdDiscoveryService) delNode(fullKey string) string{
|
||||
return ""
|
||||
}
|
||||
|
||||
ed.funDelNode(nodeId,false)
|
||||
ed.funDelNode(nodeId)
|
||||
return nodeId
|
||||
}
|
||||
|
||||
@@ -437,29 +436,25 @@ func (ed *EtcdDiscoveryService) OnEventGets(watchKey string,Kvs []*mvccpb.KeyVal
|
||||
mapNode[nodeId] = struct{}{}
|
||||
ed.addNodeId(watchKey,nodeId)
|
||||
}
|
||||
|
||||
log.Debug(">>etcd OnEventGets",log.String("watchKey",watchKey),log.Any("mapNode",mapNode))
|
||||
|
||||
// 此段代码为遍历并删除过期节点的逻辑。
|
||||
// 对于mapDiscoveryNodeId中与watchKey关联的所有节点ID,遍历该集合。
|
||||
// 如果某个节点ID不在mapNode中且不是本地节点ID,则调用funDelNode函数删除该节点。
|
||||
mapLastNodeId := ed.mapDiscoveryNodeId[watchKey] // 根据watchKey获取对应的节点ID集合
|
||||
for nodeId := range mapLastNodeId { // 遍历所有节点ID
|
||||
if _,ok := mapNode[nodeId];ok == false && nodeId != ed.localNodeId { // 检查节点是否不存在于mapNode且不是本地节点
|
||||
ed.funDelNode(nodeId,false) // 调用函数删除该节点
|
||||
ed.funDelNode(nodeId) // 调用函数删除该节点
|
||||
delete(ed.mapDiscoveryNodeId[watchKey],nodeId)
|
||||
log.Debug(">>etcd OnEventGets Delete",log.String("watchKey",watchKey),log.String("nodeId",nodeId))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ed *EtcdDiscoveryService) OnEventPut(watchKey string,Kv *mvccpb.KeyValue) {
|
||||
log.Debug(">>etcd OnEventPut",log.String("watchKey",watchKey),log.String("nodeId",ed.getNodeId(string(Kv.Key))))
|
||||
nodeId := ed.setNode(ed.getNetworkNameByFullKey(string(Kv.Key)), Kv.Value)
|
||||
ed.addNodeId(watchKey,nodeId)
|
||||
}
|
||||
|
||||
func (ed *EtcdDiscoveryService) OnEventDelete(watchKey string,Kv *mvccpb.KeyValue) {
|
||||
log.Debug(">>etcd OnEventDelete",log.String("watchKey",watchKey),log.String("nodeId",ed.getNodeId(string(Kv.Key))))
|
||||
nodeId := ed.delNode(string(Kv.Key))
|
||||
delete(ed.mapDiscoveryNodeId[watchKey],nodeId)
|
||||
}
|
||||
|
||||
@@ -175,7 +175,7 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) {
|
||||
notifyDiscover.DelNodeId = nodeId
|
||||
|
||||
//删除结点
|
||||
cluster.DelNode(nodeId, true)
|
||||
cluster.DelNode(nodeId)
|
||||
|
||||
//无注册过的结点不广播,避免非当前Master网络中的连接断开时通知到本网络
|
||||
ds.CastGo(SubServiceDiscover, ¬ifyDiscover)
|
||||
@@ -244,7 +244,7 @@ func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.RegServiceDisco
|
||||
nodeInfo.Retire = req.NodeInfo.Retire
|
||||
|
||||
//主动删除已经存在的结点,确保先断开,再连接
|
||||
cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true)
|
||||
cluster.serviceDiscoveryDelNode(nodeInfo.NodeId)
|
||||
|
||||
//加入到本地Cluster模块中,将连接该结点
|
||||
cluster.serviceDiscoverySetNodeInfo(&nodeInfo)
|
||||
@@ -429,8 +429,7 @@ func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscov
|
||||
|
||||
//删除不必要的结点
|
||||
for _, nodeId := range willDelNodeId {
|
||||
cluster.TriggerDiscoveryEvent(false,nodeId,dc.getNodePublicService(req.MasterNodeId, nodeId))
|
||||
dc.funDelNode(nodeId, true)
|
||||
dc.funDelNode(nodeId)
|
||||
dc.removeMasterNode(req.MasterNodeId, nodeId)
|
||||
}
|
||||
|
||||
@@ -441,8 +440,6 @@ func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscov
|
||||
if bSet == false {
|
||||
continue
|
||||
}
|
||||
|
||||
cluster.TriggerDiscoveryEvent(true,nodeInfo.NodeId,nodeInfo.PublicServiceList)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -3,7 +3,7 @@ package cluster
|
||||
|
||||
type OperType int
|
||||
|
||||
type FunDelNode func (nodeId string,immediately bool)
|
||||
type FunDelNode func (nodeId string)
|
||||
type FunSetNode func(nodeInfo *NodeInfo)
|
||||
|
||||
type IServiceDiscovery interface {
|
||||
|
||||
Reference in New Issue
Block a user