修改nodeid为字符串

This commit is contained in:
boyce
2024-04-10 17:30:52 +08:00
parent 161a67c8a1
commit 0cf935ffa4
16 changed files with 283 additions and 283 deletions

View File

@@ -21,13 +21,13 @@ const (
)
type MasterDiscoveryService struct {
MasterNodeId int32 //要筛选的主结点Id如果不配置或者配置成0表示针对所有的主结点
MasterNodeId string //要筛选的主结点Id如果不配置或者配置成0表示针对所有的主结点
DiscoveryService []string //只发现的服务列表
}
type NodeInfo struct {
NodeId int
NodeName string
NodeId string
//NodeName string
Private bool
ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度
@@ -56,9 +56,9 @@ type Cluster struct {
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[int]*NodeRpcInfo //nodeId
mapRpc map[string]*NodeRpcInfo //nodeId
//mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
@@ -86,7 +86,7 @@ func (cls *Cluster) Stop() {
cls.serviceDiscovery.OnNodeStop()
}
func (cls *Cluster) DiscardNode(nodeId int) {
func (cls *Cluster) DiscardNode(nodeId string) {
cls.locker.Lock()
nodeInfo, ok := cls.mapRpc[nodeId]
bDel := (ok == true) && nodeInfo.nodeInfo.status == Discard
@@ -97,7 +97,7 @@ func (cls *Cluster) DiscardNode(nodeId int) {
}
}
func (cls *Cluster) DelNode(nodeId int, immediately bool) {
func (cls *Cluster) DelNode(nodeId string, immediately bool) {
//MasterDiscover结点与本地结点不删除
if cls.GetMasterDiscoveryNodeInfo(nodeId) != nil || nodeId == cls.localNodeInfo.NodeId {
return
@@ -114,7 +114,7 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
//正在连接中不主动断开,只断开没有连接中的
if rpc.client.IsConnected() {
rpc.nodeInfo.status = Discard
log.Info("Discard node",log.Int("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
log.Info("Discard node",log.String("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
return
}
}
@@ -128,18 +128,18 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
rpc.client.Close(false)
}
log.Info("remove node ",log.Int("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
log.Info("remove node ",log.String("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
}
func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) {
if nodeId == 0 {
return
}
func (cls *Cluster) serviceDiscoveryDelNode(nodeId string, immediately bool) {
//if nodeId == "" {
// return
//}
cls.DelNode(nodeId, immediately)
}
func (cls *Cluster) delServiceNode(serviceName string, nodeId int) {
func (cls *Cluster) delServiceNode(serviceName string, nodeId string) {
if nodeId == cls.localNodeInfo.NodeId {
return
}
@@ -178,13 +178,13 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
}
mapDuplicate[serviceName] = nil
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[int]struct{}, 1)
cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1)
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
if lastNodeInfo != nil {
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire))
log.Info("Discovery nodeId",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire))
lastNodeInfo.nodeInfo = *nodeInfo
return
}
@@ -194,12 +194,12 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
log.Info("Discovery nodeId and new rpc client",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
}
func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error {
func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) error {
//1.初始化配置
err := cls.InitCfg(localNodeId)
if err != nil {
@@ -223,7 +223,7 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
return nil
}
func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool, bool) {
func (cls *Cluster) checkDynamicDiscovery(localNodeId string) (bool, bool) {
var localMaster bool //本结点是否为Master结点
var hasMaster bool //是否配置Master服务
@@ -247,7 +247,7 @@ func (cls *Cluster) AddDynamicDiscoveryService(serviceName string, bPublicServic
}
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = map[int]struct{}{}
cls.mapServiceNode[serviceName] = map[string]struct{}{}
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
}
@@ -256,7 +256,7 @@ func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo {
return cls.masterDiscoveryNodeList
}
func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo {
func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
for i := 0; i < len(cls.masterDiscoveryNodeList); i++ {
if cls.masterDiscoveryNodeList[i].NodeId == nodeId {
return &cls.masterDiscoveryNodeList[i]
@@ -270,7 +270,7 @@ func (cls *Cluster) IsMasterDiscoveryNode() bool {
return cls.GetMasterDiscoveryNodeInfo(cls.GetLocalNodeInfo().NodeId) != nil
}
func (cls *Cluster) SetupServiceDiscovery(localNodeId int, setupServiceFun SetupServiceFun) {
func (cls *Cluster) SetupServiceDiscovery(localNodeId string, setupServiceFun SetupServiceFun) {
if cls.serviceDiscovery != nil {
return
}
@@ -300,7 +300,7 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
return pService.GetRpcHandler()
}
func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) {
func (cls *Cluster) getRpcClient(nodeId string) (*rpc.Client,bool) {
c, ok := cls.mapRpc[nodeId]
if ok == false {
return nil,false
@@ -309,14 +309,14 @@ func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) {
return c.client,c.nodeInfo.Retire
}
func (cls *Cluster) GetRpcClient(nodeId int) (*rpc.Client,bool) {
func (cls *Cluster) GetRpcClient(nodeId string) (*rpc.Client,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
return cls.getRpcClient(nodeId)
}
func GetRpcClient(nodeId int, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) {
if nodeId > 0 {
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) {
if nodeId != rpc.NodeIdNull {
pClient,retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
@@ -345,12 +345,12 @@ func GetRpcServer() *rpc.Server {
return &cluster.rpcServer
}
func (cls *Cluster) IsNodeConnected(nodeId int) bool {
func (cls *Cluster) IsNodeConnected(nodeId string) bool {
pClient,_ := cls.GetRpcClient(nodeId)
return pClient != nil && pClient.IsConnected()
}
func (cls *Cluster) IsNodeRetire(nodeId int) bool {
func (cls *Cluster) IsNodeRetire(nodeId string) bool {
cls.locker.RLock()
defer cls.locker.RUnlock()
@@ -359,7 +359,7 @@ func (cls *Cluster) IsNodeRetire(nodeId int) bool {
}
func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int) {
func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId string) {
cls.locker.Lock()
nodeInfo, ok := cls.mapRpc[nodeId]
if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientId() != clientId {
@@ -384,7 +384,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int)
}
}
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) {
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
@@ -443,7 +443,7 @@ func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
func HasService(nodeId int, serviceName string) bool {
func HasService(nodeId string, serviceName string) bool {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
@@ -456,7 +456,7 @@ func HasService(nodeId int, serviceName string) bool {
return false
}
func GetNodeByServiceName(serviceName string) map[int]struct{} {
func GetNodeByServiceName(serviceName string) map[string]struct{} {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
@@ -465,7 +465,7 @@ func GetNodeByServiceName(serviceName string) map[int]struct{} {
return nil
}
mapNodeId := map[int]struct{}{}
mapNodeId := map[string]struct{}{}
for nodeId,_ := range mapNode {
mapNodeId[nodeId] = struct{}{}
}
@@ -477,7 +477,7 @@ func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg
}
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()

View File

@@ -1,19 +1,21 @@
package cluster
import "github.com/duanhf2012/origin/v2/rpc"
type ConfigDiscovery struct {
funDelService FunDelNode
funSetService FunSetNodeInfo
localNodeId int
localNodeId string
}
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{
discovery.localNodeId = localNodeId
discovery.funDelService = funDelNode
discovery.funSetService = funSetNodeInfo
//解析本地其他服务配置
_,nodeInfoList,err := GetCluster().readLocalClusterConfig(0)
_,nodeInfoList,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
if err != nil {
return err
}

View File

@@ -20,7 +20,7 @@ const NodeRetireRpcMethod = DynamicDiscoveryMasterName+".RPC_NodeRetire"
type DynamicDiscoveryMaster struct {
service.Service
mapNodeInfo map[int32]struct{}
mapNodeInfo map[string]struct{}
nodeInfo []*rpc.NodeInfo
}
@@ -29,9 +29,9 @@ type DynamicDiscoveryClient struct {
funDelService FunDelNode
funSetService FunSetNodeInfo
localNodeId int
localNodeId string
mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{}
mapDiscovery map[string]map[string]struct{} //map[masterNodeId]map[nodeId]struct{}
bRetire bool
}
@@ -47,7 +47,7 @@ func init() {
clientService.SetName(DynamicDiscoveryClientName)
}
func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool {
func (ds *DynamicDiscoveryMaster) isRegNode(nodeId string) bool {
_, ok := ds.mapNodeInfo[nodeId]
return ok
}
@@ -81,7 +81,7 @@ func (ds *DynamicDiscoveryMaster) addNodeInfo(nInfo *rpc.NodeInfo) {
ds.nodeInfo = append(ds.nodeInfo, nodeInfo)
}
func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) {
func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId string) {
if _,ok:= ds.mapNodeInfo[nodeId];ok == false {
return
}
@@ -97,7 +97,7 @@ func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) {
}
func (ds *DynamicDiscoveryMaster) OnInit() error {
ds.mapNodeInfo = make(map[int32]struct{}, 20)
ds.mapNodeInfo = make(map[string]struct{}, 20)
ds.RegRpcListener(ds)
return nil
@@ -106,8 +106,7 @@ func (ds *DynamicDiscoveryMaster) OnInit() error {
func (ds *DynamicDiscoveryMaster) OnStart() {
var nodeInfo rpc.NodeInfo
localNodeInfo := cluster.GetLocalNodeInfo()
nodeInfo.NodeId = int32(localNodeInfo.NodeId)
nodeInfo.NodeName = localNodeInfo.NodeName
nodeInfo.NodeId = localNodeInfo.NodeId
nodeInfo.ListenAddr = localNodeInfo.ListenAddr
nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList
nodeInfo.MaxRpcParamLen = localNodeInfo.MaxRpcParamLen
@@ -117,9 +116,9 @@ func (ds *DynamicDiscoveryMaster) OnStart() {
ds.addNodeInfo(&nodeInfo)
}
func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int) {
func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId string) {
//没注册过结点不通知
if ds.isRegNode(int32(nodeId)) == false {
if ds.isRegNode(nodeId) == false {
return
}
@@ -127,21 +126,21 @@ func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int) {
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.IsFull = true
notifyDiscover.NodeInfo = ds.nodeInfo
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId
ds.GoNode(nodeId, SubServiceDiscover, &notifyDiscover)
}
func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) {
if ds.isRegNode(int32(nodeId)) == false {
func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId string) {
if ds.isRegNode(nodeId) == false {
return
}
ds.removeNodeInfo(int32(nodeId))
ds.removeNodeInfo(nodeId)
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.DelNodeId = int32(nodeId)
notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId
notifyDiscover.DelNodeId = nodeId
//删除结点
cluster.DelNode(nodeId, true)
@@ -151,17 +150,17 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) {
func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
for nodeId, _ := range ds.mapNodeInfo {
ds.GoNode(int(nodeId), serviceMethod, args)
ds.GoNode(nodeId, serviceMethod, args)
}
}
func (ds *DynamicDiscoveryMaster) RPC_NodeRetire(req *rpc.NodeRetireReq, res *rpc.Empty) error {
log.Info("node is retire",log.Int32("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire))
log.Info("node is retire",log.String("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire))
ds.updateNodeInfo(req.NodeInfo)
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId
notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo)
ds.RpcCastGo(SubServiceDiscover, &notifyDiscover)
@@ -179,7 +178,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
//广播给其他所有结点
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId
notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo)
ds.RpcCastGo(SubServiceDiscover, &notifyDiscover)
@@ -188,8 +187,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
//初始化结点信息
var nodeInfo NodeInfo
nodeInfo.NodeId = int(req.NodeInfo.NodeId)
nodeInfo.NodeName = req.NodeInfo.NodeName
nodeInfo.NodeId = req.NodeInfo.NodeId
nodeInfo.Private = req.NodeInfo.Private
nodeInfo.ServiceList = req.NodeInfo.PublicServiceList
nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList
@@ -208,19 +206,19 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
func (dc *DynamicDiscoveryClient) OnInit() error {
dc.RegRpcListener(dc)
dc.mapDiscovery = map[int32]map[int32]struct{}{}
dc.mapDiscovery = map[string]map[string]struct{}{}
return nil
}
func (dc *DynamicDiscoveryClient) addMasterNode(masterNodeId int32, nodeId int32) {
func (dc *DynamicDiscoveryClient) addMasterNode(masterNodeId string, nodeId string) {
_, ok := dc.mapDiscovery[masterNodeId]
if ok == false {
dc.mapDiscovery[masterNodeId] = map[int32]struct{}{}
dc.mapDiscovery[masterNodeId] = map[string]struct{}{}
}
dc.mapDiscovery[masterNodeId][nodeId] = struct{}{}
}
func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId int32, nodeId int32) {
func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId string, nodeId string) {
mapNodeId, ok := dc.mapDiscovery[masterNodeId]
if ok == false {
return
@@ -229,7 +227,7 @@ func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId int32, nodeId in
delete(mapNodeId, nodeId)
}
func (dc *DynamicDiscoveryClient) findNodeId(nodeId int32) bool {
func (dc *DynamicDiscoveryClient) findNodeId(nodeId string) bool {
for _, mapNodeId := range dc.mapDiscovery {
_, ok := mapNodeId[nodeId]
if ok == true {
@@ -255,13 +253,13 @@ func (dc *DynamicDiscoveryClient) addDiscoveryMaster() {
}
}
func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNodeInfo map[int32]*rpc.NodeInfo) []int32 {
func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNodeInfo map[string]*rpc.NodeInfo) []string {
if mapNodeInfo == nil {
return nil
}
diffNodeIdSlice := make([]int32, 0, len(mapNodeInfo))
mapNodeId := map[int32]struct{}{}
diffNodeIdSlice := make([]string, 0, len(mapNodeInfo))
mapNodeId := map[string]struct{}{}
mapNodeId, ok := dc.mapDiscovery[masterNodeId]
if ok == false {
return nil
@@ -280,10 +278,10 @@ func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNod
//订阅发现的服务通知
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
mapNodeInfo := map[int32]*rpc.NodeInfo{}
mapNodeInfo := map[string]*rpc.NodeInfo{}
for _, nodeInfo := range req.NodeInfo {
//不对本地结点或者不存在任何公开服务的结点
if int(nodeInfo.NodeId) == dc.localNodeId {
if nodeInfo.NodeId == dc.localNodeId {
continue
}
@@ -298,7 +296,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
if nInfo == nil {
nInfo = &rpc.NodeInfo{}
nInfo.NodeId = nodeInfo.NodeId
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
nInfo.Retire = nodeInfo.Retire
@@ -312,7 +309,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
}
//如果为完整同步,则找出差异的结点
var willDelNodeId []int32
var willDelNodeId []string
if req.IsFull == true {
diffNode := dc.fullCompareDiffNode(req.MasterNodeId, mapNodeInfo)
if len(diffNode) > 0 {
@@ -321,16 +318,16 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
}
//指定删除结点
if req.DelNodeId > 0 && req.DelNodeId != int32(dc.localNodeId) {
if req.DelNodeId != rpc.NodeIdNull && req.DelNodeId != dc.localNodeId {
willDelNodeId = append(willDelNodeId, req.DelNodeId)
}
//删除不必要的结点
for _, nodeId := range willDelNodeId {
cluster.TriggerDiscoveryEvent(false,int(nodeId),nil)
dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
cluster.TriggerDiscoveryEvent(false,nodeId,nil)
dc.removeMasterNode(req.MasterNodeId, nodeId)
if dc.findNodeId(nodeId) == false {
dc.funDelService(int(nodeId), false)
dc.funDelService(nodeId, false)
}
}
@@ -341,13 +338,13 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
continue
}
cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList)
cluster.TriggerDiscoveryEvent(true,nodeInfo.NodeId,nodeInfo.PublicServiceList)
}
return nil
}
func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool {
func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId string) bool {
for i := 0; i < len(cluster.masterDiscoveryNodeList); i++ {
if cluster.masterDiscoveryNodeList[i].NodeId == nodeId {
return true
@@ -357,7 +354,7 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool {
return false
}
func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId string) {
dc.regServiceDiscover(nodeId)
}
@@ -369,22 +366,21 @@ func (dc *DynamicDiscoveryClient) OnRetire(){
var nodeRetireReq rpc.NodeRetireReq
nodeRetireReq.NodeInfo = &rpc.NodeInfo{}
nodeRetireReq.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
nodeRetireReq.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
nodeRetireReq.NodeInfo.NodeId = cluster.localNodeInfo.NodeId
nodeRetireReq.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
nodeRetireReq.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
nodeRetireReq.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
nodeRetireReq.NodeInfo.Retire = dc.bRetire
nodeRetireReq.NodeInfo.Private = cluster.localNodeInfo.Private
err := dc.GoNode(int(masterNodeList[i].NodeId),NodeRetireRpcMethod,&nodeRetireReq)
err := dc.GoNode(masterNodeList[i].NodeId,NodeRetireRpcMethod,&nodeRetireReq)
if err!= nil {
log.Error("call "+NodeRetireRpcMethod+" is fail",log.ErrorAttr("err",err))
}
}
}
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId string){
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil {
return
@@ -392,8 +388,7 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
var req rpc.ServiceDiscoverReq
req.NodeInfo = &rpc.NodeInfo{}
req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
@@ -416,13 +411,13 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
}
}
func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,serviceName string) bool{
func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId string,serviceName string) bool{
canDiscovery := true
for i:=0;i<len(cluster.GetLocalNodeInfo().MasterDiscoveryService);i++{
masterNodeId := cluster.GetLocalNodeInfo().MasterDiscoveryService[i].MasterNodeId
if masterNodeId == fromMasterNodeId || masterNodeId == 0 {
if masterNodeId == fromMasterNodeId || masterNodeId == rpc.NodeIdNull {
canDiscovery = false
for _,discoveryService := range cluster.GetLocalNodeInfo().MasterDiscoveryService[i].DiscoveryService {
@@ -436,8 +431,8 @@ func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,ser
return canDiscovery
}
func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.NodeInfo) bool{
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId string,nodeInfo *rpc.NodeInfo) bool{
if nodeInfo == nil || nodeInfo.Private == true || nodeInfo.NodeId == dc.localNodeId {
return false
}
@@ -456,8 +451,7 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.N
var nInfo NodeInfo
nInfo.ServiceList = discoverServiceSlice
nInfo.PublicServiceList = discoverServiceSlice
nInfo.NodeId = int(nodeInfo.NodeId)
nInfo.NodeName = nodeInfo.NodeName
nInfo.NodeId = nodeInfo.NodeId
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
nInfo.Retire = nodeInfo.Retire
@@ -468,12 +462,12 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.N
return true
}
func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) {
func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId string) {
//将Discard结点清理
cluster.DiscardNode(nodeId)
}
func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int, funDelNode FunDelNode, funSetNodeInfo FunSetNodeInfo) error {
func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNodeInfo FunSetNodeInfo) error {
dc.localNodeId = localNodeId
dc.funDelService = funDelNode
dc.funSetService = funSetNodeInfo

View File

@@ -31,7 +31,7 @@ func (cls *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList, error) {
return c, nil
}
func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]interface{}, map[int]map[string]interface{}, error) {
func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]interface{}, map[string]map[string]interface{}, error) {
c := map[string]interface{}{}
//读取配置
d, err := os.ReadFile(filepath)
@@ -50,7 +50,7 @@ func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]
serviceConfig = serviceCfg.(map[string]interface{})
}
mapNodeService := map[int]map[string]interface{}{}
mapNodeService := map[string]map[string]interface{}{}
nodeServiceCfg, ok := c["NodeService"]
if ok == true {
nodeServiceList := nodeServiceCfg.([]interface{})
@@ -60,13 +60,13 @@ func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]
if ok == false {
log.Fatal("NodeService list not find nodeId field")
}
mapNodeService[int(nodeId.(float64))] = serviceCfg
mapNodeService[nodeId.(string)] = serviceCfg
}
}
return GlobalCfg, serviceConfig, mapNodeService, nil
}
func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo, []NodeInfo, error) {
func (cls *Cluster) readLocalClusterConfig(nodeId string) ([]NodeInfo, []NodeInfo, error) {
var nodeInfoList []NodeInfo
var masterDiscoverNodeList []NodeInfo
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
@@ -85,14 +85,14 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo, []NodeInfo,
}
masterDiscoverNodeList = append(masterDiscoverNodeList, localNodeInfoList.MasterDiscoveryNode...)
for _, nodeInfo := range localNodeInfoList.NodeList {
if nodeInfo.NodeId == nodeId || nodeId == 0 {
if nodeInfo.NodeId == nodeId || nodeId == rpc.NodeIdNull {
nodeInfoList = append(nodeInfoList, nodeInfo)
}
}
}
}
if nodeId != 0 && (len(nodeInfoList) != 1) {
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
return nil, nil, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId)
}
@@ -110,7 +110,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo, []NodeInfo,
return masterDiscoverNodeList, nodeInfoList, nil
}
func (cls *Cluster) readLocalService(localNodeId int) error {
func (cls *Cluster) readLocalService(localNodeId string) error {
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
@@ -207,7 +207,7 @@ func (cls *Cluster) parseLocalCfg() {
for _, sName := range cls.localNodeInfo.ServiceList {
if _, ok := cls.mapServiceNode[sName]; ok == false {
cls.mapServiceNode[sName] = make(map[int]struct{})
cls.mapServiceNode[sName] = make(map[string]struct{})
}
cls.mapServiceNode[sName][cls.localNodeInfo.NodeId] = struct{}{}
@@ -227,10 +227,10 @@ func (cls *Cluster) checkDiscoveryNodeList(discoverMasterNode []NodeInfo) bool {
return true
}
func (cls *Cluster) InitCfg(localNodeId int) error {
func (cls *Cluster) InitCfg(localNodeId string) error {
cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[int]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[int]struct{}{}
cls.mapRpc = map[string]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[string]struct{}{}
//加载本地结点的NodeList配置
discoveryNode, nodeInfoList, err := cls.readLocalClusterConfig(localNodeId)

View File

@@ -2,11 +2,11 @@ package cluster
type OperType int
type FunDelNode func (nodeId int,immediately bool)
type FunDelNode func (nodeId string,immediately bool)
type FunSetNodeInfo func(nodeInfo *NodeInfo)
type IServiceDiscovery interface {
InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error
InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error
OnNodeStop()
}