增加邻居master服务发现功能

This commit is contained in:
boyce
2021-05-07 15:22:54 +08:00
parent bbab004f65
commit 212798dc04
6 changed files with 211 additions and 171 deletions

View File

@@ -17,7 +17,7 @@ const DynamicDiscoveryClientNameRpcMethod = DynamicDiscoveryClientName+".RPC_Sub
type DynamicDiscoveryMaster struct {
service.Service
mapNodeInfo map[int32] *rpc.NodeInfo
mapNodeInfo map[int32]struct{}
nodeInfo []*rpc.NodeInfo
}
@@ -47,11 +47,12 @@ func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo){
if ok == true {
return
}
ds.mapNodeInfo[nodeInfo.NodeId] = struct{}{}
ds.nodeInfo = append(ds.nodeInfo,nodeInfo)
}
func (ds *DynamicDiscoveryMaster) OnInit() error{
ds.mapNodeInfo = make(map[int32] *rpc.NodeInfo,20)
ds.mapNodeInfo = make(map[int32] struct{},20)
ds.RegRpcListener(ds)
return nil
@@ -77,11 +78,13 @@ func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int){
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.IsFull = true
notifyDiscover.NodeInfo = ds.nodeInfo
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
ds.GoNode(nodeId,DynamicDiscoveryClientNameRpcMethod,&notifyDiscover)
}
func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int){
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.DelNodeId = int32(nodeId)
//删除结点
cluster.DelNode(nodeId,true)
@@ -89,7 +92,7 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int){
}
// 收到注册过来的结点
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.ServiceDiscoverRes) error{
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error{
if req.NodeInfo == nil {
err := fmt.Errorf("RPC_RegServiceDiscover req is error.")
log.Error(err.Error())
@@ -99,6 +102,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
//广播给其他所有结点
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo,req.NodeInfo)
ds.CastGo(DynamicDiscoveryClientNameRpcMethod,&notifyDiscover)
@@ -120,7 +124,6 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
//加入到本地Cluster模块中将连接该结点
//如果本结点不为master结点而且没有可使用的服务不加入
cluster.serviceDiscoverySetNodeInfo(&nodeInfo)
res.NodeInfo = ds.nodeInfo
return nil
}
@@ -132,8 +135,6 @@ func (dc *DynamicDiscoveryClient) OnInit() error{
func (dc *DynamicDiscoveryClient) OnStart(){
//2.添加并连接发现主结点
localNodeInfo := cluster.GetLocalNodeInfo()
localNodeInfo.PublicServiceList = append(localNodeInfo.PublicServiceList,DynamicDiscoveryClientName)
dc.addDiscoveryMaster()
}
@@ -149,36 +150,76 @@ func (dc *DynamicDiscoveryClient) addDiscoveryMaster(){
}
//订阅发现的服务通知
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error{
//如果为完整同步,则找出差异的结点
var willDelNodeId []int
if req.IsFull {
mapNodeId := make(map[int32]struct{},len(req.NodeInfo))
for _,nodeInfo:= range req.NodeInfo{
mapNodeId[nodeInfo.NodeId] = struct{}{}
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
DiscoveryNodeInfo := cluster.GetMasterDiscoveryNodeInfo(int(req.MasterNodeId))
mapMasterDiscoveryService := map[string]struct{}{}
if DiscoveryNodeInfo != nil {
for i := 0; i < len(DiscoveryNodeInfo.NeighborService); i++ {
mapMasterDiscoveryService[DiscoveryNodeInfo.NeighborService[i]] = struct{}{}
}
}
mapNodeInfo := map[int32]*rpc.NodeInfo{}
//如果Master没有配置发现的服务
if len(mapMasterDiscoveryService) == 0 {
//如果为完整同步,则找出差异的结点
var willDelNodeId []int
if req.IsFull {
mapNodeId := make(map[int32]struct{}, len(req.NodeInfo))
for _, nodeInfo := range req.NodeInfo {
mapNodeId[nodeInfo.NodeId] = struct{}{}
}
cluster.FetchAllNodeId(func(nodeId int) {
if nodeId != dc.localNodeId {
if _, ok := mapNodeId[int32(nodeId)]; ok == false {
willDelNodeId = append(willDelNodeId, nodeId)
}
}
})
}
cluster.FetchAllNodeId(func(nodeId int){
if nodeId != dc.localNodeId {
if _, ok := mapNodeId[int32(nodeId)]; ok == false {
willDelNodeId = append(willDelNodeId, nodeId)
//忽略本地结点
if req.DelNodeId != int32(dc.localNodeId) && req.DelNodeId > 0 {
willDelNodeId = append(willDelNodeId, int(req.DelNodeId))
}
//删除不必要的结点
for _, nodeId := range willDelNodeId {
dc.funDelService(nodeId, false)
}
}
for _, nodeInfo := range req.NodeInfo {
//不对本地结点或者不存在任何公开服务的结点
if int(nodeInfo.NodeId) == dc.localNodeId {
continue
}
//遍历所有的公开服务,并筛选之
for _, serviceName := range nodeInfo.PublicServiceList {
//只有存在配置时才做筛选
if len(mapMasterDiscoveryService)>0 {
if _, ok := mapMasterDiscoveryService[serviceName]; ok == false {
continue
}
}
})
nInfo := mapNodeInfo[nodeInfo.NodeId]
if nInfo == nil {
nInfo = &rpc.NodeInfo{}
nInfo.NodeId = nodeInfo.NodeId
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
mapNodeInfo[nodeInfo.NodeId] = nInfo
}
nInfo.PublicServiceList = append(nInfo.PublicServiceList, serviceName)
}
}
//忽略本地结点
if req.DelNodeId != int32(dc.localNodeId) && req.DelNodeId>0 {
willDelNodeId = append(willDelNodeId, int(req.DelNodeId))
}
//删除不必要的结点
for _,nodeId := range willDelNodeId {
dc.funDelService(nodeId,false)
}
//发现新结点
for _, nodeInfo := range req.NodeInfo {
//设置新结点
for _, nodeInfo := range mapNodeInfo {
dc.setNodeInfo(nodeInfo)
}
@@ -186,8 +227,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
}
func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool{
for i:=0;i< len(cluster.discoveryNodeList);i++{
if cluster.discoveryNodeList[i].NodeId == nodeId {
for i:=0;i< len(cluster.masterDiscoveryNodeList);i++{
if cluster.masterDiscoveryNodeList[i].NodeId == nodeId {
return true
}
}
@@ -196,7 +237,8 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool{
}
func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
if dc.isDiscoverNode(nodeId) == false {
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil {
return
}
@@ -205,12 +247,14 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
//DiscoveryNode配置中没有配置NeighborService则同步当前结点所有服务
if len(nodeInfo.NeighborService)==0{
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
}
//如果是连接发现主服成功,则同步服务信息
err := dc.AsyncCallNode(nodeId, DynamicDiscoveryMasterNameRpcMethod, &req, func(res *rpc.ServiceDiscoverRes, err error) {
//向Master服务同步本Node服务信息
err := dc.AsyncCallNode(nodeId, DynamicDiscoveryMasterNameRpcMethod, &req, func(res *rpc.Empty, err error) {
if err != nil {
cluster.DelNode(nodeId,true)
log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error())
return
}