From a60ad1cccf29ddf6e0ca8773d72d5ad5c575f335 Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 29 Apr 2021 17:18:13 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=94=AF=E6=8C=81=E5=8A=A8=E6=80=81=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=8F=91=E7=8E=B0=E5=8A=9F=E8=83=BD=202.Service?= =?UTF-8?q?=E4=B8=AD=E6=94=AF=E6=8C=81=E5=AF=B9RPC=E7=BB=93=E7=82=B9?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=88=96=E6=96=AD=E5=BC=80=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 253 +++++--- cluster/configdiscovery.go | 28 +- cluster/dynamicdiscovery.go | 269 ++++++++ cluster/parsecfg.go | 74 ++- cluster/servicediscovery.go | 4 +- event/eventtype.go | 1 + network/tcp_client.go | 2 +- node/node.go | 2 +- rpc/client.go | 19 +- rpc/dynamicdiscover.pb.go | 1181 +++++++++++++++++++++++++++++++++++ rpc/dynamicdiscover.proto | 29 + rpc/gogorpc.pb.go | 43 +- rpc/rpchandler.go | 5 +- service/module.go | 4 +- service/service.go | 38 +- service/servicemgr.go | 3 + 16 files changed, 1797 insertions(+), 158 deletions(-) create mode 100644 cluster/dynamicdiscovery.go create mode 100644 rpc/dynamicdiscover.pb.go create mode 100644 rpc/dynamicdiscover.proto diff --git a/cluster/cluster.go b/cluster/cluster.go index d300785..58a4508 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -11,13 +11,16 @@ import ( var configDir = "./config/" +type SetupServiceFun func(s ...service.IService) type NodeInfo struct { NodeId int NodeName string Private bool ListenAddr string - ServiceList []string + ServiceList []string //所有的服务列表 + PublicServiceList []string //对外公开的服务列表 + DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 } type NodeRpcInfo struct { @@ -27,16 +30,24 @@ type NodeRpcInfo struct { var cluster Cluster type Cluster struct { - localNodeInfo NodeInfo - localServiceCfg map[string]interface{} //map[serviceName]配置数据* - mapRpc map[int] NodeRpcInfo //nodeId - serviceDiscovery IServiceDiscovery //服务发现接口 - mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo - mapServiceNode map[string][]int //map[serviceName]NodeInfo - locker sync.RWMutex - rpcServer rpc.Server + localNodeInfo NodeInfo //本结点配置信息 + discoveryNodeList []NodeInfo //配置发现Master结点 - rpcListerList []rpc.IRpcListener + localServiceCfg map[string]interface{} //map[serviceName]配置数据* + mapRpc map[int] NodeRpcInfo //nodeId + serviceDiscovery IServiceDiscovery //服务发现接口 + + locker sync.RWMutex //结点与服务关系保护锁 + mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo + mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] + + rpcServer rpc.Server + rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 + mapServiceListenRpcEvent map[string]struct{} //ServiceName +} + +func GetCluster() *Cluster{ + return &cluster } func SetConfigDir(cfgDir string){ @@ -47,23 +58,19 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) { cluster.serviceDiscovery = serviceDiscovery } -func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){ - cls.locker.Lock() - defer cls.locker.Unlock() - - cls.delNode(nodeId) +func (cls *Cluster) Start() { + cls.rpcServer.Start(cls.localNodeInfo.ListenAddr) } -func (cls *Cluster) delNode(nodeId int){ - //删除rpc连接关系 - rpc,ok := cls.mapRpc[nodeId] - if ok == true { - delete(cls.mapRpc,nodeId) - rpc.client.Close(false) - } +func (cls *Cluster) Stop() { + cls.serviceDiscovery.OnNodeStop() +} +func (cls *Cluster) DelNode(nodeId int){ + cls.locker.Lock() nodeInfo,ok := cls.mapIdNode[nodeId] if ok == false { + cls.locker.Unlock() return } @@ -71,43 +78,73 @@ func (cls *Cluster) delNode(nodeId int){ cls.delServiceNode(serviceName,nodeId) } + rpc,ok := cls.mapRpc[nodeId] delete(cls.mapIdNode,nodeId) + delete(cls.mapRpc,nodeId) + + cls.locker.Unlock() + if ok == true { + rpc.client.Close(false) + } +} + +func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){ + if nodeId == 0 { + return + } + + cls.DelNode(nodeId) } func (cls *Cluster) delServiceNode(serviceName string,nodeId int){ - nodeList := cls.mapServiceNode[serviceName] - for idx,nId := range nodeList { - if nId == nodeId { - cls.mapServiceNode[serviceName] = append(nodeList[:idx],nodeList[idx+1:]...) - return - } + if nodeId == cls.localNodeInfo.NodeId{ + return + } + + mapNode := cls.mapServiceNode[serviceName] + delete(mapNode,nodeId) + if len(mapNode)==0 { + delete(cls.mapServiceNode,serviceName) } } func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ - if nodeInfo.NodeId == cls.localNodeInfo.NodeId || len(nodeInfo.ServiceList)==0 || nodeInfo.Private == true { + //本地结点不加入 + if nodeInfo.NodeId == cls.localNodeInfo.NodeId { return } cls.locker.Lock() defer cls.locker.Unlock() - //先清理删除 - cls.delNode(nodeInfo.NodeId) + //先清一次的NodeId对应的所有服务清理 + lastNodeInfo,ok := cls.mapIdNode[nodeInfo.NodeId] + if ok == true{ + for _,serviceName := range lastNodeInfo.ServiceList{ + cls.delServiceNode(serviceName,nodeInfo.NodeId) + } + } //再重新组装 mapDuplicate := map[string]interface{}{} //预防重复数据 - for _,serviceName := range nodeInfo.ServiceList { + for _,serviceName := range nodeInfo.PublicServiceList { if _,ok := mapDuplicate[serviceName];ok == true { //存在重复 log.Error("Bad duplicate Service Cfg.") continue } mapDuplicate[serviceName] = nil - cls.mapServiceNode[serviceName] = append(cls.mapServiceNode[serviceName],nodeInfo.NodeId) + if _,ok:=cls.mapServiceNode[serviceName];ok==false { + cls.mapServiceNode[serviceName] = make(map[int]struct{},1) + } + cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{} } - cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo + + //已经存在连接,则不需要进行设置 + if _,rpcInfoOK := cls.mapRpc[nodeInfo.NodeId];rpcInfoOK == true { + return + } rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo rpcInfo.client = &rpc.Client{} @@ -125,29 +162,21 @@ func (cls *Cluster) buildLocalRpc(){ cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo } -func (cls *Cluster) Init(localNodeId int) error{ - cls.locker.Lock() - - //1.处理服务发现接口 - if cls.serviceDiscovery == nil { - cls.serviceDiscovery = &ConfigDiscovery{} - } - - //2.初始化配置 +func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{ + //1.初始化配置 err := cls.InitCfg(localNodeId) if err != nil { - cls.locker.Unlock() return err } cls.rpcServer.Init(cls) cls.buildLocalRpc() - cls.serviceDiscovery.RegFunDelNode(cls.serviceDiscoveryDelNode) - cls.serviceDiscovery.RegFunSetNode(cls.serviceDiscoverySetNodeInfo) - cls.locker.Unlock() + //2.安装服务发现结点 + cls.SetupServiceDiscovery(localNodeId,setupServiceFun) + service.RegRpcEventFun = cls.RegRpcEvent - err = cls.serviceDiscovery.Init(localNodeId) + err = cls.serviceDiscovery.InitDiscovery(localNodeId,cls.serviceDiscoveryDelNode,cls.serviceDiscoverySetNodeInfo) if err != nil { return err } @@ -155,6 +184,56 @@ func (cls *Cluster) Init(localNodeId int) error{ return nil } +func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool,bool){ + var localMaster bool //本结点是否为Master结点 + var hasMaster bool //是否配置Master服务 + + //遍历所有结点 + for _,nodeInfo := range cls.discoveryNodeList{ + if nodeInfo.NodeId == localNodeId { + localMaster = true + } + hasMaster = true + } + + //返回查询结果 + return localMaster,hasMaster +} + +func (cls *Cluster) appendService(serviceName string){ + cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList,serviceName) + if _,ok:=cls.mapServiceNode[serviceName];ok==false { + cls.mapServiceNode[serviceName] = map[int]struct{}{} + } + cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]= struct{}{} +} + +func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo{ + return cls.discoveryNodeList +} + +func (cls *Cluster) SetupServiceDiscovery(localNodeId int,setupServiceFun SetupServiceFun) { + if cls.serviceDiscovery!=nil { + return + } + + //1.如果没有配置DiscoveryNode配置,则使用默认配置文件发现服务 + localMaster,hasMaster := cls.checkDynamicDiscovery(localNodeId) + if hasMaster == false { + cls.serviceDiscovery = &ConfigDiscovery{} + return + } + setupServiceFun(&masterService,&clientService) + + //2.如果为动态服务发现安装本地发现服务 + cls.serviceDiscovery = getDynamicDiscovery() + if localMaster == true { + cls.appendService(DynamicDiscoveryMasterName) + } + cls.appendService(DynamicDiscoveryClientName) + +} + func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { pService := service.GetService(serviceName) if pService == nil { @@ -164,21 +243,7 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { return pService.GetRpcHandler() } -func (cls *Cluster) Start() { - cls.rpcServer.Start(cls.localNodeInfo.ListenAddr) -} - -func (cls *Cluster) Stop() { - cls.serviceDiscovery.OnNodeStop() -} - -func GetCluster() *Cluster{ - return &cluster -} - -func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client { - cls.locker.RLock() - defer cls.locker.RUnlock() +func (cls *Cluster) getRpcClient(nodeId int) *rpc.Client { c,ok := cls.mapRpc[nodeId] if ok == false { return nil @@ -187,6 +252,12 @@ func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client { return c.client } +func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client { + cls.locker.RLock() + defer cls.locker.RUnlock() + return cls.getRpcClient(nodeId) +} + func GetRpcClient(nodeId int,serviceMethod string,clientList []*rpc.Client) (error,int) { if nodeId>0 { pClient := GetCluster().GetRpcClient(nodeId) @@ -217,21 +288,55 @@ func (cls *Cluster) IsNodeConnected (nodeId int) bool { return pClient!=nil && pClient.IsConnected() } -func (cls *Cluster) RegisterRpcListener (rpcLister rpc.IRpcListener) { - cls.rpcListerList = append(cls.rpcListerList,rpcLister) -} - -func (cls *Cluster) triggerRpcEvent (bConnect bool,nodeId int) { - for _,lister := range cls.rpcListerList { - if bConnect { - lister.OnRpcConnected(nodeId) - }else{ - lister.OnRpcDisconnect(nodeId) - } +func (cls *Cluster) triggerRpcEvent (bConnect bool,clientSeq uint32,nodeId int) { + cls.locker.Lock() + nodeInfo,ok := cls.mapRpc[nodeId] + if ok == false || nodeInfo.client==nil || nodeInfo.client.GetClientSeq()!=clientSeq { + cls.locker.Unlock() + return } + cls.locker.Unlock() + + cls.rpcEventLocker.Lock() + for serviceName,_:= range cls.mapServiceListenRpcEvent{ + ser := service.GetService(serviceName) + if ser == nil { + log.Error("cannot find service name %s",serviceName) + continue + } + + var eventData service.RpcEventData + eventData.IsConnect = bConnect + eventData.NodeId = nodeId + ser.(service.IModule).NotifyEvent(&eventData) + } + cls.rpcEventLocker.Unlock() } func (cls *Cluster) GetLocalNodeInfo() *NodeInfo { return &cls.localNodeInfo } +func (cls *Cluster) RegRpcEvent(serviceName string){ + cls.rpcEventLocker.Lock() + if cls.mapServiceListenRpcEvent == nil { + cls.mapServiceListenRpcEvent = map[string]struct{}{} + } + + cls.mapServiceListenRpcEvent[serviceName] = struct{}{} + cls.rpcEventLocker.Unlock() +} + +func (cls *Cluster) UnRegRpcEvent(serviceName string){ + cls.rpcEventLocker.Lock() + delete(cls.mapServiceListenRpcEvent,serviceName) + cls.rpcEventLocker.Unlock() +} + +func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)){ + cls.locker.Lock() + for nodeId,_:= range cls.mapIdNode { + fetchFun(nodeId) + } + cls.locker.Unlock() +} \ No newline at end of file diff --git a/cluster/configdiscovery.go b/cluster/configdiscovery.go index 1735f21..5d2fd96 100644 --- a/cluster/configdiscovery.go +++ b/cluster/configdiscovery.go @@ -1,29 +1,19 @@ package cluster -import "strings" - type ConfigDiscovery struct { funDelService FunDelNode funSetService FunSetNodeInfo localNodeId int } -func (discovery *ConfigDiscovery) privateService(nodeInfo *NodeInfo){ - var serviceList []string - for _,s := range nodeInfo.ServiceList { - if strings.HasPrefix(s,"_") { - continue - } - serviceList = append(serviceList,s) - } - nodeInfo.ServiceList = serviceList -} -func (discovery *ConfigDiscovery) Init(localNodeId int) error{ +func (discovery *ConfigDiscovery) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{ discovery.localNodeId = localNodeId + discovery.funDelService = funDelNode + discovery.funSetService = funSetNodeInfo //解析本地其他服务配置 - nodeInfoList,err := GetCluster().readLocalClusterConfig(0) + _,nodeInfoList,err := GetCluster().readLocalClusterConfig(0) if err != nil { return err } @@ -32,8 +22,7 @@ func (discovery *ConfigDiscovery) Init(localNodeId int) error{ if nodeInfo.NodeId == localNodeId { continue } - //去除私有服务 - discovery.privateService(&nodeInfo) + discovery.funSetService(&nodeInfo) } @@ -43,10 +32,3 @@ func (discovery *ConfigDiscovery) Init(localNodeId int) error{ func (discovery *ConfigDiscovery) OnNodeStop(){ } -func (discovery *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){ - discovery.funDelService = funDelNode -} - -func (discovery *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){ - discovery.funSetService = funSetNodeInfo -} diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go new file mode 100644 index 0000000..82efab4 --- /dev/null +++ b/cluster/dynamicdiscovery.go @@ -0,0 +1,269 @@ +package cluster + +import ( + "fmt" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/rpc" + "github.com/duanhf2012/origin/service" + "time" +) + +const maxTryCount = 30 //最大重试次数 +const perTrySecond = 2*time.Second //每次重试间隔2秒 +const DynamicDiscoveryMasterName = "DiscoveryMaster" +const DynamicDiscoveryClientName = "DiscoveryClient" +const DynamicDiscoveryMasterNameRpcMethod = DynamicDiscoveryMasterName+".RPC_RegServiceDiscover" +const DynamicDiscoveryClientNameRpcMethod = DynamicDiscoveryClientName+".RPC_SubServiceDiscover" +type DynamicDiscoveryMaster struct { + service.Service + + mapNodeInfo map[int32] *rpc.NodeInfo + nodeInfo []*rpc.NodeInfo +} + +type DynamicDiscoveryClient struct { + service.Service + + funDelService FunDelNode + funSetService FunSetNodeInfo + localNodeId int +} + + +var masterService DynamicDiscoveryMaster +var clientService DynamicDiscoveryClient + +func getDynamicDiscovery() IServiceDiscovery{ + return &clientService +} + +func init(){ + masterService.SetName(DynamicDiscoveryMasterName) + clientService.SetName(DynamicDiscoveryClientName) +} + +func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo){ + _,ok := ds.mapNodeInfo[nodeInfo.NodeId] + if ok == true { + return + } + ds.nodeInfo = append(ds.nodeInfo,nodeInfo) +} + +func (ds *DynamicDiscoveryMaster) OnInit() error{ + ds.mapNodeInfo = make(map[int32] *rpc.NodeInfo,20) + ds.RegisterRpcListener(ds) + + return nil +} + +func (ds *DynamicDiscoveryMaster) OnStart(){ + var nodeInfo rpc.NodeInfo + localNodeInfo := cluster.GetLocalNodeInfo() + if localNodeInfo.Private == true { + return + } + + nodeInfo.NodeId = int32(localNodeInfo.NodeId) + nodeInfo.NodeName = localNodeInfo.NodeName + nodeInfo.ListenAddr = localNodeInfo.ListenAddr + nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList + + ds.addNodeInfo(&nodeInfo) +} + +func (ds *DynamicDiscoveryMaster) OnRpcConnected(nodeId int){ + //向它发布所有服务列表信息 + var notifyDiscover rpc.SubscribeDiscoverNotify + notifyDiscover.IsFull = true + notifyDiscover.NodeInfo = ds.nodeInfo + ds.GoNode(nodeId,DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) +} + +func (ds *DynamicDiscoveryMaster) OnRpcDisconnect(nodeId int){ + var notifyDiscover rpc.SubscribeDiscoverNotify + notifyDiscover.DelNodeId = int32(nodeId) + //删除结点 + cluster.DelNode(nodeId) + ds.CastGo(DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) +} + +// 收到注册过来的结点 +func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.ServiceDiscoverRes) error{ + if req.NodeInfo == nil { + err := fmt.Errorf("RPC_RegServiceDiscover req is error.") + log.Error(err.Error()) + + return err + } + + //广播给其他所有结点 + var notifyDiscover rpc.SubscribeDiscoverNotify + notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo,req.NodeInfo) + ds.CastGo(DynamicDiscoveryClientNameRpcMethod,¬ifyDiscover) + + //存入本地 + ds.addNodeInfo(req.NodeInfo) + + //初始化结点信息 + var nodeInfo NodeInfo + nodeInfo.NodeId = int(req.NodeInfo.NodeId) + nodeInfo.NodeName = req.NodeInfo.NodeName + nodeInfo.Private = req.NodeInfo.Private + nodeInfo.ServiceList = req.NodeInfo.PublicServiceList + nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList + nodeInfo.ListenAddr = req.NodeInfo.ListenAddr + + //主动删除已经存在的结点,确保先断开,再连接 + cluster.serviceDiscoveryDelNode(nodeInfo.NodeId) + + //加入到本地Cluster模块中,将连接该结点 + //如果本结点不为master结点,而且没有可使用的服务,不加入 + cluster.serviceDiscoverySetNodeInfo(&nodeInfo) + res.NodeInfo = ds.nodeInfo + + return nil +} + +func (dc *DynamicDiscoveryClient) OnInit() error{ + dc.RegisterRpcListener(dc) + return nil +} + +func (dc *DynamicDiscoveryClient) OnStart(){ + //2.添加并连接发现主结点 + localNodeInfo := cluster.GetLocalNodeInfo() + localNodeInfo.PublicServiceList = append(localNodeInfo.PublicServiceList,DynamicDiscoveryClientName) + dc.addDiscoveryMaster() +} + +func (dc *DynamicDiscoveryClient) addDiscoveryMaster(){ + discoveryNodeList := cluster.GetDiscoveryNodeList() + for i:=0;i0 { + willDelNodeId = append(willDelNodeId, int(req.DelNodeId)) + } + + //删除不必要的结点 + for _,nodeId := range willDelNodeId { + dc.funDelService(nodeId) + } + + //发现新结点 + for _, nodeInfo := range req.NodeInfo { + dc.setNodeInfo(nodeInfo) + } + + return nil +} + +func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool{ + for i:=0;i< len(cluster.discoveryNodeList);i++{ + if cluster.discoveryNodeList[i].NodeId == nodeId { + return true + } + } + + return false +} + +func (dc *DynamicDiscoveryClient) OnRpcConnected(nodeId int) { + if dc.isDiscoverNode(nodeId) == false { + return + } + + var req rpc.ServiceDiscoverReq + req.NodeInfo = &rpc.NodeInfo{} + req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) + req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName + req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr + req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList + + //如果是连接发现主服成功,则同步服务信息 + err := dc.AsyncCallNode(nodeId, DynamicDiscoveryMasterNameRpcMethod, &req, func(res *rpc.ServiceDiscoverRes, err error) { + if err != nil { + log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error()) + return + } + }) + if err != nil { + log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error()) + } +} + +func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo){ + if nodeInfo==nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId{ + return + } + + //筛选关注的服务 + localNodeInfo := cluster.GetLocalNodeInfo() + if len(localNodeInfo.DiscoveryService) >0 { + var discoverServiceSlice = make([]string,0,24) + for _,pubService := range nodeInfo.PublicServiceList { + for _, discoverService := range localNodeInfo.DiscoveryService { + if pubService == discoverService { + discoverServiceSlice = append(discoverServiceSlice,pubService) + } + } + } + nodeInfo.PublicServiceList = discoverServiceSlice + } + + if len(nodeInfo.PublicServiceList)==0{ + return + } + + var nInfo NodeInfo + nInfo.ServiceList = nodeInfo.PublicServiceList + nInfo.PublicServiceList = nodeInfo.PublicServiceList + nInfo.NodeId = int(nodeInfo.NodeId) + nInfo.NodeName = nodeInfo.NodeName + nInfo.ListenAddr = nodeInfo.ListenAddr + dc.funSetService(&nInfo) +} + + +func (dc *DynamicDiscoveryClient) OnRpcDisconnect(nodeId int){ +} + +func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{ + dc.localNodeId = localNodeId + dc.funDelService = funDelNode + dc.funSetService = funSetNodeInfo + + return nil +} + +func (dc *DynamicDiscoveryClient) OnNodeStop(){ + +} + diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index b459682..4c276f5 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -11,6 +11,7 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary type NodeInfoList struct { + DiscoveryNode []NodeInfo //用于服务发现Node NodeList []NodeInfo } @@ -62,12 +63,13 @@ func (cls *Cluster) readServiceConfig(filepath string) (map[string]interface{}, return serviceConfig,mapNodeService,nil } -func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { - var nodeInfoList [] NodeInfo +func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,[]NodeInfo,error) { + var nodeInfoList []NodeInfo + var discoverNodeList []NodeInfo clusterCfgPath :=strings.TrimRight(configDir,"/") +"/cluster" fileInfoList,err := ioutil.ReadDir(clusterCfgPath) if err != nil { - return nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err) + return nil,nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err) } //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 @@ -76,9 +78,9 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name() localNodeInfoList,err := cls.ReadClusterConfig(filePath) if err != nil { - return nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err) + return nil,nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err) } - + discoverNodeList = append(discoverNodeList,localNodeInfoList.DiscoveryNode...) for _,nodeInfo := range localNodeInfoList.NodeList { if nodeInfo.NodeId == nodeId || nodeId == 0 { nodeInfoList = append(nodeInfoList,nodeInfo) @@ -88,10 +90,22 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { } if nodeId != 0 && (len(nodeInfoList)!=1){ - return nil,fmt.Errorf("%d configurations were found for the configuration with node ID %d!",len(nodeInfoList),nodeId) + return nil,nil,fmt.Errorf("%d configurations were found for the configuration with node ID %d!",len(nodeInfoList),nodeId) } - return nodeInfoList,nil + for i,_ := range nodeInfoList{ + for j,s := range nodeInfoList[i].ServiceList{ + //私有结点不加入到Public服务列表中 + if strings.HasPrefix(s,"_") == false && nodeInfoList[i].Private==false { + nodeInfoList[i].PublicServiceList = append(nodeInfoList[i].PublicServiceList,strings.TrimLeft(s,"_")) + }else{ + nodeInfoList[i].ServiceList[j] = strings.TrimLeft(s,"_") + } + } + } + + + return discoverNodeList,nodeInfoList,nil } func (cls *Cluster) readLocalService(localNodeId int) error { @@ -142,29 +156,44 @@ func (cls *Cluster) parseLocalCfg(){ cls.mapIdNode[cls.localNodeInfo.NodeId] = cls.localNodeInfo for _,sName := range cls.localNodeInfo.ServiceList{ - cls.mapServiceNode[sName] = append(cls.mapServiceNode[sName], cls.localNodeInfo.NodeId) + if _,ok:=cls.mapServiceNode[sName];ok==false{ + cls.mapServiceNode[sName] = make(map[int]struct{}) + } + + cls.mapServiceNode[sName][cls.localNodeInfo.NodeId]= struct{}{} } } -func (cls *Cluster) localPrivateService(localNodeInfo *NodeInfo){ - for i:=0;iMaster +type ServiceDiscoverReq struct { + NodeInfo *NodeInfo `protobuf:"bytes,1,opt,name=nodeInfo,proto3" json:"nodeInfo,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ServiceDiscoverReq) Reset() { *m = ServiceDiscoverReq{} } +func (m *ServiceDiscoverReq) String() string { return proto.CompactTextString(m) } +func (*ServiceDiscoverReq) ProtoMessage() {} +func (*ServiceDiscoverReq) Descriptor() ([]byte, []int) { + return fileDescriptor_9bfdd3ec0419520f, []int{1} +} +func (m *ServiceDiscoverReq) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ServiceDiscoverReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ServiceDiscoverReq.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ServiceDiscoverReq) XXX_Merge(src proto.Message) { + xxx_messageInfo_ServiceDiscoverReq.Merge(m, src) +} +func (m *ServiceDiscoverReq) XXX_Size() int { + return m.Size() +} +func (m *ServiceDiscoverReq) XXX_DiscardUnknown() { + xxx_messageInfo_ServiceDiscoverReq.DiscardUnknown(m) +} + +var xxx_messageInfo_ServiceDiscoverReq proto.InternalMessageInfo + +func (m *ServiceDiscoverReq) GetNodeInfo() *NodeInfo { + if m != nil { + return m.NodeInfo + } + return nil +} + +//Master->Client +type SubscribeDiscoverNotify struct { + IsFull bool `protobuf:"varint,1,opt,name=IsFull,proto3" json:"IsFull,omitempty"` + DelNodeId int32 `protobuf:"varint,2,opt,name=DelNodeId,proto3" json:"DelNodeId,omitempty"` + NodeInfo []*NodeInfo `protobuf:"bytes,3,rep,name=nodeInfo,proto3" json:"nodeInfo,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SubscribeDiscoverNotify) Reset() { *m = SubscribeDiscoverNotify{} } +func (m *SubscribeDiscoverNotify) String() string { return proto.CompactTextString(m) } +func (*SubscribeDiscoverNotify) ProtoMessage() {} +func (*SubscribeDiscoverNotify) Descriptor() ([]byte, []int) { + return fileDescriptor_9bfdd3ec0419520f, []int{2} +} +func (m *SubscribeDiscoverNotify) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SubscribeDiscoverNotify) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SubscribeDiscoverNotify.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SubscribeDiscoverNotify) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscribeDiscoverNotify.Merge(m, src) +} +func (m *SubscribeDiscoverNotify) XXX_Size() int { + return m.Size() +} +func (m *SubscribeDiscoverNotify) XXX_DiscardUnknown() { + xxx_messageInfo_SubscribeDiscoverNotify.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscribeDiscoverNotify proto.InternalMessageInfo + +func (m *SubscribeDiscoverNotify) GetIsFull() bool { + if m != nil { + return m.IsFull + } + return false +} + +func (m *SubscribeDiscoverNotify) GetDelNodeId() int32 { + if m != nil { + return m.DelNodeId + } + return 0 +} + +func (m *SubscribeDiscoverNotify) GetNodeInfo() []*NodeInfo { + if m != nil { + return m.NodeInfo + } + return nil +} + +//Master->Client +type ServiceDiscoverRes struct { + NodeInfo []*NodeInfo `protobuf:"bytes,1,rep,name=nodeInfo,proto3" json:"nodeInfo,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ServiceDiscoverRes) Reset() { *m = ServiceDiscoverRes{} } +func (m *ServiceDiscoverRes) String() string { return proto.CompactTextString(m) } +func (*ServiceDiscoverRes) ProtoMessage() {} +func (*ServiceDiscoverRes) Descriptor() ([]byte, []int) { + return fileDescriptor_9bfdd3ec0419520f, []int{3} +} +func (m *ServiceDiscoverRes) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ServiceDiscoverRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ServiceDiscoverRes.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ServiceDiscoverRes) XXX_Merge(src proto.Message) { + xxx_messageInfo_ServiceDiscoverRes.Merge(m, src) +} +func (m *ServiceDiscoverRes) XXX_Size() int { + return m.Size() +} +func (m *ServiceDiscoverRes) XXX_DiscardUnknown() { + xxx_messageInfo_ServiceDiscoverRes.DiscardUnknown(m) +} + +var xxx_messageInfo_ServiceDiscoverRes proto.InternalMessageInfo + +func (m *ServiceDiscoverRes) GetNodeInfo() []*NodeInfo { + if m != nil { + return m.NodeInfo + } + return nil +} + +func init() { + proto.RegisterType((*NodeInfo)(nil), "rpc.NodeInfo") + proto.RegisterType((*ServiceDiscoverReq)(nil), "rpc.ServiceDiscoverReq") + proto.RegisterType((*SubscribeDiscoverNotify)(nil), "rpc.SubscribeDiscoverNotify") + proto.RegisterType((*ServiceDiscoverRes)(nil), "rpc.ServiceDiscoverRes") +} + +func init() { proto.RegisterFile("rpc/dynamicdiscover.proto", fileDescriptor_9bfdd3ec0419520f) } + +var fileDescriptor_9bfdd3ec0419520f = []byte{ + // 295 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcd, 0x4a, 0xfc, 0x30, + 0x14, 0xc5, 0xc9, 0xf4, 0xdf, 0xf9, 0xb7, 0x57, 0x5c, 0x18, 0x41, 0xa3, 0x48, 0x29, 0x5d, 0x55, + 0x90, 0x0e, 0xe8, 0x03, 0x88, 0x32, 0x08, 0x03, 0x52, 0x86, 0xcc, 0xce, 0x5d, 0x9b, 0x64, 0x20, + 0xd0, 0x69, 0x6a, 0xfa, 0x01, 0xe3, 0x03, 0xf9, 0x2c, 0x2e, 0x7d, 0x04, 0xe9, 0x93, 0xc8, 0xc4, + 0x4c, 0xfd, 0x64, 0x76, 0xf9, 0xdd, 0x9b, 0x9c, 0x7b, 0xce, 0x0d, 0x9c, 0xe8, 0x8a, 0x4d, 0xf8, + 0xba, 0xcc, 0x56, 0x92, 0x71, 0x59, 0x33, 0xd5, 0x09, 0x9d, 0x54, 0x5a, 0x35, 0x0a, 0x3b, 0xba, + 0x62, 0xd1, 0x33, 0x02, 0x2f, 0x55, 0x5c, 0xcc, 0xca, 0xa5, 0xc2, 0x47, 0x30, 0x36, 0x67, 0x4e, + 0x50, 0x88, 0x62, 0x97, 0x5a, 0xc2, 0xa7, 0x1f, 0x77, 0xd2, 0x6c, 0x25, 0xc8, 0x28, 0x44, 0xb1, + 0x4f, 0x07, 0xc6, 0x01, 0xc0, 0xbd, 0xac, 0x1b, 0x51, 0xde, 0x70, 0xae, 0x89, 0x63, 0xba, 0x5f, + 0x2a, 0x98, 0xc0, 0xff, 0xb9, 0x96, 0x5d, 0xd6, 0x08, 0xf2, 0x2f, 0x44, 0xb1, 0x47, 0xb7, 0x88, + 0x2f, 0xe0, 0x60, 0xde, 0xe6, 0x85, 0x64, 0x0b, 0xa1, 0x3b, 0xc9, 0xc4, 0xe6, 0x11, 0x71, 0x43, + 0x27, 0xf6, 0xe9, 0xef, 0x46, 0x74, 0x0d, 0xd8, 0xe2, 0xd4, 0xc6, 0xa0, 0xe2, 0x11, 0x9f, 0x83, + 0x57, 0x5a, 0xf7, 0xc6, 0xf3, 0xde, 0xe5, 0x7e, 0xa2, 0x2b, 0x96, 0x6c, 0x23, 0xd1, 0xa1, 0x1d, + 0x3d, 0xc1, 0xf1, 0xa2, 0xcd, 0x6b, 0xa6, 0x65, 0x3e, 0x48, 0xa4, 0xaa, 0x91, 0xcb, 0xf5, 0x26, + 0xf7, 0xac, 0xbe, 0x6b, 0x8b, 0xc2, 0x68, 0x78, 0xd4, 0x12, 0x3e, 0x03, 0x7f, 0x2a, 0x0a, 0xbb, + 0x92, 0x91, 0x59, 0xc9, 0x67, 0xe1, 0xdb, 0x6c, 0x27, 0x74, 0x76, 0xcd, 0xfe, 0xcb, 0x7c, 0xfd, + 0xc3, 0xfc, 0x2e, 0x81, 0xdb, 0xc3, 0x97, 0x3e, 0x40, 0xaf, 0x7d, 0x80, 0xde, 0xfa, 0x00, 0x3d, + 0xb8, 0xc9, 0x44, 0x57, 0x2c, 0x1f, 0x9b, 0x7f, 0xbc, 0x7a, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x9d, + 0x3f, 0x5b, 0x31, 0xe4, 0x01, 0x00, 0x00, +} + +func (m *NodeInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodeInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodeInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.PublicServiceList) > 0 { + for iNdEx := len(m.PublicServiceList) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PublicServiceList[iNdEx]) + copy(dAtA[i:], m.PublicServiceList[iNdEx]) + i = encodeVarintDynamicdiscover(dAtA, i, uint64(len(m.PublicServiceList[iNdEx]))) + i-- + dAtA[i] = 0x2a + } + } + if m.Private { + i-- + if m.Private { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } + if len(m.ListenAddr) > 0 { + i -= len(m.ListenAddr) + copy(dAtA[i:], m.ListenAddr) + i = encodeVarintDynamicdiscover(dAtA, i, uint64(len(m.ListenAddr))) + i-- + dAtA[i] = 0x1a + } + if len(m.NodeName) > 0 { + i -= len(m.NodeName) + copy(dAtA[i:], m.NodeName) + i = encodeVarintDynamicdiscover(dAtA, i, uint64(len(m.NodeName))) + i-- + dAtA[i] = 0x12 + } + if m.NodeId != 0 { + i = encodeVarintDynamicdiscover(dAtA, i, uint64(m.NodeId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *ServiceDiscoverReq) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ServiceDiscoverReq) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ServiceDiscoverReq) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.NodeInfo != nil { + { + size, err := m.NodeInfo.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintDynamicdiscover(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SubscribeDiscoverNotify) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SubscribeDiscoverNotify) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SubscribeDiscoverNotify) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.NodeInfo) > 0 { + for iNdEx := len(m.NodeInfo) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.NodeInfo[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintDynamicdiscover(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } + if m.DelNodeId != 0 { + i = encodeVarintDynamicdiscover(dAtA, i, uint64(m.DelNodeId)) + i-- + dAtA[i] = 0x10 + } + if m.IsFull { + i-- + if m.IsFull { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *ServiceDiscoverRes) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ServiceDiscoverRes) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ServiceDiscoverRes) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.NodeInfo) > 0 { + for iNdEx := len(m.NodeInfo) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.NodeInfo[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintDynamicdiscover(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func encodeVarintDynamicdiscover(dAtA []byte, offset int, v uint64) int { + offset -= sovDynamicdiscover(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *NodeInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.NodeId != 0 { + n += 1 + sovDynamicdiscover(uint64(m.NodeId)) + } + l = len(m.NodeName) + if l > 0 { + n += 1 + l + sovDynamicdiscover(uint64(l)) + } + l = len(m.ListenAddr) + if l > 0 { + n += 1 + l + sovDynamicdiscover(uint64(l)) + } + if m.Private { + n += 2 + } + if len(m.PublicServiceList) > 0 { + for _, s := range m.PublicServiceList { + l = len(s) + n += 1 + l + sovDynamicdiscover(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *ServiceDiscoverReq) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.NodeInfo != nil { + l = m.NodeInfo.Size() + n += 1 + l + sovDynamicdiscover(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *SubscribeDiscoverNotify) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.IsFull { + n += 2 + } + if m.DelNodeId != 0 { + n += 1 + sovDynamicdiscover(uint64(m.DelNodeId)) + } + if len(m.NodeInfo) > 0 { + for _, e := range m.NodeInfo { + l = e.Size() + n += 1 + l + sovDynamicdiscover(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *ServiceDiscoverRes) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.NodeInfo) > 0 { + for _, e := range m.NodeInfo { + l = e.Size() + n += 1 + l + sovDynamicdiscover(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovDynamicdiscover(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozDynamicdiscover(x uint64) (n int) { + return sovDynamicdiscover(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *NodeInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeId", wireType) + } + m.NodeId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NodeId |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDynamicdiscover + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDynamicdiscover + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ListenAddr", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDynamicdiscover + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDynamicdiscover + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ListenAddr = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Private", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Private = bool(v != 0) + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PublicServiceList", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDynamicdiscover + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDynamicdiscover + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PublicServiceList = append(m.PublicServiceList, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDynamicdiscover(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ServiceDiscoverReq) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ServiceDiscoverReq: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ServiceDiscoverReq: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeInfo", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDynamicdiscover + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthDynamicdiscover + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.NodeInfo == nil { + m.NodeInfo = &NodeInfo{} + } + if err := m.NodeInfo.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDynamicdiscover(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SubscribeDiscoverNotify) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SubscribeDiscoverNotify: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SubscribeDiscoverNotify: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IsFull", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.IsFull = bool(v != 0) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DelNodeId", wireType) + } + m.DelNodeId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.DelNodeId |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeInfo", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDynamicdiscover + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthDynamicdiscover + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeInfo = append(m.NodeInfo, &NodeInfo{}) + if err := m.NodeInfo[len(m.NodeInfo)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDynamicdiscover(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ServiceDiscoverRes) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ServiceDiscoverRes: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ServiceDiscoverRes: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeInfo", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDynamicdiscover + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthDynamicdiscover + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeInfo = append(m.NodeInfo, &NodeInfo{}) + if err := m.NodeInfo[len(m.NodeInfo)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDynamicdiscover(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDynamicdiscover + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipDynamicdiscover(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthDynamicdiscover + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupDynamicdiscover + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthDynamicdiscover + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthDynamicdiscover = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowDynamicdiscover = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupDynamicdiscover = fmt.Errorf("proto: unexpected end of group") +) diff --git a/rpc/dynamicdiscover.proto b/rpc/dynamicdiscover.proto new file mode 100644 index 0000000..a8bd01e --- /dev/null +++ b/rpc/dynamicdiscover.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; +package rpc; +option go_package = "./rpc"; + +message NodeInfo{ + int32 NodeId = 1; + string NodeName = 2; + string ListenAddr = 3; + bool Private = 4; + repeated string PublicServiceList = 5; +} + +//Client->Master +message ServiceDiscoverReq{ + NodeInfo nodeInfo = 1; +} + +//Master->Client +message SubscribeDiscoverNotify{ + bool IsFull = 1; + int32 DelNodeId = 2; + repeated NodeInfo nodeInfo = 3; +} + + +//Master->Client +message ServiceDiscoverRes{ + repeated NodeInfo nodeInfo = 1; +} \ No newline at end of file diff --git a/rpc/gogorpc.pb.go b/rpc/gogorpc.pb.go index 2826d1a..6d2faa1 100644 --- a/rpc/gogorpc.pb.go +++ b/rpc/gogorpc.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: bin/gogopb/gogorpc.proto +// source: rpc/gogorpc.proto package rpc @@ -37,7 +37,7 @@ func (m *GoGoPBRpcRequestData) Reset() { *m = GoGoPBRpcRequestData{} } func (m *GoGoPBRpcRequestData) String() string { return proto.CompactTextString(m) } func (*GoGoPBRpcRequestData) ProtoMessage() {} func (*GoGoPBRpcRequestData) Descriptor() ([]byte, []int) { - return fileDescriptor_b3b900b0f45d7fb5, []int{0} + return fileDescriptor_38afb24c36168563, []int{0} } func (m *GoGoPBRpcRequestData) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -114,7 +114,7 @@ func (m *GoGoPBRpcResponseData) Reset() { *m = GoGoPBRpcResponseData{} } func (m *GoGoPBRpcResponseData) String() string { return proto.CompactTextString(m) } func (*GoGoPBRpcResponseData) ProtoMessage() {} func (*GoGoPBRpcResponseData) Descriptor() ([]byte, []int) { - return fileDescriptor_b3b900b0f45d7fb5, []int{1} + return fileDescriptor_38afb24c36168563, []int{1} } func (m *GoGoPBRpcResponseData) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -169,26 +169,25 @@ func init() { proto.RegisterType((*GoGoPBRpcResponseData)(nil), "rpc.GoGoPBRpcResponseData") } -func init() { proto.RegisterFile("bin/gogopb/gogorpc.proto", fileDescriptor_b3b900b0f45d7fb5) } +func init() { proto.RegisterFile("rpc/gogorpc.proto", fileDescriptor_38afb24c36168563) } -var fileDescriptor_b3b900b0f45d7fb5 = []byte{ - // 241 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x48, 0xca, 0xcc, 0xd3, - 0x4f, 0xcf, 0x4f, 0xcf, 0x2f, 0x48, 0x02, 0x53, 0x45, 0x05, 0xc9, 0x7a, 0x05, 0x45, 0xf9, 0x25, - 0xf9, 0x42, 0xcc, 0x45, 0x05, 0xc9, 0x4a, 0x4b, 0x18, 0xb9, 0x44, 0xdc, 0xf3, 0xdd, 0xf3, 0x03, - 0x9c, 0x82, 0x0a, 0x92, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x5c, 0x12, 0x4b, 0x12, 0x85, - 0x04, 0xb8, 0x98, 0x83, 0x53, 0x0b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x40, 0x4c, 0x21, - 0x05, 0x2e, 0xee, 0xa0, 0x82, 0x64, 0xdf, 0xd4, 0x92, 0x8c, 0xfc, 0x14, 0xcf, 0x14, 0x09, 0x26, - 0x05, 0x46, 0x0d, 0xde, 0x20, 0x64, 0x21, 0x21, 0x15, 0x2e, 0xde, 0xe0, 0xd4, 0xa2, 0xb2, 0xcc, - 0xe4, 0x54, 0x88, 0x90, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x67, 0x10, 0xaa, 0xa0, 0x90, 0x04, 0x17, - 0xbb, 0x5f, 0x7e, 0x50, 0x6a, 0x41, 0x4e, 0xa5, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, 0x8c, - 0x0b, 0x92, 0xf1, 0xcc, 0x0b, 0x48, 0x2c, 0x4a, 0xcc, 0x95, 0x60, 0x55, 0x60, 0xd4, 0xe0, 0x09, - 0x82, 0x71, 0x95, 0x42, 0xb9, 0x44, 0x91, 0x5c, 0x59, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xc3, - 0x99, 0x22, 0x5c, 0xac, 0xae, 0x45, 0x45, 0xf9, 0x45, 0x60, 0x07, 0x72, 0x06, 0x41, 0x38, 0x20, - 0x51, 0x88, 0x95, 0xcc, 0x60, 0x83, 0x21, 0x1c, 0x27, 0xe1, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, - 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x31, 0x8a, 0x55, 0x4f, 0xbf, 0xa8, 0x20, 0x39, 0x89, 0x0d, - 0x1c, 0x3c, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x18, 0x52, 0x1a, 0x3a, 0x01, 0x00, - 0x00, +var fileDescriptor_38afb24c36168563 = []byte{ + // 237 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0x2a, 0x48, 0xd6, + 0x4f, 0xcf, 0x4f, 0xcf, 0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, + 0x2a, 0x48, 0x56, 0x5a, 0xc2, 0xc8, 0x25, 0xe2, 0x9e, 0xef, 0x9e, 0x1f, 0xe0, 0x14, 0x54, 0x90, + 0x1c, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, 0x92, 0x58, 0x92, 0x28, 0x24, 0xc0, 0xc5, 0x1c, + 0x9c, 0x5a, 0x28, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0x62, 0x0a, 0x29, 0x70, 0x71, 0x07, + 0x15, 0x24, 0xfb, 0xa6, 0x96, 0x64, 0xe4, 0xa7, 0x78, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0, + 0x06, 0x21, 0x0b, 0x09, 0xa9, 0x70, 0xf1, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x42, 0x84, + 0x24, 0x98, 0x15, 0x18, 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, + 0x52, 0x0b, 0x72, 0x2a, 0x25, 0x58, 0x14, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, + 0x5e, 0x40, 0x62, 0x51, 0x62, 0xae, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, + 0xca, 0x25, 0x8a, 0xe4, 0xca, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x1c, 0xce, 0x14, 0xe1, 0x62, + 0x75, 0x2d, 0x2a, 0xca, 0x2f, 0x02, 0x3b, 0x90, 0x33, 0x08, 0xc2, 0x01, 0x89, 0x42, 0xac, 0x64, + 0x06, 0x1b, 0x0c, 0xe1, 0x38, 0x09, 0x9f, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, + 0x47, 0x72, 0x8c, 0x51, 0xac, 0x7a, 0xfa, 0x45, 0x05, 0xc9, 0x49, 0x6c, 0xe0, 0xe0, 0x31, 0x06, + 0x04, 0x00, 0x00, 0xff, 0xff, 0xc7, 0xfc, 0x50, 0x87, 0x33, 0x01, 0x00, 0x00, } func (m *GoGoPBRpcRequestData) Marshal() (dAtA []byte, err error) { diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 529e2e6..2b45c37 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -63,7 +63,7 @@ type RpcHandler struct { callResponseCallBack chan *Call //异步返回的回调 } -type TriggerRpcEvent func(bConnect bool,nodeId int) +type TriggerRpcEvent func(bConnect bool,clientSeq uint32,nodeId int) type IRpcListener interface { OnRpcConnected(nodeId int) OnRpcDisconnect(nodeId int) @@ -467,6 +467,9 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int var pClientList [maxClusterNode]*Client err,count := handler.funcRpcClient(nodeid,serviceMethod,pClientList[:]) if count==0||err != nil { + if err == nil { + err = fmt.Errorf("cannot find rpcclient from nodeid %d serviceMethod %s",nodeid,serviceMethod) + } fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Call serviceMethod is error:%+v!",err) return nil diff --git a/service/module.go b/service/module.go index cce979f..9db603f 100644 --- a/service/module.go +++ b/service/module.go @@ -26,7 +26,7 @@ type IModule interface { GetService() IService GetModuleName() string GetEventProcessor()event.IEventProcessor - NotifyEvent(ev *event.Event) + NotifyEvent(ev event.IEvent) } type IModuleTimer interface { @@ -213,7 +213,7 @@ func (m *Module) GetEventProcessor() event.IEventProcessor{ return m.eventHandler.GetEventProcessor() } -func (m *Module) NotifyEvent(ev *event.Event){ +func (m *Module) NotifyEvent(ev event.IEvent){ m.eventHandler.NotifyEvent(ev) } diff --git a/service/service.go b/service/service.go index 829b156..073aadb 100644 --- a/service/service.go +++ b/service/service.go @@ -23,6 +23,7 @@ type IService interface { GetName() string OnSetup(iService IService) OnInit() error + OnStart() OnRelease() Wait() Start() @@ -42,6 +43,16 @@ type Service struct { startStatus bool eventProcessor event.IEventProcessor profiler *profiler.Profiler //性能分析器 + rpcEventLister rpc.IRpcListener +} + +type RpcEventData struct{ + IsConnect bool + NodeId int +} + +func (rpcEventData *RpcEventData) GetEventType() event.EventType{ + return event.Sys_Event_Rpc_Event } func (s *Service) OnSetup(iService IService){ @@ -99,6 +110,7 @@ func (s *Service) Run() { log.Debug("Start running Service %s.", s.GetName()) defer s.wg.Done() var bStop = false + s.self.(IService).OnStart() for{ rpcRequestChan := s.GetRpcRequestChan() rpcResponseCallBack := s.GetRpcResponseChan() @@ -211,4 +223,28 @@ func (s *Service) IsSingleCoroutine() bool { func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){ s.rpcHandler.RegRawRpc(rpcMethodId,rawRpcCB) -} \ No newline at end of file +} + +func (s *Service) OnStart(){ +} + +func (s *Service) OnRpcEvent(ev event.IEvent){ + event := ev.(*RpcEventData) + if event.IsConnect { + s.rpcEventLister.OnRpcConnected(event.NodeId) + }else{ + s.rpcEventLister.OnRpcDisconnect(event.NodeId) + } +} + +func (s *Service) RegisterRpcListener (rpcEventLister rpc.IRpcListener) { + s.rpcEventLister = rpcEventLister + s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent) + RegRpcEventFun(s.GetName()) +} + +func (s *Service) UnRegisterRpcListener (rpcLister rpc.IRpcListener) { + s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler()) + RegRpcEventFun(s.GetName()) +} + diff --git a/service/servicemgr.go b/service/servicemgr.go index a6a112b..2654ec2 100644 --- a/service/servicemgr.go +++ b/service/servicemgr.go @@ -4,6 +4,9 @@ package service var mapServiceName map[string]IService var setupServiceList []IService +type RegRpcEventFunType func(serviceName string) +var RegRpcEventFun RegRpcEventFunType + func init(){ mapServiceName = map[string]IService{} setupServiceList = []IService{}