diff --git a/cluster/cluster.go b/cluster/cluster.go index 572d099..f05d750 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -85,7 +85,7 @@ func (cls *Cluster) DiscardNode(nodeId int){ } func (cls *Cluster) DelNode(nodeId int,immediately bool){ - //不删除 + //MasterDiscover结点与本地结点不删除 if cls.GetMasterDiscoveryNodeInfo(nodeId)!=nil || nodeId == cls.localNodeInfo.NodeId { return } diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index b321660..d3a418d 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -5,22 +5,19 @@ import ( "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" - "time" ) -const maxTryCount = 30 //最大重试次数 -const perTrySecond = 2*time.Second //每次重试间隔2秒 const DynamicDiscoveryMasterName = "DiscoveryMaster" const DynamicDiscoveryClientName = "DiscoveryClient" -const RegServiceDiscover = DynamicDiscoveryMasterName+".RPC_RegServiceDiscover" -const SubServiceDiscover = DynamicDiscoveryClientName+".RPC_SubServiceDiscover" -const AddSubServiceDiscover = DynamicDiscoveryMasterName+".RPC_AddSubServiceDiscover" +const RegServiceDiscover = DynamicDiscoveryMasterName + ".RPC_RegServiceDiscover" +const SubServiceDiscover = DynamicDiscoveryClientName + ".RPC_SubServiceDiscover" +const AddSubServiceDiscover = DynamicDiscoveryMasterName + ".RPC_AddSubServiceDiscover" type DynamicDiscoveryMaster struct { service.Service mapNodeInfo map[int32]struct{} - nodeInfo []*rpc.NodeInfo + nodeInfo []*rpc.NodeInfo } type DynamicDiscoveryClient struct { @@ -28,48 +25,49 @@ type DynamicDiscoveryClient struct { funDelService FunDelNode funSetService FunSetNodeInfo - localNodeId int -} + localNodeId int + mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{} +} var masterService DynamicDiscoveryMaster var clientService DynamicDiscoveryClient -func getDynamicDiscovery() IServiceDiscovery{ +func getDynamicDiscovery() IServiceDiscovery { return &clientService } -func init(){ +func init() { masterService.SetName(DynamicDiscoveryMasterName) clientService.SetName(DynamicDiscoveryClientName) } -func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo){ - if len(nodeInfo.PublicServiceList)==0 { +func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool { + _, ok := ds.mapNodeInfo[nodeId] + return ok +} + +func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) { + if len(nodeInfo.PublicServiceList) == 0 { return } - _,ok := ds.mapNodeInfo[nodeInfo.NodeId] + _, ok := ds.mapNodeInfo[nodeInfo.NodeId] if ok == true { return } ds.mapNodeInfo[nodeInfo.NodeId] = struct{}{} - ds.nodeInfo = append(ds.nodeInfo,nodeInfo) + ds.nodeInfo = append(ds.nodeInfo, nodeInfo) } -func (ds *DynamicDiscoveryMaster) RPC_AddSubServiceDiscover(nodeInfo *rpc.NodeInfo,ret *rpc.Empty) error{ - ds.addNodeInfo(nodeInfo) - return nil -} - -func (ds *DynamicDiscoveryMaster) OnInit() error{ - ds.mapNodeInfo = make(map[int32] struct{},20) +func (ds *DynamicDiscoveryMaster) OnInit() error { + ds.mapNodeInfo = make(map[int32]struct{}, 20) ds.RegRpcListener(ds) return nil } -func (ds *DynamicDiscoveryMaster) OnStart(){ +func (ds *DynamicDiscoveryMaster) OnStart() { var nodeInfo rpc.NodeInfo localNodeInfo := cluster.GetLocalNodeInfo() if localNodeInfo.Private == true { @@ -84,26 +82,44 @@ func (ds *DynamicDiscoveryMaster) OnStart(){ ds.addNodeInfo(&nodeInfo) } -func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int){ +func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int) { + //没注册过结点不通知 + if ds.isRegNode(int32(nodeId)) == false { + return + } + //向它发布所有服务列表信息 var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.IsFull = true notifyDiscover.NodeInfo = ds.nodeInfo notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) - ds.GoNode(nodeId,SubServiceDiscover,¬ifyDiscover) + + ds.GoNode(nodeId, SubServiceDiscover, ¬ifyDiscover) } -func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int){ +func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) { + if ds.isRegNode(int32(nodeId)) == false { + return + } + var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) notifyDiscover.DelNodeId = int32(nodeId) //删除结点 - cluster.DelNode(nodeId,true) - ds.CastGo(SubServiceDiscover,¬ifyDiscover) + cluster.DelNode(nodeId, true) + + //无注册过的结点不广播,避免非当前Master网络中的连接断开时通知到本网络 + ds.CastGo(SubServiceDiscover, ¬ifyDiscover) +} + +func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) { + for nodeId, _ := range ds.mapNodeInfo { + ds.GoNode(int(nodeId), serviceMethod, args) + } } // 收到注册过来的结点 -func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error{ +func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error { if req.NodeInfo == nil { err := fmt.Errorf("RPC_RegServiceDiscover req is error.") log.Error(err.Error()) @@ -114,8 +130,8 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove //广播给其他所有结点 var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) - notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo,req.NodeInfo) - ds.CastGo(SubServiceDiscover,¬ifyDiscover) + notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo) + ds.RpcCastGo(SubServiceDiscover, ¬ifyDiscover) //存入本地 ds.addNodeInfo(req.NodeInfo) @@ -130,28 +146,56 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove nodeInfo.ListenAddr = req.NodeInfo.ListenAddr //主动删除已经存在的结点,确保先断开,再连接 - cluster.serviceDiscoveryDelNode(nodeInfo.NodeId,true) + cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true) //加入到本地Cluster模块中,将连接该结点 - //如果本结点不为master结点,而且没有可使用的服务,不加入 cluster.serviceDiscoverySetNodeInfo(&nodeInfo) return nil } -func (dc *DynamicDiscoveryClient) OnInit() error{ +func (dc *DynamicDiscoveryClient) OnInit() error { dc.RegRpcListener(dc) + dc.mapDiscovery = map[int32]map[int32]struct{}{} return nil } -func (dc *DynamicDiscoveryClient) OnStart(){ +func (dc *DynamicDiscoveryClient) addMasterNode(masterNodeId int32, nodeId int32) { + _, ok := dc.mapDiscovery[masterNodeId] + if ok == false { + dc.mapDiscovery[masterNodeId] = map[int32]struct{}{} + } + dc.mapDiscovery[masterNodeId][nodeId] = struct{}{} +} + +func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId int32, nodeId int32) { + mapNodeId, ok := dc.mapDiscovery[masterNodeId] + if ok == false { + return + } + + delete(mapNodeId, nodeId) +} + +func (dc *DynamicDiscoveryClient) findNodeId(nodeId int32) bool { + for _, mapNodeId := range dc.mapDiscovery { + _, ok := mapNodeId[nodeId] + if ok == true { + return true + } + } + + return false +} + +func (dc *DynamicDiscoveryClient) OnStart() { //2.添加并连接发现主结点 dc.addDiscoveryMaster() } -func (dc *DynamicDiscoveryClient) addDiscoveryMaster(){ +func (dc *DynamicDiscoveryClient) addDiscoveryMaster() { discoveryNodeList := cluster.GetDiscoveryNodeList() - for i:=0;i 0 { - willDelNodeId = append(willDelNodeId, int(req.DelNodeId)) - } - - //删除不必要的结点 - for _, nodeId := range willDelNodeId { - dc.funDelService(nodeId, false) - } - } - for _, nodeInfo := range req.NodeInfo { //不对本地结点或者不存在任何公开服务的结点 if int(nodeInfo.NodeId) == dc.localNodeId { continue } + if cluster.IsMasterDiscoveryNode() == false && len(nodeInfo.PublicServiceList) == 1 && + nodeInfo.PublicServiceList[0] == DynamicDiscoveryClientName { + continue + } + //遍历所有的公开服务,并筛选之 for _, serviceName := range nodeInfo.PublicServiceList { //只有存在配置时才做筛选 - if len(mapMasterDiscoveryService)>0 { + if len(mapMasterDiscoveryService) > 0 { if _, ok := mapMasterDiscoveryService[serviceName]; ok == false { continue } @@ -228,19 +271,40 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco } } + //如果为完整同步,则找出差异的结点 + var willDelNodeId []int32 + //如果不是邻居结点,则做筛选 + if req.IsFull == true { + diffNode := dc.fullCompareDiffNode(req.MasterNodeId, mapNodeInfo) + if len(diffNode) > 0 { + willDelNodeId = append(willDelNodeId, diffNode...) + } + } + //指定删除结点 + if req.DelNodeId > 0 && req.DelNodeId != int32(dc.localNodeId) { + willDelNodeId = append(willDelNodeId, req.DelNodeId) + } + + //删除不必要的结点 + for _, nodeId := range willDelNodeId { + dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) + if dc.findNodeId(nodeId) == false { + dc.funDelService(int(nodeId), false) + } + } //设置新结点 for _, nodeInfo := range mapNodeInfo { + dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId) dc.setNodeInfo(nodeInfo) } - return nil } -func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool{ - for i:=0;i< len(cluster.masterDiscoveryNodeList);i++{ +func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool { + for i := 0; i < len(cluster.masterDiscoveryNodeList); i++ { if cluster.masterDiscoveryNodeList[i].NodeId == nodeId { return true } @@ -260,11 +324,12 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr - //DiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务 - if len(nodeInfo.NeighborService)==0{ + + //MasterDiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务 + if len(nodeInfo.NeighborService) == 0 { req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList - }else{ - req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList,DynamicDiscoveryClientName) + } else { + req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList, DynamicDiscoveryClientName) } //向Master服务同步本Node服务信息 @@ -279,34 +344,29 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { } } -func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo){ - if nodeInfo==nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId{ +func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) { + if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId { return } //筛选关注的服务 localNodeInfo := cluster.GetLocalNodeInfo() - if len(localNodeInfo.DiscoveryService) >0 { - var discoverServiceSlice = make([]string,0,24) - for _,pubService := range nodeInfo.PublicServiceList { + if len(localNodeInfo.DiscoveryService) > 0 { + var discoverServiceSlice = make([]string, 0, 24) + for _, pubService := range nodeInfo.PublicServiceList { for _, discoverService := range localNodeInfo.DiscoveryService { if pubService == discoverService { - discoverServiceSlice = append(discoverServiceSlice,pubService) + discoverServiceSlice = append(discoverServiceSlice, pubService) } } } nodeInfo.PublicServiceList = discoverServiceSlice } - if len(nodeInfo.PublicServiceList)==0{ + if len(nodeInfo.PublicServiceList) == 0 { return } - if cluster.IsMasterDiscoveryNode() { - var ret rpc.Empty - dc.Call(AddSubServiceDiscover,nodeInfo,&ret) - } - var nInfo NodeInfo nInfo.ServiceList = nodeInfo.PublicServiceList nInfo.PublicServiceList = nodeInfo.PublicServiceList @@ -316,13 +376,12 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo){ dc.funSetService(&nInfo) } - -func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int){ +func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) { //将Discard结点清理 cluster.DiscardNode(nodeId) } -func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{ +func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int, funDelNode FunDelNode, funSetNodeInfo FunSetNodeInfo) error { dc.localNodeId = localNodeId dc.funDelService = funDelNode dc.funSetService = funSetNodeInfo @@ -330,7 +389,5 @@ func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int,funDelNode FunDe return nil } -func (dc *DynamicDiscoveryClient) OnNodeStop(){ - +func (dc *DynamicDiscoveryClient) OnNodeStop() { } -