mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-13 07:04:44 +08:00
Compare commits
6 Commits
dependabot
...
v2.2.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
857e3c4311 | ||
|
|
210a3fa982 | ||
|
|
d365dde8c0 | ||
|
|
e0d412810f | ||
|
|
593abd263e | ||
|
|
75660bdec0 |
74
README.md
74
README.md
@@ -80,8 +80,22 @@ Etcd方式示例:
|
||||
"DialTimeoutMillisecond": 3000,
|
||||
"EtcdList": [
|
||||
{
|
||||
"NetworkName": ["network1"],
|
||||
"Endpoints": ["http://192.168.13.24:12379"]
|
||||
"LocalNetworkName": "network_Area1",
|
||||
"Endpoints": ["http://127.0.0.1:12379"],
|
||||
"UserName": "",
|
||||
"Password": "",
|
||||
"Cert": "",
|
||||
"CertKey": "",
|
||||
"Ca": ""
|
||||
},
|
||||
{
|
||||
"NeighborNetworkName": ["network_Area2"],
|
||||
"Endpoints": ["http://127.0.0.1:12379"],
|
||||
"UserName": "",
|
||||
"Password": "",
|
||||
"Cert": "",
|
||||
"CertKey": "",
|
||||
"Ca": ""
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -92,12 +106,16 @@ TTLSecond:表示健康检查TTL失效时间10秒
|
||||
|
||||
DialTimeoutMillisecond: 与etcd连接超时时间
|
||||
|
||||
EtcdList:Etcd列表,可以多个Etcd服务器连接
|
||||
EtcdList:Etcd列表,可以多个Etcd服务器连接(注意:列表中必需有一个LocalNetworkName项,表示当前所有的Node归属当前网络名为network_Area1)Node下所有的服务会往network_Area1中注册。监听该网络的结点可以发现该网络中的Service。本地网络会默认监听本地网络中所有的服务。
|
||||
|
||||
NetworkName:所在的网络名称,可以配置多个。node会往对应的网络名称中注册、监听发现Service。NetworkName也起到发现隔离的作用。
|
||||
NeighborNetworkName:表示监听的邻居网络名,可以发现该网络中所有Service
|
||||
|
||||
Endpoints:Etcd服务器地址
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Origin方式示例:
|
||||
|
||||
```json
|
||||
@@ -105,10 +123,15 @@ Origin方式示例:
|
||||
"Discovery": {
|
||||
"Origin":{
|
||||
"TTLSecond": 10,
|
||||
"LocalMasterNodeId": "bot",
|
||||
"MasterNodeList": [
|
||||
{
|
||||
"NodeId": "test_1",
|
||||
"ListenAddr": "127.0.0.1:8801"
|
||||
"NodeId": "bot",
|
||||
"ListenAddr": "127.0.0.1:11001"
|
||||
},
|
||||
{
|
||||
"NodeId": "mp1server",
|
||||
"ListenAddr": "127.0.0.1:11000"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -118,8 +141,12 @@ Origin方式示例:
|
||||
|
||||
TTLSecond:表示健康检查TTL失效时间10秒
|
||||
|
||||
LocalMasterNodeId:本地所有的Node归属当前Master NodeId。归属当前的所有的Node会往该Master Node中注册服务。MasterNode会自动同步给所有的监听结点。本地Node会默认监听本地Master Node中所有的服务。注意LocalMasterNodeId配置的NodeId要在MasterNodeList列表中。
|
||||
|
||||
MasterNodeList:指定哪些Node为服务发现Master结点,需要配置NodeId与ListenAddr,注意它们要与实际的Node配置一致。
|
||||
|
||||
|
||||
|
||||
### RpcMode部分
|
||||
|
||||
默认模式
|
||||
@@ -947,20 +974,45 @@ origin引擎默认使用读取所有结点配置的进行确认结点有哪些Se
|
||||
"Private": false,
|
||||
"remark": "//以_打头的,表示只在本机进程,不对整个子网开发",
|
||||
"ServiceList": ["_TestService1", "TestService9", "TestService10"],
|
||||
"DiscoveryService": [
|
||||
"AllowDiscovery": [
|
||||
{
|
||||
"MasterNodeId": "nodeid_1",
|
||||
"NetworkName":"networkname1"
|
||||
"DiscoveryService": ["TestService8"]
|
||||
"NetworkName":"networkname1",
|
||||
"NodeIdList":[".*server"],
|
||||
"ServiceList": ["TestService8"]
|
||||
}
|
||||
]
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
DiscoveryService:在当前nodeid为nodeid_test的结点中,只发现 MasterNodeId为nodeid_1或NetworkName为networkname1网络中的TestService8服务。
|
||||
以上,如果是使用Etcd发现模式,则表示可以发现网络名networkname1,NodeId为server结尾,服务名为TestService8的服务。
|
||||
|
||||
AllowDiscovery:可以配置发现的规则,如果只配置MasterNodeId或NetworkName时(如果使用Etcd则只配置NetworkName,Origin则只配置MasterNodeId),则会筛选指定网络的所有服务,如下:
|
||||
|
||||
```
|
||||
"AllowDiscovery": [
|
||||
{
|
||||
"NetworkName":"networkname1",
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
则以上,只匹配networkname1网络名的所有服务,支持正则表达式,例如可以配置为"NetworkName":".*name1",则可以发现网络名为name1结尾的所有服务。
|
||||
|
||||
如果只发现NodeId为server结尾的所有服务,可以使用以下配置方式:
|
||||
|
||||
```
|
||||
"AllowDiscovery": [
|
||||
{
|
||||
"NodeIdList":[".*server"]
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
筛选服务也是同上。也可以组合配置NetworkName和NodeIdList配置。
|
||||
|
||||
|
||||
**注意**:MasterNodeId与NetworkName只配置一个,分别在模式为origin或者etcd服务发现类型时。
|
||||
|
||||
第八章:HttpService使用
|
||||
-----------------------
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"regexp"
|
||||
|
||||
"github.com/duanhf2012/origin/v2/event"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"github.com/duanhf2012/origin/v2/rpc"
|
||||
@@ -24,21 +26,23 @@ const (
|
||||
Discard NodeStatus = 1 //丢弃
|
||||
)
|
||||
|
||||
type DiscoveryService struct {
|
||||
MasterNodeId string //要筛选的主结点Id,如果不配置或者配置成0,表示针对所有的主结点
|
||||
NetworkName string //如果是etcd,指定要筛选的网络名中的服务,不配置,表示所有的网络
|
||||
ServiceList []string //只发现的服务列表
|
||||
// AllowDiscovery 允许发现的网络服务
|
||||
type AllowDiscovery struct {
|
||||
MasterNodeId string // 支持正则表达式
|
||||
NetworkName string // 支持正则表达式
|
||||
NodeIdList []string // 支持正则表达式
|
||||
ServiceList []string
|
||||
}
|
||||
|
||||
type NodeInfo struct {
|
||||
NodeId string
|
||||
Private bool
|
||||
ListenAddr string
|
||||
MaxRpcParamLen uint32 //最大Rpc参数长度
|
||||
CompressBytesLen int //超过字节进行压缩的长度
|
||||
ServiceList []string //所有的有序服务列表
|
||||
PublicServiceList []string //对外公开的服务列表
|
||||
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
||||
MaxRpcParamLen uint32 //最大Rpc参数长度
|
||||
CompressBytesLen int //超过字节进行压缩的长度
|
||||
ServiceList []string //所有的有序服务列表
|
||||
PublicServiceList []string //对外公开的服务列表
|
||||
AllowDiscovery []AllowDiscovery //允许发现的网络服务
|
||||
status NodeStatus
|
||||
Retire bool
|
||||
}
|
||||
@@ -223,9 +227,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
||||
}
|
||||
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
|
||||
if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType != OriginType {
|
||||
log.Info("Discovery nodeId and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
|
||||
log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
|
||||
} else {
|
||||
log.Info("Discovery nodeId and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr))
|
||||
log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -462,28 +466,76 @@ func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo, bool) {
|
||||
return nodeInfo.nodeInfo, true
|
||||
}
|
||||
|
||||
func (cls *Cluster) CanDiscoveryService(fromMasterNodeId string, serviceName string) bool {
|
||||
func (cls *Cluster) CanDiscoveryService(fromNetworkName string, fromMasterNodeId string, fromNodeId string, serviceName string) bool {
|
||||
canDiscovery := true
|
||||
|
||||
// 筛选允许的服务
|
||||
splitServiceName := strings.Split(serviceName, ":")
|
||||
if len(splitServiceName) == 2 {
|
||||
serviceName = splitServiceName[0]
|
||||
}
|
||||
|
||||
for i := 0; i < len(cls.GetLocalNodeInfo().DiscoveryService); i++ {
|
||||
masterNodeId := cls.GetLocalNodeInfo().DiscoveryService[i].MasterNodeId
|
||||
//无效的配置,则跳过
|
||||
if masterNodeId == rpc.NodeIdNull && len(cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList) == 0 {
|
||||
continue
|
||||
}
|
||||
// 先筛选允许的网络,有配置才会检测
|
||||
if len(cls.GetLocalNodeInfo().AllowDiscovery) > 0 {
|
||||
allowNetwork := false
|
||||
for i := 0; i < len(cls.GetLocalNodeInfo().AllowDiscovery); i++ {
|
||||
masterNodeId := cls.GetLocalNodeInfo().AllowDiscovery[i].MasterNodeId
|
||||
networkName := cls.GetLocalNodeInfo().AllowDiscovery[i].NetworkName
|
||||
nodeIdList := cls.GetLocalNodeInfo().AllowDiscovery[i].NodeIdList
|
||||
serviceList := cls.GetLocalNodeInfo().AllowDiscovery[i].ServiceList
|
||||
|
||||
canDiscovery = false
|
||||
if masterNodeId == fromMasterNodeId || masterNodeId == rpc.NodeIdNull {
|
||||
for _, discoveryService := range cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList {
|
||||
if discoveryService == serviceName {
|
||||
return true
|
||||
// 如果配置了网络及Master结点,则匹配之
|
||||
if fromNetworkName != "" {
|
||||
matchNetWork, _ := regexp.MatchString(networkName, fromNetworkName)
|
||||
if !matchNetWork {
|
||||
continue
|
||||
}
|
||||
} else if fromMasterNodeId != "" {
|
||||
matchMasterNode, _ := regexp.MatchString(masterNodeId, fromMasterNodeId)
|
||||
if !matchMasterNode {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 如果配置了
|
||||
if len(nodeIdList) > 0 {
|
||||
hasNode := false
|
||||
for _, nodeId := range nodeIdList {
|
||||
matchNodeId, _ := regexp.MatchString(nodeId, fromNodeId)
|
||||
if !matchNodeId {
|
||||
continue
|
||||
}
|
||||
hasNode = true
|
||||
break
|
||||
}
|
||||
|
||||
if !hasNode {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 如果配置了服务,则匹配之
|
||||
if len(serviceList) > 0 {
|
||||
hasService := false
|
||||
for _, service := range serviceList {
|
||||
// service按正则表达式匹配serviceName
|
||||
matched, _ := regexp.MatchString(service, serviceName)
|
||||
if matched {
|
||||
hasService = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !hasService {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
allowNetwork = true
|
||||
break
|
||||
}
|
||||
|
||||
if !allowNetwork {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,12 @@ import (
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/duanhf2012/origin/v2/event"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"github.com/duanhf2012/origin/v2/rpc"
|
||||
@@ -14,19 +20,15 @@ import (
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const originDir = "/origin"
|
||||
|
||||
type etcdClientInfo struct {
|
||||
watchKeys []string
|
||||
leaseID clientv3.LeaseID
|
||||
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
|
||||
isLocalNetwork bool
|
||||
watchKeys []string
|
||||
leaseID clientv3.LeaseID
|
||||
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
|
||||
}
|
||||
|
||||
type EtcdDiscoveryService struct {
|
||||
@@ -93,6 +95,7 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
return errors.New("etcd discovery config is nil.")
|
||||
}
|
||||
|
||||
var hasLocalNetwork bool
|
||||
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
||||
var client *clientv3.Client
|
||||
var tlsConfig *tls.Config
|
||||
@@ -141,13 +144,25 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
}
|
||||
|
||||
ec := &etcdClientInfo{}
|
||||
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NetworkName {
|
||||
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName))
|
||||
|
||||
if etcdDiscoveryCfg.EtcdList[i].LocalNetworkName != "" {
|
||||
hasLocalNetwork = true
|
||||
ec.isLocalNetwork = true
|
||||
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, etcdDiscoveryCfg.EtcdList[i].LocalNetworkName))
|
||||
} else {
|
||||
ec.isLocalNetwork = false
|
||||
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NeighborNetworkName {
|
||||
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName))
|
||||
}
|
||||
}
|
||||
|
||||
ed.mapClient[client] = ec
|
||||
}
|
||||
|
||||
if !hasLocalNetwork {
|
||||
return errors.New("etcd discovery init fail,cannot find local network")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -167,14 +182,18 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
|
||||
}
|
||||
|
||||
etcdClient.leaseID = resp.ID
|
||||
for _, watchKey := range etcdClient.watchKeys {
|
||||
// 注册服务节点到 etcd
|
||||
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||
ed.tryRegisterService(client, etcdClient)
|
||||
return
|
||||
}
|
||||
|
||||
// 注册服务节点到 etcd,LocalNetwork时才会注册,且etcdClient.watchKeys必然>0
|
||||
if len(etcdClient.watchKeys) != 1 {
|
||||
log.Error("LocalNetwork watchkey is error")
|
||||
return
|
||||
}
|
||||
|
||||
_, err = client.Put(context.Background(), ed.getRegisterKey(etcdClient.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||
ed.tryRegisterService(client, etcdClient)
|
||||
return
|
||||
}
|
||||
|
||||
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
|
||||
@@ -204,6 +223,10 @@ func (ed *EtcdDiscoveryService) tryRegisterService(client *clientv3.Client, etcd
|
||||
return
|
||||
}
|
||||
|
||||
if !etcdClient.isLocalNetwork {
|
||||
return
|
||||
}
|
||||
|
||||
ed.AfterFunc(time.Second*3, func(t *timer.Timer) {
|
||||
ed.registerServiceByClient(client, etcdClient)
|
||||
})
|
||||
@@ -229,13 +252,18 @@ func (ed *EtcdDiscoveryService) tryLaterRetire() {
|
||||
func (ed *EtcdDiscoveryService) retire() error {
|
||||
//从etcd中更新
|
||||
for c, ec := range ed.mapClient {
|
||||
for _, watchKey := range ec.watchKeys {
|
||||
// 注册服务节点到 etcd
|
||||
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
|
||||
if err != nil {
|
||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
if !ec.isLocalNetwork {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(ec.watchKeys)!=1 {
|
||||
log.Error("LocalNetwork watchkey is error")
|
||||
continue
|
||||
}
|
||||
_, err := c.Put(context.Background(), ed.getRegisterKey(ec.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
|
||||
if err != nil {
|
||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,7 +320,7 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
|
||||
//筛选关注的服务
|
||||
var discoverServiceSlice = make([]string, 0, 24)
|
||||
for _, pubService := range nodeInfo.PublicServiceList {
|
||||
if cluster.CanDiscoveryService(networkName, pubService) == true {
|
||||
if cluster.CanDiscoveryService(networkName, "", nodeInfo.NodeId, pubService) == true {
|
||||
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
||||
}
|
||||
}
|
||||
@@ -503,11 +531,16 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
|
||||
|
||||
//写入到etcd中
|
||||
for c, info := range ed.mapClient {
|
||||
for _, watchKey := range info.watchKeys {
|
||||
if ed.getNetworkNameByWatchKey(watchKey) == etcdServiceRecord.NetworkName {
|
||||
client = c
|
||||
break
|
||||
}
|
||||
if !info.isLocalNetwork {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(info.watchKeys)!=1 {
|
||||
log.Error("")
|
||||
}
|
||||
if ed.getNetworkNameByWatchKey(info.watchKeys[0]) == etcdServiceRecord.NetworkName {
|
||||
client = c
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -508,11 +508,17 @@ func (dc *OriginDiscoveryClient) regServiceDiscover(nodeId string) {
|
||||
if nodeId == cluster.GetLocalNodeInfo().NodeId {
|
||||
return
|
||||
}
|
||||
|
||||
nodeInfo := cluster.getOriginMasterDiscoveryNodeInfo(nodeId)
|
||||
if nodeInfo == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 只能往本地Master结点上注册
|
||||
if !cluster.IsLocalMasterNodeId(nodeId) {
|
||||
return
|
||||
}
|
||||
|
||||
var req rpc.RegServiceDiscoverReq
|
||||
req.NodeInfo = &rpc.NodeInfo{}
|
||||
req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId
|
||||
@@ -548,7 +554,7 @@ func (dc *OriginDiscoveryClient) setNodeInfo(masterNodeId string, nodeInfo *rpc.
|
||||
//筛选关注的服务
|
||||
var discoverServiceSlice = make([]string, 0, 24)
|
||||
for _, pubService := range nodeInfo.PublicServiceList {
|
||||
if cluster.CanDiscoveryService(masterNodeId, pubService) == true {
|
||||
if cluster.CanDiscoveryService("",masterNodeId, nodeInfo.NodeId,pubService) == true {
|
||||
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
||||
}
|
||||
}
|
||||
@@ -617,13 +623,22 @@ func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool {
|
||||
return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil
|
||||
}
|
||||
|
||||
func (cls *Cluster) IsLocalMasterNodeId(nodeId string) bool {
|
||||
if cls.discoveryInfo.Origin == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return cls.discoveryInfo.Origin.LocalMasterNodeId == nodeId
|
||||
}
|
||||
func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
|
||||
if cls.discoveryInfo.Origin == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ {
|
||||
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
|
||||
|
||||
|
||||
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
|
||||
return &cls.discoveryInfo.Origin.MasterNodeList[i]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,27 +16,27 @@ import (
|
||||
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
|
||||
type EtcdList struct {
|
||||
NetworkName []string
|
||||
Endpoints []string
|
||||
UserName string
|
||||
Password string
|
||||
Cert string
|
||||
CertKey string
|
||||
Ca string
|
||||
LocalNetworkName string // 如果配置,则为本地网络,必需配置一个本地网络
|
||||
NeighborNetworkName []string
|
||||
Endpoints []string
|
||||
UserName string
|
||||
Password string
|
||||
Cert string
|
||||
CertKey string
|
||||
Ca string
|
||||
}
|
||||
|
||||
type EtcdDiscovery struct {
|
||||
DialTimeoutMillisecond time.Duration
|
||||
TTLSecond int64
|
||||
|
||||
EtcdList []EtcdList
|
||||
EtcdList []EtcdList
|
||||
}
|
||||
|
||||
type OriginDiscovery struct {
|
||||
TTLSecond int64
|
||||
MasterNodeList []NodeInfo
|
||||
TTLSecond int64
|
||||
LocalMasterNodeId string
|
||||
MasterNodeList []NodeInfo
|
||||
}
|
||||
|
||||
type DiscoveryType int
|
||||
@@ -72,7 +72,7 @@ type NodeInfoList struct {
|
||||
}
|
||||
|
||||
func validConfigFile(f string) bool {
|
||||
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
||||
return strings.HasSuffix(f, ".json") || strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
||||
}
|
||||
|
||||
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
||||
@@ -131,21 +131,20 @@ func (d *DiscoveryInfo) setEtcd(etcd *EtcdDiscovery) error {
|
||||
return fmt.Errorf("repeat configuration of Discovery")
|
||||
}
|
||||
|
||||
//Endpoints不允许重复
|
||||
mapAddr := make(map[string]struct{})
|
||||
for _, n := range etcd.EtcdList {
|
||||
for _, endPoint := range n.Endpoints {
|
||||
if _, ok := mapAddr[endPoint]; ok == true {
|
||||
return fmt.Errorf("etcd discovery config Etcd.EtcdList.Endpoints %+v is repeat", endPoint)
|
||||
}
|
||||
mapAddr[endPoint] = struct{}{}
|
||||
}
|
||||
|
||||
//networkName不允许重复
|
||||
mapNetworkName := make(map[string]struct{})
|
||||
for _, netName := range n.NetworkName {
|
||||
if n.LocalNetworkName != "" {
|
||||
if _, ok := mapNetworkName[n.LocalNetworkName]; ok == true {
|
||||
return fmt.Errorf("etcd discovery config Etcd.EtcdList.LocalNetworkName %+v is repeat", n.LocalNetworkName)
|
||||
}
|
||||
mapNetworkName[n.LocalNetworkName] = struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, netName := range n.NeighborNetworkName {
|
||||
if _, ok := mapNetworkName[netName]; ok == true {
|
||||
return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", n.NetworkName)
|
||||
return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", netName)
|
||||
}
|
||||
|
||||
mapNetworkName[netName] = struct{}{}
|
||||
@@ -275,7 +274,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
||||
var rpcMode RpcMode
|
||||
|
||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
|
||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
@@ -340,7 +339,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
nodeService := map[string]interface{}{}
|
||||
|
||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
|
||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
@@ -432,7 +431,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cls *Cluster) parseLocalCfg() error{
|
||||
func (cls *Cluster) parseLocalCfg() error {
|
||||
rpcInfo := NodeRpcInfo{}
|
||||
rpcInfo.nodeInfo = cls.localNodeInfo
|
||||
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
|
||||
@@ -454,7 +453,7 @@ func (cls *Cluster) parseLocalCfg() error{
|
||||
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
||||
}
|
||||
|
||||
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
|
||||
if _, ok := cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]; ok {
|
||||
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
|
||||
}
|
||||
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
||||
|
||||
@@ -119,7 +119,6 @@ func (ws *WSModule) recyclerReaderBytes([]byte) {
|
||||
func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent {
|
||||
ws.mapClientLocker.Lock()
|
||||
defer ws.mapClientLocker.Unlock()
|
||||
|
||||
pClient := &WSClient{wsConn: conn, id: primitive.NewObjectID().Hex()}
|
||||
pClient.wsModule = ws
|
||||
ws.mapClient[pClient.id] = pClient
|
||||
@@ -159,6 +158,22 @@ func (ws *WSModule) GetProcessor() processor.IRawProcessor {
|
||||
return ws.process
|
||||
}
|
||||
|
||||
func (ws *WSModule) GetClientHeader(clientId string,key string) string {
|
||||
ws.mapClientLocker.Lock()
|
||||
defer ws.mapClientLocker.Unlock()
|
||||
|
||||
pClient, ok := ws.mapClient[clientId]
|
||||
if ok == false || pClient.wsConn == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
if pClient.wsConn.GetHeader() == nil {
|
||||
log.Warn("clientId header is nil", log.String("clientId", clientId))
|
||||
}
|
||||
|
||||
return pClient.wsConn.GetHeader().Get(key)
|
||||
}
|
||||
|
||||
func (ws *WSModule) GetClientIp(clientId string) string {
|
||||
ws.mapClientLocker.Lock()
|
||||
defer ws.mapClientLocker.Unlock()
|
||||
|
||||
@@ -69,6 +69,8 @@ func (bm *Blueprint) regSysNodes() {
|
||||
bm.RegisterExecNode(NewExecNode[EqualSwitch]())
|
||||
bm.RegisterExecNode(NewExecNode[Probability]())
|
||||
bm.RegisterExecNode(NewExecNode[CreateTimer]())
|
||||
bm.RegisterExecNode(NewExecNode[AppendIntReturn]())
|
||||
bm.RegisterExecNode(NewExecNode[AppendStringReturn]())
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -142,6 +142,17 @@ func (gc *graphConfig) GetNodeByID(nodeID string) *nodeConfig {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gr *Graph) GetAndCreateReturnPort() IPort {
|
||||
p, ok := gr.globalVariables[ReturnVarial]
|
||||
if ok && p != nil {
|
||||
return p
|
||||
}
|
||||
|
||||
p = NewPortArray()
|
||||
gr.globalVariables[ReturnVarial] = p
|
||||
return p
|
||||
}
|
||||
|
||||
func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array, error) {
|
||||
if IsDebug {
|
||||
log.Debug("Graph Do", log.String("graphName", gr.graphFileName), log.Int64("graphID", gr.graphID), log.Int64("entranceID", entranceID))
|
||||
@@ -157,6 +168,8 @@ func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array, error) {
|
||||
|
||||
if gr.globalVariables == nil {
|
||||
gr.globalVariables = map[string]IPort{}
|
||||
}else {
|
||||
gr.globalVariables[ReturnVarial] = nil
|
||||
}
|
||||
|
||||
err := entranceNode.Do(gr, args...)
|
||||
|
||||
@@ -681,7 +681,7 @@ func (em *CreateTimer) Exec() (int, error) {
|
||||
|
||||
array, ok := em.GetInPortArray(1)
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("CreateTimer inParam 0 error")
|
||||
return -1, fmt.Errorf("CreateTimer inParam 1 error")
|
||||
}
|
||||
|
||||
var timerId uint64
|
||||
@@ -718,7 +718,7 @@ func (em *CloseTimer) GetName() string {
|
||||
func (em *CloseTimer) Exec() (int, error) {
|
||||
timerID, ok := em.GetInPortInt(1)
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("CreateTimer inParam 0 error")
|
||||
return -1, fmt.Errorf("CreateTimer inParam 1 error")
|
||||
}
|
||||
|
||||
id := uint64(timerID)
|
||||
@@ -729,3 +729,56 @@ func (em *CloseTimer) Exec() (int, error) {
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// AppendIntReturn 追加返回结果(Int)
|
||||
type AppendIntReturn struct {
|
||||
BaseExecNode
|
||||
}
|
||||
|
||||
func (em *AppendIntReturn) GetName() string {
|
||||
return "AppendIntReturn"
|
||||
}
|
||||
|
||||
func (em *AppendIntReturn) Exec() (int, error) {
|
||||
val, ok := em.GetInPortInt(1)
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("AppendIntReturn inParam 1 error")
|
||||
}
|
||||
|
||||
returnPort := em.gr.GetAndCreateReturnPort()
|
||||
if returnPort == nil {
|
||||
return -1, fmt.Errorf("GetAndCreateReturnPort fail")
|
||||
}
|
||||
returnPort.AppendArrayValInt(val)
|
||||
|
||||
return -0, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
// AppendStringReturn 追加返回结果(String)
|
||||
type AppendStringReturn struct {
|
||||
BaseExecNode
|
||||
}
|
||||
|
||||
func (em *AppendStringReturn) GetName() string {
|
||||
return "AppendStringReturn"
|
||||
}
|
||||
|
||||
func (em *AppendStringReturn) Exec() (int, error) {
|
||||
val, ok := em.GetInPortStr(1)
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("AppendStringReturn inParam 1 error")
|
||||
}
|
||||
|
||||
returnPort := em.gr.GetAndCreateReturnPort()
|
||||
if returnPort == nil {
|
||||
return -1, fmt.Errorf("GetAndCreateReturnPort fail")
|
||||
}
|
||||
returnPort.AppendArrayValStr(val)
|
||||
|
||||
return -0, nil
|
||||
}
|
||||
Reference in New Issue
Block a user