mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-12 22:54:43 +08:00
Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
857e3c4311 | ||
|
|
210a3fa982 | ||
|
|
d365dde8c0 | ||
|
|
e0d412810f | ||
|
|
593abd263e | ||
|
|
75660bdec0 | ||
|
|
eaf20c4e3a | ||
|
|
027e83b706 | ||
|
|
f9be55e98d | ||
|
|
d7c4cfb1ef | ||
|
|
4cb6882a1a | ||
|
|
b78d9721f2 | ||
|
|
f8953d1764 | ||
|
|
fac7a323e1 | ||
|
|
1995d91cfc | ||
|
|
21e9b2cd4b | ||
|
|
969fbe818c | ||
|
|
f3ea9d7c7f | ||
|
|
70389b644d |
74
README.md
74
README.md
@@ -80,8 +80,22 @@ Etcd方式示例:
|
|||||||
"DialTimeoutMillisecond": 3000,
|
"DialTimeoutMillisecond": 3000,
|
||||||
"EtcdList": [
|
"EtcdList": [
|
||||||
{
|
{
|
||||||
"NetworkName": ["network1"],
|
"LocalNetworkName": "network_Area1",
|
||||||
"Endpoints": ["http://192.168.13.24:12379"]
|
"Endpoints": ["http://127.0.0.1:12379"],
|
||||||
|
"UserName": "",
|
||||||
|
"Password": "",
|
||||||
|
"Cert": "",
|
||||||
|
"CertKey": "",
|
||||||
|
"Ca": ""
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NeighborNetworkName": ["network_Area2"],
|
||||||
|
"Endpoints": ["http://127.0.0.1:12379"],
|
||||||
|
"UserName": "",
|
||||||
|
"Password": "",
|
||||||
|
"Cert": "",
|
||||||
|
"CertKey": "",
|
||||||
|
"Ca": ""
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@@ -92,12 +106,16 @@ TTLSecond:表示健康检查TTL失效时间10秒
|
|||||||
|
|
||||||
DialTimeoutMillisecond: 与etcd连接超时时间
|
DialTimeoutMillisecond: 与etcd连接超时时间
|
||||||
|
|
||||||
EtcdList:Etcd列表,可以多个Etcd服务器连接
|
EtcdList:Etcd列表,可以多个Etcd服务器连接(注意:列表中必需有一个LocalNetworkName项,表示当前所有的Node归属当前网络名为network_Area1)Node下所有的服务会往network_Area1中注册。监听该网络的结点可以发现该网络中的Service。本地网络会默认监听本地网络中所有的服务。
|
||||||
|
|
||||||
NetworkName:所在的网络名称,可以配置多个。node会往对应的网络名称中注册、监听发现Service。NetworkName也起到发现隔离的作用。
|
NeighborNetworkName:表示监听的邻居网络名,可以发现该网络中所有Service
|
||||||
|
|
||||||
Endpoints:Etcd服务器地址
|
Endpoints:Etcd服务器地址
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Origin方式示例:
|
Origin方式示例:
|
||||||
|
|
||||||
```json
|
```json
|
||||||
@@ -105,10 +123,15 @@ Origin方式示例:
|
|||||||
"Discovery": {
|
"Discovery": {
|
||||||
"Origin":{
|
"Origin":{
|
||||||
"TTLSecond": 10,
|
"TTLSecond": 10,
|
||||||
|
"LocalMasterNodeId": "bot",
|
||||||
"MasterNodeList": [
|
"MasterNodeList": [
|
||||||
{
|
{
|
||||||
"NodeId": "test_1",
|
"NodeId": "bot",
|
||||||
"ListenAddr": "127.0.0.1:8801"
|
"ListenAddr": "127.0.0.1:11001"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NodeId": "mp1server",
|
||||||
|
"ListenAddr": "127.0.0.1:11000"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@@ -118,8 +141,12 @@ Origin方式示例:
|
|||||||
|
|
||||||
TTLSecond:表示健康检查TTL失效时间10秒
|
TTLSecond:表示健康检查TTL失效时间10秒
|
||||||
|
|
||||||
|
LocalMasterNodeId:本地所有的Node归属当前Master NodeId。归属当前的所有的Node会往该Master Node中注册服务。MasterNode会自动同步给所有的监听结点。本地Node会默认监听本地Master Node中所有的服务。注意LocalMasterNodeId配置的NodeId要在MasterNodeList列表中。
|
||||||
|
|
||||||
MasterNodeList:指定哪些Node为服务发现Master结点,需要配置NodeId与ListenAddr,注意它们要与实际的Node配置一致。
|
MasterNodeList:指定哪些Node为服务发现Master结点,需要配置NodeId与ListenAddr,注意它们要与实际的Node配置一致。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### RpcMode部分
|
### RpcMode部分
|
||||||
|
|
||||||
默认模式
|
默认模式
|
||||||
@@ -947,20 +974,45 @@ origin引擎默认使用读取所有结点配置的进行确认结点有哪些Se
|
|||||||
"Private": false,
|
"Private": false,
|
||||||
"remark": "//以_打头的,表示只在本机进程,不对整个子网开发",
|
"remark": "//以_打头的,表示只在本机进程,不对整个子网开发",
|
||||||
"ServiceList": ["_TestService1", "TestService9", "TestService10"],
|
"ServiceList": ["_TestService1", "TestService9", "TestService10"],
|
||||||
"DiscoveryService": [
|
"AllowDiscovery": [
|
||||||
{
|
{
|
||||||
"MasterNodeId": "nodeid_1",
|
"MasterNodeId": "nodeid_1",
|
||||||
"NetworkName":"networkname1"
|
"NetworkName":"networkname1",
|
||||||
"DiscoveryService": ["TestService8"]
|
"NodeIdList":[".*server"],
|
||||||
|
"ServiceList": ["TestService8"]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
DiscoveryService:在当前nodeid为nodeid_test的结点中,只发现 MasterNodeId为nodeid_1或NetworkName为networkname1网络中的TestService8服务。
|
以上,如果是使用Etcd发现模式,则表示可以发现网络名networkname1,NodeId为server结尾,服务名为TestService8的服务。
|
||||||
|
|
||||||
|
AllowDiscovery:可以配置发现的规则,如果只配置MasterNodeId或NetworkName时(如果使用Etcd则只配置NetworkName,Origin则只配置MasterNodeId),则会筛选指定网络的所有服务,如下:
|
||||||
|
|
||||||
|
```
|
||||||
|
"AllowDiscovery": [
|
||||||
|
{
|
||||||
|
"NetworkName":"networkname1",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
则以上,只匹配networkname1网络名的所有服务,支持正则表达式,例如可以配置为"NetworkName":".*name1",则可以发现网络名为name1结尾的所有服务。
|
||||||
|
|
||||||
|
如果只发现NodeId为server结尾的所有服务,可以使用以下配置方式:
|
||||||
|
|
||||||
|
```
|
||||||
|
"AllowDiscovery": [
|
||||||
|
{
|
||||||
|
"NodeIdList":[".*server"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
筛选服务也是同上。也可以组合配置NetworkName和NodeIdList配置。
|
||||||
|
|
||||||
|
|
||||||
**注意**:MasterNodeId与NetworkName只配置一个,分别在模式为origin或者etcd服务发现类型时。
|
|
||||||
|
|
||||||
第八章:HttpService使用
|
第八章:HttpService使用
|
||||||
-----------------------
|
-----------------------
|
||||||
|
|||||||
@@ -3,13 +3,16 @@ package cluster
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"regexp"
|
||||||
|
|
||||||
"github.com/duanhf2012/origin/v2/event"
|
"github.com/duanhf2012/origin/v2/event"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/rpc"
|
"github.com/duanhf2012/origin/v2/rpc"
|
||||||
"github.com/duanhf2012/origin/v2/service"
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
"reflect"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var configDir = "./config/"
|
var configDir = "./config/"
|
||||||
@@ -23,21 +26,23 @@ const (
|
|||||||
Discard NodeStatus = 1 //丢弃
|
Discard NodeStatus = 1 //丢弃
|
||||||
)
|
)
|
||||||
|
|
||||||
type DiscoveryService struct {
|
// AllowDiscovery 允许发现的网络服务
|
||||||
MasterNodeId string //要筛选的主结点Id,如果不配置或者配置成0,表示针对所有的主结点
|
type AllowDiscovery struct {
|
||||||
NetworkName string //如果是etcd,指定要筛选的网络名中的服务,不配置,表示所有的网络
|
MasterNodeId string // 支持正则表达式
|
||||||
ServiceList []string //只发现的服务列表
|
NetworkName string // 支持正则表达式
|
||||||
|
NodeIdList []string // 支持正则表达式
|
||||||
|
ServiceList []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeInfo struct {
|
type NodeInfo struct {
|
||||||
NodeId string
|
NodeId string
|
||||||
Private bool
|
Private bool
|
||||||
ListenAddr string
|
ListenAddr string
|
||||||
MaxRpcParamLen uint32 //最大Rpc参数长度
|
MaxRpcParamLen uint32 //最大Rpc参数长度
|
||||||
CompressBytesLen int //超过字节进行压缩的长度
|
CompressBytesLen int //超过字节进行压缩的长度
|
||||||
ServiceList []string //所有的有序服务列表
|
ServiceList []string //所有的有序服务列表
|
||||||
PublicServiceList []string //对外公开的服务列表
|
PublicServiceList []string //对外公开的服务列表
|
||||||
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
AllowDiscovery []AllowDiscovery //允许发现的网络服务
|
||||||
status NodeStatus
|
status NodeStatus
|
||||||
Retire bool
|
Retire bool
|
||||||
}
|
}
|
||||||
@@ -222,9 +227,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
|||||||
}
|
}
|
||||||
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
|
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
|
||||||
if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType != OriginType {
|
if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType != OriginType {
|
||||||
log.Info("Discovery nodeId and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
|
log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
|
||||||
} else {
|
} else {
|
||||||
log.Info("Discovery nodeId and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr))
|
log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -461,28 +466,76 @@ func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo, bool) {
|
|||||||
return nodeInfo.nodeInfo, true
|
return nodeInfo.nodeInfo, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) CanDiscoveryService(fromMasterNodeId string, serviceName string) bool {
|
func (cls *Cluster) CanDiscoveryService(fromNetworkName string, fromMasterNodeId string, fromNodeId string, serviceName string) bool {
|
||||||
canDiscovery := true
|
canDiscovery := true
|
||||||
|
// 筛选允许的服务
|
||||||
splitServiceName := strings.Split(serviceName, ":")
|
splitServiceName := strings.Split(serviceName, ":")
|
||||||
if len(splitServiceName) == 2 {
|
if len(splitServiceName) == 2 {
|
||||||
serviceName = splitServiceName[0]
|
serviceName = splitServiceName[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(cls.GetLocalNodeInfo().DiscoveryService); i++ {
|
// 先筛选允许的网络,有配置才会检测
|
||||||
masterNodeId := cls.GetLocalNodeInfo().DiscoveryService[i].MasterNodeId
|
if len(cls.GetLocalNodeInfo().AllowDiscovery) > 0 {
|
||||||
//无效的配置,则跳过
|
allowNetwork := false
|
||||||
if masterNodeId == rpc.NodeIdNull && len(cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList) == 0 {
|
for i := 0; i < len(cls.GetLocalNodeInfo().AllowDiscovery); i++ {
|
||||||
continue
|
masterNodeId := cls.GetLocalNodeInfo().AllowDiscovery[i].MasterNodeId
|
||||||
}
|
networkName := cls.GetLocalNodeInfo().AllowDiscovery[i].NetworkName
|
||||||
|
nodeIdList := cls.GetLocalNodeInfo().AllowDiscovery[i].NodeIdList
|
||||||
|
serviceList := cls.GetLocalNodeInfo().AllowDiscovery[i].ServiceList
|
||||||
|
|
||||||
canDiscovery = false
|
// 如果配置了网络及Master结点,则匹配之
|
||||||
if masterNodeId == fromMasterNodeId || masterNodeId == rpc.NodeIdNull {
|
if fromNetworkName != "" {
|
||||||
for _, discoveryService := range cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList {
|
matchNetWork, _ := regexp.MatchString(networkName, fromNetworkName)
|
||||||
if discoveryService == serviceName {
|
if !matchNetWork {
|
||||||
return true
|
continue
|
||||||
|
}
|
||||||
|
} else if fromMasterNodeId != "" {
|
||||||
|
matchMasterNode, _ := regexp.MatchString(masterNodeId, fromMasterNodeId)
|
||||||
|
if !matchMasterNode {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 如果配置了
|
||||||
|
if len(nodeIdList) > 0 {
|
||||||
|
hasNode := false
|
||||||
|
for _, nodeId := range nodeIdList {
|
||||||
|
matchNodeId, _ := regexp.MatchString(nodeId, fromNodeId)
|
||||||
|
if !matchNodeId {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
hasNode = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasNode {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果配置了服务,则匹配之
|
||||||
|
if len(serviceList) > 0 {
|
||||||
|
hasService := false
|
||||||
|
for _, service := range serviceList {
|
||||||
|
// service按正则表达式匹配serviceName
|
||||||
|
matched, _ := regexp.MatchString(service, serviceName)
|
||||||
|
if matched {
|
||||||
|
hasService = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasService {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allowNetwork = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if !allowNetwork {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,12 @@ import (
|
|||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/duanhf2012/origin/v2/event"
|
"github.com/duanhf2012/origin/v2/event"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/rpc"
|
"github.com/duanhf2012/origin/v2/rpc"
|
||||||
@@ -14,19 +20,15 @@ import (
|
|||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"go.etcd.io/etcd/client/v3"
|
"go.etcd.io/etcd/client/v3"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const originDir = "/origin"
|
const originDir = "/origin"
|
||||||
|
|
||||||
type etcdClientInfo struct {
|
type etcdClientInfo struct {
|
||||||
watchKeys []string
|
isLocalNetwork bool
|
||||||
leaseID clientv3.LeaseID
|
watchKeys []string
|
||||||
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
|
leaseID clientv3.LeaseID
|
||||||
|
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdDiscoveryService struct {
|
type EtcdDiscoveryService struct {
|
||||||
@@ -93,6 +95,7 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
return errors.New("etcd discovery config is nil.")
|
return errors.New("etcd discovery config is nil.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var hasLocalNetwork bool
|
||||||
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
||||||
var client *clientv3.Client
|
var client *clientv3.Client
|
||||||
var tlsConfig *tls.Config
|
var tlsConfig *tls.Config
|
||||||
@@ -141,13 +144,25 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ec := &etcdClientInfo{}
|
ec := &etcdClientInfo{}
|
||||||
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NetworkName {
|
|
||||||
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName))
|
if etcdDiscoveryCfg.EtcdList[i].LocalNetworkName != "" {
|
||||||
|
hasLocalNetwork = true
|
||||||
|
ec.isLocalNetwork = true
|
||||||
|
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, etcdDiscoveryCfg.EtcdList[i].LocalNetworkName))
|
||||||
|
} else {
|
||||||
|
ec.isLocalNetwork = false
|
||||||
|
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NeighborNetworkName {
|
||||||
|
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ed.mapClient[client] = ec
|
ed.mapClient[client] = ec
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !hasLocalNetwork {
|
||||||
|
return errors.New("etcd discovery init fail,cannot find local network")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,14 +182,18 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
|
|||||||
}
|
}
|
||||||
|
|
||||||
etcdClient.leaseID = resp.ID
|
etcdClient.leaseID = resp.ID
|
||||||
for _, watchKey := range etcdClient.watchKeys {
|
|
||||||
// 注册服务节点到 etcd
|
// 注册服务节点到 etcd,LocalNetwork时才会注册,且etcdClient.watchKeys必然>0
|
||||||
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
|
if len(etcdClient.watchKeys) != 1 {
|
||||||
if err != nil {
|
log.Error("LocalNetwork watchkey is error")
|
||||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
return
|
||||||
ed.tryRegisterService(client, etcdClient)
|
}
|
||||||
return
|
|
||||||
}
|
_, err = client.Put(context.Background(), ed.getRegisterKey(etcdClient.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||||
|
ed.tryRegisterService(client, etcdClient)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
|
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
|
||||||
@@ -204,6 +223,10 @@ func (ed *EtcdDiscoveryService) tryRegisterService(client *clientv3.Client, etcd
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !etcdClient.isLocalNetwork {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
ed.AfterFunc(time.Second*3, func(t *timer.Timer) {
|
ed.AfterFunc(time.Second*3, func(t *timer.Timer) {
|
||||||
ed.registerServiceByClient(client, etcdClient)
|
ed.registerServiceByClient(client, etcdClient)
|
||||||
})
|
})
|
||||||
@@ -229,13 +252,18 @@ func (ed *EtcdDiscoveryService) tryLaterRetire() {
|
|||||||
func (ed *EtcdDiscoveryService) retire() error {
|
func (ed *EtcdDiscoveryService) retire() error {
|
||||||
//从etcd中更新
|
//从etcd中更新
|
||||||
for c, ec := range ed.mapClient {
|
for c, ec := range ed.mapClient {
|
||||||
for _, watchKey := range ec.watchKeys {
|
if !ec.isLocalNetwork {
|
||||||
// 注册服务节点到 etcd
|
continue
|
||||||
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
|
}
|
||||||
if err != nil {
|
|
||||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
if len(ec.watchKeys)!=1 {
|
||||||
return err
|
log.Error("LocalNetwork watchkey is error")
|
||||||
}
|
continue
|
||||||
|
}
|
||||||
|
_, err := c.Put(context.Background(), ed.getRegisterKey(ec.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -292,7 +320,7 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
|
|||||||
//筛选关注的服务
|
//筛选关注的服务
|
||||||
var discoverServiceSlice = make([]string, 0, 24)
|
var discoverServiceSlice = make([]string, 0, 24)
|
||||||
for _, pubService := range nodeInfo.PublicServiceList {
|
for _, pubService := range nodeInfo.PublicServiceList {
|
||||||
if cluster.CanDiscoveryService(networkName, pubService) == true {
|
if cluster.CanDiscoveryService(networkName, "", nodeInfo.NodeId, pubService) == true {
|
||||||
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -503,11 +531,16 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
|
|||||||
|
|
||||||
//写入到etcd中
|
//写入到etcd中
|
||||||
for c, info := range ed.mapClient {
|
for c, info := range ed.mapClient {
|
||||||
for _, watchKey := range info.watchKeys {
|
if !info.isLocalNetwork {
|
||||||
if ed.getNetworkNameByWatchKey(watchKey) == etcdServiceRecord.NetworkName {
|
continue
|
||||||
client = c
|
}
|
||||||
break
|
|
||||||
}
|
if len(info.watchKeys)!=1 {
|
||||||
|
log.Error("")
|
||||||
|
}
|
||||||
|
if ed.getNetworkNameByWatchKey(info.watchKeys[0]) == etcdServiceRecord.NetworkName {
|
||||||
|
client = c
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -508,11 +508,17 @@ func (dc *OriginDiscoveryClient) regServiceDiscover(nodeId string) {
|
|||||||
if nodeId == cluster.GetLocalNodeInfo().NodeId {
|
if nodeId == cluster.GetLocalNodeInfo().NodeId {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeInfo := cluster.getOriginMasterDiscoveryNodeInfo(nodeId)
|
nodeInfo := cluster.getOriginMasterDiscoveryNodeInfo(nodeId)
|
||||||
if nodeInfo == nil {
|
if nodeInfo == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 只能往本地Master结点上注册
|
||||||
|
if !cluster.IsLocalMasterNodeId(nodeId) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var req rpc.RegServiceDiscoverReq
|
var req rpc.RegServiceDiscoverReq
|
||||||
req.NodeInfo = &rpc.NodeInfo{}
|
req.NodeInfo = &rpc.NodeInfo{}
|
||||||
req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId
|
req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId
|
||||||
@@ -548,7 +554,7 @@ func (dc *OriginDiscoveryClient) setNodeInfo(masterNodeId string, nodeInfo *rpc.
|
|||||||
//筛选关注的服务
|
//筛选关注的服务
|
||||||
var discoverServiceSlice = make([]string, 0, 24)
|
var discoverServiceSlice = make([]string, 0, 24)
|
||||||
for _, pubService := range nodeInfo.PublicServiceList {
|
for _, pubService := range nodeInfo.PublicServiceList {
|
||||||
if cluster.CanDiscoveryService(masterNodeId, pubService) == true {
|
if cluster.CanDiscoveryService("",masterNodeId, nodeInfo.NodeId,pubService) == true {
|
||||||
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -617,13 +623,22 @@ func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool {
|
|||||||
return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil
|
return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cls *Cluster) IsLocalMasterNodeId(nodeId string) bool {
|
||||||
|
if cls.discoveryInfo.Origin == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return cls.discoveryInfo.Origin.LocalMasterNodeId == nodeId
|
||||||
|
}
|
||||||
func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
|
func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
|
||||||
if cls.discoveryInfo.Origin == nil {
|
if cls.discoveryInfo.Origin == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ {
|
for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ {
|
||||||
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
|
|
||||||
|
|
||||||
|
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
|
||||||
return &cls.discoveryInfo.Origin.MasterNodeList[i]
|
return &cls.discoveryInfo.Origin.MasterNodeList[i]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,27 +16,27 @@ import (
|
|||||||
|
|
||||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
|
|
||||||
|
|
||||||
type EtcdList struct {
|
type EtcdList struct {
|
||||||
NetworkName []string
|
LocalNetworkName string // 如果配置,则为本地网络,必需配置一个本地网络
|
||||||
Endpoints []string
|
NeighborNetworkName []string
|
||||||
UserName string
|
Endpoints []string
|
||||||
Password string
|
UserName string
|
||||||
Cert string
|
Password string
|
||||||
CertKey string
|
Cert string
|
||||||
Ca string
|
CertKey string
|
||||||
|
Ca string
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdDiscovery struct {
|
type EtcdDiscovery struct {
|
||||||
DialTimeoutMillisecond time.Duration
|
DialTimeoutMillisecond time.Duration
|
||||||
TTLSecond int64
|
TTLSecond int64
|
||||||
|
EtcdList []EtcdList
|
||||||
EtcdList []EtcdList
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type OriginDiscovery struct {
|
type OriginDiscovery struct {
|
||||||
TTLSecond int64
|
TTLSecond int64
|
||||||
MasterNodeList []NodeInfo
|
LocalMasterNodeId string
|
||||||
|
MasterNodeList []NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
type DiscoveryType int
|
type DiscoveryType int
|
||||||
@@ -72,7 +72,7 @@ type NodeInfoList struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func validConfigFile(f string) bool {
|
func validConfigFile(f string) bool {
|
||||||
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
return strings.HasSuffix(f, ".json") || strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
||||||
}
|
}
|
||||||
|
|
||||||
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
||||||
@@ -131,21 +131,20 @@ func (d *DiscoveryInfo) setEtcd(etcd *EtcdDiscovery) error {
|
|||||||
return fmt.Errorf("repeat configuration of Discovery")
|
return fmt.Errorf("repeat configuration of Discovery")
|
||||||
}
|
}
|
||||||
|
|
||||||
//Endpoints不允许重复
|
|
||||||
mapAddr := make(map[string]struct{})
|
|
||||||
for _, n := range etcd.EtcdList {
|
for _, n := range etcd.EtcdList {
|
||||||
for _, endPoint := range n.Endpoints {
|
|
||||||
if _, ok := mapAddr[endPoint]; ok == true {
|
|
||||||
return fmt.Errorf("etcd discovery config Etcd.EtcdList.Endpoints %+v is repeat", endPoint)
|
|
||||||
}
|
|
||||||
mapAddr[endPoint] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
//networkName不允许重复
|
//networkName不允许重复
|
||||||
mapNetworkName := make(map[string]struct{})
|
mapNetworkName := make(map[string]struct{})
|
||||||
for _, netName := range n.NetworkName {
|
if n.LocalNetworkName != "" {
|
||||||
|
if _, ok := mapNetworkName[n.LocalNetworkName]; ok == true {
|
||||||
|
return fmt.Errorf("etcd discovery config Etcd.EtcdList.LocalNetworkName %+v is repeat", n.LocalNetworkName)
|
||||||
|
}
|
||||||
|
mapNetworkName[n.LocalNetworkName] = struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, netName := range n.NeighborNetworkName {
|
||||||
if _, ok := mapNetworkName[netName]; ok == true {
|
if _, ok := mapNetworkName[netName]; ok == true {
|
||||||
return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", n.NetworkName)
|
return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", netName)
|
||||||
}
|
}
|
||||||
|
|
||||||
mapNetworkName[netName] = struct{}{}
|
mapNetworkName[netName] = struct{}{}
|
||||||
@@ -275,7 +274,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
var rpcMode RpcMode
|
var rpcMode RpcMode
|
||||||
|
|
||||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
|
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error {
|
||||||
if info.IsDir() {
|
if info.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -340,7 +339,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
nodeService := map[string]interface{}{}
|
nodeService := map[string]interface{}{}
|
||||||
|
|
||||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
|
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error {
|
||||||
if info.IsDir() {
|
if info.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -432,7 +431,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) parseLocalCfg() error{
|
func (cls *Cluster) parseLocalCfg() error {
|
||||||
rpcInfo := NodeRpcInfo{}
|
rpcInfo := NodeRpcInfo{}
|
||||||
rpcInfo.nodeInfo = cls.localNodeInfo
|
rpcInfo.nodeInfo = cls.localNodeInfo
|
||||||
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
|
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
|
||||||
@@ -454,7 +453,7 @@ func (cls *Cluster) parseLocalCfg() error{
|
|||||||
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
|
if _, ok := cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]; ok {
|
||||||
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
|
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
|
||||||
}
|
}
|
||||||
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ const (
|
|||||||
Sys_Event_Tcp EventType = -3
|
Sys_Event_Tcp EventType = -3
|
||||||
Sys_Event_Http_Event EventType = -4
|
Sys_Event_Http_Event EventType = -4
|
||||||
Sys_Event_WebSocket EventType = -5
|
Sys_Event_WebSocket EventType = -5
|
||||||
Sys_Event_Kcp EventType = -6
|
Sys_Event_Kcp EventType = -6
|
||||||
Sys_Event_Node_Conn_Event EventType = -7
|
Sys_Event_Node_Conn_Event EventType = -7
|
||||||
Sys_Event_Nats_Conn_Event EventType = -8
|
Sys_Event_Nats_Conn_Event EventType = -8
|
||||||
Sys_Event_DiscoverService EventType = -9
|
Sys_Event_DiscoverService EventType = -9
|
||||||
@@ -18,6 +18,6 @@ const (
|
|||||||
Sys_Event_EtcdDiscovery EventType = -11
|
Sys_Event_EtcdDiscovery EventType = -11
|
||||||
Sys_Event_Gin_Event EventType = -12
|
Sys_Event_Gin_Event EventType = -12
|
||||||
Sys_Event_FrameTick EventType = -13
|
Sys_Event_FrameTick EventType = -13
|
||||||
|
Sys_Event_ReloadBlueprint EventType = -14
|
||||||
Sys_Event_User_Define EventType = 1
|
Sys_Event_User_Define EventType = 1
|
||||||
)
|
)
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -12,7 +12,7 @@ require (
|
|||||||
github.com/goccy/go-json v0.10.2
|
github.com/goccy/go-json v0.10.2
|
||||||
github.com/gomodule/redigo v1.8.8
|
github.com/gomodule/redigo v1.8.8
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/gorilla/websocket v1.5.0
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
github.com/nats-io/nats.go v1.34.1
|
github.com/nats-io/nats.go v1.34.1
|
||||||
github.com/pierrec/lz4/v4 v4.1.21
|
github.com/pierrec/lz4/v4 v4.1.21
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -88,8 +88,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
|||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
||||||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
|
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
|
||||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||||
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
|
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
|
||||||
|
|||||||
@@ -3,12 +3,13 @@ package network
|
|||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WSServer struct {
|
type WSServer struct {
|
||||||
@@ -16,13 +17,16 @@ type WSServer struct {
|
|||||||
MaxConnNum int
|
MaxConnNum int
|
||||||
PendingWriteNum int
|
PendingWriteNum int
|
||||||
MaxMsgLen uint32
|
MaxMsgLen uint32
|
||||||
HTTPTimeout time.Duration
|
|
||||||
CertFile string
|
CertFile string
|
||||||
KeyFile string
|
KeyFile string
|
||||||
NewAgent func(*WSConn) Agent
|
NewAgent func(*WSConn) Agent
|
||||||
ln net.Listener
|
ln net.Listener
|
||||||
handler *WSHandler
|
handler *WSHandler
|
||||||
messageType int
|
messageType int
|
||||||
|
|
||||||
|
HandshakeTimeout time.Duration
|
||||||
|
ReadTimeout time.Duration
|
||||||
|
WriteTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type WSHandler struct {
|
type WSHandler struct {
|
||||||
@@ -73,14 +77,14 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
handler.conns[conn] = struct{}{}
|
handler.conns[conn] = struct{}{}
|
||||||
handler.mutexConns.Unlock()
|
handler.mutexConns.Unlock()
|
||||||
c,ok:=conn.NetConn().(*net.TCPConn)
|
c, ok := conn.NetConn().(*net.TCPConn)
|
||||||
if !ok {
|
if !ok {
|
||||||
tlsConn,ok := conn.NetConn().(*tls.Conn)
|
tlsConn, ok := conn.NetConn().(*tls.Conn)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Error("conn error")
|
log.Error("conn error")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c,ok = tlsConn.NetConn().(*net.TCPConn)
|
c, ok = tlsConn.NetConn().(*net.TCPConn)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Error("conn error")
|
log.Error("conn error")
|
||||||
return
|
return
|
||||||
@@ -127,10 +131,19 @@ func (server *WSServer) Start() error {
|
|||||||
server.MaxMsgLen = 4096
|
server.MaxMsgLen = 4096
|
||||||
log.Info("invalid MaxMsgLen", log.Uint32("reset", server.MaxMsgLen))
|
log.Info("invalid MaxMsgLen", log.Uint32("reset", server.MaxMsgLen))
|
||||||
}
|
}
|
||||||
if server.HTTPTimeout <= 0 {
|
if server.HandshakeTimeout <= 0 {
|
||||||
server.HTTPTimeout = 10 * time.Second
|
server.HandshakeTimeout = 15 * time.Second
|
||||||
log.Info("invalid HTTPTimeout", log.Duration("reset", server.HTTPTimeout))
|
log.Info("invalid HandshakeTimeout", log.Duration("reset", server.HandshakeTimeout))
|
||||||
}
|
}
|
||||||
|
if server.ReadTimeout <= 0 {
|
||||||
|
server.ReadTimeout = 15 * time.Second
|
||||||
|
log.Info("invalid ReadTimeout", log.Duration("reset", server.ReadTimeout))
|
||||||
|
}
|
||||||
|
if server.WriteTimeout <= 0 {
|
||||||
|
server.WriteTimeout = 15 * time.Second
|
||||||
|
log.Info("invalid WriteTimeout", log.Duration("reset", server.WriteTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
if server.NewAgent == nil {
|
if server.NewAgent == nil {
|
||||||
log.Error("NewAgent must not be nil")
|
log.Error("NewAgent must not be nil")
|
||||||
return errors.New("NewAgent must not be nil")
|
return errors.New("NewAgent must not be nil")
|
||||||
@@ -159,7 +172,7 @@ func (server *WSServer) Start() error {
|
|||||||
conns: make(WebsocketConnSet),
|
conns: make(WebsocketConnSet),
|
||||||
messageType: server.messageType,
|
messageType: server.messageType,
|
||||||
upgrader: websocket.Upgrader{
|
upgrader: websocket.Upgrader{
|
||||||
HandshakeTimeout: server.HTTPTimeout,
|
HandshakeTimeout: server.HandshakeTimeout,
|
||||||
CheckOrigin: func(_ *http.Request) bool { return true },
|
CheckOrigin: func(_ *http.Request) bool { return true },
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -167,8 +180,8 @@ func (server *WSServer) Start() error {
|
|||||||
httpServer := &http.Server{
|
httpServer := &http.Server{
|
||||||
Addr: server.Addr,
|
Addr: server.Addr,
|
||||||
Handler: server.handler,
|
Handler: server.handler,
|
||||||
ReadTimeout: server.HTTPTimeout,
|
ReadTimeout: server.ReadTimeout,
|
||||||
WriteTimeout: server.HTTPTimeout,
|
WriteTimeout: server.WriteTimeout,
|
||||||
MaxHeaderBytes: 1024,
|
MaxHeaderBytes: 1024,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,13 +2,15 @@ package wsmodule
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/duanhf2012/origin/v2/event"
|
"github.com/duanhf2012/origin/v2/event"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/network"
|
"github.com/duanhf2012/origin/v2/network"
|
||||||
"github.com/duanhf2012/origin/v2/network/processor"
|
"github.com/duanhf2012/origin/v2/network/processor"
|
||||||
"github.com/duanhf2012/origin/v2/service"
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type WSModule struct {
|
type WSModule struct {
|
||||||
@@ -36,6 +38,10 @@ type WSCfg struct {
|
|||||||
LittleEndian bool //是否小端序
|
LittleEndian bool //是否小端序
|
||||||
KeyFile string
|
KeyFile string
|
||||||
CertFile string
|
CertFile string
|
||||||
|
|
||||||
|
HandshakeTimeoutSecond time.Duration
|
||||||
|
ReadTimeoutSecond time.Duration
|
||||||
|
WriteTimeoutSecond time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type WSPackType int8
|
type WSPackType int8
|
||||||
@@ -63,6 +69,9 @@ func (ws *WSModule) OnInit() error {
|
|||||||
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||||
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||||
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||||
|
ws.WSServer.HandshakeTimeout = ws.wsCfg.HandshakeTimeoutSecond*time.Second
|
||||||
|
ws.WSServer.ReadTimeout = ws.wsCfg.ReadTimeoutSecond*time.Second
|
||||||
|
ws.WSServer.WriteTimeout = ws.wsCfg.WriteTimeoutSecond*time.Second
|
||||||
|
|
||||||
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
|
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
|
||||||
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
|
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
|
||||||
@@ -110,7 +119,6 @@ func (ws *WSModule) recyclerReaderBytes([]byte) {
|
|||||||
func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent {
|
func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent {
|
||||||
ws.mapClientLocker.Lock()
|
ws.mapClientLocker.Lock()
|
||||||
defer ws.mapClientLocker.Unlock()
|
defer ws.mapClientLocker.Unlock()
|
||||||
|
|
||||||
pClient := &WSClient{wsConn: conn, id: primitive.NewObjectID().Hex()}
|
pClient := &WSClient{wsConn: conn, id: primitive.NewObjectID().Hex()}
|
||||||
pClient.wsModule = ws
|
pClient.wsModule = ws
|
||||||
ws.mapClient[pClient.id] = pClient
|
ws.mapClient[pClient.id] = pClient
|
||||||
@@ -150,6 +158,22 @@ func (ws *WSModule) GetProcessor() processor.IRawProcessor {
|
|||||||
return ws.process
|
return ws.process
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ws *WSModule) GetClientHeader(clientId string,key string) string {
|
||||||
|
ws.mapClientLocker.Lock()
|
||||||
|
defer ws.mapClientLocker.Unlock()
|
||||||
|
|
||||||
|
pClient, ok := ws.mapClient[clientId]
|
||||||
|
if ok == false || pClient.wsConn == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if pClient.wsConn.GetHeader() == nil {
|
||||||
|
log.Warn("clientId header is nil", log.String("clientId", clientId))
|
||||||
|
}
|
||||||
|
|
||||||
|
return pClient.wsConn.GetHeader().Get(key)
|
||||||
|
}
|
||||||
|
|
||||||
func (ws *WSModule) GetClientIp(clientId string) string {
|
func (ws *WSModule) GetClientIp(clientId string) string {
|
||||||
ws.mapClientLocker.Lock()
|
ws.mapClientLocker.Lock()
|
||||||
defer ws.mapClientLocker.Unlock()
|
defer ws.mapClientLocker.Unlock()
|
||||||
|
|||||||
@@ -3,38 +3,156 @@ package blueprint
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Blueprint struct {
|
type Blueprint struct {
|
||||||
execPool ExecPool
|
execNodes []IExecNode // 注册的定义执行结点
|
||||||
graphPool GraphPool
|
execNodeList []func() IExecNode
|
||||||
|
execPool *ExecPool
|
||||||
|
graphPool *GraphPool
|
||||||
|
|
||||||
blueprintModule IBlueprintModule
|
blueprintModule IBlueprintModule
|
||||||
mapGraph map[int64]IGraph
|
mapGraph map[int64]IGraph
|
||||||
seedID int64
|
seedID int64
|
||||||
cancelTimer func(*uint64)bool
|
cancelTimer func(*uint64) bool
|
||||||
|
|
||||||
|
execDefFilePath string // 执行结点定义文件路径
|
||||||
|
graphFilePath string // 蓝图文件路径
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, blueprintModule IBlueprintModule,cancelTimer func(*uint64)bool) error {
|
func (bm *Blueprint) RegisterExecNode(execNodeFunc func() IExecNode) {
|
||||||
err := bm.execPool.Load(execDefFilePath)
|
bm.execNodeList = append(bm.execNodeList, execNodeFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
type IExecNodeType[T any] interface {
|
||||||
|
*T
|
||||||
|
IExecNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// 生成一个泛型函数,返回func() IExecNode类型
|
||||||
|
func NewExecNode[T any, P IExecNodeType[T]]() func() IExecNode {
|
||||||
|
return func() IExecNode {
|
||||||
|
var t T
|
||||||
|
return P(&t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bm *Blueprint) regSysNodes() {
|
||||||
|
bm.RegisterExecNode(NewExecNode[AddInt]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[SubInt]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[MulInt]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[DivInt]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[ModInt]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[RandNumber]())
|
||||||
|
|
||||||
|
bm.RegisterExecNode(NewExecNode[Entrance_ArrayParam]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[Entrance_IntParam]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[Entrance_Timer]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[DebugOutput]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[Sequence]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[Foreach]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[ForeachIntArray]())
|
||||||
|
|
||||||
|
bm.RegisterExecNode(NewExecNode[GetArrayInt]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[GetArrayString]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[GetArrayLen]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[CreateIntArray]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[CreateStringArray]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[AppendIntegerToArray]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[AppendStringToArray]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[BoolIf]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[GreaterThanInteger]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[LessThanInteger]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[EqualInteger]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[RangeCompare]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[EqualSwitch]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[Probability]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[CreateTimer]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[AppendIntReturn]())
|
||||||
|
bm.RegisterExecNode(NewExecNode[AppendStringReturn]())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (bm *Blueprint) StartHotReload() (func(),error) {
|
||||||
|
var execPool ExecPool
|
||||||
|
var graphPool GraphPool
|
||||||
|
|
||||||
|
// 加载配置结点生成名字对应的innerExecNode
|
||||||
|
err := execPool.Load(bm.execDefFilePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 将注册的实际执行结点与innerExecNode进行关联
|
||||||
|
for _, newExec := range bm.execNodeList {
|
||||||
|
e := newExec()
|
||||||
|
if !execPool.Register(e) {
|
||||||
|
return nil,fmt.Errorf("register exec failed,exec:%s", e.GetName())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 加载所有的vgf蓝图文件
|
||||||
|
err = graphPool.Load(&execPool, bm.graphFilePath, bm.blueprintModule)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 返回配置加载后的刷新内存处理
|
||||||
|
return func() {
|
||||||
|
// 替换旧的执行池和图池
|
||||||
|
bm.execPool = &execPool
|
||||||
|
bm.graphPool = &graphPool
|
||||||
|
|
||||||
|
for _, gh := range bm.mapGraph {
|
||||||
|
gFileName := gh.GetGraphFileName()
|
||||||
|
bGraph := bm.graphPool.GetBaseGraph(gFileName)
|
||||||
|
if bGraph == nil {
|
||||||
|
log.Warn("GetBaseGraph fail", log.String("graph file name", gFileName))
|
||||||
|
bGraph = &baseGraph{entrance: map[int64]*execNode{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
gh.HotReload(bGraph)
|
||||||
|
}
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, blueprintModule IBlueprintModule, cancelTimer func(*uint64) bool) error {
|
||||||
|
var execPool ExecPool
|
||||||
|
var graphPool GraphPool
|
||||||
|
|
||||||
|
// 加载配置结点生成名字对应的innerExecNode
|
||||||
|
err := execPool.Load(execDefFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range execNodes {
|
// 注册系统执行结点
|
||||||
if !bm.execPool.Register(e) {
|
bm.regSysNodes()
|
||||||
|
|
||||||
|
// 将注册的实际执行结点与innerExecNode进行关联
|
||||||
|
for _, newExec := range bm.execNodeList {
|
||||||
|
e := newExec()
|
||||||
|
if !execPool.Register(e) {
|
||||||
return fmt.Errorf("register exec failed,exec:%s", e.GetName())
|
return fmt.Errorf("register exec failed,exec:%s", e.GetName())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = bm.graphPool.Load(&bm.execPool, graphFilePath, blueprintModule)
|
// 加载所有的vgf蓝图文件
|
||||||
|
err = graphPool.Load(&execPool, graphFilePath, blueprintModule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bm.execPool = &execPool
|
||||||
|
bm.graphPool = &graphPool
|
||||||
bm.cancelTimer = cancelTimer
|
bm.cancelTimer = cancelTimer
|
||||||
bm.blueprintModule = blueprintModule
|
bm.blueprintModule = blueprintModule
|
||||||
bm.mapGraph = make(map[int64]IGraph,128)
|
bm.mapGraph = make(map[int64]IGraph, 128)
|
||||||
|
bm.execDefFilePath = execDefFilePath
|
||||||
|
bm.graphFilePath = graphFilePath
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -42,32 +160,41 @@ func (bm *Blueprint) Create(graphName string) int64 {
|
|||||||
if graphName == "" {
|
if graphName == "" {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
graphID := atomic.AddInt64(&bm.seedID, 1)
|
graphID := atomic.AddInt64(&bm.seedID, 1)
|
||||||
bm.mapGraph[graphID] = bm.graphPool.Create(graphName, graphID)
|
gr := bm.graphPool.Create(graphName, graphID)
|
||||||
|
if gr == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
bm.mapGraph[graphID] = gr
|
||||||
return graphID
|
return graphID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bm *Blueprint) TriggerEvent(graphID int64, eventID int64, args ...any) error{
|
func (bm *Blueprint) TriggerEvent(graphID int64, eventID int64, args ...any) error {
|
||||||
graph := bm.mapGraph[graphID]
|
graph := bm.mapGraph[graphID]
|
||||||
if graph == nil {
|
if graph == nil {
|
||||||
return fmt.Errorf("can not find graph:%d", graphID)
|
return fmt.Errorf("can not find graph:%d", graphID)
|
||||||
}
|
}
|
||||||
|
|
||||||
_,err:= graph.Do(eventID, args...)
|
_, err := graph.Do(eventID, args...)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bm *Blueprint) Do(graphID int64, entranceID int64, args ...any) (Port_Array,error){
|
func (bm *Blueprint) Do(graphID int64, entranceID int64, args ...any) (Port_Array, error) {
|
||||||
graph := bm.mapGraph[graphID]
|
graph := bm.mapGraph[graphID]
|
||||||
if graph == nil {
|
if graph == nil {
|
||||||
return nil,fmt.Errorf("can not find graph:%d", graphID)
|
return nil, fmt.Errorf("can not find graph:%d", graphID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return graph.Do(entranceID, args...)
|
return graph.Do(entranceID, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bm *Blueprint) ReleaseGraph(graphID int64) {
|
func (bm *Blueprint) ReleaseGraph(graphID int64) {
|
||||||
|
if graphID == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
defer delete(bm.mapGraph, graphID)
|
defer delete(bm.mapGraph, graphID)
|
||||||
graph := bm.mapGraph[graphID]
|
graph := bm.mapGraph[graphID]
|
||||||
if graph == nil {
|
if graph == nil {
|
||||||
@@ -77,7 +204,7 @@ func (bm *Blueprint) ReleaseGraph(graphID int64) {
|
|||||||
graph.Release()
|
graph.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bm *Blueprint) CancelTimerId(graphID int64, timerId *uint64) bool{
|
func (bm *Blueprint) CancelTimerId(graphID int64, timerId *uint64) bool {
|
||||||
tId := *timerId
|
tId := *timerId
|
||||||
bm.cancelTimer(timerId)
|
bm.cancelTimer(timerId)
|
||||||
|
|
||||||
@@ -86,11 +213,11 @@ func (bm *Blueprint) CancelTimerId(graphID int64, timerId *uint64) bool{
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
gr,ok := graph.(*Graph)
|
gr, ok := graph.(*Graph)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(gr.mapTimerID, tId)
|
delete(gr.mapTimerID, tId)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,17 +6,11 @@ import (
|
|||||||
|
|
||||||
func TestExecMgr(t *testing.T) {
|
func TestExecMgr(t *testing.T) {
|
||||||
var bp Blueprint
|
var bp Blueprint
|
||||||
err := bp.Init("D:\\Develop\\OriginNodeEditor\\json", "D:\\Develop\\OriginNodeEditor\\vgf", nil)
|
err := bp.Init("E:\\WorkSpace\\c4\\OriginNodeEditor\\json", "E:\\WorkSpace\\c4\\OriginNodeEditor\\vgf", nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("init failed,err:%v", err)
|
t.Fatalf("Init failed,err:%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
graphTest1 := bp.Create("testArrayOperator", 0)
|
|
||||||
err = graphTest1.Do(EntranceID_IntParam, 20, 1, 3)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Do EntranceID_IntParam failed,err:%v", err)
|
|
||||||
}
|
|
||||||
graphTest1.Release()
|
|
||||||
//graphTest2 := bp.Create("testForeach")
|
//graphTest2 := bp.Create("testForeach")
|
||||||
//err = graphTest2.Do(EntranceID_IntParam, 1, 2, 3)
|
//err = graphTest2.Do(EntranceID_IntParam, 1, 2, 3)
|
||||||
//if err != nil {
|
//if err != nil {
|
||||||
|
|||||||
@@ -2,15 +2,7 @@ package blueprint
|
|||||||
|
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
type IBaseExecNode interface {
|
// IInnerExecNode 配置生成的结点
|
||||||
initInnerExecNode(innerNode *innerExecNode)
|
|
||||||
initExecNode(gr *Graph, en *execNode) error
|
|
||||||
GetPorts() ([]IPort, []IPort)
|
|
||||||
getExecNodeInfo() (*ExecContext, *execNode)
|
|
||||||
setExecNodeInfo(gr *ExecContext, en *execNode)
|
|
||||||
GetBlueprintModule() IBlueprintModule
|
|
||||||
}
|
|
||||||
|
|
||||||
type IInnerExecNode interface {
|
type IInnerExecNode interface {
|
||||||
GetName() string
|
GetName() string
|
||||||
SetExec(exec IExecNode)
|
SetExec(exec IExecNode)
|
||||||
@@ -23,37 +15,46 @@ type IInnerExecNode interface {
|
|||||||
GetInPort(index int) IPort
|
GetInPort(index int) IPort
|
||||||
GetOutPort(index int) IPort
|
GetOutPort(index int) IPort
|
||||||
|
|
||||||
GetInPortParamStartIndex() int
|
|
||||||
GetOutPortParamStartIndex() int
|
GetOutPortParamStartIndex() int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IBaseExecNode 实际注册的执行结点的基础结构体
|
||||||
|
type IBaseExecNode interface {
|
||||||
|
initInnerExecNode(innerNode *innerExecNode)
|
||||||
|
initExecNode(gr *Graph, en *execNode) error
|
||||||
|
GetPorts() ([]IPort, []IPort)
|
||||||
|
getExecNodeInfo() (*ExecContext, *execNode)
|
||||||
|
setExecNodeInfo(gr *ExecContext, en *execNode)
|
||||||
|
GetBlueprintModule() IBlueprintModule
|
||||||
|
}
|
||||||
|
|
||||||
|
// IExecNode 实际注册的执行结点
|
||||||
type IExecNode interface {
|
type IExecNode interface {
|
||||||
|
IBaseExecNode
|
||||||
GetName() string
|
GetName() string
|
||||||
DoNext(index int) error
|
DoNext(index int) error
|
||||||
Exec() (int, error) // 返回后续执行的Node的Index
|
Exec() (int, error) // 返回后续执行的Node的Index
|
||||||
GetNextExecLen() int
|
GetNextExecLen() int
|
||||||
getInnerExecNode() IInnerExecNode
|
getInnerExecNode() IInnerExecNode
|
||||||
|
|
||||||
setVariableName(name string) bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 配置对应的基础信息+端口数据
|
||||||
type innerExecNode struct {
|
type innerExecNode struct {
|
||||||
Name string
|
Name string
|
||||||
Title string
|
Title string
|
||||||
Package string
|
Package string
|
||||||
Description string
|
Description string
|
||||||
|
|
||||||
inPort []IPort
|
inPort []IPort // 下标即为portId
|
||||||
outPort []IPort
|
outPort []IPort // 下标即为portId
|
||||||
|
|
||||||
inPortParamStartIndex int // 输入参数的起始索引,用于排除执行入口
|
|
||||||
outPortParamStartIndex int // 输出参数的起始索引,用于排除执行出口
|
outPortParamStartIndex int // 输出参数的起始索引,用于排除执行出口
|
||||||
|
|
||||||
IExecNode
|
IExecNode // 实际注册的执行结点
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseExecNode struct {
|
type BaseExecNode struct {
|
||||||
*innerExecNode
|
*innerExecNode // 内部注册的执行结点
|
||||||
|
|
||||||
// 执行时初始化的数据
|
// 执行时初始化的数据
|
||||||
*ExecContext
|
*ExecContext
|
||||||
@@ -67,49 +68,90 @@ type InputConfig struct {
|
|||||||
DataType string `json:"data_type"`
|
DataType string `json:"data_type"`
|
||||||
HasInput bool `json:"has_input"`
|
HasInput bool `json:"has_input"`
|
||||||
PinWidget string `json:"pin_widget"`
|
PinWidget string `json:"pin_widget"`
|
||||||
|
PortId int `json:"port_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type OutInputConfig struct {
|
type OutputConfig struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
PortType string `json:"type"`
|
PortType string `json:"type"`
|
||||||
DataType string `json:"data_type"`
|
DataType string `json:"data_type"`
|
||||||
HasInput bool `json:"has_input"`
|
HasInput bool `json:"has_input"`
|
||||||
|
PortId int `json:"port_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseExecConfig struct {
|
type BaseExecConfig struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Title string `json:"title"`
|
Title string `json:"title"`
|
||||||
Package string `json:"package"`
|
Package string `json:"package"`
|
||||||
Description string `json:"description"`
|
Description string `json:"description"`
|
||||||
IsPure bool `json:"is_pure"`
|
IsPure bool `json:"is_pure"`
|
||||||
Inputs []InputConfig `json:"inputs"`
|
Inputs []InputConfig `json:"inputs"`
|
||||||
Outputs []OutInputConfig `json:"outputs"`
|
Outputs []OutputConfig `json:"outputs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *innerExecNode) AppendInPort(port ...IPort) {
|
func (bc *BaseExecConfig) GetMaxInPortId() int {
|
||||||
if len(em.inPort) == 0 {
|
maxPortId := -1
|
||||||
em.inPortParamStartIndex = -1
|
for i := range bc.Inputs {
|
||||||
}
|
if bc.Inputs[i].PortId > maxPortId {
|
||||||
|
maxPortId = bc.Inputs[i].PortId
|
||||||
for i := 0; i < len(port); i++ {
|
|
||||||
if !port[i].IsPortExec() && em.inPortParamStartIndex < 0 {
|
|
||||||
em.inPortParamStartIndex = len(em.inPort)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
em.inPort = append(em.inPort, port[i])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return maxPortId
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *innerExecNode) AppendOutPort(port ...IPort) {
|
func (bc *BaseExecConfig) GetMaxOutPortId() int {
|
||||||
if len(em.outPort) == 0 {
|
maxPortId := -1
|
||||||
em.outPortParamStartIndex = -1
|
for i := range bc.Outputs {
|
||||||
}
|
if bc.Outputs[i].PortId > maxPortId {
|
||||||
for i := 0; i < len(port); i++ {
|
maxPortId = bc.Outputs[i].PortId
|
||||||
if !port[i].IsPortExec() && em.outPortParamStartIndex < 0 {
|
|
||||||
em.outPortParamStartIndex = len(em.outPort)
|
|
||||||
}
|
}
|
||||||
em.outPort = append(em.outPort, port[i])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return maxPortId
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *innerExecNode) PrepareMaxInPortId(maxInPortId int) {
|
||||||
|
em.inPort = make([]IPort, maxInPortId+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *innerExecNode) PrepareMaxOutPortId(maxOutPortId int) {
|
||||||
|
em.outPort = make([]IPort, maxOutPortId+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *innerExecNode) SetInPortById(id int, port IPort) bool {
|
||||||
|
if id < 0 || id >= len(em.inPort) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
em.inPort[id] = port
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *innerExecNode) SetOutPortById(id int, port IPort) bool {
|
||||||
|
if id < 0 || id >= len(em.outPort) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
em.outPort[id] = port
|
||||||
|
|
||||||
|
// 分析执行的
|
||||||
|
em.outPortParamStartIndex = -1
|
||||||
|
for i := range em.outPort {
|
||||||
|
if em.outPort[i] == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 遇到非Exec结点,即为输出参数开始位置
|
||||||
|
if !em.outPort[i].IsPortExec() {
|
||||||
|
em.outPortParamStartIndex = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *innerExecNode) GetOutPortParamStartIndex() int {
|
||||||
|
return em.outPortParamStartIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *innerExecNode) GetName() string {
|
func (em *innerExecNode) GetName() string {
|
||||||
@@ -123,6 +165,10 @@ func (em *innerExecNode) SetExec(exec IExecNode) {
|
|||||||
func (em *innerExecNode) CloneInOutPort() ([]IPort, []IPort) {
|
func (em *innerExecNode) CloneInOutPort() ([]IPort, []IPort) {
|
||||||
inPorts := make([]IPort, 0, 2)
|
inPorts := make([]IPort, 0, 2)
|
||||||
for _, port := range em.inPort {
|
for _, port := range em.inPort {
|
||||||
|
if port == nil {
|
||||||
|
inPorts = append(inPorts, nil)
|
||||||
|
}
|
||||||
|
|
||||||
if port.IsPortExec() {
|
if port.IsPortExec() {
|
||||||
// 执行入口, 不需要克隆,占位处理
|
// 执行入口, 不需要克隆,占位处理
|
||||||
inPorts = append(inPorts, nil)
|
inPorts = append(inPorts, nil)
|
||||||
@@ -134,6 +180,10 @@ func (em *innerExecNode) CloneInOutPort() ([]IPort, []IPort) {
|
|||||||
outPorts := make([]IPort, 0, 2)
|
outPorts := make([]IPort, 0, 2)
|
||||||
|
|
||||||
for _, port := range em.outPort {
|
for _, port := range em.outPort {
|
||||||
|
if port == nil {
|
||||||
|
outPorts = append(outPorts, nil)
|
||||||
|
}
|
||||||
|
|
||||||
if port.IsPortExec() {
|
if port.IsPortExec() {
|
||||||
outPorts = append(outPorts, nil)
|
outPorts = append(outPorts, nil)
|
||||||
continue
|
continue
|
||||||
@@ -182,12 +232,8 @@ func (em *innerExecNode) GetOutPort(index int) IPort {
|
|||||||
return em.outPort[index]
|
return em.outPort[index]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *innerExecNode) GetInPortParamStartIndex() int {
|
func (en *BaseExecNode) GetVariableName() string {
|
||||||
return em.inPortParamStartIndex
|
return en.execNode.variableName
|
||||||
}
|
|
||||||
|
|
||||||
func (em *innerExecNode) GetOutPortParamStartIndex() int {
|
|
||||||
return em.outPortParamStartIndex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (en *BaseExecNode) GetBluePrintModule() IBlueprintModule {
|
func (en *BaseExecNode) GetBluePrintModule() IBlueprintModule {
|
||||||
@@ -526,12 +572,7 @@ func (en *BaseExecNode) getInnerExecNode() IInnerExecNode {
|
|||||||
return en.innerExecNode.IExecNode.(IInnerExecNode)
|
return en.innerExecNode.IExecNode.(IInnerExecNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (en *BaseExecNode) setVariableName(name string) bool {
|
func (en *BaseExecNode) GetBlueprintModule() IBlueprintModule {
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func (en *BaseExecNode) GetBlueprintModule() IBlueprintModule{
|
|
||||||
if en.gr == nil {
|
if en.gr == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,18 +5,19 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 格式说明Entrance_ID
|
// Entrance 格式:Entrance_XXXX_ID
|
||||||
const (
|
const (
|
||||||
Entrance = "Entrance_"
|
Entrance = "Entrance_"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ExecPool struct {
|
type ExecPool struct {
|
||||||
innerExecNodeMap map[string]IInnerExecNode
|
innerExecNodeMap map[string]IInnerExecNode // 所有配置对应的结点信息
|
||||||
execNodeMap map[string]IExecNode
|
execNodeMap map[string]IExecNode // 实际注册的执行结点
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExecPool) Load(execDefFilePath string) error {
|
func (em *ExecPool) Load(execDefFilePath string) error {
|
||||||
@@ -48,16 +49,19 @@ func (em *ExecPool) Load(execDefFilePath string) error {
|
|||||||
|
|
||||||
// 只处理JSON文件
|
// 只处理JSON文件
|
||||||
if filepath.Ext(path) == ".json" {
|
if filepath.Ext(path) == ".json" {
|
||||||
|
// 将配置的结点初始化为innerExecNode将加入到innerExecNodeMap中
|
||||||
return em.processJSONFile(path)
|
return em.processJSONFile(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to walk path %s: %v", execDefFilePath, err)
|
return fmt.Errorf("failed to walk path %s: %v", execDefFilePath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return em.loadSysExec()
|
// 生成变量配置对应的配置结点GetVar_类型、SetVar_类型
|
||||||
|
return em.regVariablesNode()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理单个JSON文件
|
// 处理单个JSON文件
|
||||||
@@ -83,12 +87,24 @@ func (em *ExecPool) processJSONFile(filePath string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := range baseExecConfig {
|
for i := range baseExecConfig {
|
||||||
exec, err := em.createExecFromJSON(baseExecConfig[i])
|
// 对PortId进行排序
|
||||||
if err != nil {
|
sort.Slice(baseExecConfig[i].Inputs, func(left, right int) bool {
|
||||||
return err
|
return baseExecConfig[i].Inputs[left].PortId < baseExecConfig[i].Inputs[right].PortId
|
||||||
|
})
|
||||||
|
// 对PortId进行排序
|
||||||
|
sort.Slice(baseExecConfig[i].Outputs, func(left, right int) bool {
|
||||||
|
return baseExecConfig[i].Outputs[left].PortId < baseExecConfig[i].Outputs[right].PortId
|
||||||
|
})
|
||||||
|
|
||||||
|
// 根据配置的结点信息,创建innerExecNode
|
||||||
|
var execError error
|
||||||
|
exec, execError := em.createExecFromJSON(baseExecConfig[i])
|
||||||
|
if execError != nil {
|
||||||
|
return execError
|
||||||
}
|
}
|
||||||
|
|
||||||
if !em.loadBaseExec(exec) {
|
// 加载到innerExecNodeMap中
|
||||||
|
if !em.addInnerExec(exec) {
|
||||||
return fmt.Errorf("exec %s already registered", exec.GetName())
|
return fmt.Errorf("exec %s already registered", exec.GetName())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -116,6 +132,7 @@ func (em *ExecPool) createPortByDataType(nodeName, portName, dataType string) (I
|
|||||||
func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExecNode, error) {
|
func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExecNode, error) {
|
||||||
var baseExec innerExecNode
|
var baseExec innerExecNode
|
||||||
|
|
||||||
|
// 如果是入口名,则按入口名Entrance_ArrayParam_000002生成结点名:Entrance_ArrayParam
|
||||||
entranceName, _, ok := getEntranceNodeNameAndID(baseExecConfig.Name)
|
entranceName, _, ok := getEntranceNodeNameAndID(baseExecConfig.Name)
|
||||||
if ok {
|
if ok {
|
||||||
baseExec.Name = entranceName
|
baseExec.Name = entranceName
|
||||||
@@ -125,8 +142,10 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe
|
|||||||
baseExec.Title = baseExecConfig.Title
|
baseExec.Title = baseExecConfig.Title
|
||||||
baseExec.Package = baseExecConfig.Package
|
baseExec.Package = baseExecConfig.Package
|
||||||
baseExec.Description = baseExecConfig.Description
|
baseExec.Description = baseExecConfig.Description
|
||||||
|
baseExec.PrepareMaxInPortId(baseExecConfig.GetMaxInPortId())
|
||||||
|
baseExec.PrepareMaxOutPortId(baseExecConfig.GetMaxOutPortId())
|
||||||
|
|
||||||
// exec数量
|
// 初始化所有的输入端口
|
||||||
inExecNum := 0
|
inExecNum := 0
|
||||||
for index, input := range baseExecConfig.Inputs {
|
for index, input := range baseExecConfig.Inputs {
|
||||||
portType := strings.ToLower(input.PortType)
|
portType := strings.ToLower(input.PortType)
|
||||||
@@ -134,6 +153,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe
|
|||||||
return nil, fmt.Errorf("input %s data type %s not support", input.Name, input.DataType)
|
return nil, fmt.Errorf("input %s data type %s not support", input.Name, input.DataType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 输入执行结点只能有一个,且只能放在第一个
|
||||||
if portType == Config_PortType_Exec {
|
if portType == Config_PortType_Exec {
|
||||||
if inExecNum > 0 {
|
if inExecNum > 0 {
|
||||||
return nil, fmt.Errorf("inPort only allows one Execute,node name %s", baseExec.Name)
|
return nil, fmt.Errorf("inPort only allows one Execute,node name %s", baseExec.Name)
|
||||||
@@ -143,18 +163,22 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe
|
|||||||
}
|
}
|
||||||
|
|
||||||
inExecNum++
|
inExecNum++
|
||||||
baseExec.AppendInPort(NewPortExec())
|
// 设置执行端口
|
||||||
|
baseExec.SetInPortById(input.PortId, NewPortExec())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 根据类型设置对应的端口
|
||||||
port, err := em.createPortByDataType(baseExec.Name, input.Name, input.DataType)
|
port, err := em.createPortByDataType(baseExec.Name, input.Name, input.DataType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
baseExec.AppendInPort(port)
|
// 根据PortId设置端口
|
||||||
|
baseExec.SetInPortById(input.PortId, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 初始化所有的输出端口
|
||||||
hasData := false
|
hasData := false
|
||||||
for _, output := range baseExecConfig.Outputs {
|
for _, output := range baseExecConfig.Outputs {
|
||||||
portType := strings.ToLower(output.PortType)
|
portType := strings.ToLower(output.PortType)
|
||||||
@@ -167,22 +191,25 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe
|
|||||||
return nil, fmt.Errorf("the exec port can only be placed at the front,node name %s", baseExec.Name)
|
return nil, fmt.Errorf("the exec port can only be placed at the front,node name %s", baseExec.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置执行端口
|
||||||
if portType == Config_PortType_Exec {
|
if portType == Config_PortType_Exec {
|
||||||
baseExec.AppendOutPort(NewPortExec())
|
baseExec.SetOutPortById(output.PortId, NewPortExec())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 根据类型设置数据端口
|
||||||
hasData = true
|
hasData = true
|
||||||
port, err := em.createPortByDataType(baseExec.Name, output.Name, output.DataType)
|
port, err := em.createPortByDataType(baseExec.Name, output.Name, output.DataType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
baseExec.AppendOutPort(port)
|
baseExec.SetOutPortById(output.PortId, port)
|
||||||
}
|
}
|
||||||
return &baseExec, nil
|
return &baseExec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExecPool) loadBaseExec(exec IInnerExecNode) bool {
|
func (em *ExecPool) addInnerExec(exec IInnerExecNode) bool {
|
||||||
if _, ok := em.innerExecNodeMap[exec.GetName()]; ok {
|
if _, ok := em.innerExecNodeMap[exec.GetName()]; ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -201,7 +228,7 @@ func (em *ExecPool) Register(exec IExecNode) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := em.execNodeMap[innerNode.GetName()]; ok {
|
if _, ok = em.execNodeMap[innerNode.GetName()]; ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,10 +237,14 @@ func (em *ExecPool) Register(exec IExecNode) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置实际执行结点中innerExecNode变量,BaseExecNode.innerExecNode = innerNode
|
||||||
baseExecNode.initInnerExecNode(innerNode.(*innerExecNode))
|
baseExecNode.initInnerExecNode(innerNode.(*innerExecNode))
|
||||||
|
|
||||||
|
// innerNode设置实际的exec变量,innerExecNode.IExecNode = exec
|
||||||
innerNode.SetExec(exec)
|
innerNode.SetExec(exec)
|
||||||
|
|
||||||
em.execNodeMap[baseExec.GetName()] = baseExec
|
// 将实际的执行结点保存到execNodeMap中
|
||||||
|
em.execNodeMap[baseExec.GetName()] = exec
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -224,7 +255,8 @@ func (em *ExecPool) GetExec(name string) IInnerExecNode {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExecPool) loadSysExec() error {
|
// regVariablesNode 注册变量结点GetVar_类型、SetVar_类型
|
||||||
|
func (em *ExecPool) regVariablesNode() error {
|
||||||
var err error
|
var err error
|
||||||
if err = em.regGetVariables(Config_DataType_Int); err != nil {
|
if err = em.regGetVariables(Config_DataType_Int); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -275,17 +307,18 @@ func (em *ExecPool) loadSysExec() error {
|
|||||||
func (em *ExecPool) regGetVariables(typ string) error {
|
func (em *ExecPool) regGetVariables(typ string) error {
|
||||||
var baseExec innerExecNode
|
var baseExec innerExecNode
|
||||||
baseExec.Name = genGetVariablesNodeName(typ)
|
baseExec.Name = genGetVariablesNodeName(typ)
|
||||||
|
baseExec.PrepareMaxOutPortId(0)
|
||||||
|
|
||||||
outPort := NewPortByType(typ)
|
outPort := NewPortByType(typ)
|
||||||
if outPort == nil {
|
if outPort == nil {
|
||||||
return fmt.Errorf("invalid type %s", typ)
|
return fmt.Errorf("invalid type %s", typ)
|
||||||
}
|
}
|
||||||
baseExec.AppendOutPort(outPort)
|
baseExec.SetOutPortById(0, outPort)
|
||||||
|
|
||||||
var getVariablesNode GetVariablesNode
|
var getVariablesNode GetVariablesNode
|
||||||
getVariablesNode.nodeName = baseExec.GetName()
|
getVariablesNode.nodeName = baseExec.GetName()
|
||||||
|
|
||||||
if !em.loadBaseExec(&baseExec) {
|
if !em.addInnerExec(&baseExec) {
|
||||||
return fmt.Errorf("exec %s already registered", baseExec.GetName())
|
return fmt.Errorf("exec %s already registered", baseExec.GetName())
|
||||||
}
|
}
|
||||||
if !em.Register(&getVariablesNode) {
|
if !em.Register(&getVariablesNode) {
|
||||||
@@ -311,12 +344,17 @@ func (em *ExecPool) regSetVariables(typ string) error {
|
|||||||
inPort := NewPortByType(typ)
|
inPort := NewPortByType(typ)
|
||||||
outExecPort := NewPortByType(Config_PortType_Exec)
|
outExecPort := NewPortByType(Config_PortType_Exec)
|
||||||
outPort := NewPortByType(typ)
|
outPort := NewPortByType(typ)
|
||||||
|
baseExec.PrepareMaxInPortId(1)
|
||||||
|
baseExec.PrepareMaxOutPortId(1)
|
||||||
|
|
||||||
baseExec.AppendInPort(inExecPort, inPort)
|
baseExec.SetInPortById(0, inExecPort)
|
||||||
baseExec.AppendOutPort(outExecPort, outPort)
|
baseExec.SetInPortById(1, inPort)
|
||||||
|
|
||||||
|
baseExec.SetOutPortById(0, outExecPort)
|
||||||
|
baseExec.SetOutPortById(1, outPort)
|
||||||
|
|
||||||
baseExec.IExecNode = &SetVariablesNode{nodeName: baseExec.GetName()}
|
baseExec.IExecNode = &SetVariablesNode{nodeName: baseExec.GetName()}
|
||||||
if !em.loadBaseExec(&baseExec) {
|
if !em.addInnerExec(&baseExec) {
|
||||||
return fmt.Errorf("exec %s already registered", baseExec.GetName())
|
return fmt.Errorf("exec %s already registered", baseExec.GetName())
|
||||||
}
|
}
|
||||||
if !em.Register(baseExec.IExecNode) {
|
if !em.Register(baseExec.IExecNode) {
|
||||||
|
|||||||
@@ -2,21 +2,28 @@ package blueprint
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/goccy/go-json"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/service"
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
|
"github.com/goccy/go-json"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ReturnVarial = "g_Return"
|
const ReturnVarial = "g_Return"
|
||||||
|
|
||||||
|
var IsDebug = false
|
||||||
|
|
||||||
type IGraph interface {
|
type IGraph interface {
|
||||||
Do(entranceID int64, args ...any) (Port_Array,error)
|
Do(entranceID int64, args ...any) (Port_Array, error)
|
||||||
Release()
|
Release()
|
||||||
|
GetGraphFileName() string
|
||||||
|
HotReload(newBaseGraph *baseGraph)
|
||||||
}
|
}
|
||||||
|
|
||||||
type IBlueprintModule interface {
|
type IBlueprintModule interface {
|
||||||
SafeAfterFunc(timerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{}))
|
SafeAfterFunc(timerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{}))
|
||||||
TriggerEvent(graphID int64, eventID int64, args ...any) error
|
TriggerEvent(graphID int64, eventID int64, args ...any) error
|
||||||
CancelTimerId(graphID int64,timerId *uint64) bool
|
CancelTimerId(graphID int64, timerId *uint64) bool
|
||||||
GetGameService() service.IService
|
GetGameService() service.IService
|
||||||
GetBattleService() service.IService
|
GetBattleService() service.IService
|
||||||
}
|
}
|
||||||
@@ -26,7 +33,8 @@ type baseGraph struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Graph struct {
|
type Graph struct {
|
||||||
graphID int64
|
graphFileName string
|
||||||
|
graphID int64
|
||||||
*baseGraph
|
*baseGraph
|
||||||
graphContext
|
graphContext
|
||||||
IBlueprintModule
|
IBlueprintModule
|
||||||
@@ -52,8 +60,8 @@ type edgeConfig struct {
|
|||||||
SourceNodeID string `json:"source_node_id"`
|
SourceNodeID string `json:"source_node_id"`
|
||||||
DesNodeId string `json:"des_node_id"`
|
DesNodeId string `json:"des_node_id"`
|
||||||
|
|
||||||
SourcePortIndex int `json:"source_port_index"`
|
SourcePortId int `json:"source_port_id"`
|
||||||
DesPortIndex int `json:"des_port_index"`
|
DesPortId int `json:"des_port_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MultiTypeValue struct {
|
type MultiTypeValue struct {
|
||||||
@@ -134,15 +142,34 @@ func (gc *graphConfig) GetNodeByID(nodeID string) *nodeConfig {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array,error) {
|
func (gr *Graph) GetAndCreateReturnPort() IPort {
|
||||||
|
p, ok := gr.globalVariables[ReturnVarial]
|
||||||
|
if ok && p != nil {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
p = NewPortArray()
|
||||||
|
gr.globalVariables[ReturnVarial] = p
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array, error) {
|
||||||
|
if IsDebug {
|
||||||
|
log.Debug("Graph Do", log.String("graphName", gr.graphFileName), log.Int64("graphID", gr.graphID), log.Int64("entranceID", entranceID))
|
||||||
|
}
|
||||||
|
|
||||||
entranceNode := gr.entrance[entranceID]
|
entranceNode := gr.entrance[entranceID]
|
||||||
if entranceNode == nil {
|
if entranceNode == nil {
|
||||||
return nil,fmt.Errorf("entranceID:%d not found", entranceID)
|
return nil, fmt.Errorf("entranceID:%d not found", entranceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
gr.variables = map[string]IPort{}
|
gr.variables = map[string]IPort{}
|
||||||
|
gr.context = map[string]*ExecContext{}
|
||||||
|
|
||||||
if gr.globalVariables == nil {
|
if gr.globalVariables == nil {
|
||||||
gr.globalVariables = map[string]IPort{}
|
gr.globalVariables = map[string]IPort{}
|
||||||
|
}else {
|
||||||
|
gr.globalVariables[ReturnVarial] = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := entranceNode.Do(gr, args...)
|
err := entranceNode.Do(gr, args...)
|
||||||
@@ -150,17 +177,17 @@ func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array,error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if gr.globalVariables!= nil {
|
if gr.globalVariables != nil {
|
||||||
port := gr.globalVariables[ReturnVarial]
|
port := gr.globalVariables[ReturnVarial]
|
||||||
if port != nil {
|
if port != nil {
|
||||||
array,ok := port.GetArray()
|
array, ok := port.GetArray()
|
||||||
if ok{
|
if ok {
|
||||||
return array,nil
|
return array, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil,nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gr *Graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort {
|
func (gr *Graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort {
|
||||||
@@ -194,3 +221,11 @@ func (gr *Graph) Release() {
|
|||||||
// 清理掉所有数据
|
// 清理掉所有数据
|
||||||
*gr = Graph{}
|
*gr = Graph{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gr *Graph) HotReload(newBaseGraph *baseGraph) {
|
||||||
|
gr.baseGraph = newBaseGraph
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gr *Graph) GetGraphFileName() string{
|
||||||
|
return gr.graphFileName
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,10 +2,11 @@ package blueprint
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/goccy/go-json"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/goccy/go-json"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GraphPool struct {
|
type GraphPool struct {
|
||||||
@@ -51,6 +52,15 @@ func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string, blueprintMod
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gp *GraphPool) GetBaseGraph(graphName string) *baseGraph {
|
||||||
|
gr, ok := gp.mapGraphs[graphName]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return gr
|
||||||
|
}
|
||||||
|
|
||||||
func (gp *GraphPool) Create(graphName string, graphID int64) IGraph {
|
func (gp *GraphPool) Create(graphName string, graphID int64) IGraph {
|
||||||
gr, ok := gp.mapGraphs[graphName]
|
gr, ok := gp.mapGraphs[graphName]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -60,6 +70,7 @@ func (gp *GraphPool) Create(graphName string, graphID int64) IGraph {
|
|||||||
var graph Graph
|
var graph Graph
|
||||||
graph.baseGraph = gr
|
graph.baseGraph = gr
|
||||||
graph.graphID = graphID
|
graph.graphID = graphID
|
||||||
|
graph.graphFileName = graphName
|
||||||
graph.context = make(map[string]*ExecContext, 4)
|
graph.context = make(map[string]*ExecContext, 4)
|
||||||
graph.IBlueprintModule = gp.blueprintModule
|
graph.IBlueprintModule = gp.blueprintModule
|
||||||
return &graph
|
return &graph
|
||||||
@@ -71,21 +82,25 @@ func (gp *GraphPool) processJSONFile(filePath string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open file %s: %v", filePath, err)
|
return fmt.Errorf("failed to open file %s: %v", filePath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := file.Close(); err != nil {
|
if err = file.Close(); err != nil {
|
||||||
fmt.Printf("关闭文件 %s 时出错: %v\n", filePath, err)
|
fmt.Printf("关闭文件 %s 时出错: %v\n", filePath, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
fileName := filepath.Base(filePath)
|
fileName := filepath.Base(filePath)
|
||||||
ext := filepath.Ext(fileName) // 获取".html"
|
ext := filepath.Ext(fileName) // 获取".vgf"
|
||||||
name := strings.TrimSuffix(fileName, ext) // 获取"name"
|
name := strings.TrimSuffix(fileName, ext) // 获取"name"
|
||||||
|
|
||||||
|
// 解析文件
|
||||||
var gConfig graphConfig
|
var gConfig graphConfig
|
||||||
decoder := json.NewDecoder(file)
|
decoder := json.NewDecoder(file)
|
||||||
if err := decoder.Decode(&gConfig); err != nil {
|
if err = decoder.Decode(&gConfig); err != nil {
|
||||||
return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err)
|
return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 预处理蓝图
|
||||||
return gp.prepareGraph(name, &gConfig)
|
return gp.prepareGraph(name, &gConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -136,8 +151,6 @@ func (gp *GraphPool) genVarExec(nodeCfg *nodeConfig, graphConfig *graphConfig) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
e := gp.execPool.GetExec(nodeName)
|
e := gp.execPool.GetExec(nodeName)
|
||||||
e.(IExecNode).setVariableName(varName)
|
|
||||||
|
|
||||||
return e, varName
|
return e, varName
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -173,7 +186,7 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode,
|
|||||||
|
|
||||||
func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, recursion *int) error {
|
func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, recursion *int) error {
|
||||||
*recursion++
|
*recursion++
|
||||||
if *recursion > 100 {
|
if *recursion > 256 {
|
||||||
return fmt.Errorf("recursion too deep")
|
return fmt.Errorf("recursion too deep")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,7 +219,7 @@ func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *
|
|||||||
func (gp *GraphPool) findOutNextNode(graphConfig *graphConfig, mapNodeExec map[string]*execNode, sourceNodeID string, sourcePortIdx int) *execNode {
|
func (gp *GraphPool) findOutNextNode(graphConfig *graphConfig, mapNodeExec map[string]*execNode, sourceNodeID string, sourcePortIdx int) *execNode {
|
||||||
// 找到出口的NodeID
|
// 找到出口的NodeID
|
||||||
for _, edge := range graphConfig.Edges {
|
for _, edge := range graphConfig.Edges {
|
||||||
if edge.SourceNodeID == sourceNodeID && edge.SourcePortIndex == sourcePortIdx {
|
if edge.SourceNodeID == sourceNodeID && edge.SourcePortId == sourcePortIdx {
|
||||||
return mapNodeExec[edge.DesNodeId]
|
return mapNodeExec[edge.DesNodeId]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -254,7 +267,7 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node
|
|||||||
|
|
||||||
func (gp *GraphPool) findPreInPortNode(mapNodes map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, portIdx int) *prePortNode {
|
func (gp *GraphPool) findPreInPortNode(mapNodes map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, portIdx int) *prePortNode {
|
||||||
for _, edge := range graphConfig.Edges {
|
for _, edge := range graphConfig.Edges {
|
||||||
if edge.DesNodeId == nodeExec.Id && edge.DesPortIndex == portIdx {
|
if edge.DesNodeId == nodeExec.Id && edge.DesPortId == portIdx {
|
||||||
srcNode := mapNodes[edge.SourceNodeID]
|
srcNode := mapNodes[edge.SourceNodeID]
|
||||||
if srcNode == nil {
|
if srcNode == nil {
|
||||||
return nil
|
return nil
|
||||||
@@ -262,7 +275,7 @@ func (gp *GraphPool) findPreInPortNode(mapNodes map[string]*execNode, nodeExec *
|
|||||||
|
|
||||||
var preNode prePortNode
|
var preNode prePortNode
|
||||||
preNode.node = srcNode
|
preNode.node = srcNode
|
||||||
preNode.outPortIndex = edge.SourcePortIndex
|
preNode.outPortId = edge.SourcePortId
|
||||||
|
|
||||||
return &preNode
|
return &preNode
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,14 +5,6 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
RegExecNode(&AddInt{})
|
|
||||||
RegExecNode(&SubInt{})
|
|
||||||
RegExecNode(&MulInt{})
|
|
||||||
RegExecNode(&DivInt{})
|
|
||||||
RegExecNode(&ModInt{})
|
|
||||||
RegExecNode(&RandNumber{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddInt 加(int)
|
// AddInt 加(int)
|
||||||
type AddInt struct {
|
type AddInt struct {
|
||||||
|
|||||||
@@ -2,11 +2,12 @@ package blueprint
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type prePortNode struct {
|
type prePortNode struct {
|
||||||
node *execNode // 上个结点
|
node *execNode // 上个结点
|
||||||
outPortIndex int // 对应上一个结点的OutPort索引
|
outPortId int // 对应上一个结点的OutPortId
|
||||||
}
|
}
|
||||||
|
|
||||||
type execNode struct {
|
type execNode struct {
|
||||||
@@ -102,20 +103,6 @@ func (en *execNode) exec(gr *Graph) (int, error) {
|
|||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
//defer func() {
|
|
||||||
inPort, outPort := node.GetPorts()
|
|
||||||
debugString := "inPort:"
|
|
||||||
for i := 0; i < len(inPort); i++ {
|
|
||||||
debugString += fmt.Sprintf("%+v,", inPort[i])
|
|
||||||
}
|
|
||||||
debugString += " outPort:"
|
|
||||||
for i := 0; i < len(outPort); i++ {
|
|
||||||
debugString += fmt.Sprintf("%+v,", outPort[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("exec node %s,%s\n", en.execNode.GetName(), debugString)
|
|
||||||
//}()
|
|
||||||
|
|
||||||
return e.Exec()
|
return e.Exec()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,9 +129,9 @@ func (en *execNode) doSetInPort(gr *Graph, index int, inPort IPort) error {
|
|||||||
|
|
||||||
// 判断上一个结点是否已经执行过
|
// 判断上一个结点是否已经执行过
|
||||||
if _, ok := gr.context[preNode.node.Id]; ok {
|
if _, ok := gr.context[preNode.node.Id]; ok {
|
||||||
outPort := gr.GetNodeOutPortValue(preNode.node.Id, preNode.outPortIndex)
|
outPort := gr.GetNodeOutPortValue(preNode.node.Id, preNode.outPortId)
|
||||||
if outPort == nil {
|
if outPort == nil {
|
||||||
return fmt.Errorf("pre node %s out port index %d not found", preNode.node.Id, preNode.outPortIndex)
|
return fmt.Errorf("pre node %s out port index %d not found", preNode.node.Id, preNode.outPortId)
|
||||||
}
|
}
|
||||||
|
|
||||||
inPort.SetValue(outPort)
|
inPort.SetValue(outPort)
|
||||||
@@ -155,6 +142,10 @@ func (en *execNode) doSetInPort(gr *Graph, index int, inPort IPort) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (en *execNode) Do(gr *Graph, outPortArgs ...any) error {
|
func (en *execNode) Do(gr *Graph, outPortArgs ...any) error {
|
||||||
|
if IsDebug {
|
||||||
|
log.Debug("Start ExecNode", log.String("Name",en.execNode.GetName()))
|
||||||
|
}
|
||||||
|
|
||||||
// 重新初始化上下文
|
// 重新初始化上下文
|
||||||
inPorts, outPorts := en.execNode.CloneInOutPort()
|
inPorts, outPorts := en.execNode.CloneInOutPort()
|
||||||
gr.context[en.Id] = &ExecContext{
|
gr.context[en.Id] = &ExecContext{
|
||||||
@@ -164,7 +155,7 @@ func (en *execNode) Do(gr *Graph, outPortArgs ...any) error {
|
|||||||
|
|
||||||
startOutIdx := en.execNode.GetOutPortParamStartIndex()
|
startOutIdx := en.execNode.GetOutPortParamStartIndex()
|
||||||
for i := 0; i < len(outPortArgs); i++ {
|
for i := 0; i < len(outPortArgs); i++ {
|
||||||
if i >= len(outPorts) {
|
if i+startOutIdx >= len(outPorts) {
|
||||||
return fmt.Errorf("args %d not found in node %s", i, en.execNode.GetName())
|
return fmt.Errorf("args %d not found in node %s", i, en.execNode.GetName())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,12 +166,12 @@ func (en *execNode) Do(gr *Graph, outPortArgs ...any) error {
|
|||||||
|
|
||||||
// 处理InPort结点值
|
// 处理InPort结点值
|
||||||
var err error
|
var err error
|
||||||
for index := range inPorts {
|
for portId := range inPorts {
|
||||||
if en.execNode.IsInPortExec(index) {
|
if en.execNode.IsInPortExec(portId) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = en.doSetInPort(gr, index, inPorts[index])
|
err = en.doSetInPort(gr, portId, inPorts[portId])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -194,6 +185,10 @@ func (en *execNode) Do(gr *Graph, outPortArgs ...any) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if IsDebug {
|
||||||
|
log.Debug("End ExecNode", log.String("Name",en.execNode.GetName()),log.Any("InPort",inPorts ),log.Any("OutPort",outPorts))
|
||||||
|
}
|
||||||
|
|
||||||
if nextIndex == -1 || en.nextNode == nil {
|
if nextIndex == -1 || en.nextNode == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ func (em *Port[T]) convertInt64(v any) (int64, bool) {
|
|||||||
|
|
||||||
func (em *Port[T]) setAnyVale(v any) error {
|
func (em *Port[T]) setAnyVale(v any) error {
|
||||||
switch v.(type) {
|
switch v.(type) {
|
||||||
case int, int64:
|
case int8,int16,int32,int, int64,uint8,uint16,uint32,uint, uint64:
|
||||||
val, ok := em.convertInt64(v)
|
val, ok := em.convertInt64(v)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
|
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
|
||||||
|
|||||||
@@ -1,7 +0,0 @@
|
|||||||
package blueprint
|
|
||||||
|
|
||||||
var execNodes []IExecNode
|
|
||||||
|
|
||||||
func RegExecNode(exec IExecNode) {
|
|
||||||
execNodes = append(execNodes, exec)
|
|
||||||
}
|
|
||||||
@@ -2,9 +2,10 @@ package blueprint
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
|
||||||
"math/rand/v2"
|
"math/rand/v2"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 系统入口ID定义,1000以内
|
// 系统入口ID定义,1000以内
|
||||||
@@ -14,30 +15,6 @@ const (
|
|||||||
EntranceID_Timer = 3
|
EntranceID_Timer = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
RegExecNode(&Entrance_ArrayParam{})
|
|
||||||
RegExecNode(&Entrance_IntParam{})
|
|
||||||
RegExecNode(&Entrance_Timer{})
|
|
||||||
RegExecNode(&Output{})
|
|
||||||
RegExecNode(&Sequence{})
|
|
||||||
RegExecNode(&Foreach{})
|
|
||||||
RegExecNode(&GetArrayInt{})
|
|
||||||
RegExecNode(&GetArrayString{})
|
|
||||||
RegExecNode(&GetArrayLen{})
|
|
||||||
RegExecNode(&CreateIntArray{})
|
|
||||||
RegExecNode(&CreateStringArray{})
|
|
||||||
RegExecNode(&AppendIntegerToArray{})
|
|
||||||
RegExecNode(&AppendStringToArray{})
|
|
||||||
|
|
||||||
RegExecNode(&BoolIf{})
|
|
||||||
RegExecNode(&GreaterThanInteger{})
|
|
||||||
RegExecNode(&LessThanInteger{})
|
|
||||||
RegExecNode(&EqualInteger{})
|
|
||||||
RegExecNode(&RangeCompare{})
|
|
||||||
RegExecNode(&Probability{})
|
|
||||||
RegExecNode(&CreateTimer{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type Entrance_ArrayParam struct {
|
type Entrance_ArrayParam struct {
|
||||||
BaseExecNode
|
BaseExecNode
|
||||||
}
|
}
|
||||||
@@ -74,15 +51,15 @@ func (em *Entrance_Timer) Exec() (int, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Output struct {
|
type DebugOutput struct {
|
||||||
BaseExecNode
|
BaseExecNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *Output) GetName() string {
|
func (em *DebugOutput) GetName() string {
|
||||||
return "Output"
|
return "DebugOutput"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *Output) Exec() (int, error) {
|
func (em *DebugOutput) Exec() (int, error) {
|
||||||
val, ok := em.GetInPortInt(1)
|
val, ok := em.GetInPortInt(1)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, fmt.Errorf("output Exec inParam not found")
|
return 0, fmt.Errorf("output Exec inParam not found")
|
||||||
@@ -98,7 +75,7 @@ func (em *Output) Exec() (int, error) {
|
|||||||
return 0, fmt.Errorf("output Exec inParam not found")
|
return 0, fmt.Errorf("output Exec inParam not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("output Exec inParam [%d] [%s] [%v]\n", val, valStr, valArray)
|
log.Debug("DebugOutput Exec", log.Any("param1", val), log.Any("param2", valStr), log.Any("param3", valArray))
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,6 +102,37 @@ func (em *Sequence) Exec() (int, error) {
|
|||||||
return -1, nil
|
return -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ForeachIntArray struct {
|
||||||
|
BaseExecNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *ForeachIntArray) GetName() string {
|
||||||
|
return "ForeachIntArray"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *ForeachIntArray) Exec() (int, error) {
|
||||||
|
array, ok := em.GetInPortArray(1)
|
||||||
|
if !ok {
|
||||||
|
return 0, fmt.Errorf("ForeachIntArray Exec inParam 1 not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range array {
|
||||||
|
em.ExecContext.OutputPorts[2].SetInt(Port_Int(i))
|
||||||
|
em.ExecContext.OutputPorts[3].SetInt(array[i].IntVal)
|
||||||
|
err := em.DoNext(0)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := em.DoNext(1)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, nil
|
||||||
|
}
|
||||||
|
|
||||||
type Foreach struct {
|
type Foreach struct {
|
||||||
BaseExecNode
|
BaseExecNode
|
||||||
}
|
}
|
||||||
@@ -462,6 +470,40 @@ func (em *RangeCompare) Exec() (int, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EqualSwitch 等于分支==
|
||||||
|
type EqualSwitch struct {
|
||||||
|
BaseExecNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *EqualSwitch) GetName() string {
|
||||||
|
return "EqualSwitch"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *EqualSwitch) Exec() (int, error) {
|
||||||
|
inPortA := em.GetInPort(1)
|
||||||
|
if inPortA == nil {
|
||||||
|
return -1, fmt.Errorf("GreaterThanInteger inParam 1 not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
ret, ok := inPortA.GetInt()
|
||||||
|
if !ok {
|
||||||
|
return -1, fmt.Errorf("GreaterThanInteger inParam 1 error")
|
||||||
|
}
|
||||||
|
|
||||||
|
intArray := em.execNode.GetInPortDefaultIntArrayValue(2)
|
||||||
|
if intArray == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(intArray) && i < em.GetOutPortCount()-2; i++ {
|
||||||
|
if ret == intArray[i] {
|
||||||
|
return i + 2, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Probability 概率判断(万分比)
|
// Probability 概率判断(万分比)
|
||||||
type Probability struct {
|
type Probability struct {
|
||||||
BaseExecNode
|
BaseExecNode
|
||||||
@@ -639,7 +681,7 @@ func (em *CreateTimer) Exec() (int, error) {
|
|||||||
|
|
||||||
array, ok := em.GetInPortArray(1)
|
array, ok := em.GetInPortArray(1)
|
||||||
if !ok {
|
if !ok {
|
||||||
return -1, fmt.Errorf("CreateTimer inParam 0 error")
|
return -1, fmt.Errorf("CreateTimer inParam 1 error")
|
||||||
}
|
}
|
||||||
|
|
||||||
var timerId uint64
|
var timerId uint64
|
||||||
@@ -649,8 +691,8 @@ func (em *CreateTimer) Exec() (int, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("CreateTimer SafeAfterFunc error timerId:%d err:%v", timerId, err)
|
log.Warnf("CreateTimer SafeAfterFunc error timerId:%d err:%v", timerId, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
em.gr.IBlueprintModule.CancelTimerId(graphID,&timerId)
|
em.gr.IBlueprintModule.CancelTimerId(graphID, &timerId)
|
||||||
})
|
})
|
||||||
|
|
||||||
em.gr.mapTimerID[timerId] = struct{}{}
|
em.gr.mapTimerID[timerId] = struct{}{}
|
||||||
@@ -676,7 +718,7 @@ func (em *CloseTimer) GetName() string {
|
|||||||
func (em *CloseTimer) Exec() (int, error) {
|
func (em *CloseTimer) Exec() (int, error) {
|
||||||
timerID, ok := em.GetInPortInt(1)
|
timerID, ok := em.GetInPortInt(1)
|
||||||
if !ok {
|
if !ok {
|
||||||
return -1, fmt.Errorf("CreateTimer inParam 0 error")
|
return -1, fmt.Errorf("CreateTimer inParam 1 error")
|
||||||
}
|
}
|
||||||
|
|
||||||
id := uint64(timerID)
|
id := uint64(timerID)
|
||||||
@@ -687,3 +729,56 @@ func (em *CloseTimer) Exec() (int, error) {
|
|||||||
|
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// AppendIntReturn 追加返回结果(Int)
|
||||||
|
type AppendIntReturn struct {
|
||||||
|
BaseExecNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *AppendIntReturn) GetName() string {
|
||||||
|
return "AppendIntReturn"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *AppendIntReturn) Exec() (int, error) {
|
||||||
|
val, ok := em.GetInPortInt(1)
|
||||||
|
if !ok {
|
||||||
|
return -1, fmt.Errorf("AppendIntReturn inParam 1 error")
|
||||||
|
}
|
||||||
|
|
||||||
|
returnPort := em.gr.GetAndCreateReturnPort()
|
||||||
|
if returnPort == nil {
|
||||||
|
return -1, fmt.Errorf("GetAndCreateReturnPort fail")
|
||||||
|
}
|
||||||
|
returnPort.AppendArrayValInt(val)
|
||||||
|
|
||||||
|
return -0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// AppendStringReturn 追加返回结果(String)
|
||||||
|
type AppendStringReturn struct {
|
||||||
|
BaseExecNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *AppendStringReturn) GetName() string {
|
||||||
|
return "AppendStringReturn"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em *AppendStringReturn) Exec() (int, error) {
|
||||||
|
val, ok := em.GetInPortStr(1)
|
||||||
|
if !ok {
|
||||||
|
return -1, fmt.Errorf("AppendStringReturn inParam 1 error")
|
||||||
|
}
|
||||||
|
|
||||||
|
returnPort := em.gr.GetAndCreateReturnPort()
|
||||||
|
if returnPort == nil {
|
||||||
|
return -1, fmt.Errorf("GetAndCreateReturnPort fail")
|
||||||
|
}
|
||||||
|
returnPort.AppendArrayValStr(val)
|
||||||
|
|
||||||
|
return -0, nil
|
||||||
|
}
|
||||||
@@ -12,13 +12,11 @@ const globalVariablesPrefix = "g_"
|
|||||||
type GetVariablesNode struct {
|
type GetVariablesNode struct {
|
||||||
BaseExecNode
|
BaseExecNode
|
||||||
nodeName string
|
nodeName string
|
||||||
varName string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type SetVariablesNode struct {
|
type SetVariablesNode struct {
|
||||||
BaseExecNode
|
BaseExecNode
|
||||||
nodeName string
|
nodeName string
|
||||||
varName string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GetVariablesNode) GetName() string {
|
func (g *GetVariablesNode) GetName() string {
|
||||||
@@ -27,14 +25,14 @@ func (g *GetVariablesNode) GetName() string {
|
|||||||
|
|
||||||
func (g *GetVariablesNode) Exec() (int, error) {
|
func (g *GetVariablesNode) Exec() (int, error) {
|
||||||
var port IPort
|
var port IPort
|
||||||
if strings.HasPrefix(g.varName, globalVariablesPrefix) {
|
if strings.HasPrefix(g.GetVariableName(), globalVariablesPrefix) {
|
||||||
port = g.gr.globalVariables[g.varName]
|
port = g.gr.globalVariables[g.GetVariableName()]
|
||||||
} else {
|
} else {
|
||||||
port = g.gr.variables[g.varName]
|
port = g.gr.variables[g.GetVariableName()]
|
||||||
}
|
}
|
||||||
|
|
||||||
if port == nil {
|
if port == nil {
|
||||||
return -1, fmt.Errorf("variable %s not found,node name %s", g.varName, g.nodeName)
|
return -1, fmt.Errorf("variable %s not found,node name %s", g.GetVariableName(), g.nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !g.SetOutPort(0, port) {
|
if !g.SetOutPort(0, port) {
|
||||||
@@ -44,10 +42,7 @@ func (g *GetVariablesNode) Exec() (int, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GetVariablesNode) setVariableName(name string) bool {
|
|
||||||
g.varName = name
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *SetVariablesNode) GetName() string {
|
func (g *SetVariablesNode) GetName() string {
|
||||||
return g.nodeName
|
return g.nodeName
|
||||||
@@ -60,10 +55,10 @@ func (g *SetVariablesNode) Exec() (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
varPort := port.Clone()
|
varPort := port.Clone()
|
||||||
if strings.HasPrefix(g.varName, globalVariablesPrefix) {
|
if strings.HasPrefix(g.GetVariableName(), globalVariablesPrefix) {
|
||||||
g.gr.globalVariables[g.varName] = varPort
|
g.gr.globalVariables[g.GetVariableName()] = varPort
|
||||||
} else {
|
} else {
|
||||||
g.gr.variables[g.varName] = varPort
|
g.gr.variables[g.GetVariableName()] = varPort
|
||||||
}
|
}
|
||||||
|
|
||||||
if !g.SetOutPort(1, varPort) {
|
if !g.SetOutPort(1, varPort) {
|
||||||
@@ -73,7 +68,3 @@ func (g *SetVariablesNode) Exec() (int, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *SetVariablesNode) setVariableName(name string) bool {
|
|
||||||
g.varName = name
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user