package cluster import ( "errors" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" "time" "github.com/duanhf2012/origin/util/timer" ) const DynamicDiscoveryMasterName = "DiscoveryMaster" const DynamicDiscoveryClientName = "DiscoveryClient" const RegServiceDiscover = DynamicDiscoveryMasterName + ".RPC_RegServiceDiscover" const SubServiceDiscover = DynamicDiscoveryClientName + ".RPC_SubServiceDiscover" const AddSubServiceDiscover = DynamicDiscoveryMasterName + ".RPC_AddSubServiceDiscover" type DynamicDiscoveryMaster struct { service.Service mapNodeInfo map[int32]struct{} nodeInfo []*rpc.NodeInfo } type DynamicDiscoveryClient struct { service.Service funDelService FunDelNode funSetService FunSetNodeInfo localNodeId int mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{} } var masterService DynamicDiscoveryMaster var clientService DynamicDiscoveryClient func getDynamicDiscovery() IServiceDiscovery { return &clientService } func init() { masterService.SetName(DynamicDiscoveryMasterName) clientService.SetName(DynamicDiscoveryClientName) } func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool { _, ok := ds.mapNodeInfo[nodeId] return ok } func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) { if len(nodeInfo.PublicServiceList) == 0 { return } _, ok := ds.mapNodeInfo[nodeInfo.NodeId] if ok == true { return } ds.mapNodeInfo[nodeInfo.NodeId] = struct{}{} ds.nodeInfo = append(ds.nodeInfo, nodeInfo) } func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) { if _,ok:= ds.mapNodeInfo[nodeId];ok == false { return } for i:=0;i 0 { willDelNodeId = append(willDelNodeId, diffNode...) } } //指定删除结点 if req.DelNodeId > 0 && req.DelNodeId != int32(dc.localNodeId) { willDelNodeId = append(willDelNodeId, req.DelNodeId) } //删除不必要的结点 for _, nodeId := range willDelNodeId { nodeInfo,_ := cluster.GetNodeInfo(int(nodeId)) cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList) dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) if dc.findNodeId(nodeId) == false { dc.funDelService(int(nodeId), false) } } //设置新结点 for _, nodeInfo := range mapNodeInfo { bSet := dc.setNodeInfo(req.MasterNodeId,nodeInfo) if bSet == false { continue } cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList) } return nil } func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool { for i := 0; i < len(cluster.masterDiscoveryNodeList); i++ { if cluster.masterDiscoveryNodeList[i].NodeId == nodeId { return true } } return false } func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { dc.regServiceDiscover(nodeId) } func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){ nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId) if nodeInfo == nil { return } var req rpc.ServiceDiscoverReq req.NodeInfo = &rpc.NodeInfo{} req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList //向Master服务同步本Node服务信息 err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { if err != nil { log.Error("call "+RegServiceDiscover+" is fail :"+ err.Error()) dc.AfterFunc(time.Second*3, func(timer *timer.Timer) { dc.regServiceDiscover(nodeId) }) return } }) if err != nil { log.Error("call "+ RegServiceDiscover+" is fail :"+ err.Error()) } } func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,serviceName string) bool{ canDiscovery := true for i:=0;i