diff --git a/node/node.go b/node/node.go index c46ff49..b8f8932 100644 --- a/node/node.go +++ b/node/node.go @@ -49,7 +49,7 @@ func init() { signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, SingleStop, SignalRetire) console.RegisterCommandBool("help", false, "<-help> This help.", usage) - console.RegisterCommandString("name", "", "<-name nodeName> Node's name.", setName) + console.RegisterCommandString("name", "", "<-name nodeName> node's name.", setName) console.RegisterCommandString("start", "", "<-start nodeid=nodeid> Run originserver.", startNode) console.RegisterCommandString("stop", "", "<-stop nodeid=nodeid> Stop originserver process.", stopNode) console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode) diff --git a/util/blueprint/blueprint.go b/util/blueprint/blueprint.go new file mode 100644 index 0000000..fab1fc9 --- /dev/null +++ b/util/blueprint/blueprint.go @@ -0,0 +1,11 @@ +package blueprint + +type Blueprint struct { + execPool ExecPool + graphPool GraphPool +} + +func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string) { + bm.execPool.Load(execDefFilePath) + bm.graphPool.Load(graphFilePath) +} diff --git a/util/blueprint/blueprint_test.go b/util/blueprint/blueprint_test.go new file mode 100644 index 0000000..75d5136 --- /dev/null +++ b/util/blueprint/blueprint_test.go @@ -0,0 +1,9 @@ +package blueprint + +import ( + "testing" +) + +func TestExecMgr(t *testing.T) { + +} diff --git a/util/blueprint/context.go b/util/blueprint/context.go new file mode 100644 index 0000000..bd4b5cd --- /dev/null +++ b/util/blueprint/context.go @@ -0,0 +1,10 @@ +package blueprint + +type ExecContext struct { + InputPorts []IPort + OutputPorts []IPort +} + +func (ec *ExecContext) Reset() { + *ec = ExecContext{} +} diff --git a/util/blueprint/exec.go b/util/blueprint/exec.go new file mode 100644 index 0000000..e68161a --- /dev/null +++ b/util/blueprint/exec.go @@ -0,0 +1,414 @@ +package blueprint + +import "fmt" + +type IBaseExecNode interface { + initExecNode(gr *graph, nodeId string, variableName string, nodeName string) error +} + +type IBaseExec interface { + GetName() string + SetExec(exec IExec) + IsInPortExec(index int) bool + IsOutPortExec(index int) bool + GetInPortCount() int + CloneInOutPort() ([]IPort, []IPort) + + GetInPort(index int) IPort + GetOutPort(index int) IPort +} + +type IExec interface { + GetName() string + Exec() error +} + +type IExecData interface { +} + +type BaseExec struct { + Name string + Title string + Package string + Description string + + InPort []IPort + OutPort []IPort + IExec +} + +type InputConfig struct { + Name string `json:"name"` + PortType string `json:"type"` + DataType string `json:"data_type"` + HasInput bool `json:"has_input"` + PinWidget string `json:"pin_widget"` +} + +type OutInputConfig struct { + Name string `json:"name"` + PortType string `json:"type"` + DataType string `json:"data_type"` + HasInput bool `json:"has_input"` +} + +type BaseExecConfig struct { + Name string `json:"name"` + Title string `json:"title"` + Package string `json:"package"` + Description string `json:"description"` + IsPure bool `json:"is_pure"` + Inputs []InputConfig `json:"inputs"` + Outputs []OutInputConfig `json:"outputs"` +} + +func (em *BaseExec) AppendInPort(port ...IPort) { + em.InPort = append(em.InPort, port...) +} + +func (em *BaseExec) AppendOutPort(port ...IPort) { + em.OutPort = append(em.OutPort, port...) +} + +func (em *BaseExec) GetName() string { + return em.Name +} + +func (em *BaseExec) SetExec(exec IExec) { + em.IExec = exec +} + +func (em *BaseExec) CloneInOutPort() ([]IPort, []IPort) { + inPorts := make([]IPort, 0, 2) + for _, port := range em.InPort { + if port.IsPortExec() { + continue + } + + inPorts = append(inPorts, port.Clone()) + } + outPorts := make([]IPort, 0, 2) + + for _, port := range em.OutPort { + if port.IsPortExec() { + continue + } + outPorts = append(outPorts, port.Clone()) + } + + return inPorts, outPorts +} + +func (em *BaseExec) IsInPortExec(index int) bool { + if index >= len(em.InPort) || index < 0 { + return false + } + + return em.InPort[index].IsPortExec() +} +func (em *BaseExec) IsOutPortExec(index int) bool { + if index >= len(em.OutPort) || index < 0 { + return false + } + + return em.OutPort[index].IsPortExec() +} + +func (em *BaseExec) GetInPortCount() int { + return len(em.InPort) +} + +func (em *BaseExec) GetInPort(index int) IPort { + if index >= len(em.InPort) || index < 0 { + return nil + } + return em.InPort[index] +} + +func (em *BaseExec) GetOutPort(index int) IPort { + if index >= len(em.OutPort) || index < 0 { + return nil + } + return em.OutPort[index] +} + +type BaseExecNode struct { + *ExecContext + gr *graph + variableName string + nodeName string +} + +func (en *BaseExecNode) initExecNode(gr *graph, nodeId string, variableName string, nodeName string) error { + ctx, ok := gr.context[nodeId] + if !ok { + return fmt.Errorf("node %s not found", nodeId) + } + en.ExecContext = ctx + en.gr = gr + en.variableName = variableName + en.nodeName = nodeName + return nil +} + +func (en *BaseExecNode) GetInPort(index int) IPort { + if index >= len(en.InputPorts) || index < 0 { + return nil + } + return en.InputPorts[index] +} + +func (en *BaseExecNode) GetOutPort(index int) IPort { + if index >= len(en.OutputPorts) || index < 0 { + return nil + } + return en.OutputPorts[index] +} + +func (en *BaseExecNode) SetOutPort(index int, val IPort) bool { + if index >= len(en.OutputPorts) || index < 0 { + return false + } + en.OutputPorts[index] = val + return true +} + +func (en *BaseExecNode) GetInPortInt(index int) (Port_Int, bool) { + port := en.GetInPort(index) + if port == nil { + return 0, false + } + return port.GetInt() +} + +func (en *BaseExecNode) GetInPortFloat(index int) (Port_Float, bool) { + port := en.GetInPort(index) + if port == nil { + return 0, false + } + return port.GetFloat() +} + +func (en *BaseExecNode) GetInPortStr(index int) (Port_Str, bool) { + port := en.GetInPort(index) + if port == nil { + return "", false + } + return port.GetStr() +} + +func (en *BaseExecNode) GetInPortArrayValInt(index int, idx int) (Port_Int, bool) { + port := en.GetInPort(index) + if port == nil { + return 0, false + } + return port.GetArrayValInt(idx) +} + +func (en *BaseExecNode) GetInPortArrayValStr(idx int) (Port_Str, bool) { + port := en.GetInPort(idx) + if port == nil { + return "", false + } + return port.GetArrayValStr(idx) +} + +func (en *BaseExecNode) GetInPortBool(index int) (Port_Bool, bool) { + port := en.GetInPort(index) + if port == nil { + return false, false + } + return port.GetBool() +} + +func (en *BaseExecNode) GetOutPortInt(index int) (Port_Int, bool) { + port := en.GetOutPort(index) + if port == nil { + return 0, false + } + return port.GetInt() +} + +func (en *BaseExecNode) GetOutPortFloat(index int) (Port_Float, bool) { + port := en.GetOutPort(index) + if port == nil { + return 0, false + } + return port.GetFloat() +} + +func (en *BaseExecNode) GetOutPortStr(index int) (Port_Str, bool) { + port := en.GetOutPort(index) + if port == nil { + return "", false + } + return port.GetStr() +} + +func (en *BaseExecNode) GetOutPortArrayValInt(index int, idx int) (Port_Int, bool) { + port := en.GetOutPort(index) + if port == nil { + return 0, false + } + return port.GetArrayValInt(idx) +} + +func (en *BaseExecNode) GetOutPortArrayValStr(index int, idx int) (Port_Str, bool) { + port := en.GetOutPort(index) + if port == nil { + return "", false + } + return port.GetArrayValStr(idx) +} + +func (en *BaseExecNode) GetOutPortBool(index int) (Port_Bool, bool) { + port := en.GetInPort(index) + if port == nil { + return false, false + } + return port.GetBool() +} + +func (en *BaseExecNode) SetInPortInt(index int, val Port_Int) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.SetInt(val) +} + +func (en *BaseExecNode) SetInPortFloat(index int, val Port_Float) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.SetFloat(val) +} + +func (en *BaseExecNode) SetInPortStr(index int, val Port_Str) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.SetStr(val) +} + +func (en *BaseExecNode) SetInBool(index int, val Port_Bool) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.SetBool(val) +} + +func (en *BaseExecNode) SetInPortArrayValInt(index int, idx int, val Port_Int) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.SetArrayValInt(idx, val) +} + +func (en *BaseExecNode) SetInPortArrayValStr(index int, idx int, val Port_Str) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.SetArrayValStr(idx, val) +} + +func (en *BaseExecNode) AppendInPortArrayValInt(index int, val Port_Int) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.AppendArrayValInt(val) +} + +func (en *BaseExecNode) AppendInPortArrayValStr(index int, val Port_Str) bool { + port := en.GetInPort(index) + if port == nil { + return false + } + return port.AppendArrayValStr(val) +} + +func (en *BaseExecNode) GetInPortArrayLen(index int) int { + port := en.GetInPort(index) + if port == nil { + return 0 + } + return port.GetArrayLen() +} + +func (en *BaseExecNode) SetOutPortInt(index int, val Port_Int) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.SetInt(val) +} + +func (en *BaseExecNode) SetOutPortFloat(index int, val Port_Float) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.SetFloat(val) +} + +func (en *BaseExecNode) SetOutPortStr(index int, val Port_Str) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.SetStr(val) +} + +func (en *BaseExecNode) SetOutPortBool(index int, val Port_Bool) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.SetBool(val) +} + +func (en *BaseExecNode) SetOutPortArrayValInt(index int, idx int, val Port_Int) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.SetArrayValInt(idx, val) +} + +func (en *BaseExecNode) SetOutPortArrayValStr(index int, idx int, val Port_Str) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.SetArrayValStr(idx, val) +} + +func (en *BaseExecNode) AppendOutPortArrayValInt(index int, val Port_Int) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.AppendArrayValInt(val) +} + +func (en *BaseExecNode) AppendOutPortArrayValStr(index int, val Port_Str) bool { + port := en.GetOutPort(index) + if port == nil { + return false + } + return port.AppendArrayValStr(val) +} + +func (en *BaseExecNode) GetOutPortArrayLen(index int) int { + port := en.GetOutPort(index) + if port == nil { + return 0 + } + return port.GetArrayLen() +} diff --git a/util/blueprint/execpool.go b/util/blueprint/execpool.go new file mode 100644 index 0000000..d86d9cf --- /dev/null +++ b/util/blueprint/execpool.go @@ -0,0 +1,292 @@ +package blueprint + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" +) + +type ExecPool struct { + baseExecMap map[string]IBaseExec + execMap map[string]IBaseExec +} + +func (em *ExecPool) Load(execDefFilePath string) error { + em.baseExecMap = make(map[string]IBaseExec, 512) + + // 检查路径是否存在 + stat, err := os.Stat(execDefFilePath) + if err != nil { + return fmt.Errorf("failed to access path %s: %v", execDefFilePath, err) + } + + // 如果是单个文件,直接处理 + if !stat.IsDir() { + return fmt.Errorf("%s is not a directory", execDefFilePath) + } + + // 遍历目录及其子目录 + err = filepath.Walk(execDefFilePath, func(path string, info os.FileInfo, err error) error { + if err != nil { + fmt.Printf("访问路径出错 %s: %v\n", path, err) + return nil // 继续遍历其他文件 + } + + // 如果是目录,继续遍历 + if info.IsDir() { + return nil + } + + // 只处理JSON文件 + if filepath.Ext(path) == ".json" { + return em.processJSONFile(path) + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to walk path %s: %v", execDefFilePath, err) + } + + return em.loadSysExec() +} + +// 处理单个JSON文件 +func (em *ExecPool) processJSONFile(filePath string) error { + // 打开文件 + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file %s: %v", filePath, err) + } + + defer func(file *os.File) { + err = file.Close() + if err != nil { + fmt.Printf("failed to close file %s: %v\n", filePath, err) + return + } + }(file) + + var baseExecConfig BaseExecConfig + decoder := json.NewDecoder(file) + if err := decoder.Decode(&baseExecConfig); err != nil { + return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err) + } + + exec, err := em.createExecFromJSON(baseExecConfig) + if err != nil { + return err + } + + if !em.loadBaseExec(exec) { + return fmt.Errorf("exec %s already registered", exec.GetName()) + } + + return nil +} + +func (em *ExecPool) createPortByDataType(nodeName, portName, dataType string) (IPort, error) { + switch strings.ToLower(dataType) { + case Config_DataType_Int, Config_DataType_Integer: + return NewPortInt(), nil + case Config_DataType_Float: + return NewPortFloat(), nil + case Config_DataType_Str: + return NewPortStr(), nil + case Config_DataType_Boolean, Config_DataType_Bool: + return NewPortBool(), nil + case Config_DataType_Array: + return NewPortArray(), nil + } + + return nil, fmt.Errorf("invalid data type %s,node %s port %s", dataType, nodeName, portName) +} + +func (em *ExecPool) createExecFromJSON(baseExecConfig BaseExecConfig) (IBaseExec, error) { + var baseExec BaseExec + baseExec.Name = baseExecConfig.Name + baseExec.Title = baseExecConfig.Title + baseExec.Package = baseExecConfig.Package + baseExec.Description = baseExecConfig.Description + + // exec数量 + inExecNum := 0 + for index, input := range baseExecConfig.Inputs { + portType := strings.ToLower(input.DataType) + if portType != Config_PortType_Exec && portType != Config_PortType_Data { + return nil, fmt.Errorf("input %s data type %s not support", input.Name, input.DataType) + } + + if portType == Config_PortType_Exec { + if inExecNum > 0 { + return nil, fmt.Errorf("inPort only allows one Execute,node name %s", baseExec.Name) + } + if index > 0 { + return nil, fmt.Errorf("the exec port is only allowed to be placed on the first one,node name %s", baseExec.Name) + } + + inExecNum++ + baseExec.AppendInPort(NewPortExec()) + continue + } + + port, err := em.createPortByDataType(baseExec.Name, input.Name, input.DataType) + if err != nil { + return nil, err + } + + baseExec.AppendInPort(port) + } + + hasData := false + for _, output := range baseExecConfig.Outputs { + portType := strings.ToLower(output.DataType) + if portType != Config_PortType_Exec && portType != Config_PortType_Data { + return nil, fmt.Errorf("output %s data type %s not support,node name %s", output.Name, output.DataType, baseExec.Name) + } + + // Exec出口只能先Exec,再Data,不能穿插,如果是Data类型,但遇到Exec入口则不允许 + if hasData && portType == Config_PortType_Exec { + return nil, fmt.Errorf("the exec port can only be placed at the front,node name %s", baseExec.Name) + } + + if portType == Config_PortType_Exec { + baseExec.AppendInPort(NewPortExec()) + continue + } + hasData = true + port, err := em.createPortByDataType(baseExec.Name, output.Name, output.DataType) + if err != nil { + return nil, err + } + + baseExec.AppendOutPort(port) + } + return &baseExec, nil +} + +func (em *ExecPool) loadBaseExec(exec IBaseExec) bool { + if _, ok := em.baseExecMap[exec.GetName()]; ok { + return false + } + em.baseExecMap[exec.GetName()] = exec + return true +} + +func (em *ExecPool) Register(exec IExec) bool { + baseExec, ok := exec.(IBaseExec) + if !ok { + return false + } + + if _, ok := em.execMap[baseExec.GetName()]; ok { + return false + } + + baseExec.SetExec(exec) + em.execMap[baseExec.GetName()] = baseExec + return true +} + +func (em *ExecPool) GetExec(name string) IBaseExec { + if exec, ok := em.execMap[name]; ok { + return exec + } + return nil +} + +func (em *ExecPool) loadSysExec() error { + var err error + if err = em.regGetVariables(Config_DataType_Int); err != nil { + return err + } + if err = em.regGetVariables(Config_DataType_Integer); err != nil { + return err + } + if err = em.regGetVariables(Config_DataType_Float); err != nil { + return err + } + if err = em.regGetVariables(Config_DataType_Str); err != nil { + return err + } + if err = em.regGetVariables(Config_DataType_Boolean); err != nil { + return err + } + if err = em.regGetVariables(Config_DataType_Bool); err != nil { + return err + } + if err = em.regGetVariables(Config_DataType_Array); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Int); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Integer); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Float); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Str); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Boolean); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Bool); err != nil { + return err + } + if err = em.regSetVariables(Config_DataType_Array); err != nil { + return err + } + return nil +} + +func (em *ExecPool) regGetVariables(typ string) error { + var baseExec BaseExec + baseExec.Name = genGetVariablesNodeName(typ) + + outPort := NewPortByType(typ) + baseExec.AppendOutPort(outPort) + baseExec.IExec = &GetVariablesNode{nodeName: baseExec.GetName()} + + if !em.loadBaseExec(&baseExec) { + return fmt.Errorf("exec %s already registered", baseExec.GetName()) + } + if !em.Register(baseExec.IExec) { + return fmt.Errorf("exec %s already registered", baseExec.GetName()) + } + + return nil +} + +func genSetVariablesNodeName(typ string) string { + return fmt.Sprintf("%s_%s", SetVariables, typ) +} + +func genGetVariablesNodeName(typ string) string { + return fmt.Sprintf("%s_%s", GetVariables, typ) +} + +func (em *ExecPool) regSetVariables(typ string) error { + var baseExec BaseExec + baseExec.Name = genSetVariablesNodeName(typ) + + inPort := NewPortByType(typ) + outPort := NewPortByType(typ) + + baseExec.AppendInPort(inPort) + baseExec.AppendOutPort(outPort) + + baseExec.IExec = &SetVariablesNode{nodeName: baseExec.GetName()} + if !em.loadBaseExec(&baseExec) { + return fmt.Errorf("exec %s already registered", baseExec.GetName()) + } + if !em.Register(baseExec.IExec) { + return fmt.Errorf("exec %s already registered", baseExec.GetName()) + } + + return nil +} diff --git a/util/blueprint/graph.go b/util/blueprint/graph.go new file mode 100644 index 0000000..c785ef4 --- /dev/null +++ b/util/blueprint/graph.go @@ -0,0 +1,101 @@ +package blueprint + +import "fmt" + +type IGraph interface { + Do(entranceID int64) error +} + +type graph struct { + context map[string]*ExecContext // 上下文 + entrance map[int64]*execNode // 入口 + variables map[string]IPort // 变量 + globalVariables map[string]IPort // 全局变量 +} + +type nodeConfig struct { + Id string `json:"id"` + Class string `json:"class"` + Module string `json:"module"` + Pos []float64 `json:"pos"` + PortDefault map[string]interface{} `json:"port_defaultv"` +} + +type edgeConfig struct { + EdgeID string `json:"edge_id"` + SourceNodeID string `json:"source_node_id"` + DesNodeId string `json:"des_node_id"` + + SourcePortIndex int `json:"source_port_index"` + DesPortIndex int `json:"des_port_index"` +} + +type variablesConfig struct { + Name string `json:"name"` + Type string `json:"type"` + Value string `json:"value"` +} + +type graphConfig struct { + GraphName string `json:"graph_name"` + Time string `json:"time"` + + Nodes []nodeConfig `json:"nodes"` + Edges []edgeConfig `json:"edges"` + Variables []variablesConfig `json:"variables"` +} + +func (gc *graphConfig) GetVariablesByName(varName string) *variablesConfig { + for _, varCfg := range gc.Variables { + if varCfg.Name == varName { + return &varCfg + } + } + + return nil +} + +func (gc *graphConfig) GetNodeByID(nodeID string) *nodeConfig { + for _, node := range gc.Nodes { + if node.Id == nodeID { + return &node + } + } + + return nil +} + +func (gr *graph) Do(entranceID int64) error { + entranceNode := gr.entrance[entranceID] + if entranceNode == nil { + return fmt.Errorf("entranceID:%d not found", entranceID) + } + + gr.variables = map[string]IPort{} + if gr.globalVariables == nil { + gr.globalVariables = map[string]IPort{} + } + + return entranceNode.Do(gr) +} + +func (gr *graph) GetNodeInPortValue(nodeID string, inPortIndex int) IPort { + if ctx, ok := gr.context[nodeID]; ok { + if inPortIndex >= len(ctx.InputPorts) || inPortIndex < 0 { + return nil + } + + return ctx.InputPorts[inPortIndex] + } + return nil +} + +func (gr *graph) GetNodeOutPortValue(nodeID string, outPortIndex int) IPort { + if ctx, ok := gr.context[nodeID]; ok { + if outPortIndex >= len(ctx.OutputPorts) || outPortIndex < 0 { + return nil + } + return ctx.OutputPorts[outPortIndex] + } + return nil +} diff --git a/util/blueprint/graphpool.go b/util/blueprint/graphpool.go new file mode 100644 index 0000000..1c3d95e --- /dev/null +++ b/util/blueprint/graphpool.go @@ -0,0 +1,278 @@ +package blueprint + +import ( + "fmt" + "github.com/goccy/go-json" + "os" + "path/filepath" + "strconv" + "strings" +) + +// 格式说明Entrance_ID +const ( + Entrance = "Entrance_" +) + +type GraphPool struct { + mapGraphs map[string]graph + execPool *ExecPool +} + +func (gp *GraphPool) Load(graphFilePath string) error { + // 检查路径是否存在 + stat, err := os.Stat(graphFilePath) + if err != nil { + return fmt.Errorf("failed to access path %s: %v", graphFilePath, err) + } + + // 如果是单个文件,直接处理 + if !stat.IsDir() { + return fmt.Errorf("%s is not a directory", graphFilePath) + } + + // 遍历目录及其子目录 + return filepath.Walk(graphFilePath, func(path string, info os.FileInfo, err error) error { + if err != nil { + fmt.Printf("访问路径出错 %s: %v\n", path, err) + return nil // 继续遍历其他文件 + } + + // 如果是目录,继续遍历 + if info.IsDir() { + return nil + } + + // 只处理JSON文件 + if filepath.Ext(path) == ".vgf" { + return gp.processJSONFile(path) + } + + return nil + }) +} + +func (gp *GraphPool) Create(graphName string) IGraph { + gr, ok := gp.mapGraphs[graphName] + if !ok { + return nil + } + + return &gr +} + +func (gp *GraphPool) processJSONFile(filePath string) error { + // 打开文件 + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file %s: %v", filePath, err) + } + defer file.Close() + + fileName := filepath.Base(filePath) + ext := filepath.Ext(fileName) // 获取".html" + name := strings.TrimSuffix(fileName, ext) // 获取"name" + var gConfig graphConfig + decoder := json.NewDecoder(file) + if err := decoder.Decode(&gConfig); err != nil { + return fmt.Errorf("failed to decode JSON from file %s: %v", filePath, err) + } + + return gp.prepareGraph(name, &gConfig) +} + +func (gp *GraphPool) prepareGraph(graphName string, graphConfig *graphConfig) error { + // 找到所有的入口 + for _, node := range graphConfig.Nodes { + if strings.HasPrefix(node.Class, Entrance) { + // 取得ID + id := strings.TrimPrefix(node.Class, Entrance) + entranceID, err := strconv.Atoi(id) + if err != nil { + return err + } + // 对入口进行预处理 + err = gp.prepareOneEntrance(graphName, int64(entranceID), &node, graphConfig) + if err != nil { + return err + } + } + } + + return nil +} + +func (gp *GraphPool) getVarExec(nodeCfg *nodeConfig, graphConfig *graphConfig) (IBaseExec, string) { + // 是否为Get_或Set_开头 + if strings.HasPrefix(nodeCfg.Class, "Get_") || strings.HasPrefix(nodeCfg.Class, "Set_") { + return gp.execPool.GetExec(nodeCfg.Class), "" + } + + // 获取Get_或Set_结尾字符串 + var nodeName string + var varName string + if strings.HasSuffix(nodeCfg.Class, "Get_") { + var typ string + varName = strings.TrimSuffix(nodeCfg.Class, "Get_") + varCfg := graphConfig.GetVariablesByName(varName) + if varCfg != nil { + typ = varCfg.Type + } + nodeName = genGetVariablesNodeName(typ) + } else if strings.HasSuffix(nodeCfg.Class, "Set_") { + var typ string + varName = strings.TrimSuffix(nodeCfg.Class, "Set_") + varCfg := graphConfig.GetVariablesByName(varName) + if varCfg != nil { + typ = varCfg.Type + } + nodeName = genSetVariablesNodeName(typ) + } + + return gp.execPool.GetExec(nodeName), varName +} + +func (gp *GraphPool) genAllNode(graphConfig *graphConfig) (map[string]*execNode, error) { + nodes := make(map[string]*execNode) + for _, node := range graphConfig.Nodes { + var varName string + // 获取不到node,则获取变量node + exec := gp.execPool.GetExec(node.Class) + if exec == nil { + exec, varName = gp.getVarExec(&node, graphConfig) + if exec == nil { + return nil, fmt.Errorf("no exec found for node %s", node.Class) + } + } + + nodes[node.Id] = &execNode{ + Id: node.Id, + baseExec: exec, + preInPort: make([]*prePortNode, exec.GetInPortCount()), + inPortDefaultValue: node.PortDefault, + variableName: varName, + } + } + + return nodes, nil +} + +func (gp *GraphPool) prepareOneNode(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { + // 找到所有出口 + var idx int + for ; nodeExec.baseExec.IsOutPortExec(idx); idx++ { + // 找到出口结点 + nextExecNode := gp.findOutNextNode(graphConfig, mapNodeExec, nodeExec.Id, idx) + nodeExec.nextNode = append(nodeExec.nextNode, nextExecNode) + } + + // 将所有的next填充next + for _, nextOne := range nodeExec.nextNode { + // 对出口进行预处理 + err := gp.prepareOneNode(mapNodeExec, nextOne, graphConfig) + if err != nil { + return err + } + } + + return nil +} + +func (gp *GraphPool) findOutNextNode(graphConfig *graphConfig, mapNodeExec map[string]*execNode, sourceNodeID string, sourcePortIdx int) *execNode { + // 找到出口的NodeID + for _, edge := range graphConfig.Edges { + if edge.SourceNodeID == sourceNodeID && edge.SourcePortIndex == sourcePortIdx { + return mapNodeExec[edge.DesNodeId] + } + } + + return nil +} + +// prepareOneEntrance 先处理执行Exec入出口连线 +func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, nodeCfg *nodeConfig, graphConfig *graphConfig) error { + // 将所有的Node执行结点生成出来 + mapNodes, err := gp.genAllNode(graphConfig) + if err != nil { + return err + } + + // 从入口结点开始做预处理,将next结点都统一生成 + nodeExec := mapNodes[nodeCfg.Id] + if nodeExec == nil { + return fmt.Errorf("entrance node %s not found", nodeCfg.Id) + } + + err = gp.prepareOneNode(mapNodes, nodeExec, graphConfig) + if err != nil { + return err + } + + // 处理inPort前置结点 + err = gp.prepareInPort(mapNodes, nodeExec, graphConfig) + if err != nil { + return err + } + + var gr graph + gr.entrance = make(map[int64]*execNode, 16) + gr.context = make(map[string]*ExecContext, 16) + + gr.entrance[entranceID] = nodeExec + gp.mapGraphs[graphName] = gr + + return nil +} + +func (gp *GraphPool) findPreInPortNode(mapNodes map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig, portIdx int) *prePortNode { + for _, edge := range graphConfig.Edges { + if edge.DesNodeId == nodeExec.Id && edge.DesPortIndex == portIdx { + srcNode := mapNodes[edge.SourceNodeID] + if srcNode == nil { + return nil + } + + var preNode prePortNode + preNode.node = srcNode + preNode.outPortIndex = edge.SourcePortIndex + + return &preNode + } + } + + return nil +} + +func (gp *GraphPool) preparePreInPortNode(mapNodes map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { + // 找到当前结点的所有inPort的前一个端口 + for i := 0; i < nodeExec.baseExec.GetInPortCount(); i++ { + // 如果是执行结点,则跳过 + if nodeExec.baseExec.IsInPortExec(i) { + continue + } + + // 找到入口的上一个结点 + preNode := gp.findPreInPortNode(mapNodes, nodeExec, graphConfig, i) + if preNode == nil { + continue + } + nodeExec.preInPort[i] = preNode + } + return nil +} + +func (gp *GraphPool) prepareInPort(mapNodeExec map[string]*execNode, nodeExec *execNode, graphConfig *graphConfig) error { + for _, nextNode := range nodeExec.nextNode { + if nextNode == nil { + continue + } + + // 对nextNode结点的入口进行预处理 + err := gp.preparePreInPortNode(mapNodeExec, nextNode, graphConfig) + if err != nil { + return err + } + } + + return nil +} diff --git a/util/blueprint/node.go b/util/blueprint/node.go new file mode 100644 index 0000000..d88f25a --- /dev/null +++ b/util/blueprint/node.go @@ -0,0 +1,121 @@ +package blueprint + +import "fmt" + +type prePortNode struct { + node *execNode // 上个结点 + outPortIndex int // 对应上一个结点的OutPort索引 +} + +type execNode struct { + Id string + baseExec IBaseExec + + nextNode []*execNode + nextIdx int + + preInPort []*prePortNode // Port的上一个结点 + inPortDefaultValue map[string]any + + variableName string // 如果是变量,则有变量名 +} + +func (en *execNode) GetInPortDefaultValue(index int) any { + key := fmt.Sprintf("%d", index) + v, ok := en.inPortDefaultValue[key] + if !ok { + return nil + } + return v +} + +func (en *execNode) Next() *execNode { + if en.nextIdx >= len(en.nextNode) { + return nil + } + + return en.nextNode[en.nextIdx] +} + +func (en *execNode) exec(gr *graph) error { + e, ok := en.baseExec.(IExec) + if !ok { + return fmt.Errorf("exec node %s not exec", en.baseExec.GetName()) + } + + node, ok := en.baseExec.(IBaseExecNode) + if !ok { + return fmt.Errorf("exec node %s not exec", en.baseExec.GetName()) + } + + if err := node.initExecNode(gr, en.Id, en.variableName, en.baseExec.GetName()); err != nil { + return err + } + + return e.Exec() +} + +func (en *execNode) doSetInPort(gr *graph, index int, inPort IPort) error { + // 找到当前Node的InPort的index的前一个结点 + preNode := en.preInPort[index] + // 如果前一个结点为空,则填充默认值 + if preNode == nil { + err := inPort.setAnyVale(en.GetInPortDefaultValue(index)) + if err != nil { + return err + } + return nil + } + + // 判断上一个结点是否已经执行过 + if _, ok := gr.context[preNode.node.Id]; ok { + outPort := gr.GetNodeOutPortValue(preNode.node.Id, preNode.outPortIndex) + if outPort == nil { + return fmt.Errorf("pre node %s out port index %d not found", preNode.node.Id, preNode.outPortIndex) + } + + inPort.SetValue(outPort) + return nil + } + + // 如果前一个结点没有执行过,则递归执行前一个结点 + return preNode.node.Do(gr) +} + +func (en *execNode) Do(gr *graph) error { + // 重新初始化上下文 + inPorts, outPorts := en.baseExec.CloneInOutPort() + gr.context[en.Id] = &ExecContext{ + InputPorts: inPorts, + OutputPorts: outPorts, + } + + // 处理InPort结点值 + var err error + for index := range inPorts { + if en.baseExec.IsInPortExec(index) { + continue + } + + err = en.doSetInPort(gr, index, inPorts[index]) + if err != nil { + return err + } + } + + // 设置执行器相关的上下文信息 + // 如果是变量设置变量名 + // 执行本结点 + if err = en.exec(gr); err != nil { + return err + } + + for _, nextNode := range en.nextNode { + err = nextNode.Do(gr) + if err != nil { + return err + } + } + + return nil +} diff --git a/util/blueprint/port.go b/util/blueprint/port.go new file mode 100644 index 0000000..2e75af6 --- /dev/null +++ b/util/blueprint/port.go @@ -0,0 +1,294 @@ +package blueprint + +import ( + "fmt" + "strconv" +) + +const ( + Config_PortType_Exec = "exec" + Config_PortType_Data = "data" + Config_DataType_Int = "int" + Config_DataType_Integer = "integer" + Config_DataType_Float = "float" + Config_DataType_Str = "string" + Config_DataType_Boolean = "boolean" + Config_DataType_Bool = "bool" + Config_DataType_Array = "array" +) + +type Port[T iPortType] struct { + PortVal T +} + +func (em *Port[T]) Clone() IPort { + return &Port[T]{ + PortVal: em.PortVal, + } +} + +func (em *Port[T]) Reset() { + var v T + em.PortVal = v +} + +func (em *Port[T]) GetInt() (Port_Int, bool) { + if t, ok := any(em.PortVal).(Port_Int); ok { + return t, true + } + return 0, false +} + +func (em *Port[T]) GetFloat() (Port_Float, bool) { + if t, ok := any(em.PortVal).(Port_Float); ok { + return t, true + } + return 0, false +} + +func (em *Port[T]) GetStr() (Port_Str, bool) { + if t, ok := any(em.PortVal).(Port_Str); ok { + return t, true + } + return "", false +} + +func (em *Port[T]) GetArrayValInt(idx int) (Port_Int, bool) { + if t, ok := any(em.PortVal).(Port_Array); ok { + if idx >= 0 && idx < len(t) { + return t[idx].IntVal, true + } + } + + return 0, false +} + +func (em *Port[T]) GetArrayValStr(idx int) (string, bool) { + if t, ok := any(em.PortVal).(Port_Array); ok { + if idx >= 0 && idx < len(t) { + return t[idx].StrVal, true + } + } + + return "", false +} + +func (em *Port[T]) GetBool() (Port_Bool, bool) { + if t, ok := any(em.PortVal).(Port_Bool); ok { + return t, true + } + return false, false +} + +func (em *Port[T]) SetInt(val Port_Int) bool { + if t, ok := any(&em.PortVal).(*Port_Int); ok { + *t = val + return true + } + return false +} + +func (em *Port[T]) SetFloat(val Port_Float) bool { + if t, ok := any(&em.PortVal).(*Port_Float); ok { + *t = val + return true + } + return false +} + +func (em *Port[T]) SetStr(val Port_Str) bool { + if t, ok := any(&em.PortVal).(*Port_Str); ok { + *t = val + return true + } + return false +} + +func (em *Port[T]) SetBool(val Port_Bool) bool { + if t, ok := any(&em.PortVal).(*Port_Bool); ok { + *t = val + return true + } + return false +} + +func (em *Port[T]) SetArrayValInt(idx int, val Port_Int) bool { + if t, ok := any(em.PortVal).(Port_Array); ok { + if idx >= 0 && idx < len(t) { + t[idx].IntVal = val + return true + } + } + return false +} + +func (em *Port[T]) SetArrayValStr(idx int, val Port_Str) bool { + if t, ok := any(em.PortVal).(Port_Array); ok { + if idx >= 0 && idx < len(t) { + (t)[idx].StrVal = val + return true + } + } + return false +} + +func (em *Port[T]) AppendArrayValInt(val Port_Int) bool { + if t, ok := any(&em.PortVal).(*Port_Array); ok { + *t = append(*t, ArrayData{IntVal: val}) + return true + } + return false +} + +func (em *Port[T]) AppendArrayValStr(val Port_Str) bool { + if t, ok := any(&em.PortVal).(*Port_Array); ok { + *t = append(*t, ArrayData{StrVal: val}) + return true + } + return false +} + +func (em *Port[T]) GetArrayLen() int { + if t, ok := any(&em.PortVal).(*Port_Array); ok { + return len(*t) + } + + return 0 +} + +func (em *Port[T]) IsPortExec() bool { + _, ok := any(em.PortVal).(Port_Exec) + return ok +} + +func (em *Port[T]) setAnyVale(v any) error { + switch v.(type) { + case float64: + fV := v.(float64) + switch any(em.PortVal).(type) { + case Port_Int: + em.SetInt(Port_Int(fV)) + case Port_Float: + em.SetFloat(fV) + case Port_Str: + em.SetStr(fmt.Sprintf("%d", int64(fV))) + case Port_Bool: + em.SetBool(int64(fV) != 0) + } + case string: + strV := v.(string) + switch any(em.PortVal).(type) { + case Port_Int: + val, err := strconv.Atoi(strV) + if err != nil { + return err + } + em.SetInt(Port_Int(val)) + case Port_Float: + fV, err := strconv.ParseFloat(strV, 64) + if err != nil { + return err + } + em.SetFloat(fV) + case Port_Str: + em.SetStr(strV) + case Port_Bool: + val, err := strconv.ParseBool(strV) + if err != nil { + return err + } + em.SetBool(val) + } + case bool: + strV := v.(bool) + switch any(em.PortVal).(type) { + case Port_Int: + return fmt.Errorf("port type is int, but value is %v", strV) + case Port_Float: + return fmt.Errorf("port type is float, but value is %v", strV) + case Port_Str: + return fmt.Errorf("port type is string, but value is %v", strV) + case Port_Bool: + em.SetBool(strV) + } + } + return nil +} + +func (em *Port[T]) SetValue(val IPort) bool { + valT, ok := val.(*Port[T]) + if !ok { + return false + } + em.PortVal = valT.PortVal + return true +} + +type IPort interface { + GetInt() (Port_Int, bool) + GetFloat() (Port_Float, bool) + GetStr() (Port_Str, bool) + GetArrayValInt(idx int) (Port_Int, bool) + GetArrayValStr(idx int) (Port_Str, bool) + GetBool() (Port_Bool, bool) + + SetInt(val Port_Int) bool + SetFloat(val Port_Float) bool + SetStr(val Port_Str) bool + SetBool(val Port_Bool) bool + SetArrayValInt(idx int, val Port_Int) bool + SetArrayValStr(idx int, val Port_Str) bool + AppendArrayValInt(val Port_Int) bool + AppendArrayValStr(val Port_Str) bool + GetArrayLen() int + Clone() IPort + Reset() + + IsPortExec() bool + + setAnyVale(v any) error + SetValue(val IPort) bool +} + +func NewPortExec() IPort { + return &Port[Port_Exec]{} +} + +func NewPortInt() IPort { + return &Port[Port_Int]{} +} + +func NewPortFloat() IPort { + return &Port[Port_Float]{} +} + +func NewPortStr() IPort { + return &Port[Port_Str]{} +} + +func NewPortBool() IPort { + return &Port[Port_Bool]{} +} + +func NewPortArray() IPort { + return &Port[Port_Array]{} +} + +func NewPortByType(typ string) IPort { + switch typ { + case Config_PortType_Exec: + return NewPortExec() + case Config_DataType_Int: + return NewPortInt() + case Config_DataType_Float: + return NewPortFloat() + case Config_DataType_Str: + return NewPortStr() + case Config_DataType_Bool: + return NewPortBool() + case Config_DataType_Array: + return NewPortArray() + default: + return nil + } +} diff --git a/util/blueprint/typedef.go b/util/blueprint/typedef.go new file mode 100644 index 0000000..e58dd99 --- /dev/null +++ b/util/blueprint/typedef.go @@ -0,0 +1,25 @@ +package blueprint + +type ArrayElement struct { + IntVal int64 + StrVal string +} + +type PortExec struct { +} +type ArrayData struct { + IntVal int64 + StrVal string +} + +type Port_Exec = PortExec +type Port_Int = int64 +type Port_Float = float64 +type Port_Str = string +type Port_Bool = bool + +type Port_Array []ArrayData + +type iPortType interface { + Port_Exec | Port_Int | Port_Float | Port_Str | Port_Bool | Port_Array +} diff --git a/util/blueprint/variables.go b/util/blueprint/variables.go new file mode 100644 index 0000000..c3baa2d --- /dev/null +++ b/util/blueprint/variables.go @@ -0,0 +1,53 @@ +package blueprint + +import "fmt" + +const GetVariables = "GetVar" +const SetVariables = "SetVar" + +type GetVariablesNode struct { + BaseExecNode + nodeName string + varName string +} + +type SetVariablesNode struct { + BaseExecNode + nodeName string + varName string +} + +func (g *GetVariablesNode) GetName() string { + return g.nodeName +} + +func (g *GetVariablesNode) Exec() error { + port := g.gr.variables[g.varName] + if port == nil { + return fmt.Errorf("variable %s not found,node name %s", g.varName, g.nodeName) + } + + if !g.SetOutPort(0, port) { + return fmt.Errorf("set out port failed,node name %s", g.nodeName) + } + + return nil +} + +func (g *SetVariablesNode) GetName() string { + return g.nodeName +} + +func (g *SetVariablesNode) Exec() error { + port := g.GetInPort(0) + if port == nil { + return fmt.Errorf("get in port failed,node name %s", g.nodeName) + } + + g.gr.variables[g.varName] = port + if !g.SetOutPort(0, port) { + return fmt.Errorf("set out port failed,node name %s", g.nodeName) + } + + return nil +}