mirror of
https://github.com/duanhf2012/origin.git
synced 2026-03-06 22:27:36 +08:00
优化筛选服务发现---支持对指定主master结点进行筛选
This commit is contained in:
@@ -20,6 +20,11 @@ const (
|
|||||||
Discard NodeStatus = 1 //丢弃
|
Discard NodeStatus = 1 //丢弃
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MasterDiscoveryService struct {
|
||||||
|
MasterNodeId int32 //要筛选的主结点Id,如果不配置或者配置成0,表示针对所有的主结点
|
||||||
|
DiscoveryService []string //只发现的服务列表
|
||||||
|
}
|
||||||
|
|
||||||
type NodeInfo struct {
|
type NodeInfo struct {
|
||||||
NodeId int
|
NodeId int
|
||||||
NodeName string
|
NodeName string
|
||||||
@@ -29,8 +34,7 @@ type NodeInfo struct {
|
|||||||
CompressBytesLen int //超过字节进行压缩的长度
|
CompressBytesLen int //超过字节进行压缩的长度
|
||||||
ServiceList []string //所有的有序服务列表
|
ServiceList []string //所有的有序服务列表
|
||||||
PublicServiceList []string //对外公开的服务列表
|
PublicServiceList []string //对外公开的服务列表
|
||||||
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
|
MasterDiscoveryService []MasterDiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
||||||
NeighborService []string
|
|
||||||
status NodeStatus
|
status NodeStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -248,15 +248,6 @@ func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNod
|
|||||||
|
|
||||||
//订阅发现的服务通知
|
//订阅发现的服务通知
|
||||||
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
|
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
|
||||||
//整理当前master结点需要筛选的NeighborService
|
|
||||||
masterDiscoveryNodeInfo := cluster.GetMasterDiscoveryNodeInfo(int(req.MasterNodeId))
|
|
||||||
mapMasterDiscoveryService := map[string]struct{}{}
|
|
||||||
if masterDiscoveryNodeInfo != nil {
|
|
||||||
for i := 0; i < len(masterDiscoveryNodeInfo.NeighborService); i++ {
|
|
||||||
mapMasterDiscoveryService[masterDiscoveryNodeInfo.NeighborService[i]] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mapNodeInfo := map[int32]*rpc.NodeInfo{}
|
mapNodeInfo := map[int32]*rpc.NodeInfo{}
|
||||||
for _, nodeInfo := range req.NodeInfo {
|
for _, nodeInfo := range req.NodeInfo {
|
||||||
//不对本地结点或者不存在任何公开服务的结点
|
//不对本地结点或者不存在任何公开服务的结点
|
||||||
@@ -271,13 +262,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
|
|||||||
|
|
||||||
//遍历所有的公开服务,并筛选之
|
//遍历所有的公开服务,并筛选之
|
||||||
for _, serviceName := range nodeInfo.PublicServiceList {
|
for _, serviceName := range nodeInfo.PublicServiceList {
|
||||||
//只有存在配置时才做筛选
|
|
||||||
if len(mapMasterDiscoveryService) > 0 {
|
|
||||||
if _, ok := mapMasterDiscoveryService[serviceName]; ok == false {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nInfo := mapNodeInfo[nodeInfo.NodeId]
|
nInfo := mapNodeInfo[nodeInfo.NodeId]
|
||||||
if nInfo == nil {
|
if nInfo == nil {
|
||||||
nInfo = &rpc.NodeInfo{}
|
nInfo = &rpc.NodeInfo{}
|
||||||
@@ -319,10 +303,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
|
|||||||
|
|
||||||
//设置新结点
|
//设置新结点
|
||||||
for _, nodeInfo := range mapNodeInfo {
|
for _, nodeInfo := range mapNodeInfo {
|
||||||
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
|
bSet := dc.setNodeInfo(req.MasterNodeId,nodeInfo)
|
||||||
dc.setNodeInfo(nodeInfo)
|
if bSet == false {
|
||||||
|
|
||||||
if len(nodeInfo.PublicServiceList) == 0 {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -358,13 +340,8 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
|
|||||||
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
|
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
|
||||||
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
|
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
|
||||||
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
|
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
|
||||||
|
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
|
||||||
|
|
||||||
//MasterDiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务
|
|
||||||
if len(nodeInfo.NeighborService) == 0 {
|
|
||||||
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
|
|
||||||
} else {
|
|
||||||
req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList, DynamicDiscoveryClientName)
|
|
||||||
}
|
|
||||||
|
|
||||||
//向Master服务同步本Node服务信息
|
//向Master服务同步本Node服务信息
|
||||||
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
|
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
|
||||||
@@ -382,37 +359,53 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) {
|
func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,serviceName string) bool{
|
||||||
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
|
canDiscovery := true
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//筛选关注的服务
|
for i:=0;i<len(cluster.GetLocalNodeInfo().MasterDiscoveryService);i++{
|
||||||
localNodeInfo := cluster.GetLocalNodeInfo()
|
masterNodeId := cluster.GetLocalNodeInfo().MasterDiscoveryService[i].MasterNodeId
|
||||||
if len(localNodeInfo.DiscoveryService) > 0 {
|
|
||||||
var discoverServiceSlice = make([]string, 0, 24)
|
if masterNodeId == fromMasterNodeId || masterNodeId == 0 {
|
||||||
for _, pubService := range nodeInfo.PublicServiceList {
|
canDiscovery = false
|
||||||
for _, discoverService := range localNodeInfo.DiscoveryService {
|
|
||||||
if pubService == discoverService {
|
for _,discoveryService := range cluster.GetLocalNodeInfo().MasterDiscoveryService[i].DiscoveryService {
|
||||||
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
if discoveryService == serviceName {
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nodeInfo.PublicServiceList = discoverServiceSlice
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nodeInfo.PublicServiceList) == 0 {
|
return canDiscovery
|
||||||
return
|
}
|
||||||
|
|
||||||
|
func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.NodeInfo) bool{
|
||||||
|
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
//筛选关注的服务
|
||||||
|
var discoverServiceSlice = make([]string, 0, 24)
|
||||||
|
for _, pubService := range nodeInfo.PublicServiceList {
|
||||||
|
if dc.canDiscoveryService(masterNodeId,pubService) == true {
|
||||||
|
discoverServiceSlice = append(discoverServiceSlice,pubService)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(discoverServiceSlice) == 0 {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
var nInfo NodeInfo
|
var nInfo NodeInfo
|
||||||
nInfo.ServiceList = nodeInfo.PublicServiceList
|
nInfo.ServiceList = discoverServiceSlice
|
||||||
nInfo.PublicServiceList = nodeInfo.PublicServiceList
|
nInfo.PublicServiceList = discoverServiceSlice
|
||||||
nInfo.NodeId = int(nodeInfo.NodeId)
|
nInfo.NodeId = int(nodeInfo.NodeId)
|
||||||
nInfo.NodeName = nodeInfo.NodeName
|
nInfo.NodeName = nodeInfo.NodeName
|
||||||
nInfo.ListenAddr = nodeInfo.ListenAddr
|
nInfo.ListenAddr = nodeInfo.ListenAddr
|
||||||
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
|
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
|
||||||
dc.funSetService(&nInfo)
|
dc.funSetService(&nInfo)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) {
|
func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) {
|
||||||
|
|||||||
Reference in New Issue
Block a user