新增retire命令参数用于将置于服务退休状态,使得服务软关闭

This commit is contained in:
duanhf2012
2023-10-09 15:27:01 +08:00
parent ba019ac466
commit c0971a46a7
16 changed files with 504 additions and 203 deletions

View File

@@ -36,6 +36,7 @@ type NodeInfo struct {
PublicServiceList []string //对外公开的服务列表
MasterDiscoveryService []MasterDiscoveryService //筛选发现的服务,如果不配置,不进行筛选
status NodeStatus
Retire bool
}
type NodeRpcInfo struct {
@@ -55,8 +56,8 @@ type Cluster struct {
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[int]NodeRpcInfo //nodeId
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapRpc map[int]*NodeRpcInfo //nodeId
//mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server
@@ -87,10 +88,11 @@ func (cls *Cluster) Stop() {
func (cls *Cluster) DiscardNode(nodeId int) {
cls.locker.Lock()
nodeInfo, ok := cls.mapIdNode[nodeId]
nodeInfo, ok := cls.mapRpc[nodeId]
bDel := (ok == true) && nodeInfo.nodeInfo.status == Discard
cls.locker.Unlock()
if ok == true && nodeInfo.status == Discard {
if bDel {
cls.DelNode(nodeId, true)
}
}
@@ -103,39 +105,30 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
cls.locker.Lock()
defer cls.locker.Unlock()
nodeInfo, ok := cls.mapIdNode[nodeId]
rpc, ok := cls.mapRpc[nodeId]
if ok == false {
return
}
rpc, ok := cls.mapRpc[nodeId]
for {
//立即删除
if immediately || ok == false {
break
}
if immediately ==false {
//正在连接中不主动断开,只断开没有连接中的
if rpc.client.IsConnected() {
nodeInfo.status = Discard
log.Info("Discard node",log.Int("nodeId",nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
rpc.nodeInfo.status = Discard
log.Info("Discard node",log.Int("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
return
}
break
}
for _, serviceName := range nodeInfo.ServiceList {
for _, serviceName := range rpc.nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeId)
}
delete(cls.mapIdNode, nodeId)
delete(cls.mapRpc, nodeId)
if ok == true {
rpc.client.Close(false)
}
log.Info("remove node ",log.Int("NodeId", nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
log.Info("remove node ",log.Int("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
}
func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) {
@@ -168,9 +161,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
defer cls.locker.Unlock()
//先清一次的NodeId对应的所有服务清理
lastNodeInfo, ok := cls.mapIdNode[nodeInfo.NodeId]
lastNodeInfo, ok := cls.mapRpc[nodeInfo.NodeId]
if ok == true {
for _, serviceName := range lastNodeInfo.ServiceList {
for _, serviceName := range lastNodeInfo.nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeInfo.NodeId)
}
}
@@ -189,27 +182,22 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList))
//已经存在连接,则不需要进行设置
if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true {
if lastNodeInfo != nil {
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire))
lastNodeInfo.nodeInfo = *nodeInfo
return
}
//不存在时,则建立连接
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
log.Info("Discovery nodeId and new rpc client",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
}
func (cls *Cluster) buildLocalRpc() {
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId)
cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo
}
func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error {
//1.初始化配置
@@ -219,7 +207,6 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
}
cls.rpcServer.Init(cls)
cls.buildLocalRpc()
//2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
@@ -313,27 +300,33 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
return pService.GetRpcHandler()
}
func (cls *Cluster) getRpcClient(nodeId int) *rpc.Client {
func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) {
c, ok := cls.mapRpc[nodeId]
if ok == false {
return nil
return nil,false
}
return c.client
return c.client,c.nodeInfo.Retire
}
func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
func (cls *Cluster) GetRpcClient(nodeId int) (*rpc.Client,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
return cls.getRpcClient(nodeId)
}
func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (error, int) {
func GetRpcClient(nodeId int, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) {
if nodeId > 0 {
pClient := GetCluster().GetRpcClient(nodeId)
pClient,retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
}
//如果需要筛选掉退休结点
if filterRetire == true && retire == true {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
}
clientList[0] = pClient
return nil, 1
}
@@ -345,7 +338,7 @@ func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (e
serviceName := serviceMethod[:findIndex]
//1.找到对应的rpcNodeid
return GetCluster().GetNodeIdByService(serviceName, clientList, true)
return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire)
}
func GetRpcServer() *rpc.Server {
@@ -353,7 +346,7 @@ func GetRpcServer() *rpc.Server {
}
func (cls *Cluster) IsNodeConnected(nodeId int) bool {
pClient := cls.GetRpcClient(nodeId)
pClient,_ := cls.GetRpcClient(nodeId)
return pClient != nil && pClient.IsConnected()
}
@@ -475,11 +468,3 @@ func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg
}
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeInfo,ok:= cls.mapIdNode[nodeId]
return nodeInfo,ok
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/service"
"time"
"github.com/duanhf2012/origin/util/timer"
"google.golang.org/protobuf/proto"
)
const DynamicDiscoveryMasterName = "DiscoveryMaster"
@@ -14,6 +15,7 @@ const DynamicDiscoveryClientName = "DiscoveryClient"
const RegServiceDiscover = DynamicDiscoveryMasterName + ".RPC_RegServiceDiscover"
const SubServiceDiscover = DynamicDiscoveryClientName + ".RPC_SubServiceDiscover"
const AddSubServiceDiscover = DynamicDiscoveryMasterName + ".RPC_AddSubServiceDiscover"
const NodeRetireRpcMethod = DynamicDiscoveryMasterName+".RPC_NodeRetire"
type DynamicDiscoveryMaster struct {
service.Service
@@ -30,6 +32,7 @@ type DynamicDiscoveryClient struct {
localNodeId int
mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{}
bRetire bool
}
var masterService DynamicDiscoveryMaster
@@ -49,16 +52,32 @@ func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool {
return ok
}
func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) {
if len(nodeInfo.PublicServiceList) == 0 {
func (ds *DynamicDiscoveryMaster) updateNodeInfo(nInfo *rpc.NodeInfo) {
if _,ok:= ds.mapNodeInfo[nInfo.NodeId];ok == false {
return
}
_, ok := ds.mapNodeInfo[nodeInfo.NodeId]
nodeInfo := proto.Clone(nInfo).(*rpc.NodeInfo)
for i:=0;i<len(ds.nodeInfo);i++ {
if ds.nodeInfo[i].NodeId == nodeInfo.NodeId {
ds.nodeInfo[i] = nodeInfo
break
}
}
}
func (ds *DynamicDiscoveryMaster) addNodeInfo(nInfo *rpc.NodeInfo) {
if len(nInfo.PublicServiceList) == 0 {
return
}
_, ok := ds.mapNodeInfo[nInfo.NodeId]
if ok == true {
return
}
ds.mapNodeInfo[nodeInfo.NodeId] = struct{}{}
ds.mapNodeInfo[nInfo.NodeId] = struct{}{}
nodeInfo := proto.Clone(nInfo).(*rpc.NodeInfo)
ds.nodeInfo = append(ds.nodeInfo, nodeInfo)
}
@@ -87,16 +106,14 @@ func (ds *DynamicDiscoveryMaster) OnInit() error {
func (ds *DynamicDiscoveryMaster) OnStart() {
var nodeInfo rpc.NodeInfo
localNodeInfo := cluster.GetLocalNodeInfo()
if localNodeInfo.Private == true {
return
}
nodeInfo.NodeId = int32(localNodeInfo.NodeId)
nodeInfo.NodeName = localNodeInfo.NodeName
nodeInfo.ListenAddr = localNodeInfo.ListenAddr
nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList
nodeInfo.MaxRpcParamLen = localNodeInfo.MaxRpcParamLen
nodeInfo.Private = localNodeInfo.Private
nodeInfo.Retire = localNodeInfo.Retire
ds.addNodeInfo(&nodeInfo)
}
@@ -138,6 +155,19 @@ func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface
}
}
func (ds *DynamicDiscoveryMaster) RPC_NodeRetire(req *rpc.NodeRetireReq, res *rpc.Empty) error {
log.Info("node is retire",log.Int32("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire))
ds.updateNodeInfo(req.NodeInfo)
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo)
ds.RpcCastGo(SubServiceDiscover, &notifyDiscover)
return nil
}
// 收到注册过来的结点
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error {
if req.NodeInfo == nil {
@@ -165,6 +195,8 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList
nodeInfo.ListenAddr = req.NodeInfo.ListenAddr
nodeInfo.MaxRpcParamLen = req.NodeInfo.MaxRpcParamLen
nodeInfo.Retire = req.NodeInfo.Retire
//主动删除已经存在的结点,确保先断开,再连接
cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true)
@@ -269,6 +301,9 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
nInfo.Retire = nodeInfo.Retire
nInfo.Private = nodeInfo.Private
mapNodeInfo[nodeInfo.NodeId] = nInfo
}
@@ -278,7 +313,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//如果为完整同步,则找出差异的结点
var willDelNodeId []int32
//如果不是邻居结点,则做筛选
if req.IsFull == true {
diffNode := dc.fullCompareDiffNode(req.MasterNodeId, mapNodeInfo)
if len(diffNode) > 0 {
@@ -293,8 +327,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//删除不必要的结点
for _, nodeId := range willDelNodeId {
nodeInfo,_ := cluster.GetNodeInfo(int(nodeId))
cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList)
cluster.TriggerDiscoveryEvent(false,int(nodeId),nil)
dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
if dc.findNodeId(nodeId) == false {
dc.funDelService(int(nodeId), false)
@@ -328,6 +361,29 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
dc.regServiceDiscover(nodeId)
}
func (dc *DynamicDiscoveryClient) OnRetire(){
dc.bRetire = true
masterNodeList := cluster.GetDiscoveryNodeList()
for i:=0;i<len(masterNodeList);i++{
var nodeRetireReq rpc.NodeRetireReq
nodeRetireReq.NodeInfo = &rpc.NodeInfo{}
nodeRetireReq.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
nodeRetireReq.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
nodeRetireReq.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
nodeRetireReq.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
nodeRetireReq.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
nodeRetireReq.NodeInfo.Retire = dc.bRetire
nodeRetireReq.NodeInfo.Private = cluster.localNodeInfo.Private
err := dc.GoNode(int(masterNodeList[i].NodeId),NodeRetireRpcMethod,&nodeRetireReq)
if err!= nil {
log.Error("call "+NodeRetireRpcMethod+" is fail",log.ErrorAttr("err",err))
}
}
}
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil {
@@ -341,7 +397,8 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
req.NodeInfo.Retire = dc.bRetire
req.NodeInfo.Private = cluster.localNodeInfo.Private
//向Master服务同步本Node服务信息
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
@@ -403,6 +460,9 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.N
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
nInfo.Retire = nodeInfo.Retire
nInfo.Private = nodeInfo.Private
dc.funSetService(&nInfo)
return true

View File

@@ -199,7 +199,11 @@ func (cls *Cluster) readLocalService(localNodeId int) error {
}
func (cls *Cluster) parseLocalCfg() {
cls.mapIdNode[cls.localNodeInfo.NodeId] = cls.localNodeInfo
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId)
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, sName := range cls.localNodeInfo.ServiceList {
if _, ok := cls.mapServiceNode[sName]; ok == false {
@@ -225,8 +229,7 @@ func (cls *Cluster) checkDiscoveryNodeList(discoverMasterNode []NodeInfo) bool {
func (cls *Cluster) InitCfg(localNodeId int) error {
cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[int]NodeRpcInfo{}
cls.mapIdNode = map[int]NodeInfo{}
cls.mapRpc = map[int]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[int]struct{}{}
//加载本地结点的NodeList配置
@@ -263,17 +266,24 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
return ok
}
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, bAll bool) (error, int) {
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, int) {
cls.locker.RLock()
defer cls.locker.RUnlock()
mapNodeId, ok := cls.mapServiceNode[serviceName]
count := 0
if ok == true {
for nodeId, _ := range mapNodeId {
pClient := GetCluster().getRpcClient(nodeId)
if pClient == nil || (bAll == false && pClient.IsConnected() == false) {
pClient,retire := GetCluster().getRpcClient(nodeId)
if pClient == nil || pClient.IsConnected() == false {
continue
}
//如果需要筛选掉退休的对retire状态的结点略过
if filterRetire == true && retire == true {
continue
}
rpcClientList[count] = pClient
count++
if count >= cap(rpcClientList) {