Files
origin/util/blueprint/execpool.go
2025-10-28 11:11:36 +08:00

366 lines
9.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package blueprint
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sort"
)
// 格式说明Entrance_ID
const (
Entrance = "Entrance_"
)
type ExecPool struct {
innerExecNodeMap map[string]IInnerExecNode
execNodeMap map[string]IExecNode
}
func (em *ExecPool) Load(execDefFilePath string) error {
em.innerExecNodeMap = make(map[string]IInnerExecNode, 512)
em.execNodeMap = make(map[string]IExecNode, 512)
// 检查路径是否存在
stat, err := os.Stat(execDefFilePath)
if err != nil {
return fmt.Errorf("failed to access path %s: %v", execDefFilePath, err)
}
// 如果是单个文件,直接处理
if !stat.IsDir() {
return fmt.Errorf("%s is not a directory", execDefFilePath)
}
// 遍历目录及其子目录
err = filepath.Walk(execDefFilePath, func(path string, info os.FileInfo, err error) error {
if err != nil {
fmt.Printf("访问路径出错 %s: %v\n", path, err)
return nil // 继续遍历其他文件
}
// 如果是目录,继续遍历
if info.IsDir() {
return nil
}
// 只处理JSON文件
if filepath.Ext(path) == ".json" {
return em.processJSONFile(path)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to walk path %s: %v", execDefFilePath, err)
}
return em.loadSysExec()
}
// 处理单个JSON文件
func (em *ExecPool) processJSONFile(filePath string) error {
// 打开文件
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file %s: %v", filePath, err)
}
defer func(file *os.File) {
err = file.Close()
if err != nil {
fmt.Printf("failed to close file %s: %v\n", filePath, err)
return
}
}(file)
var baseExecConfig []BaseExecConfig
decoder := json.NewDecoder(file)
if err = decoder.Decode(&baseExecConfig); err != nil {
return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err)
}
for i := range baseExecConfig {
// 排序
sort.Slice(baseExecConfig[i].Inputs, func(left, right int) bool {
return baseExecConfig[i].Inputs[left].PortId < baseExecConfig[i].Inputs[right].PortId
})
sort.Slice(baseExecConfig[i].Outputs, func(left, right int) bool {
return baseExecConfig[i].Outputs[left].PortId < baseExecConfig[i].Outputs[right].PortId
})
exec, err := em.createExecFromJSON(baseExecConfig[i])
if err != nil {
return err
}
if !em.loadBaseExec(exec) {
return fmt.Errorf("exec %s already registered", exec.GetName())
}
}
return nil
}
func (em *ExecPool) createPortByDataType(nodeName, portName, dataType string) (IPort, error) {
switch strings.ToLower(dataType) {
case Config_DataType_Int, Config_DataType_Integer:
return NewPortInt(), nil
case Config_DataType_Float:
return NewPortFloat(), nil
case Config_DataType_Str:
return NewPortStr(), nil
case Config_DataType_Boolean, Config_DataType_Bool:
return NewPortBool(), nil
case Config_DataType_Array:
return NewPortArray(), nil
}
return nil, fmt.Errorf("invalid data type %s,node %s port %s", dataType, nodeName, portName)
}
func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExecNode, error) {
var baseExec innerExecNode
entranceName, _, ok := getEntranceNodeNameAndID(baseExecConfig.Name)
if ok {
baseExec.Name = entranceName
} else {
baseExec.Name = baseExecConfig.Name
}
baseExec.Title = baseExecConfig.Title
baseExec.Package = baseExecConfig.Package
baseExec.Description = baseExecConfig.Description
baseExec.PrepareMaxInPortId(baseExecConfig.GetMaxInPortId())
baseExec.PrepareMaxOutPortId(baseExecConfig.GetMaxOutPortId())
// exec数量
inExecNum := 0
for index, input := range baseExecConfig.Inputs {
portType := strings.ToLower(input.PortType)
if portType != Config_PortType_Exec && portType != Config_PortType_Data {
return nil, fmt.Errorf("input %s data type %s not support", input.Name, input.DataType)
}
if portType == Config_PortType_Exec {
if inExecNum > 0 {
return nil, fmt.Errorf("inPort only allows one Execute,node name %s", baseExec.Name)
}
if index > 0 {
return nil, fmt.Errorf("the exec port is only allowed to be placed on the first one,node name %s", baseExec.Name)
}
inExecNum++
baseExec.SetInPortById(input.PortId, NewPortExec())
// baseExec.AppendInPort(NewPortExec())
continue
}
port, err := em.createPortByDataType(baseExec.Name, input.Name, input.DataType)
if err != nil {
return nil, err
}
baseExec.SetInPortById(input.PortId, port)
}
hasData := false
for _, output := range baseExecConfig.Outputs {
portType := strings.ToLower(output.PortType)
if portType != Config_PortType_Exec && portType != Config_PortType_Data {
return nil, fmt.Errorf("output %s data type %s not support,node name %s", output.Name, output.DataType, baseExec.Name)
}
// Exec出口只能先Exec再Data不能穿插如果是Data类型但遇到Exec入口则不允许
if hasData && portType == Config_PortType_Exec {
return nil, fmt.Errorf("the exec port can only be placed at the front,node name %s", baseExec.Name)
}
if portType == Config_PortType_Exec {
baseExec.SetOutPortById(output.PortId, NewPortExec())
continue
}
hasData = true
port, err := em.createPortByDataType(baseExec.Name, output.Name, output.DataType)
if err != nil {
return nil, err
}
baseExec.SetOutPortById(output.PortId, port)
}
return &baseExec, nil
}
func (em *ExecPool) loadBaseExec(exec IInnerExecNode) bool {
if _, ok := em.innerExecNodeMap[exec.GetName()]; ok {
return false
}
em.innerExecNodeMap[exec.GetName()] = exec
return true
}
func (em *ExecPool) Register(exec IExecNode) bool {
baseExec, ok := exec.(IExecNode)
if !ok {
return false
}
innerNode, ok := em.innerExecNodeMap[baseExec.GetName()]
if !ok {
return false
}
if _, ok = em.execNodeMap[innerNode.GetName()]; ok {
return false
}
baseExecNode, ok := exec.(IBaseExecNode)
if !ok {
return false
}
baseExecNode.initInnerExecNode(innerNode.(*innerExecNode))
innerNode.SetExec(exec)
em.execNodeMap[baseExec.GetName()] = baseExec
return true
}
func (em *ExecPool) GetExec(name string) IInnerExecNode {
if exec, ok := em.execNodeMap[name]; ok {
return exec.getInnerExecNode()
}
return nil
}
func (em *ExecPool) loadSysExec() error {
var err error
if err = em.regGetVariables(Config_DataType_Int); err != nil {
return err
}
if err = em.regGetVariables(Config_DataType_Integer); err != nil {
return err
}
if err = em.regGetVariables(Config_DataType_Float); err != nil {
return err
}
if err = em.regGetVariables(Config_DataType_Str); err != nil {
return err
}
if err = em.regGetVariables(Config_DataType_Boolean); err != nil {
return err
}
if err = em.regGetVariables(Config_DataType_Bool); err != nil {
return err
}
if err = em.regGetVariables(Config_DataType_Array); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Int); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Integer); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Float); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Str); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Boolean); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Bool); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Array); err != nil {
return err
}
return nil
}
func (em *ExecPool) regGetVariables(typ string) error {
var baseExec innerExecNode
baseExec.Name = genGetVariablesNodeName(typ)
baseExec.PrepareMaxOutPortId(0)
outPort := NewPortByType(typ)
if outPort == nil {
return fmt.Errorf("invalid type %s", typ)
}
baseExec.SetOutPortById(0, outPort)
var getVariablesNode GetVariablesNode
getVariablesNode.nodeName = baseExec.GetName()
if !em.loadBaseExec(&baseExec) {
return fmt.Errorf("exec %s already registered", baseExec.GetName())
}
if !em.Register(&getVariablesNode) {
return fmt.Errorf("exec %s already registered", baseExec.GetName())
}
return nil
}
func genSetVariablesNodeName(typ string) string {
return fmt.Sprintf("%s_%s", SetVariables, typ)
}
func genGetVariablesNodeName(typ string) string {
return fmt.Sprintf("%s_%s", GetVariables, typ)
}
func (em *ExecPool) regSetVariables(typ string) error {
var baseExec innerExecNode
baseExec.Name = genSetVariablesNodeName(typ)
inExecPort := NewPortByType(Config_PortType_Exec)
inPort := NewPortByType(typ)
outExecPort := NewPortByType(Config_PortType_Exec)
outPort := NewPortByType(typ)
baseExec.PrepareMaxInPortId(1)
baseExec.PrepareMaxOutPortId(1)
baseExec.SetInPortById(0, inExecPort)
baseExec.SetInPortById(1, inPort)
baseExec.SetOutPortById(0, outExecPort)
baseExec.SetOutPortById(1, outPort)
baseExec.IExecNode = &SetVariablesNode{nodeName: baseExec.GetName()}
if !em.loadBaseExec(&baseExec) {
return fmt.Errorf("exec %s already registered", baseExec.GetName())
}
if !em.Register(baseExec.IExecNode) {
return fmt.Errorf("exec %s already registered", baseExec.GetName())
}
return nil
}
func getEntranceNodeNameAndID(className string) (string, int64, bool) {
if !strings.HasPrefix(className, Entrance) {
return "", 0, false
}
parts := strings.Split(className, "_")
if len(parts) != 3 {
return "", 0, false
}
entranceID, err := strconv.Atoi(parts[2])
if err != nil {
return "", 0, false
}
return parts[0] + "_" + parts[1], int64(entranceID), true
}