Files
origin/cluster/cluster.go
2025-12-12 10:52:08 +08:00

544 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cluster
import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"regexp"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/service"
)
var configDir = "./config/"
type SetupServiceFun func(s ...service.IService)
type NodeStatus int
const (
Normal NodeStatus = 0 //正常
Discard NodeStatus = 1 //丢弃
)
// AllowDiscovery 允许发现的网络服务
type AllowDiscovery struct {
MasterNodeId string // 支持正则表达式
NetworkName string // 支持正则表达式
NodeIdList []string // 支持正则表达式
ServiceList []string
}
type NodeInfo struct {
NodeId string
Private bool
ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度
CompressBytesLen int //超过字节进行压缩的长度
ServiceList []string //所有的有序服务列表
PublicServiceList []string //对外公开的服务列表
AllowDiscovery []AllowDiscovery //允许发现的网络服务
status NodeStatus
Retire bool
}
type NodeRpcInfo struct {
nodeInfo NodeInfo
client *rpc.Client
}
var cluster Cluster
type Cluster struct {
localNodeInfo NodeInfo //本结点配置信息
discoveryInfo DiscoveryInfo //服务发现配置
rpcMode RpcMode
globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[string]*NodeRpcInfo //nodeId
mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId]
mapTemplateServiceNode map[string]map[string]struct{} //map[templateServiceName]map[serviceName]nodeId
callSet rpc.CallSet
rpcNats rpc.RpcNats
rpcServer rpc.IServer
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName
}
func GetCluster() *Cluster {
return &cluster
}
func SetConfigDir(cfgDir string) {
configDir = cfgDir
}
func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
cluster.serviceDiscovery = serviceDiscovery
}
func (cls *Cluster) Start() error {
return cls.rpcServer.Start()
}
func (cls *Cluster) Stop() {
cls.rpcServer.Stop()
}
func (cls *Cluster) DiscardNode(nodeId string) {
cls.locker.Lock()
nodeInfo, ok := cls.mapRpc[nodeId]
bDel := (ok == true) && nodeInfo.nodeInfo.status == Discard
cls.locker.Unlock()
if bDel {
cls.DelNode(nodeId)
}
}
func (cls *Cluster) DelNode(nodeId string) {
//MasterDiscover结点与本地结点不删除
if cls.IsOriginMasterDiscoveryNode(nodeId) || nodeId == cls.localNodeInfo.NodeId {
return
}
cls.locker.Lock()
defer cls.locker.Unlock()
nodeRpc, ok := cls.mapRpc[nodeId]
if ok == false {
return
}
cls.TriggerDiscoveryEvent(false, nodeId, nodeRpc.nodeInfo.ServiceList)
for _, serviceName := range nodeRpc.nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeId)
}
delete(cls.mapRpc, nodeId)
if ok == true {
nodeRpc.client.Close(false)
}
log.Info("remove node ", log.String("NodeId", nodeRpc.nodeInfo.NodeId), log.String("ListenAddr", nodeRpc.nodeInfo.ListenAddr))
}
func (cls *Cluster) serviceDiscoveryDelNode(nodeId string) {
cls.DelNode(nodeId)
}
func (cls *Cluster) delServiceNode(serviceName string, nodeId string) {
if nodeId == cls.localNodeInfo.NodeId {
return
}
//处理模板服务
splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
mapService := cls.mapTemplateServiceNode[templateServiceName]
delete(mapService, serviceName)
if len(cls.mapTemplateServiceNode[templateServiceName]) == 0 {
delete(cls.mapTemplateServiceNode, templateServiceName)
}
}
mapNode := cls.mapServiceNode[serviceName]
delete(mapNode, nodeId)
if len(mapNode) == 0 {
delete(cls.mapServiceNode, serviceName)
}
}
func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
//本地结点不加入
if nodeInfo.NodeId == cls.localNodeInfo.NodeId {
return
}
cls.locker.Lock()
defer cls.locker.Unlock()
//先清一次的NodeId对应的所有服务清理
lastNodeInfo, ok := cls.mapRpc[nodeInfo.NodeId]
if ok == true {
for _, serviceName := range lastNodeInfo.nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeInfo.NodeId)
}
}
cluster.TriggerDiscoveryEvent(true, nodeInfo.NodeId, nodeInfo.PublicServiceList)
//再重新组装
mapDuplicate := map[string]interface{}{} //预防重复数据
for _, serviceName := range nodeInfo.PublicServiceList {
if _, ok := mapDuplicate[serviceName]; ok == true {
//存在重复
log.Error("Bad duplicate Service Cfg.")
continue
}
mapDuplicate[serviceName] = nil
//如果是模板服务,则记录模板关系
splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
//记录模板
if _, ok = cls.mapTemplateServiceNode[templateServiceName]; ok == false {
cls.mapTemplateServiceNode[templateServiceName] = map[string]struct{}{}
}
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
}
if _, ok = cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1)
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
if lastNodeInfo != nil {
log.Info("Discovery nodeId", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
lastNodeInfo.nodeInfo = *nodeInfo
return
}
//不存在时,则建立连接
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo
if cls.IsNatsMode() {
rpcInfo.client = cls.rpcNats.NewNatsClient(nodeInfo.NodeId, cls.GetLocalNodeInfo().NodeId, &cls.callSet, cls.NotifyAllService)
} else {
rpcInfo.client = rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen, cls.localNodeInfo.CompressBytesLen, &cls.callSet, cls.NotifyAllService)
}
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType != OriginType {
log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
} else {
log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr))
}
}
func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) error {
//1.初始化配置
err := cls.InitCfg(localNodeId)
if err != nil {
return err
}
cls.callSet.Init()
if cls.IsNatsMode() {
cls.rpcNats.Init(cls.rpcMode.Nats.NatsUrl, cls.rpcMode.Nats.NoRandomize, cls.GetLocalNodeInfo().NodeId, cls.localNodeInfo.CompressBytesLen, cls, cluster.NotifyAllService)
cls.rpcServer = &cls.rpcNats
} else {
s := &rpc.Server{}
s.Init(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen, cls.localNodeInfo.CompressBytesLen, cls)
cls.rpcServer = s
}
//2.安装服务发现结点
err = cls.setupDiscovery(localNodeId, setupServiceFun)
if err != nil {
log.Error("setupDiscovery fail", log.ErrorField("err", err))
return err
}
service.RegRpcEventFun = cls.RegRpcEvent
service.UnRegRpcEventFun = cls.UnRegRpcEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil {
return err
}
return nil
}
func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
pService := service.GetService(serviceName)
if pService == nil {
return nil
}
return pService.GetRpcHandler()
}
func (cls *Cluster) getRpcClient(nodeId string) (*rpc.Client, bool) {
c, ok := cls.mapRpc[nodeId]
if ok == false {
return nil, false
}
return c.client, c.nodeInfo.Retire
}
func (cls *Cluster) GetRpcClient(nodeId string) (*rpc.Client, bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
return cls.getRpcClient(nodeId)
}
func GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
return GetCluster().GetNodeIdByTemplateService(templateServiceName, rpcClientList, filterRetire)
}
func GetRpcClient(nodeId string, serviceMethod string, filterRetire bool, clientList []*rpc.Client) (error, []*rpc.Client) {
if nodeId != rpc.NodeIdNull {
pClient, retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil {
return fmt.Errorf("cannot find nodeid %s", nodeId), nil
}
//如果需要筛选掉退休结点
if filterRetire == true && retire == true {
return fmt.Errorf("cannot find nodeid %s", nodeId), nil
}
clientList = append(clientList, pClient)
return nil, clientList
}
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), nil
}
serviceName := serviceMethod[:findIndex]
return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire)
}
func GetRpcServer() rpc.IServer {
return cluster.rpcServer
}
func (cls *Cluster) IsNodeConnected(nodeId string) bool {
pClient, _ := cls.GetRpcClient(nodeId)
return pClient != nil && pClient.IsConnected()
}
func (cls *Cluster) IsNodeRetire(nodeId string) bool {
cls.locker.RLock()
defer cls.locker.RUnlock()
_, retire := cls.getRpcClient(nodeId)
return retire
}
func (cls *Cluster) NotifyAllService(event event.IEvent) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for serviceName := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName)
if ser == nil {
log.Error("cannot find service name " + serviceName)
continue
}
ser.(service.IModule).NotifyEvent(event)
}
}
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
cls.NotifyAllService(&eventData)
}
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
return &cls.localNodeInfo
}
func (cls *Cluster) RegRpcEvent(serviceName string) {
cls.rpcEventLocker.Lock()
if cls.mapServiceListenRpcEvent == nil {
cls.mapServiceListenRpcEvent = map[string]struct{}{}
}
cls.mapServiceListenRpcEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenRpcEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func HasService(nodeId string, serviceName string) bool {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapNode, _ := cluster.mapServiceNode[serviceName]
if mapNode != nil {
_, ok := mapNode[nodeId]
return ok
}
return false
}
func GetNodeByServiceName(serviceName string) map[string]struct{} {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
mapNodeId := map[string]struct{}{}
for nodeId := range mapNode {
mapNodeId[nodeId] = struct{}{}
}
return mapNodeId
}
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名]NodeId
func GetNodeByTemplateServiceName(templateServiceName string) map[string]string {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapServiceName := cluster.mapTemplateServiceNode[templateServiceName]
mapNodeId := make(map[string]string, 9)
for serviceName := range mapServiceName {
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
for nodeId := range mapNode {
mapNodeId[serviceName] = nodeId
}
}
return mapNodeId
}
func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg
}
func (cls *Cluster) ParseGlobalCfg(cfg interface{}) error {
if cls.globalCfg == nil {
return errors.New("no service configuration found")
}
rv := reflect.ValueOf(cls.globalCfg)
if rv.Kind() == reflect.Ptr && rv.IsNil() {
return errors.New("no service configuration found")
}
bytes, err := json.Marshal(cls.globalCfg)
if err != nil {
return err
}
return json.Unmarshal(bytes, cfg)
}
func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo, bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeInfo, ok := cls.mapRpc[nodeId]
if ok == false || nodeInfo == nil {
return NodeInfo{}, false
}
return nodeInfo.nodeInfo, true
}
func (cls *Cluster) CanDiscoveryService(fromNetworkName string, fromMasterNodeId string, fromNodeId string, serviceName string) bool {
canDiscovery := true
// 筛选允许的服务
splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
}
// 先筛选允许的网络,有配置才会检测
if len(cls.GetLocalNodeInfo().AllowDiscovery) > 0 {
allowNetwork := false
for i := 0; i < len(cls.GetLocalNodeInfo().AllowDiscovery); i++ {
masterNodeId := cls.GetLocalNodeInfo().AllowDiscovery[i].MasterNodeId
networkName := cls.GetLocalNodeInfo().AllowDiscovery[i].NetworkName
nodeIdList := cls.GetLocalNodeInfo().AllowDiscovery[i].NodeIdList
serviceList := cls.GetLocalNodeInfo().AllowDiscovery[i].ServiceList
// 如果配置了网络及Master结点则匹配之
if fromNetworkName != "" {
matchNetWork, _ := regexp.MatchString(networkName, fromNetworkName)
if !matchNetWork {
continue
}
} else if fromMasterNodeId != "" {
matchMasterNode, _ := regexp.MatchString(masterNodeId, fromMasterNodeId)
if !matchMasterNode {
continue
}
}
// 如果配置了
if len(nodeIdList) > 0 {
hasNode := false
for _, nodeId := range nodeIdList {
matchNodeId, _ := regexp.MatchString(nodeId, fromNodeId)
if !matchNodeId {
continue
}
hasNode = true
break
}
if !hasNode {
continue
}
}
// 如果配置了服务,则匹配之
if len(serviceList) > 0 {
hasService := false
for _, service := range serviceList {
// service按正则表达式匹配serviceName
matched, _ := regexp.MatchString(service, serviceName)
if matched {
hasService = true
break
}
}
if !hasService {
continue
}
}
allowNetwork = true
break
}
if !allowNetwork {
return false
}
}
return canDiscovery
}