diff --git a/cluster/cluster.go b/cluster/cluster.go index 58a4508..d5c28f6 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -13,6 +13,12 @@ var configDir = "./config/" type SetupServiceFun func(s ...service.IService) +type NodeStatus int +const( + Normal NodeStatus = 0 //正常 + Discard NodeStatus = 1 //丢弃 +) + type NodeInfo struct { NodeId int NodeName string @@ -21,6 +27,7 @@ type NodeInfo struct { ServiceList []string //所有的服务列表 PublicServiceList []string //对外公开的服务列表 DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 + status NodeStatus } type NodeRpcInfo struct { @@ -66,7 +73,17 @@ func (cls *Cluster) Stop() { cls.serviceDiscovery.OnNodeStop() } -func (cls *Cluster) DelNode(nodeId int){ +func (cls *Cluster) DiscardNode(nodeId int){ + cls.locker.Lock() + nodeInfo,ok := cls.mapIdNode[nodeId] + cls.locker.Unlock() + + if ok==true && nodeInfo.status == Discard { + cls.DelNode(nodeId,true) + } +} + +func (cls *Cluster) DelNode(nodeId int,immediately bool){ cls.locker.Lock() nodeInfo,ok := cls.mapIdNode[nodeId] if ok == false { @@ -74,26 +91,43 @@ func (cls *Cluster) DelNode(nodeId int){ return } + rpc,ok := cls.mapRpc[nodeId] + for{ + //立即删除 + if immediately || ok == false { + break + } + + rpc.client.Lock() + //正在连接中不主动断开,只断开没有连接中的 + if rpc.client.IsConnected() { + nodeInfo.status = Discard + rpc.client.Unlock() + cls.locker.Unlock() + return + } + rpc.client.Unlock() + break + } + for _,serviceName := range nodeInfo.ServiceList{ cls.delServiceNode(serviceName,nodeId) } - rpc,ok := cls.mapRpc[nodeId] delete(cls.mapIdNode,nodeId) delete(cls.mapRpc,nodeId) - cls.locker.Unlock() if ok == true { rpc.client.Close(false) } } -func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){ +func (cls *Cluster) serviceDiscoveryDelNode (nodeId int,immediately bool){ if nodeId == 0 { return } - cls.DelNode(nodeId) + cls.DelNode(nodeId,immediately) } func (cls *Cluster) delServiceNode(serviceName string,nodeId int){ @@ -151,6 +185,10 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ rpcInfo.client.TriggerRpcEvent = cls.triggerRpcEvent rpcInfo.client.Connect(nodeInfo.NodeId,nodeInfo.ListenAddr) cls.mapRpc[nodeInfo.NodeId] = rpcInfo + + //debug + fmt.Printf("xxxxxxxxxxxx") + fmt.Println(nodeInfo) } func (cls *Cluster) buildLocalRpc(){ @@ -339,4 +377,17 @@ func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)){ fetchFun(nodeId) } cls.locker.Unlock() -} \ No newline at end of file +} + +func HasService(nodeId int,serviceName string) bool{ + cluster.locker.RLock() + defer cluster.locker.RUnlock() + + mapNode,_ := cluster.mapServiceNode[serviceName] + if mapNode!=nil { + _,ok := mapNode[nodeId] + return ok + } + + return false +} diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index 2fc131f..a6538ee 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -84,7 +84,7 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int){ var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.DelNodeId = int32(nodeId) //删除结点 - cluster.DelNode(nodeId) + cluster.DelNode(nodeId,true) ds.CastGo(DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) } @@ -115,7 +115,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove nodeInfo.ListenAddr = req.NodeInfo.ListenAddr //主动删除已经存在的结点,确保先断开,再连接 - cluster.serviceDiscoveryDelNode(nodeInfo.NodeId) + cluster.serviceDiscoveryDelNode(nodeInfo.NodeId,true) //加入到本地Cluster模块中,将连接该结点 //如果本结点不为master结点,而且没有可使用的服务,不加入 @@ -174,7 +174,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco //删除不必要的结点 for _,nodeId := range willDelNodeId { - dc.funDelService(nodeId) + dc.funDelService(nodeId,false) } //发现新结点 @@ -210,6 +210,7 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { //如果是连接发现主服成功,则同步服务信息 err := dc.AsyncCallNode(nodeId, DynamicDiscoveryMasterNameRpcMethod, &req, func(res *rpc.ServiceDiscoverRes, err error) { if err != nil { + cluster.DelNode(nodeId,true) log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error()) return } @@ -253,6 +254,8 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo){ func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int){ + //将Discard结点清理 + cluster.DiscardNode(nodeId) } func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{ diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index 4c276f5..756047c 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -167,7 +167,7 @@ func (cls *Cluster) parseLocalCfg(){ func (cls *Cluster) checkDiscoveryNodeList(discoverMasterNode []NodeInfo) bool{ for i:=0;i