diff --git a/cluster/cluster.go b/cluster/cluster.go index e502961..572d099 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -150,6 +150,7 @@ func (cls *Cluster) delServiceNode(serviceName string,nodeId int){ } } + func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ //本地结点不加入 if nodeInfo.NodeId == cls.localNodeInfo.NodeId { @@ -272,6 +273,10 @@ func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo{ return nil } +func (cls *Cluster) IsMasterDiscoveryNode() bool{ + return cls.GetMasterDiscoveryNodeInfo(cls.GetLocalNodeInfo().NodeId)!=nil +} + func (cls *Cluster) SetupServiceDiscovery(localNodeId int,setupServiceFun SetupServiceFun) { if cls.serviceDiscovery!=nil { return diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index 87b92bf..b321660 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -12,8 +12,10 @@ const maxTryCount = 30 //最大重试次数 const perTrySecond = 2*time.Second //每次重试间隔2秒 const DynamicDiscoveryMasterName = "DiscoveryMaster" const DynamicDiscoveryClientName = "DiscoveryClient" -const DynamicDiscoveryMasterNameRpcMethod = DynamicDiscoveryMasterName+".RPC_RegServiceDiscover" -const DynamicDiscoveryClientNameRpcMethod = DynamicDiscoveryClientName+".RPC_SubServiceDiscover" +const RegServiceDiscover = DynamicDiscoveryMasterName+".RPC_RegServiceDiscover" +const SubServiceDiscover = DynamicDiscoveryClientName+".RPC_SubServiceDiscover" +const AddSubServiceDiscover = DynamicDiscoveryMasterName+".RPC_AddSubServiceDiscover" + type DynamicDiscoveryMaster struct { service.Service @@ -43,6 +45,10 @@ func init(){ } func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo){ + if len(nodeInfo.PublicServiceList)==0 { + return + } + _,ok := ds.mapNodeInfo[nodeInfo.NodeId] if ok == true { return @@ -51,6 +57,11 @@ func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo){ ds.nodeInfo = append(ds.nodeInfo,nodeInfo) } +func (ds *DynamicDiscoveryMaster) RPC_AddSubServiceDiscover(nodeInfo *rpc.NodeInfo,ret *rpc.Empty) error{ + ds.addNodeInfo(nodeInfo) + return nil +} + func (ds *DynamicDiscoveryMaster) OnInit() error{ ds.mapNodeInfo = make(map[int32] struct{},20) ds.RegRpcListener(ds) @@ -79,7 +90,7 @@ func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int){ notifyDiscover.IsFull = true notifyDiscover.NodeInfo = ds.nodeInfo notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) - ds.GoNode(nodeId,DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) + ds.GoNode(nodeId,SubServiceDiscover,¬ifyDiscover) } func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int){ @@ -88,7 +99,7 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int){ notifyDiscover.DelNodeId = int32(nodeId) //删除结点 cluster.DelNode(nodeId,true) - ds.CastGo(DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) + ds.CastGo(SubServiceDiscover,¬ifyDiscover) } // 收到注册过来的结点 @@ -104,7 +115,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo,req.NodeInfo) - ds.CastGo(DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) + ds.CastGo(SubServiceDiscover,¬ifyDiscover) //存入本地 ds.addNodeInfo(req.NodeInfo) @@ -145,7 +156,6 @@ func (dc *DynamicDiscoveryClient) addDiscoveryMaster(){ continue } dc.funSetService(&discoveryNodeList[i]) - //cluster.serviceDiscoverySetNodeInfo(&discoveryNodeList[i]) } } @@ -218,11 +228,14 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco } } + + //设置新结点 for _, nodeInfo := range mapNodeInfo { dc.setNodeInfo(nodeInfo) } + return nil } @@ -250,17 +263,19 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { //DiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务 if len(nodeInfo.NeighborService)==0{ req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList + }else{ + req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList,DynamicDiscoveryClientName) } //向Master服务同步本Node服务信息 - err := dc.AsyncCallNode(nodeId, DynamicDiscoveryMasterNameRpcMethod, &req, func(res *rpc.Empty, err error) { + err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { if err != nil { - log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error()) + log.Error("call %s is fail :%s", RegServiceDiscover, err.Error()) return } }) if err != nil { - log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error()) + log.Error("call %s is fail :%s", RegServiceDiscover, err.Error()) } } @@ -287,6 +302,11 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo){ return } + if cluster.IsMasterDiscoveryNode() { + var ret rpc.Empty + dc.Call(AddSubServiceDiscover,nodeInfo,&ret) + } + var nInfo NodeInfo nInfo.ServiceList = nodeInfo.PublicServiceList nInfo.PublicServiceList = nodeInfo.PublicServiceList diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index a7df1df..35786d3 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -396,11 +396,14 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error { var pClientList [maxClusterNode]*Client err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:]) - if count==0||err != nil { + if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err - } - if count > 1 { + }else if count <=0 { + err = fmt.Errorf("Call serviceMethod is error:cannot find %s",serviceMethod) + log.Error("%s",err.Error()) + return err + }else if count > 1 { log.Error("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!") }