package cluster import ( "errors" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" ) const DynamicDiscoveryMasterName = "DiscoveryMaster" const DynamicDiscoveryClientName = "DiscoveryClient" const RegServiceDiscover = DynamicDiscoveryMasterName + ".RPC_RegServiceDiscover" const SubServiceDiscover = DynamicDiscoveryClientName + ".RPC_SubServiceDiscover" const AddSubServiceDiscover = DynamicDiscoveryMasterName + ".RPC_AddSubServiceDiscover" type DynamicDiscoveryMaster struct { service.Service mapNodeInfo map[int32]struct{} nodeInfo []*rpc.NodeInfo } type DynamicDiscoveryClient struct { service.Service funDelService FunDelNode funSetService FunSetNodeInfo localNodeId int mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{} } var masterService DynamicDiscoveryMaster var clientService DynamicDiscoveryClient func getDynamicDiscovery() IServiceDiscovery { return &clientService } func init() { masterService.SetName(DynamicDiscoveryMasterName) clientService.SetName(DynamicDiscoveryClientName) } func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool { _, ok := ds.mapNodeInfo[nodeId] return ok } func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) { if len(nodeInfo.PublicServiceList) == 0 { return } _, ok := ds.mapNodeInfo[nodeInfo.NodeId] if ok == true { return } ds.mapNodeInfo[nodeInfo.NodeId] = struct{}{} ds.nodeInfo = append(ds.nodeInfo, nodeInfo) } func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) { if _,ok:= ds.mapNodeInfo[nodeId];ok == false { return } for i:=0;i 0 { if _, ok := mapMasterDiscoveryService[serviceName]; ok == false { continue } } nInfo := mapNodeInfo[nodeInfo.NodeId] if nInfo == nil { nInfo = &rpc.NodeInfo{} nInfo.NodeId = nodeInfo.NodeId nInfo.NodeName = nodeInfo.NodeName nInfo.ListenAddr = nodeInfo.ListenAddr nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen mapNodeInfo[nodeInfo.NodeId] = nInfo } nInfo.PublicServiceList = append(nInfo.PublicServiceList, serviceName) } } //如果为完整同步,则找出差异的结点 var willDelNodeId []int32 //如果不是邻居结点,则做筛选 if req.IsFull == true { diffNode := dc.fullCompareDiffNode(req.MasterNodeId, mapNodeInfo) if len(diffNode) > 0 { willDelNodeId = append(willDelNodeId, diffNode...) } } //指定删除结点 if req.DelNodeId > 0 && req.DelNodeId != int32(dc.localNodeId) { willDelNodeId = append(willDelNodeId, req.DelNodeId) } //删除不必要的结点 for _, nodeId := range willDelNodeId { nodeInfo,_ := cluster.GetNodeInfo(int(nodeId)) cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList) dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) if dc.findNodeId(nodeId) == false { dc.funDelService(int(nodeId), false) } } //设置新结点 for _, nodeInfo := range mapNodeInfo { dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId) dc.setNodeInfo(nodeInfo) if len(nodeInfo.PublicServiceList) == 0 { continue } cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList) } return nil } func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool { for i := 0; i < len(cluster.masterDiscoveryNodeList); i++ { if cluster.masterDiscoveryNodeList[i].NodeId == nodeId { return true } } return false } func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId) if nodeInfo == nil { return } var req rpc.ServiceDiscoverReq req.NodeInfo = &rpc.NodeInfo{} req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen //MasterDiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务 if len(nodeInfo.NeighborService) == 0 { req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList } else { req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList, DynamicDiscoveryClientName) } //向Master服务同步本Node服务信息 err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { if err != nil { log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) return } }) if err != nil { log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) } } func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) { if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId { return } //筛选关注的服务 localNodeInfo := cluster.GetLocalNodeInfo() if len(localNodeInfo.DiscoveryService) > 0 { var discoverServiceSlice = make([]string, 0, 24) for _, pubService := range nodeInfo.PublicServiceList { for _, discoverService := range localNodeInfo.DiscoveryService { if pubService == discoverService { discoverServiceSlice = append(discoverServiceSlice, pubService) } } } nodeInfo.PublicServiceList = discoverServiceSlice } if len(nodeInfo.PublicServiceList) == 0 { return } var nInfo NodeInfo nInfo.ServiceList = nodeInfo.PublicServiceList nInfo.PublicServiceList = nodeInfo.PublicServiceList nInfo.NodeId = int(nodeInfo.NodeId) nInfo.NodeName = nodeInfo.NodeName nInfo.ListenAddr = nodeInfo.ListenAddr nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen dc.funSetService(&nInfo) } func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) { //将Discard结点清理 cluster.DiscardNode(nodeId) } func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int, funDelNode FunDelNode, funSetNodeInfo FunSetNodeInfo) error { dc.localNodeId = localNodeId dc.funDelService = funDelNode dc.funSetService = funSetNodeInfo return nil } func (dc *DynamicDiscoveryClient) OnNodeStop() { }