From 3bcce31a86871d09d79866b4f48a0003cd91716c Mon Sep 17 00:00:00 2001 From: boyce <6549168@qq.com> Date: Sun, 21 Sep 2025 18:41:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- util/blueprint/blueprint.go | 19 ++++- util/blueprint/blueprint_test.go | 26 ++++++- util/blueprint/exec.go | 94 ++++++++++++++++-------- util/blueprint/execpool.go | 120 ++++++++++++++++++++++--------- util/blueprint/graph.go | 8 +-- util/blueprint/graphpool.go | 52 +++++++------- util/blueprint/node.go | 36 +++++----- util/blueprint/port.go | 4 +- util/blueprint/variables.go | 16 ++--- 9 files changed, 247 insertions(+), 128 deletions(-) diff --git a/util/blueprint/blueprint.go b/util/blueprint/blueprint.go index fab1fc9..18aa262 100644 --- a/util/blueprint/blueprint.go +++ b/util/blueprint/blueprint.go @@ -5,7 +5,20 @@ type Blueprint struct { graphPool GraphPool } -func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string) { - bm.execPool.Load(execDefFilePath) - bm.graphPool.Load(graphFilePath) +func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, onRegister func(execPool *ExecPool) error) error { + err := bm.execPool.Load(execDefFilePath) + if err != nil { + return err + } + + err = onRegister(&bm.execPool) + if err != nil { + return err + } + + err = bm.graphPool.Load(&bm.execPool, graphFilePath) + if err != nil { + return err + } + return nil } diff --git a/util/blueprint/blueprint_test.go b/util/blueprint/blueprint_test.go index 75d5136..c9ce498 100644 --- a/util/blueprint/blueprint_test.go +++ b/util/blueprint/blueprint_test.go @@ -4,6 +4,28 @@ import ( "testing" ) -func TestExecMgr(t *testing.T) { - +type Entrance_IntParam struct { + BaseExecNode +} + +func (em *Entrance_IntParam) GetName() string { + return "Entrance_IntParam" +} + +func (em *Entrance_IntParam) Exec() (int, error) { + return 0, nil +} + +func OnRegister(bm *ExecPool) error { + bm.Register(&Entrance_IntParam{}) + return nil +} + +func TestExecMgr(t *testing.T) { + // + var bp Blueprint + err := bp.Init("./json/", "./vgf/", OnRegister) + if err != nil { + t.Fatalf("init failed,err:%v", err) + } } diff --git a/util/blueprint/exec.go b/util/blueprint/exec.go index e68161a..3e7c344 100644 --- a/util/blueprint/exec.go +++ b/util/blueprint/exec.go @@ -3,12 +3,13 @@ package blueprint import "fmt" type IBaseExecNode interface { - initExecNode(gr *graph, nodeId string, variableName string, nodeName string) error + initInnerExecNode(innerNode *innerExecNode) + initExecNode(gr *graph, en *execNode) error } -type IBaseExec interface { +type IInnerExecNode interface { GetName() string - SetExec(exec IExec) + SetExec(exec IExecNode) IsInPortExec(index int) bool IsOutPortExec(index int) bool GetInPortCount() int @@ -18,15 +19,15 @@ type IBaseExec interface { GetOutPort(index int) IPort } -type IExec interface { +type IExecNode interface { GetName() string - Exec() error + DoNext(index int) error + Exec() (int, error) // 返回后续执行的Node的Index + GetNextExecLen() int + getInnerExecNode() IInnerExecNode } -type IExecData interface { -} - -type BaseExec struct { +type innerExecNode struct { Name string Title string Package string @@ -34,7 +35,16 @@ type BaseExec struct { InPort []IPort OutPort []IPort - IExec + IExecNode +} + +type BaseExecNode struct { + *innerExecNode + + // 执行时初始化的数据 + *ExecContext + gr *graph + execNode *execNode } type InputConfig struct { @@ -62,23 +72,23 @@ type BaseExecConfig struct { Outputs []OutInputConfig `json:"outputs"` } -func (em *BaseExec) AppendInPort(port ...IPort) { +func (em *innerExecNode) AppendInPort(port ...IPort) { em.InPort = append(em.InPort, port...) } -func (em *BaseExec) AppendOutPort(port ...IPort) { +func (em *innerExecNode) AppendOutPort(port ...IPort) { em.OutPort = append(em.OutPort, port...) } -func (em *BaseExec) GetName() string { +func (em *innerExecNode) GetName() string { return em.Name } -func (em *BaseExec) SetExec(exec IExec) { - em.IExec = exec +func (em *innerExecNode) SetExec(exec IExecNode) { + em.IExecNode = exec } -func (em *BaseExec) CloneInOutPort() ([]IPort, []IPort) { +func (em *innerExecNode) CloneInOutPort() ([]IPort, []IPort) { inPorts := make([]IPort, 0, 2) for _, port := range em.InPort { if port.IsPortExec() { @@ -99,14 +109,14 @@ func (em *BaseExec) CloneInOutPort() ([]IPort, []IPort) { return inPorts, outPorts } -func (em *BaseExec) IsInPortExec(index int) bool { +func (em *innerExecNode) IsInPortExec(index int) bool { if index >= len(em.InPort) || index < 0 { return false } return em.InPort[index].IsPortExec() } -func (em *BaseExec) IsOutPortExec(index int) bool { +func (em *innerExecNode) IsOutPortExec(index int) bool { if index >= len(em.OutPort) || index < 0 { return false } @@ -114,40 +124,37 @@ func (em *BaseExec) IsOutPortExec(index int) bool { return em.OutPort[index].IsPortExec() } -func (em *BaseExec) GetInPortCount() int { +func (em *innerExecNode) GetInPortCount() int { return len(em.InPort) } -func (em *BaseExec) GetInPort(index int) IPort { +func (em *innerExecNode) GetInPort(index int) IPort { if index >= len(em.InPort) || index < 0 { return nil } return em.InPort[index] } -func (em *BaseExec) GetOutPort(index int) IPort { +func (em *innerExecNode) GetOutPort(index int) IPort { if index >= len(em.OutPort) || index < 0 { return nil } return em.OutPort[index] } -type BaseExecNode struct { - *ExecContext - gr *graph - variableName string - nodeName string +func (en *BaseExecNode) initInnerExecNode(innerNode *innerExecNode) { + en.innerExecNode = innerNode } -func (en *BaseExecNode) initExecNode(gr *graph, nodeId string, variableName string, nodeName string) error { - ctx, ok := gr.context[nodeId] +func (en *BaseExecNode) initExecNode(gr *graph, node *execNode) error { + ctx, ok := gr.context[node.Id] if !ok { - return fmt.Errorf("node %s not found", nodeId) + return fmt.Errorf("node %s not found", node.Id) } + en.ExecContext = ctx en.gr = gr - en.variableName = variableName - en.nodeName = nodeName + en.execNode = node return nil } @@ -412,3 +419,28 @@ func (en *BaseExecNode) GetOutPortArrayLen(index int) int { } return port.GetArrayLen() } + +func (en *BaseExecNode) DoNext(index int) error { + // -1 表示中断运行 + if index == -1 { + return nil + } + + if index < 0 || index >= len(en.execNode.nextNode) { + return fmt.Errorf("next index %d not found", index) + } + + return en.execNode.nextNode[index].Do(en.gr) +} + +func (en *BaseExecNode) GetNextExecLen() int { + return len(en.execNode.nextNode) +} + +func (en *BaseExecNode) getInnerExecNode() IInnerExecNode { + innerNode, ok := en.execNode.execNode.(IInnerExecNode) + if ok { + return innerNode + } + return nil +} diff --git a/util/blueprint/execpool.go b/util/blueprint/execpool.go index d86d9cf..bb01a1c 100644 --- a/util/blueprint/execpool.go +++ b/util/blueprint/execpool.go @@ -5,16 +5,23 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" ) +// 格式说明Entrance_ID +const ( + Entrance = "Entrance_" +) + type ExecPool struct { - baseExecMap map[string]IBaseExec - execMap map[string]IBaseExec + innerExecNodeMap map[string]IInnerExecNode + execNodeMap map[string]IExecNode } func (em *ExecPool) Load(execDefFilePath string) error { - em.baseExecMap = make(map[string]IBaseExec, 512) + em.innerExecNodeMap = make(map[string]IInnerExecNode, 512) + em.execNodeMap = make(map[string]IExecNode, 512) // 检查路径是否存在 stat, err := os.Stat(execDefFilePath) @@ -69,19 +76,21 @@ func (em *ExecPool) processJSONFile(filePath string) error { } }(file) - var baseExecConfig BaseExecConfig + var baseExecConfig []BaseExecConfig decoder := json.NewDecoder(file) - if err := decoder.Decode(&baseExecConfig); err != nil { + if err = decoder.Decode(&baseExecConfig); err != nil { return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err) } - exec, err := em.createExecFromJSON(baseExecConfig) - if err != nil { - return err - } + for i := range baseExecConfig { + exec, err := em.createExecFromJSON(baseExecConfig[i]) + if err != nil { + return err + } - if !em.loadBaseExec(exec) { - return fmt.Errorf("exec %s already registered", exec.GetName()) + if !em.loadBaseExec(exec) { + return fmt.Errorf("exec %s already registered", exec.GetName()) + } } return nil @@ -104,9 +113,15 @@ func (em *ExecPool) createPortByDataType(nodeName, portName, dataType string) (I return nil, fmt.Errorf("invalid data type %s,node %s port %s", dataType, nodeName, portName) } -func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IBaseExec, error) { - var baseExec BaseExec - baseExec.Name = baseExecConfig.Name +func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExecNode, error) { + var baseExec innerExecNode + + entranceName, _, ok := getEntranceNodeNameAndID(baseExecConfig.Name) + if ok { + baseExec.Name = entranceName + } else { + baseExec.Name = baseExecConfig.Name + } baseExec.Title = baseExecConfig.Title baseExec.Package = baseExecConfig.Package baseExec.Description = baseExecConfig.Description @@ -114,7 +129,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IBaseExec // exec数量 inExecNum := 0 for index, input := range baseExecConfig.Inputs { - portType := strings.ToLower(input.DataType) + portType := strings.ToLower(input.PortType) if portType != Config_PortType_Exec && portType != Config_PortType_Data { return nil, fmt.Errorf("input %s data type %s not support", input.Name, input.DataType) } @@ -142,7 +157,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IBaseExec hasData := false for _, output := range baseExecConfig.Outputs { - portType := strings.ToLower(output.DataType) + portType := strings.ToLower(output.PortType) if portType != Config_PortType_Exec && portType != Config_PortType_Data { return nil, fmt.Errorf("output %s data type %s not support,node name %s", output.Name, output.DataType, baseExec.Name) } @@ -153,7 +168,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IBaseExec } if portType == Config_PortType_Exec { - baseExec.AppendInPort(NewPortExec()) + baseExec.AppendOutPort(NewPortExec()) continue } hasData = true @@ -167,32 +182,44 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IBaseExec return &baseExec, nil } -func (em *ExecPool) loadBaseExec(exec IBaseExec) bool { - if _, ok := em.baseExecMap[exec.GetName()]; ok { +func (em *ExecPool) loadBaseExec(exec IInnerExecNode) bool { + if _, ok := em.innerExecNodeMap[exec.GetName()]; ok { return false } - em.baseExecMap[exec.GetName()] = exec + em.innerExecNodeMap[exec.GetName()] = exec return true } -func (em *ExecPool) Register(exec IExec) bool { - baseExec, ok := exec.(IBaseExec) +func (em *ExecPool) Register(exec IExecNode) bool { + baseExec, ok := exec.(IExecNode) if !ok { return false } - if _, ok := em.execMap[baseExec.GetName()]; ok { + innerNode, ok := em.innerExecNodeMap[baseExec.GetName()] + if !ok { return false } - baseExec.SetExec(exec) - em.execMap[baseExec.GetName()] = baseExec + if _, ok := em.execNodeMap[innerNode.GetName()]; ok { + return false + } + + baseExecNode, ok := exec.(IBaseExecNode) + if !ok { + return false + } + + baseExecNode.initInnerExecNode(innerNode.(*innerExecNode)) + innerNode.SetExec(exec) + + em.execNodeMap[baseExec.GetName()] = baseExec return true } -func (em *ExecPool) GetExec(name string) IBaseExec { - if exec, ok := em.execMap[name]; ok { - return exec +func (em *ExecPool) GetExec(name string) IInnerExecNode { + if exec, ok := em.execNodeMap[name]; ok { + return exec.getInnerExecNode() } return nil } @@ -245,17 +272,24 @@ func (em *ExecPool) loadSysExec() error { } func (em *ExecPool) regGetVariables(typ string) error { - var baseExec BaseExec + var baseExec innerExecNode baseExec.Name = genGetVariablesNodeName(typ) outPort := NewPortByType(typ) + if outPort == nil { + return fmt.Errorf("invalid type %s", typ) + } baseExec.AppendOutPort(outPort) - baseExec.IExec = &GetVariablesNode{nodeName: baseExec.GetName()} + + var getVariablesNode GetVariablesNode + getVariablesNode.nodeName = baseExec.GetName() + //getVariablesNode.execNode = &baseExec + //baseExec.IExecNode = &getVariablesNode if !em.loadBaseExec(&baseExec) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) } - if !em.Register(baseExec.IExec) { + if !em.Register(&getVariablesNode) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) } @@ -271,7 +305,7 @@ func genGetVariablesNodeName(typ string) string { } func (em *ExecPool) regSetVariables(typ string) error { - var baseExec BaseExec + var baseExec innerExecNode baseExec.Name = genSetVariablesNodeName(typ) inPort := NewPortByType(typ) @@ -280,13 +314,31 @@ func (em *ExecPool) regSetVariables(typ string) error { baseExec.AppendInPort(inPort) baseExec.AppendOutPort(outPort) - baseExec.IExec = &SetVariablesNode{nodeName: baseExec.GetName()} + baseExec.IExecNode = &SetVariablesNode{nodeName: baseExec.GetName()} if !em.loadBaseExec(&baseExec) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) } - if !em.Register(baseExec.IExec) { + if !em.Register(baseExec.IExecNode) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) } return nil } + +func getEntranceNodeNameAndID(className string) (string, int64, bool) { + if !strings.HasPrefix(className, Entrance) { + return "", 0, false + } + + parts := strings.Split(className, "_") + if len(parts) != 3 { + return "", 0, false + } + + entranceID, err := strconv.Atoi(parts[2]) + if err != nil { + return "", 0, false + } + + return parts[0] + "_" + parts[1], int64(entranceID), true +} diff --git a/util/blueprint/graph.go b/util/blueprint/graph.go index c785ef4..7b6973f 100644 --- a/util/blueprint/graph.go +++ b/util/blueprint/graph.go @@ -14,10 +14,10 @@ type graph struct { } type nodeConfig struct { - Id string `json:"id"` - Class string `json:"class"` - Module string `json:"module"` - Pos []float64 `json:"pos"` + Id string `json:"id"` + Class string `json:"class"` + Module string `json:"module"` + //Pos []float64 `json:"pos"` PortDefault map[string]interface{} `json:"port_defaultv"` } diff --git a/util/blueprint/graphpool.go b/util/blueprint/graphpool.go index 1c3d95e..4d412f3 100644 --- a/util/blueprint/graphpool.go +++ b/util/blueprint/graphpool.go @@ -5,21 +5,17 @@ import ( "github.com/goccy/go-json" "os" "path/filepath" - "strconv" "strings" ) -// 格式说明Entrance_ID -const ( - Entrance = "Entrance_" -) - type GraphPool struct { mapGraphs map[string]graph execPool *ExecPool } -func (gp *GraphPool) Load(graphFilePath string) error { +func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error { + gp.execPool = execPool + // 检查路径是否存在 stat, err := os.Stat(graphFilePath) if err != nil { @@ -67,7 +63,11 @@ func (gp *GraphPool) processJSONFile(filePath string) error { if err != nil { return fmt.Errorf("failed to open file %s: %v", filePath, err) } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + fmt.Printf("关闭文件 %s 时出错: %v\n", filePath, err) + } + }() fileName := filepath.Base(filePath) ext := filepath.Ext(fileName) // 获取".html" @@ -84,25 +84,23 @@ func (gp *GraphPool) processJSONFile(filePath string) error { func (gp *GraphPool) prepareGraph(graphName string, graphConfig *graphConfig) error { // 找到所有的入口 for _, node := range graphConfig.Nodes { - if strings.HasPrefix(node.Class, Entrance) { - // 取得ID - id := strings.TrimPrefix(node.Class, Entrance) - entranceID, err := strconv.Atoi(id) - if err != nil { - return err - } - // 对入口进行预处理 - err = gp.prepareOneEntrance(graphName, int64(entranceID), &node, graphConfig) - if err != nil { - return err - } + _, entranceID, ok := getEntranceNodeNameAndID(node.Class) + if !ok { + continue } + + // 对入口进行预处理 + err := gp.prepareOneEntrance(graphName, entranceID, &node, graphConfig) + if err != nil { + return err + } + } return nil } -func (gp *GraphPool) getVarExec(nodeCfg *nodeConfig, graphConfig *graphConfig) (IBaseExec, string) { +func (gp *GraphPool) getVarExec(nodeCfg *nodeConfig, graphConfig *graphConfig) (IInnerExecNode, string) { // 是否为Get_或Set_开头 if strings.HasPrefix(nodeCfg.Class, "Get_") || strings.HasPrefix(nodeCfg.Class, "Set_") { return gp.execPool.GetExec(nodeCfg.Class), "" @@ -141,13 +139,13 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode, if exec == nil { exec, varName = gp.getVarExec(&node, graphConfig) if exec == nil { - return nil, fmt.Errorf("no exec found for node %s", node.Class) + return nil, fmt.Errorf("%s node has not been registered", node.Class) } } - + nodes[node.Id] = &execNode{ Id: node.Id, - baseExec: exec, + execNode: exec, preInPort: make([]*prePortNode, exec.GetInPortCount()), inPortDefaultValue: node.PortDefault, variableName: varName, @@ -160,7 +158,7 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode, func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { // 找到所有出口 var idx int - for ; nodeExec.baseExec.IsOutPortExec(idx); idx++ { + for ; nodeExec.execNode.IsOutPortExec(idx); idx++ { // 找到出口结点 nextExecNode := gp.findOutNextNode(graphConfig, mapNodeExec, nodeExec.Id, idx) nodeExec.nextNode = append(nodeExec.nextNode, nextExecNode) @@ -245,9 +243,9 @@ func (gp *GraphPool) findPreInPortNode(mapNodes map[string]*execNode, nodeExec * func (gp *GraphPool) preparePreInPortNode(mapNodes map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { // 找到当前结点的所有inPort的前一个端口 - for i := 0; i < nodeExec.baseExec.GetInPortCount(); i++ { + for i := 0; i < nodeExec.execNode.GetInPortCount(); i++ { // 如果是执行结点,则跳过 - if nodeExec.baseExec.IsInPortExec(i) { + if nodeExec.execNode.IsInPortExec(i) { continue } diff --git a/util/blueprint/node.go b/util/blueprint/node.go index d88f25a..eea8488 100644 --- a/util/blueprint/node.go +++ b/util/blueprint/node.go @@ -9,7 +9,7 @@ type prePortNode struct { type execNode struct { Id string - baseExec IBaseExec + execNode IInnerExecNode nextNode []*execNode nextIdx int @@ -37,19 +37,19 @@ func (en *execNode) Next() *execNode { return en.nextNode[en.nextIdx] } -func (en *execNode) exec(gr *graph) error { - e, ok := en.baseExec.(IExec) +func (en *execNode) exec(gr *graph) (int, error) { + e, ok := en.execNode.(IExecNode) if !ok { - return fmt.Errorf("exec node %s not exec", en.baseExec.GetName()) + return -1, fmt.Errorf("exec node %s not exec", en.execNode.GetName()) } - node, ok := en.baseExec.(IBaseExecNode) + node, ok := en.execNode.(IBaseExecNode) if !ok { - return fmt.Errorf("exec node %s not exec", en.baseExec.GetName()) + return -1, fmt.Errorf("exec node %s not exec", en.execNode.GetName()) } - if err := node.initExecNode(gr, en.Id, en.variableName, en.baseExec.GetName()); err != nil { - return err + if err := node.initExecNode(gr, en); err != nil { + return -1, err } return e.Exec() @@ -84,7 +84,7 @@ func (en *execNode) doSetInPort(gr *graph, index int, inPort IPort) error { func (en *execNode) Do(gr *graph) error { // 重新初始化上下文 - inPorts, outPorts := en.baseExec.CloneInOutPort() + inPorts, outPorts := en.execNode.CloneInOutPort() gr.context[en.Id] = &ExecContext{ InputPorts: inPorts, OutputPorts: outPorts, @@ -93,7 +93,7 @@ func (en *execNode) Do(gr *graph) error { // 处理InPort结点值 var err error for index := range inPorts { - if en.baseExec.IsInPortExec(index) { + if en.execNode.IsInPortExec(index) { continue } @@ -106,16 +106,18 @@ func (en *execNode) Do(gr *graph) error { // 设置执行器相关的上下文信息 // 如果是变量设置变量名 // 执行本结点 - if err = en.exec(gr); err != nil { + nextIndex, err := en.exec(gr) + if err != nil { return err } - for _, nextNode := range en.nextNode { - err = nextNode.Do(gr) - if err != nil { - return err - } + if nextIndex == -1 { + return nil } - return nil + if nextIndex < 0 || nextIndex >= len(en.nextNode) { + return fmt.Errorf("next index %d not found", nextIndex) + } + + return en.nextNode[nextIndex].Do(gr) } diff --git a/util/blueprint/port.go b/util/blueprint/port.go index 2e75af6..31110f4 100644 --- a/util/blueprint/port.go +++ b/util/blueprint/port.go @@ -278,13 +278,13 @@ func NewPortByType(typ string) IPort { switch typ { case Config_PortType_Exec: return NewPortExec() - case Config_DataType_Int: + case Config_DataType_Int, Config_DataType_Integer: return NewPortInt() case Config_DataType_Float: return NewPortFloat() case Config_DataType_Str: return NewPortStr() - case Config_DataType_Bool: + case Config_DataType_Bool, Config_DataType_Boolean: return NewPortBool() case Config_DataType_Array: return NewPortArray() diff --git a/util/blueprint/variables.go b/util/blueprint/variables.go index c3baa2d..dd7915b 100644 --- a/util/blueprint/variables.go +++ b/util/blueprint/variables.go @@ -21,33 +21,33 @@ func (g *GetVariablesNode) GetName() string { return g.nodeName } -func (g *GetVariablesNode) Exec() error { +func (g *GetVariablesNode) Exec() (int, error) { port := g.gr.variables[g.varName] if port == nil { - return fmt.Errorf("variable %s not found,node name %s", g.varName, g.nodeName) + return -1, fmt.Errorf("variable %s not found,node name %s", g.varName, g.nodeName) } if !g.SetOutPort(0, port) { - return fmt.Errorf("set out port failed,node name %s", g.nodeName) + return -1, fmt.Errorf("set out port failed,node name %s", g.nodeName) } - return nil + return 0, nil } func (g *SetVariablesNode) GetName() string { return g.nodeName } -func (g *SetVariablesNode) Exec() error { +func (g *SetVariablesNode) Exec() (int, error) { port := g.GetInPort(0) if port == nil { - return fmt.Errorf("get in port failed,node name %s", g.nodeName) + return -1, fmt.Errorf("get in port failed,node name %s", g.nodeName) } g.gr.variables[g.varName] = port if !g.SetOutPort(0, port) { - return fmt.Errorf("set out port failed,node name %s", g.nodeName) + return -1, fmt.Errorf("set out port failed,node name %s", g.nodeName) } - return nil + return 0, nil }