优化origin默认服务发现

This commit is contained in:
boyce
2024-04-25 16:36:10 +08:00
parent bba5eb2929
commit c6ade7d3e1

View File

@@ -35,9 +35,10 @@ type OriginDiscoveryClient struct {
funSetNode FunSetNode
localNodeId string
mapDiscovery map[string]map[string]struct{} //map[masterNodeId]map[nodeId]struct{}
mapMasterNetwork map[string]string
mapDiscovery map[string]map[string][]string //map[masterNodeId]map[nodeId]struct{}
//mapMasterNetwork map[string]string
bRetire bool
isRegisterOk bool
}
var masterService OriginDiscoveryMaster
@@ -98,6 +99,7 @@ func (ds *OriginDiscoveryMaster) removeNodeInfo(nodeId string) {
}
}
ds.nsTTL.removeNode(nodeId)
delete(ds.mapNodeInfo,nodeId)
}
@@ -187,12 +189,13 @@ func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{
}
func (ds *OriginDiscoveryMaster) RPC_Ping(req *rpc.Ping, res *rpc.Pong) error {
log.Debug("ping",log.String("nodeId",req.NodeId))
if ds.isRegNode(req.NodeId) == false{
res.Ok = false
return nil
}
return nil
//return nil
res.Ok = true
ds.nsTTL.addAndRefreshNode(req.NodeId)
return nil
@@ -256,22 +259,38 @@ func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.RegServiceDisco
return nil
}
func (ds *OriginDiscoveryMaster) RPC_UnRegServiceDiscover(req *rpc.UnRegServiceDiscoverReq, res *rpc.Empty) error {
log.Debug("RPC_UnRegServiceDiscover",log.String("nodeId",req.NodeId))
ds.OnNodeDisconnect(req.NodeId)
return nil
}
func (dc *OriginDiscoveryClient) OnInit() error {
dc.RegNodeConnListener(dc)
dc.RegNatsConnListener(dc)
dc.mapDiscovery = map[string]map[string]struct{}{}
dc.mapMasterNetwork = map[string]string{}
dc.mapDiscovery = map[string]map[string][]string{}
//dc.mapMasterNetwork = map[string]string{}
return nil
}
func (dc *OriginDiscoveryClient) addMasterNode(masterNodeId string, nodeId string) {
func (dc *OriginDiscoveryClient) addMasterNode(masterNodeId string, nodeId string,serviceList []string) {
_, ok := dc.mapDiscovery[masterNodeId]
if ok == false {
dc.mapDiscovery[masterNodeId] = map[string]struct{}{}
dc.mapDiscovery[masterNodeId] = map[string][]string{}
}
dc.mapDiscovery[masterNodeId][nodeId] = struct{}{}
dc.mapDiscovery[masterNodeId][nodeId] = serviceList
}
func (dc *OriginDiscoveryClient) getNodePublicService(masterNodeId string,nodeId string) []string{
mapNodeId, ok := dc.mapDiscovery[masterNodeId]
if ok == false {
return nil
}
publicService := mapNodeId[nodeId]
return publicService
}
func (dc *OriginDiscoveryClient) removeMasterNode(masterNodeId string, nodeId string) {
@@ -295,10 +314,6 @@ func (dc *OriginDiscoveryClient) findNodeId(nodeId string) bool {
}
func (dc *OriginDiscoveryClient) ping(){
if cluster.IsNatsMode() == false {
return
}
interval := time.Duration(cluster.GetOriginDiscovery().TTLSecond)*time.Second
interval = interval /3
if interval < time.Second {
@@ -306,6 +321,9 @@ func (dc *OriginDiscoveryClient) ping(){
}
dc.NewTicker(interval,func(t *timer.Ticker){
if cluster.IsNatsMode() == false || dc.isRegisterOk == false{
return
}
var ping rpc.Ping
ping.NodeId = cluster.GetLocalNodeInfo().NodeId
masterNodes := GetCluster().GetOriginDiscovery().MasterNodeList
@@ -315,7 +333,7 @@ func (dc *OriginDiscoveryClient) ping(){
}
masterNodeId := masterNodes[i].NodeId
dc.AsyncCallNode(masterNodeId,RpcPingMethod,&ping, func(empty *rpc.Pong,err error) {
dc.AsyncCallNodeWithTimeout(3*time.Second,masterNodeId,RpcPingMethod,&ping, func(empty *rpc.Pong,err error) {
if err == nil && empty.Ok == false{
//断开master重
dc.regServiceDiscover(masterNodeId)
@@ -349,7 +367,7 @@ func (dc *OriginDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNod
}
diffNodeIdSlice := make([]string, 0, len(mapNodeInfo))
mapNodeId := map[string]struct{}{}
mapNodeId := map[string][]string{}
mapNodeId, ok := dc.mapDiscovery[masterNodeId]
if ok == false {
return nil
@@ -368,6 +386,7 @@ func (dc *OriginDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNod
//订阅发现的服务通知
func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
log.Debug("RPC_SubServiceDiscover",log.String("masterNodeId",req.MasterNodeId),log.String("delNodeId",req.GetDelNodeId()))
mapNodeInfo := map[string]*rpc.NodeInfo{}
for _, nodeInfo := range req.NodeInfo {
//不对本地结点或者不存在任何公开服务的结点
@@ -414,15 +433,14 @@ func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscov
//删除不必要的结点
for _, nodeId := range willDelNodeId {
cluster.TriggerDiscoveryEvent(false,nodeId,nil)
cluster.TriggerDiscoveryEvent(false,nodeId,dc.getNodePublicService(req.MasterNodeId, nodeId))
dc.funDelNode(nodeId, true)
dc.removeMasterNode(req.MasterNodeId, nodeId)
if dc.findNodeId(nodeId) == false {
dc.funDelNode(nodeId, false)
}
}
//设置新结点
for _, nodeInfo := range mapNodeInfo {
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId,nodeInfo.PublicServiceList)
bSet := dc.setNodeInfo(req.MasterNodeId,nodeInfo)
if bSet == false {
continue
@@ -439,6 +457,8 @@ func (dc *OriginDiscoveryClient) OnNodeConnected(nodeId string) {
}
func (dc *OriginDiscoveryClient) OnRelease(){
log.Debug("OriginDiscoveryClient")
//取消注册
var nodeRetireReq rpc.UnRegServiceDiscoverReq
nodeRetireReq.NodeId = cluster.GetLocalNodeInfo().NodeId
@@ -510,6 +530,7 @@ func (dc *OriginDiscoveryClient) regServiceDiscover(nodeId string){
return
}
dc.isRegisterOk = true
dc.RPC_SubServiceDiscover(res)
})