diff --git a/util/blueprint/blueprint.go b/util/blueprint/blueprint.go index 18aa262..50b15b2 100644 --- a/util/blueprint/blueprint.go +++ b/util/blueprint/blueprint.go @@ -20,5 +20,10 @@ func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, onRegist if err != nil { return err } + return nil } + +func (bm *Blueprint) Create(graphName string) IGraph { + return bm.graphPool.Create(graphName) +} diff --git a/util/blueprint/blueprint_test.go b/util/blueprint/blueprint_test.go index c9ce498..d424600 100644 --- a/util/blueprint/blueprint_test.go +++ b/util/blueprint/blueprint_test.go @@ -1,6 +1,7 @@ package blueprint import ( + "fmt" "testing" ) @@ -16,11 +17,33 @@ func (em *Entrance_IntParam) Exec() (int, error) { return 0, nil } +type Output struct { + BaseExecNode +} + +func (em *Output) GetName() string { + return "Output" +} +func (em *Output) Exec() (int, error) { + val, ok := em.GetInPortInt(0) + if !ok { + return 0, fmt.Errorf("Output Exec inParam not found") + } + + fmt.Printf("Output Exec inParam %d", val) + return 0, nil +} + func OnRegister(bm *ExecPool) error { bm.Register(&Entrance_IntParam{}) + bm.Register(&Output{}) return nil } +const ( + EntranceID_IntParam = 3 +) + func TestExecMgr(t *testing.T) { // var bp Blueprint @@ -28,4 +51,13 @@ func TestExecMgr(t *testing.T) { if err != nil { t.Fatalf("init failed,err:%v", err) } + + graph := bp.Create("test1") + + err = graph.Do(EntranceID_IntParam, 1, 2, 3) + if err != nil { + t.Fatalf("do failed,err:%v", err) + } + + graph.Release() } diff --git a/util/blueprint/exec.go b/util/blueprint/exec.go index 3e7c344..b2f8860 100644 --- a/util/blueprint/exec.go +++ b/util/blueprint/exec.go @@ -4,7 +4,7 @@ import "fmt" type IBaseExecNode interface { initInnerExecNode(innerNode *innerExecNode) - initExecNode(gr *graph, en *execNode) error + initExecNode(gr *Graph, en *execNode) error } type IInnerExecNode interface { @@ -13,6 +13,7 @@ type IInnerExecNode interface { IsInPortExec(index int) bool IsOutPortExec(index int) bool GetInPortCount() int + GetOutPortCount() int CloneInOutPort() ([]IPort, []IPort) GetInPort(index int) IPort @@ -43,7 +44,7 @@ type BaseExecNode struct { // 执行时初始化的数据 *ExecContext - gr *graph + gr *Graph execNode *execNode } @@ -128,6 +129,10 @@ func (em *innerExecNode) GetInPortCount() int { return len(em.InPort) } +func (em *innerExecNode) GetOutPortCount() int { + return len(em.OutPort) +} + func (em *innerExecNode) GetInPort(index int) IPort { if index >= len(em.InPort) || index < 0 { return nil @@ -146,7 +151,7 @@ func (en *BaseExecNode) initInnerExecNode(innerNode *innerExecNode) { en.innerExecNode = innerNode } -func (en *BaseExecNode) initExecNode(gr *graph, node *execNode) error { +func (en *BaseExecNode) initExecNode(gr *Graph, node *execNode) error { ctx, ok := gr.context[node.Id] if !ok { return fmt.Errorf("node %s not found", node.Id) @@ -159,6 +164,10 @@ func (en *BaseExecNode) initExecNode(gr *graph, node *execNode) error { } func (en *BaseExecNode) GetInPort(index int) IPort { + if en.InputPorts == nil { + return nil + } + if index >= len(en.InputPorts) || index < 0 { return nil } @@ -166,6 +175,9 @@ func (en *BaseExecNode) GetInPort(index int) IPort { } func (en *BaseExecNode) GetOutPort(index int) IPort { + if en.OutputPorts == nil { + return nil + } if index >= len(en.OutputPorts) || index < 0 { return nil } @@ -438,9 +450,5 @@ func (en *BaseExecNode) GetNextExecLen() int { } func (en *BaseExecNode) getInnerExecNode() IInnerExecNode { - innerNode, ok := en.execNode.execNode.(IInnerExecNode) - if ok { - return innerNode - } - return nil + return en.innerExecNode.IExecNode.(IInnerExecNode) } diff --git a/util/blueprint/execpool.go b/util/blueprint/execpool.go index bb01a1c..8466f22 100644 --- a/util/blueprint/execpool.go +++ b/util/blueprint/execpool.go @@ -247,9 +247,7 @@ func (em *ExecPool) loadSysExec() error { if err = em.regGetVariables(Config_DataType_Array); err != nil { return err } - if err = em.regSetVariables(Config_DataType_Int); err != nil { - return err - } + if err = em.regSetVariables(Config_DataType_Integer); err != nil { return err } @@ -283,8 +281,6 @@ func (em *ExecPool) regGetVariables(typ string) error { var getVariablesNode GetVariablesNode getVariablesNode.nodeName = baseExec.GetName() - //getVariablesNode.execNode = &baseExec - //baseExec.IExecNode = &getVariablesNode if !em.loadBaseExec(&baseExec) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) diff --git a/util/blueprint/graph.go b/util/blueprint/graph.go index 7b6973f..5333193 100644 --- a/util/blueprint/graph.go +++ b/util/blueprint/graph.go @@ -3,16 +3,24 @@ package blueprint import "fmt" type IGraph interface { - Do(entranceID int64) error + Do(entranceID int64, args ...any) error + Release() } -type graph struct { +type baseGraph struct { + entrance map[int64]*execNode // 入口 +} + +type Graph struct { + *baseGraph + graphContext +} + +type graphContext struct { context map[string]*ExecContext // 上下文 - entrance map[int64]*execNode // 入口 variables map[string]IPort // 变量 globalVariables map[string]IPort // 全局变量 } - type nodeConfig struct { Id string `json:"id"` Class string `json:"class"` @@ -65,7 +73,7 @@ func (gc *graphConfig) GetNodeByID(nodeID string) *nodeConfig { return nil } -func (gr *graph) Do(entranceID int64) error { +func (gr *Graph) Do(entranceID int64, args ...any) error { entranceNode := gr.entrance[entranceID] if entranceNode == nil { return fmt.Errorf("entranceID:%d not found", entranceID) @@ -76,10 +84,10 @@ func (gr *graph) Do(entranceID int64) error { gr.globalVariables = map[string]IPort{} } - return entranceNode.Do(gr) + return entranceNode.Do(gr, args...) } -func (gr *graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort { +func (gr *Graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort { if ctx, ok := gr.context[nodeID]; ok { if inPortIndex >= len(ctx.InputPorts) || inPortIndex < 0 { return nil @@ -90,7 +98,7 @@ func (gr *graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort { return nil } -func (gr *graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort { +func (gr *Graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort { if ctx, ok := gr.context[nodeID]; ok { if outPortIndex >= len(ctx.OutputPorts) || outPortIndex < 0 { return nil @@ -99,3 +107,10 @@ func (gr *graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort { } return nil } + +func (gr *Graph) Release() { + // 有定时器关闭定时器 + + // 清理掉所有数据 + *gr = Graph{} +} diff --git a/util/blueprint/graphpool.go b/util/blueprint/graphpool.go index 4d412f3..8d4124d 100644 --- a/util/blueprint/graphpool.go +++ b/util/blueprint/graphpool.go @@ -9,12 +9,13 @@ import ( ) type GraphPool struct { - mapGraphs map[string]graph + mapGraphs map[string]*baseGraph execPool *ExecPool } func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error { gp.execPool = execPool + gp.mapGraphs = make(map[string]*baseGraph, 1024) // 检查路径是否存在 stat, err := os.Stat(graphFilePath) @@ -53,8 +54,12 @@ func (gp *GraphPool) Create(graphName string) IGraph { if !ok { return nil } + + var graph Graph + graph.baseGraph = gr + graph.context = make(map[string]*ExecContext, 4) - return &gr + return &graph } func (gp *GraphPool) processJSONFile(filePath string) error { @@ -134,8 +139,13 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode, nodes := make(map[string]*execNode) for _, node := range graphConfig.Nodes { var varName string + className := node.Class + if name, _, ok := getEntranceNodeNameAndID(className); ok { + className = name + } + // 获取不到node,则获取变量node - exec := gp.execPool.GetExec(node.Class) + exec := gp.execPool.GetExec(className) if exec == nil { exec, varName = gp.getVarExec(&node, graphConfig) if exec == nil { @@ -155,19 +165,27 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode, return nodes, nil } -func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { +func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, recursion *int) error { + *recursion++ + if *recursion > 100 { + return fmt.Errorf("recursion too deep") + } + // 找到所有出口 var idx int - for ; nodeExec.execNode.IsOutPortExec(idx); idx++ { + for ; nodeExec.execNode.IsOutPortExec(idx) && idx < nodeExec.execNode.GetOutPortCount(); idx++ { // 找到出口结点 nextExecNode := gp.findOutNextNode(graphConfig, mapNodeExec, nodeExec.Id, idx) + if nextExecNode == nil { + continue + } nodeExec.nextNode = append(nodeExec.nextNode, nextExecNode) } // 将所有的next填充next for _, nextOne := range nodeExec.nextNode { // 对出口进行预处理 - err := gp.prepareOneNode(mapNodeExec, nextOne, graphConfig) + err := gp.prepareOneNode(mapNodeExec, nextOne, graphConfig, recursion) if err != nil { return err } @@ -201,7 +219,7 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node return fmt.Errorf("entrance node %s not found", nodeCfg.Id) } - err = gp.prepareOneNode(mapNodes, nodeExec, graphConfig) + err = gp.prepareOneNode(mapNodes, nodeExec, graphConfig, new(int)) if err != nil { return err } @@ -212,12 +230,15 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node return err } - var gr graph + var gr baseGraph gr.entrance = make(map[int64]*execNode, 16) - gr.context = make(map[string]*ExecContext, 16) - gr.entrance[entranceID] = nodeExec - gp.mapGraphs[graphName] = gr + + if _, ok := gp.mapGraphs[graphName]; ok { + return fmt.Errorf("baseGraph %s already exists", graphName) + } + + gp.mapGraphs[graphName] = &gr return nil } @@ -270,6 +291,11 @@ func (gp *GraphPool) prepareInPort(mapNodeExec map[string]*execNode, nodeExec *e if err != nil { return err } + + err = gp.prepareInPort(mapNodeExec, nextNode, graphConfig) + if err != nil { + return err + } } return nil diff --git a/util/blueprint/node.go b/util/blueprint/node.go index eea8488..aa53089 100644 --- a/util/blueprint/node.go +++ b/util/blueprint/node.go @@ -37,7 +37,7 @@ func (en *execNode) Next() *execNode { return en.nextNode[en.nextIdx] } -func (en *execNode) exec(gr *graph) (int, error) { +func (en *execNode) exec(gr *Graph) (int, error) { e, ok := en.execNode.(IExecNode) if !ok { return -1, fmt.Errorf("exec node %s not exec", en.execNode.GetName()) @@ -55,7 +55,7 @@ func (en *execNode) exec(gr *graph) (int, error) { return e.Exec() } -func (en *execNode) doSetInPort(gr *graph, index int, inPort IPort) error { +func (en *execNode) doSetInPort(gr *Graph, index int, inPort IPort) error { // 找到当前Node的InPort的index的前一个结点 preNode := en.preInPort[index] // 如果前一个结点为空,则填充默认值 @@ -79,10 +79,10 @@ func (en *execNode) doSetInPort(gr *graph, index int, inPort IPort) error { } // 如果前一个结点没有执行过,则递归执行前一个结点 - return preNode.node.Do(gr) + return preNode.node.Do(gr, nil) } -func (en *execNode) Do(gr *graph) error { +func (en *execNode) Do(gr *Graph, outPortArgs ...any) error { // 重新初始化上下文 inPorts, outPorts := en.execNode.CloneInOutPort() gr.context[en.Id] = &ExecContext{ @@ -90,6 +90,16 @@ func (en *execNode) Do(gr *graph) error { OutputPorts: outPorts, } + for i := 0; i < len(outPortArgs); i++ { + if i >= len(outPorts) { + return fmt.Errorf("args %d not found in node %s", i, en.execNode.GetName()) + } + + if err := outPorts[i].setAnyVale(outPortArgs[i]); err != nil { + return fmt.Errorf("args %d set value error: %w", i, err) + } + } + // 处理InPort结点值 var err error for index := range inPorts { @@ -111,7 +121,7 @@ func (en *execNode) Do(gr *graph) error { return err } - if nextIndex == -1 { + if nextIndex == -1 || en.nextNode == nil { return nil } diff --git a/util/blueprint/port.go b/util/blueprint/port.go index 31110f4..e8e892b 100644 --- a/util/blueprint/port.go +++ b/util/blueprint/port.go @@ -161,8 +161,53 @@ func (em *Port[T]) IsPortExec() bool { return ok } +func (em *Port[T]) convertInt64(v any) (int64, bool) { + switch v.(type) { + case int: + return int64(v.(int)), true + case int64: + return v.(int64), true + case int32: + return int64(v.(int32)), true + case int16: + return int64(v.(int16)), true + case int8: + return int64(v.(int8)), true + case uint64: + return int64(v.(uint64)), true + case uint32: + return int64(v.(uint32)), true + case uint16: + return int64(v.(uint16)), true + case uint8: + return int64(v.(uint8)), true + case uint: + return int64(v.(uint)), true + default: + return 0, false + } +} + func (em *Port[T]) setAnyVale(v any) error { switch v.(type) { + case int, int64: + val, ok := em.convertInt64(v) + if !ok { + return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v) + } + + switch any(em.PortVal).(type) { + case Port_Int: + em.SetInt(val) + case Port_Float: + em.SetFloat(Port_Float(val)) + case Port_Str: + em.SetStr(fmt.Sprintf("%d", int64(val))) + case Port_Bool: + em.SetBool(int64(val) != 0) + default: + return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v) + } case float64: fV := v.(float64) switch any(em.PortVal).(type) { @@ -174,6 +219,8 @@ func (em *Port[T]) setAnyVale(v any) error { em.SetStr(fmt.Sprintf("%d", int64(fV))) case Port_Bool: em.SetBool(int64(fV) != 0) + default: + return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v) } case string: strV := v.(string) @@ -198,6 +245,8 @@ func (em *Port[T]) setAnyVale(v any) error { return err } em.SetBool(val) + default: + return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v) } case bool: strV := v.(bool) @@ -210,8 +259,11 @@ func (em *Port[T]) setAnyVale(v any) error { return fmt.Errorf("port type is string, but value is %v", strV) case Port_Bool: em.SetBool(strV) + default: + return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v) } } + return nil }