优化代码

This commit is contained in:
boyce
2025-09-23 10:20:03 +08:00
parent 3bcce31a86
commit 77e2986ffb
8 changed files with 181 additions and 37 deletions

View File

@@ -20,5 +20,10 @@ func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, onRegist
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func (bm *Blueprint) Create(graphName string) IGraph {
return bm.graphPool.Create(graphName)
}

View File

@@ -1,6 +1,7 @@
package blueprint package blueprint
import ( import (
"fmt"
"testing" "testing"
) )
@@ -16,11 +17,33 @@ func (em *Entrance_IntParam) Exec() (int, error) {
return 0, nil return 0, nil
} }
type Output struct {
BaseExecNode
}
func (em *Output) GetName() string {
return "Output"
}
func (em *Output) Exec() (int, error) {
val, ok := em.GetInPortInt(0)
if !ok {
return 0, fmt.Errorf("Output Exec inParam not found")
}
fmt.Printf("Output Exec inParam %d", val)
return 0, nil
}
func OnRegister(bm *ExecPool) error { func OnRegister(bm *ExecPool) error {
bm.Register(&Entrance_IntParam{}) bm.Register(&Entrance_IntParam{})
bm.Register(&Output{})
return nil return nil
} }
const (
EntranceID_IntParam = 3
)
func TestExecMgr(t *testing.T) { func TestExecMgr(t *testing.T) {
// //
var bp Blueprint var bp Blueprint
@@ -28,4 +51,13 @@ func TestExecMgr(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("init failed,err:%v", err) t.Fatalf("init failed,err:%v", err)
} }
graph := bp.Create("test1")
err = graph.Do(EntranceID_IntParam, 1, 2, 3)
if err != nil {
t.Fatalf("do failed,err:%v", err)
}
graph.Release()
} }

View File

@@ -4,7 +4,7 @@ import "fmt"
type IBaseExecNode interface { type IBaseExecNode interface {
initInnerExecNode(innerNode *innerExecNode) initInnerExecNode(innerNode *innerExecNode)
initExecNode(gr *graph, en *execNode) error initExecNode(gr *Graph, en *execNode) error
} }
type IInnerExecNode interface { type IInnerExecNode interface {
@@ -13,6 +13,7 @@ type IInnerExecNode interface {
IsInPortExec(index int) bool IsInPortExec(index int) bool
IsOutPortExec(index int) bool IsOutPortExec(index int) bool
GetInPortCount() int GetInPortCount() int
GetOutPortCount() int
CloneInOutPort() ([]IPort, []IPort) CloneInOutPort() ([]IPort, []IPort)
GetInPort(index int) IPort GetInPort(index int) IPort
@@ -43,7 +44,7 @@ type BaseExecNode struct {
// 执行时初始化的数据 // 执行时初始化的数据
*ExecContext *ExecContext
gr *graph gr *Graph
execNode *execNode execNode *execNode
} }
@@ -128,6 +129,10 @@ func (em *innerExecNode) GetInPortCount() int {
return len(em.InPort) return len(em.InPort)
} }
func (em *innerExecNode) GetOutPortCount() int {
return len(em.OutPort)
}
func (em *innerExecNode) GetInPort(index int) IPort { func (em *innerExecNode) GetInPort(index int) IPort {
if index >= len(em.InPort) || index < 0 { if index >= len(em.InPort) || index < 0 {
return nil return nil
@@ -146,7 +151,7 @@ func (en *BaseExecNode) initInnerExecNode(innerNode *innerExecNode) {
en.innerExecNode = innerNode en.innerExecNode = innerNode
} }
func (en *BaseExecNode) initExecNode(gr *graph, node *execNode) error { func (en *BaseExecNode) initExecNode(gr *Graph, node *execNode) error {
ctx, ok := gr.context[node.Id] ctx, ok := gr.context[node.Id]
if !ok { if !ok {
return fmt.Errorf("node %s not found", node.Id) return fmt.Errorf("node %s not found", node.Id)
@@ -159,6 +164,10 @@ func (en *BaseExecNode) initExecNode(gr *graph, node *execNode) error {
} }
func (en *BaseExecNode) GetInPort(index int) IPort { func (en *BaseExecNode) GetInPort(index int) IPort {
if en.InputPorts == nil {
return nil
}
if index >= len(en.InputPorts) || index < 0 { if index >= len(en.InputPorts) || index < 0 {
return nil return nil
} }
@@ -166,6 +175,9 @@ func (en *BaseExecNode) GetInPort(index int) IPort {
} }
func (en *BaseExecNode) GetOutPort(index int) IPort { func (en *BaseExecNode) GetOutPort(index int) IPort {
if en.OutputPorts == nil {
return nil
}
if index >= len(en.OutputPorts) || index < 0 { if index >= len(en.OutputPorts) || index < 0 {
return nil return nil
} }
@@ -438,9 +450,5 @@ func (en *BaseExecNode) GetNextExecLen() int {
} }
func (en *BaseExecNode) getInnerExecNode() IInnerExecNode { func (en *BaseExecNode) getInnerExecNode() IInnerExecNode {
innerNode, ok := en.execNode.execNode.(IInnerExecNode) return en.innerExecNode.IExecNode.(IInnerExecNode)
if ok {
return innerNode
}
return nil
} }

View File

@@ -247,9 +247,7 @@ func (em *ExecPool) loadSysExec() error {
if err = em.regGetVariables(Config_DataType_Array); err != nil { if err = em.regGetVariables(Config_DataType_Array); err != nil {
return err return err
} }
if err = em.regSetVariables(Config_DataType_Int); err != nil {
return err
}
if err = em.regSetVariables(Config_DataType_Integer); err != nil { if err = em.regSetVariables(Config_DataType_Integer); err != nil {
return err return err
} }
@@ -283,8 +281,6 @@ func (em *ExecPool) regGetVariables(typ string) error {
var getVariablesNode GetVariablesNode var getVariablesNode GetVariablesNode
getVariablesNode.nodeName = baseExec.GetName() getVariablesNode.nodeName = baseExec.GetName()
//getVariablesNode.execNode = &baseExec
//baseExec.IExecNode = &getVariablesNode
if !em.loadBaseExec(&baseExec) { if !em.loadBaseExec(&baseExec) {
return fmt.Errorf("exec %s already registered", baseExec.GetName()) return fmt.Errorf("exec %s already registered", baseExec.GetName())

View File

@@ -3,16 +3,24 @@ package blueprint
import "fmt" import "fmt"
type IGraph interface { type IGraph interface {
Do(entranceID int64) error Do(entranceID int64, args ...any) error
Release()
} }
type graph struct { type baseGraph struct {
entrance map[int64]*execNode // 入口
}
type Graph struct {
*baseGraph
graphContext
}
type graphContext struct {
context map[string]*ExecContext // 上下文 context map[string]*ExecContext // 上下文
entrance map[int64]*execNode // 入口
variables map[string]IPort // 变量 variables map[string]IPort // 变量
globalVariables map[string]IPort // 全局变量 globalVariables map[string]IPort // 全局变量
} }
type nodeConfig struct { type nodeConfig struct {
Id string `json:"id"` Id string `json:"id"`
Class string `json:"class"` Class string `json:"class"`
@@ -65,7 +73,7 @@ func (gc *graphConfig) GetNodeByID(nodeID string) *nodeConfig {
return nil return nil
} }
func (gr *graph) Do(entranceID int64) error { func (gr *Graph) Do(entranceID int64, args ...any) error {
entranceNode := gr.entrance[entranceID] entranceNode := gr.entrance[entranceID]
if entranceNode == nil { if entranceNode == nil {
return fmt.Errorf("entranceID:%d not found", entranceID) return fmt.Errorf("entranceID:%d not found", entranceID)
@@ -76,10 +84,10 @@ func (gr *graph) Do(entranceID int64) error {
gr.globalVariables = map[string]IPort{} gr.globalVariables = map[string]IPort{}
} }
return entranceNode.Do(gr) return entranceNode.Do(gr, args...)
} }
func (gr *graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort { func (gr *Graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort {
if ctx, ok := gr.context[nodeID]; ok { if ctx, ok := gr.context[nodeID]; ok {
if inPortIndex >= len(ctx.InputPorts) || inPortIndex < 0 { if inPortIndex >= len(ctx.InputPorts) || inPortIndex < 0 {
return nil return nil
@@ -90,7 +98,7 @@ func (gr *graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort {
return nil return nil
} }
func (gr *graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort { func (gr *Graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort {
if ctx, ok := gr.context[nodeID]; ok { if ctx, ok := gr.context[nodeID]; ok {
if outPortIndex >= len(ctx.OutputPorts) || outPortIndex < 0 { if outPortIndex >= len(ctx.OutputPorts) || outPortIndex < 0 {
return nil return nil
@@ -99,3 +107,10 @@ func (gr *graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort {
} }
return nil return nil
} }
func (gr *Graph) Release() {
// 有定时器关闭定时器
// 清理掉所有数据
*gr = Graph{}
}

View File

@@ -9,12 +9,13 @@ import (
) )
type GraphPool struct { type GraphPool struct {
mapGraphs map[string]graph mapGraphs map[string]*baseGraph
execPool *ExecPool execPool *ExecPool
} }
func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error { func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error {
gp.execPool = execPool gp.execPool = execPool
gp.mapGraphs = make(map[string]*baseGraph, 1024)
// 检查路径是否存在 // 检查路径是否存在
stat, err := os.Stat(graphFilePath) stat, err := os.Stat(graphFilePath)
@@ -54,7 +55,11 @@ func (gp *GraphPool) Create(graphName string) IGraph {
return nil return nil
} }
return &gr var graph Graph
graph.baseGraph = gr
graph.context = make(map[string]*ExecContext, 4)
return &graph
} }
func (gp *GraphPool) processJSONFile(filePath string) error { func (gp *GraphPool) processJSONFile(filePath string) error {
@@ -134,8 +139,13 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode,
nodes := make(map[string]*execNode) nodes := make(map[string]*execNode)
for _, node := range graphConfig.Nodes { for _, node := range graphConfig.Nodes {
var varName string var varName string
className := node.Class
if name, _, ok := getEntranceNodeNameAndID(className); ok {
className = name
}
// 获取不到node则获取变量node // 获取不到node则获取变量node
exec := gp.execPool.GetExec(node.Class) exec := gp.execPool.GetExec(className)
if exec == nil { if exec == nil {
exec, varName = gp.getVarExec(&node, graphConfig) exec, varName = gp.getVarExec(&node, graphConfig)
if exec == nil { if exec == nil {
@@ -155,19 +165,27 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode,
return nodes, nil return nodes, nil
} }
func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, recursion *int) error {
*recursion++
if *recursion > 100 {
return fmt.Errorf("recursion too deep")
}
// 找到所有出口 // 找到所有出口
var idx int var idx int
for ; nodeExec.execNode.IsOutPortExec(idx); idx++ { for ; nodeExec.execNode.IsOutPortExec(idx) && idx < nodeExec.execNode.GetOutPortCount(); idx++ {
// 找到出口结点 // 找到出口结点
nextExecNode := gp.findOutNextNode(graphConfig, mapNodeExec, nodeExec.Id, idx) nextExecNode := gp.findOutNextNode(graphConfig, mapNodeExec, nodeExec.Id, idx)
if nextExecNode == nil {
continue
}
nodeExec.nextNode = append(nodeExec.nextNode, nextExecNode) nodeExec.nextNode = append(nodeExec.nextNode, nextExecNode)
} }
// 将所有的next填充next // 将所有的next填充next
for _, nextOne := range nodeExec.nextNode { for _, nextOne := range nodeExec.nextNode {
// 对出口进行预处理 // 对出口进行预处理
err := gp.prepareOneNode(mapNodeExec, nextOne, graphConfig) err := gp.prepareOneNode(mapNodeExec, nextOne, graphConfig, recursion)
if err != nil { if err != nil {
return err return err
} }
@@ -201,7 +219,7 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node
return fmt.Errorf("entrance node %s not found", nodeCfg.Id) return fmt.Errorf("entrance node %s not found", nodeCfg.Id)
} }
err = gp.prepareOneNode(mapNodes, nodeExec, graphConfig) err = gp.prepareOneNode(mapNodes, nodeExec, graphConfig, new(int))
if err != nil { if err != nil {
return err return err
} }
@@ -212,12 +230,15 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node
return err return err
} }
var gr graph var gr baseGraph
gr.entrance = make(map[int64]*execNode, 16) gr.entrance = make(map[int64]*execNode, 16)
gr.context = make(map[string]*ExecContext, 16)
gr.entrance[entranceID] = nodeExec gr.entrance[entranceID] = nodeExec
gp.mapGraphs[graphName] = gr
if _, ok := gp.mapGraphs[graphName]; ok {
return fmt.Errorf("baseGraph %s already exists", graphName)
}
gp.mapGraphs[graphName] = &gr
return nil return nil
} }
@@ -270,6 +291,11 @@ func (gp *GraphPool) prepareInPort(mapNodeExec map[string]*execNode, nodeExec *e
if err != nil { if err != nil {
return err return err
} }
err = gp.prepareInPort(mapNodeExec, nextNode, graphConfig)
if err != nil {
return err
}
} }
return nil return nil

View File

@@ -37,7 +37,7 @@ func (en *execNode) Next() *execNode {
return en.nextNode[en.nextIdx] return en.nextNode[en.nextIdx]
} }
func (en *execNode) exec(gr *graph) (int, error) { func (en *execNode) exec(gr *Graph) (int, error) {
e, ok := en.execNode.(IExecNode) e, ok := en.execNode.(IExecNode)
if !ok { if !ok {
return -1, fmt.Errorf("exec node %s not exec", en.execNode.GetName()) return -1, fmt.Errorf("exec node %s not exec", en.execNode.GetName())
@@ -55,7 +55,7 @@ func (en *execNode) exec(gr *graph) (int, error) {
return e.Exec() return e.Exec()
} }
func (en *execNode) doSetInPort(gr *graph, index int, inPort IPort) error { func (en *execNode) doSetInPort(gr *Graph, index int, inPort IPort) error {
// 找到当前Node的InPort的index的前一个结点 // 找到当前Node的InPort的index的前一个结点
preNode := en.preInPort[index] preNode := en.preInPort[index]
// 如果前一个结点为空,则填充默认值 // 如果前一个结点为空,则填充默认值
@@ -79,10 +79,10 @@ func (en *execNode) doSetInPort(gr *graph, index int, inPort IPort) error {
} }
// 如果前一个结点没有执行过,则递归执行前一个结点 // 如果前一个结点没有执行过,则递归执行前一个结点
return preNode.node.Do(gr) return preNode.node.Do(gr, nil)
} }
func (en *execNode) Do(gr *graph) error { func (en *execNode) Do(gr *Graph, outPortArgs ...any) error {
// 重新初始化上下文 // 重新初始化上下文
inPorts, outPorts := en.execNode.CloneInOutPort() inPorts, outPorts := en.execNode.CloneInOutPort()
gr.context[en.Id] = &ExecContext{ gr.context[en.Id] = &ExecContext{
@@ -90,6 +90,16 @@ func (en *execNode) Do(gr *graph) error {
OutputPorts: outPorts, OutputPorts: outPorts,
} }
for i := 0; i < len(outPortArgs); i++ {
if i >= len(outPorts) {
return fmt.Errorf("args %d not found in node %s", i, en.execNode.GetName())
}
if err := outPorts[i].setAnyVale(outPortArgs[i]); err != nil {
return fmt.Errorf("args %d set value error: %w", i, err)
}
}
// 处理InPort结点值 // 处理InPort结点值
var err error var err error
for index := range inPorts { for index := range inPorts {
@@ -111,7 +121,7 @@ func (en *execNode) Do(gr *graph) error {
return err return err
} }
if nextIndex == -1 { if nextIndex == -1 || en.nextNode == nil {
return nil return nil
} }

View File

@@ -161,8 +161,53 @@ func (em *Port[T]) IsPortExec() bool {
return ok return ok
} }
func (em *Port[T]) convertInt64(v any) (int64, bool) {
switch v.(type) {
case int:
return int64(v.(int)), true
case int64:
return v.(int64), true
case int32:
return int64(v.(int32)), true
case int16:
return int64(v.(int16)), true
case int8:
return int64(v.(int8)), true
case uint64:
return int64(v.(uint64)), true
case uint32:
return int64(v.(uint32)), true
case uint16:
return int64(v.(uint16)), true
case uint8:
return int64(v.(uint8)), true
case uint:
return int64(v.(uint)), true
default:
return 0, false
}
}
func (em *Port[T]) setAnyVale(v any) error { func (em *Port[T]) setAnyVale(v any) error {
switch v.(type) { switch v.(type) {
case int, int64:
val, ok := em.convertInt64(v)
if !ok {
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
}
switch any(em.PortVal).(type) {
case Port_Int:
em.SetInt(val)
case Port_Float:
em.SetFloat(Port_Float(val))
case Port_Str:
em.SetStr(fmt.Sprintf("%d", int64(val)))
case Port_Bool:
em.SetBool(int64(val) != 0)
default:
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
}
case float64: case float64:
fV := v.(float64) fV := v.(float64)
switch any(em.PortVal).(type) { switch any(em.PortVal).(type) {
@@ -174,6 +219,8 @@ func (em *Port[T]) setAnyVale(v any) error {
em.SetStr(fmt.Sprintf("%d", int64(fV))) em.SetStr(fmt.Sprintf("%d", int64(fV)))
case Port_Bool: case Port_Bool:
em.SetBool(int64(fV) != 0) em.SetBool(int64(fV) != 0)
default:
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
} }
case string: case string:
strV := v.(string) strV := v.(string)
@@ -198,6 +245,8 @@ func (em *Port[T]) setAnyVale(v any) error {
return err return err
} }
em.SetBool(val) em.SetBool(val)
default:
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
} }
case bool: case bool:
strV := v.(bool) strV := v.(bool)
@@ -210,8 +259,11 @@ func (em *Port[T]) setAnyVale(v any) error {
return fmt.Errorf("port type is string, but value is %v", strV) return fmt.Errorf("port type is string, but value is %v", strV)
case Port_Bool: case Port_Bool:
em.SetBool(strV) em.SetBool(strV)
default:
return fmt.Errorf("port type is %T, but value is %v", em.PortVal, v)
} }
} }
return nil return nil
} }