From 027e83b70663e940973fddf6602131a76d036e02 Mon Sep 17 00:00:00 2001 From: boyce <6549168@qq.com> Date: Mon, 17 Nov 2025 10:55:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E4=BC=98=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 7 +++-- util/blueprint/blueprint.go | 12 +++++-- util/blueprint/blueprint_test.go | 5 +++ util/blueprint/exec.go | 27 +++++++++------- util/blueprint/execpool.go | 54 +++++++++++++++++++++----------- util/blueprint/graphpool.go | 12 ++++--- 6 files changed, 77 insertions(+), 40 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 319f484..3ac2359 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,13 +3,14 @@ package cluster import ( "errors" "fmt" + "reflect" + "strings" + "sync" + "github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/service" - "reflect" - "strings" - "sync" ) var configDir = "./config/" diff --git a/util/blueprint/blueprint.go b/util/blueprint/blueprint.go index d16de12..d9aa14a 100644 --- a/util/blueprint/blueprint.go +++ b/util/blueprint/blueprint.go @@ -56,26 +56,32 @@ func (bm *Blueprint) regSysNode() { } func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, blueprintModule IBlueprintModule, cancelTimer func(*uint64) bool) error { - bm.regSysNode() + // 加载配置结点生成名字对应的innerExecNode err := bm.execPool.Load(execDefFilePath) if err != nil { return err } + // 注册系统执行结点 + bm.regSysNode() + + // 将注册的实际执行结点与innerExecNode进行关联 for _, e := range bm.execNodes { if !bm.execPool.Register(e) { return fmt.Errorf("register exec failed,exec:%s", e.GetName()) } } + // 加载所有的vgf蓝图文件 err = bm.graphPool.Load(&bm.execPool, graphFilePath, blueprintModule) if err != nil { return err } - + bm.cancelTimer = cancelTimer bm.blueprintModule = blueprintModule bm.mapGraph = make(map[int64]IGraph, 128) + return nil } @@ -117,7 +123,7 @@ func (bm *Blueprint) ReleaseGraph(graphID int64) { if graphID == 0 { return } - + defer delete(bm.mapGraph, graphID) graph := bm.mapGraph[graphID] if graph == nil { diff --git a/util/blueprint/blueprint_test.go b/util/blueprint/blueprint_test.go index deb92cf..3e54b7a 100644 --- a/util/blueprint/blueprint_test.go +++ b/util/blueprint/blueprint_test.go @@ -5,6 +5,11 @@ import ( ) func TestExecMgr(t *testing.T) { + var bp Blueprint + err := bp.Init("E:\\WorkSpace\\c4\\OriginNodeEditor\\json", "E:\\WorkSpace\\c4\\OriginNodeEditor\\vgf", nil, nil) + if err != nil { + t.Fatalf("Init failed,err:%v", err) + } //graphTest2 := bp.Create("testForeach") //err = graphTest2.Do(EntranceID_IntParam, 1, 2, 3) diff --git a/util/blueprint/exec.go b/util/blueprint/exec.go index a4480e5..6373e77 100644 --- a/util/blueprint/exec.go +++ b/util/blueprint/exec.go @@ -2,15 +2,7 @@ package blueprint import "fmt" -type IBaseExecNode interface { - initInnerExecNode(innerNode *innerExecNode) - initExecNode(gr *Graph, en *execNode) error - GetPorts() ([]IPort, []IPort) - getExecNodeInfo() (*ExecContext, *execNode) - setExecNodeInfo(gr *ExecContext, en *execNode) - GetBlueprintModule() IBlueprintModule -} - +// IInnerExecNode 配置生成的结点 type IInnerExecNode interface { GetName() string SetExec(exec IExecNode) @@ -26,7 +18,19 @@ type IInnerExecNode interface { GetOutPortParamStartIndex() int } +// IBaseExecNode 实际注册的执行结点的基础结构体 +type IBaseExecNode interface { + initInnerExecNode(innerNode *innerExecNode) + initExecNode(gr *Graph, en *execNode) error + GetPorts() ([]IPort, []IPort) + getExecNodeInfo() (*ExecContext, *execNode) + setExecNodeInfo(gr *ExecContext, en *execNode) + GetBlueprintModule() IBlueprintModule +} + +// IExecNode 实际注册的执行结点 type IExecNode interface { + IBaseExecNode GetName() string DoNext(index int) error Exec() (int, error) // 返回后续执行的Node的Index @@ -34,6 +38,7 @@ type IExecNode interface { getInnerExecNode() IInnerExecNode } +// 配置对应的基础信息+端口数据 type innerExecNode struct { Name string Title string @@ -45,7 +50,7 @@ type innerExecNode struct { outPortParamStartIndex int // 输出参数的起始索引,用于排除执行出口 - IExecNode + IExecNode // 实际注册的执行结点 } type BaseExecNode struct { @@ -106,7 +111,6 @@ func (bc *BaseExecConfig) GetMaxOutPortId() int { return maxPortId } - func (em *innerExecNode) PrepareMaxInPortId(maxInPortId int) { em.inPort = make([]IPort, maxInPortId+1) } @@ -568,7 +572,6 @@ func (en *BaseExecNode) getInnerExecNode() IInnerExecNode { return en.innerExecNode.IExecNode.(IInnerExecNode) } - func (en *BaseExecNode) GetBlueprintModule() IBlueprintModule { if en.gr == nil { return nil diff --git a/util/blueprint/execpool.go b/util/blueprint/execpool.go index 25fa4d7..504e797 100644 --- a/util/blueprint/execpool.go +++ b/util/blueprint/execpool.go @@ -5,19 +5,19 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "strings" - "sort" ) -// 格式说明Entrance_ID +// Entrance 格式:Entrance_XXXX_ID const ( Entrance = "Entrance_" ) type ExecPool struct { - innerExecNodeMap map[string]IInnerExecNode - execNodeMap map[string]IExecNode + innerExecNodeMap map[string]IInnerExecNode // 所有配置对应的结点信息 + execNodeMap map[string]IExecNode // 实际注册的执行结点 } func (em *ExecPool) Load(execDefFilePath string) error { @@ -49,16 +49,19 @@ func (em *ExecPool) Load(execDefFilePath string) error { // 只处理JSON文件 if filepath.Ext(path) == ".json" { + // 将配置的结点初始化为innerExecNode将加入到innerExecNodeMap中 return em.processJSONFile(path) } return nil }) + if err != nil { return fmt.Errorf("failed to walk path %s: %v", execDefFilePath, err) } - return em.loadSysExec() + // 生成变量配置对应的配置结点GetVar_类型、SetVar_类型 + return em.regVariablesNode() } // 处理单个JSON文件 @@ -84,21 +87,24 @@ func (em *ExecPool) processJSONFile(filePath string) error { } for i := range baseExecConfig { - // 排序 + // 对PortId进行排序 sort.Slice(baseExecConfig[i].Inputs, func(left, right int) bool { return baseExecConfig[i].Inputs[left].PortId < baseExecConfig[i].Inputs[right].PortId }) - + // 对PortId进行排序 sort.Slice(baseExecConfig[i].Outputs, func(left, right int) bool { return baseExecConfig[i].Outputs[left].PortId < baseExecConfig[i].Outputs[right].PortId }) - exec, err := em.createExecFromJSON(baseExecConfig[i]) - if err != nil { - return err + // 根据配置的结点信息,创建innerExecNode + var execError error + exec, execError := em.createExecFromJSON(baseExecConfig[i]) + if execError != nil { + return execError } - if !em.loadBaseExec(exec) { + // 加载到innerExecNodeMap中 + if !em.addInnerExec(exec) { return fmt.Errorf("exec %s already registered", exec.GetName()) } } @@ -126,6 +132,7 @@ func (em *ExecPool) createPortByDataType(nodeName, portName, dataType string) (I func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExecNode, error) { var baseExec innerExecNode + // 如果是入口名,则按入口名Entrance_ArrayParam_000002生成结点名:Entrance_ArrayParam entranceName, _, ok := getEntranceNodeNameAndID(baseExecConfig.Name) if ok { baseExec.Name = entranceName @@ -138,7 +145,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe baseExec.PrepareMaxInPortId(baseExecConfig.GetMaxInPortId()) baseExec.PrepareMaxOutPortId(baseExecConfig.GetMaxOutPortId()) - // exec数量 + // 初始化所有的输入端口 inExecNum := 0 for index, input := range baseExecConfig.Inputs { portType := strings.ToLower(input.PortType) @@ -146,6 +153,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe return nil, fmt.Errorf("input %s data type %s not support", input.Name, input.DataType) } + // 输入执行结点只能有一个,且只能放在第一个 if portType == Config_PortType_Exec { if inExecNum > 0 { return nil, fmt.Errorf("inPort only allows one Execute,node name %s", baseExec.Name) @@ -155,19 +163,22 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe } inExecNum++ + // 设置执行端口 baseExec.SetInPortById(input.PortId, NewPortExec()) - // baseExec.AppendInPort(NewPortExec()) continue } + // 根据类型设置对应的端口 port, err := em.createPortByDataType(baseExec.Name, input.Name, input.DataType) if err != nil { return nil, err } + // 根据PortId设置端口 baseExec.SetInPortById(input.PortId, port) } + // 初始化所有的输出端口 hasData := false for _, output := range baseExecConfig.Outputs { portType := strings.ToLower(output.PortType) @@ -180,11 +191,13 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe return nil, fmt.Errorf("the exec port can only be placed at the front,node name %s", baseExec.Name) } + // 设置执行端口 if portType == Config_PortType_Exec { baseExec.SetOutPortById(output.PortId, NewPortExec()) continue } + // 根据类型设置数据端口 hasData = true port, err := em.createPortByDataType(baseExec.Name, output.Name, output.DataType) if err != nil { @@ -196,7 +209,7 @@ func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IInnerExe return &baseExec, nil } -func (em *ExecPool) loadBaseExec(exec IInnerExecNode) bool { +func (em *ExecPool) addInnerExec(exec IInnerExecNode) bool { if _, ok := em.innerExecNodeMap[exec.GetName()]; ok { return false } @@ -224,10 +237,14 @@ func (em *ExecPool) Register(exec IExecNode) bool { return false } + // 设置实际执行结点中innerExecNode变量,BaseExecNode.innerExecNode = innerNode baseExecNode.initInnerExecNode(innerNode.(*innerExecNode)) + + // innerNode设置实际的exec变量,innerExecNode.IExecNode = exec innerNode.SetExec(exec) - em.execNodeMap[baseExec.GetName()] = baseExec + // 将实际的执行结点保存到execNodeMap中 + em.execNodeMap[baseExec.GetName()] = exec return true } @@ -238,7 +255,8 @@ func (em *ExecPool) GetExec(name string) IInnerExecNode { return nil } -func (em *ExecPool) loadSysExec() error { +// regVariablesNode 注册变量结点GetVar_类型、SetVar_类型 +func (em *ExecPool) regVariablesNode() error { var err error if err = em.regGetVariables(Config_DataType_Int); err != nil { return err @@ -300,7 +318,7 @@ func (em *ExecPool) regGetVariables(typ string) error { var getVariablesNode GetVariablesNode getVariablesNode.nodeName = baseExec.GetName() - if !em.loadBaseExec(&baseExec) { + if !em.addInnerExec(&baseExec) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) } if !em.Register(&getVariablesNode) { @@ -336,7 +354,7 @@ func (em *ExecPool) regSetVariables(typ string) error { baseExec.SetOutPortById(1, outPort) baseExec.IExecNode = &SetVariablesNode{nodeName: baseExec.GetName()} - if !em.loadBaseExec(&baseExec) { + if !em.addInnerExec(&baseExec) { return fmt.Errorf("exec %s already registered", baseExec.GetName()) } if !em.Register(baseExec.IExecNode) { diff --git a/util/blueprint/graphpool.go b/util/blueprint/graphpool.go index 2e1a953..72a40d2 100644 --- a/util/blueprint/graphpool.go +++ b/util/blueprint/graphpool.go @@ -73,21 +73,25 @@ func (gp *GraphPool) processJSONFile(filePath string) error { if err != nil { return fmt.Errorf("failed to open file %s: %v", filePath, err) } + defer func() { - if err := file.Close(); err != nil { + if err = file.Close(); err != nil { fmt.Printf("关闭文件 %s 时出错: %v\n", filePath, err) } }() fileName := filepath.Base(filePath) - ext := filepath.Ext(fileName) // 获取".html" + ext := filepath.Ext(fileName) // 获取".vgf" name := strings.TrimSuffix(fileName, ext) // 获取"name" + + // 解析文件 var gConfig graphConfig decoder := json.NewDecoder(file) - if err := decoder.Decode(&gConfig); err != nil { + if err = decoder.Decode(&gConfig); err != nil { return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err) } + // 预处理蓝图 return gp.prepareGraph(name, &gConfig) } @@ -173,7 +177,7 @@ func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode, func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, recursion *int) error { *recursion++ - if *recursion > 100 { + if *recursion > 256 { return fmt.Errorf("recursion too deep") }