diff --git a/sysmodule/blueprintmodule/BlueprintModule.go b/sysmodule/blueprintmodule/BlueprintModule.go new file mode 100644 index 0000000..ea198d3 --- /dev/null +++ b/sysmodule/blueprintmodule/BlueprintModule.go @@ -0,0 +1,74 @@ +package blueprintmodule + +import ( + "fmt" + "github.com/duanhf2012/origin/v2/service" + "github.com/duanhf2012/origin/v2/util/blueprint" + "sync/atomic" +) + +type BlueprintModule struct { + service.Module + + bp blueprint.Blueprint + + execDefFilePath string + graphFilePath string + + seedGraphID int64 + + mapGraph map[int64]blueprint.IGraph +} + +func (m *BlueprintModule) Init(execDefFilePath string, graphFilePath string) error { + m.execDefFilePath = execDefFilePath + m.graphFilePath = graphFilePath + + m.mapGraph = make(map[int64]blueprint.IGraph, 1024) + return nil +} + +func (m *BlueprintModule) OnInit() error { + if m.execDefFilePath == "" || m.graphFilePath == "" { + return fmt.Errorf("execDefFilePath or graphFilePath is empty") + } + + m.seedGraphID = 1 + return m.bp.Init(m.execDefFilePath, m.graphFilePath, m) +} + +func (m *BlueprintModule) CreateGraph(graphName string) int64 { + graphID := atomic.AddInt64(&m.seedGraphID, 1) + graph := m.bp.Create(graphName, graphID) + if graph == nil { + return 0 + } + m.mapGraph[graphID] = graph + + return graphID +} + +func (m *BlueprintModule) GetGraph(graphID int64) (blueprint.IGraph, error) { + graph, ok := m.mapGraph[graphID] + if !ok { + return nil, fmt.Errorf("graph not found,graphID:%d", graphID) + } + return graph, nil +} + +func (m *BlueprintModule) Do(graphID int64, entranceID int64, args ...any) error { + graph, err := m.GetGraph(graphID) + if err != nil { + return err + } + return graph.Do(entranceID, args...) +} + +func (m *BlueprintModule) TriggerEvent(graphID int64, eventID int64, args ...any) error { + graph, err := m.GetGraph(graphID) + if err != nil { + return err + } + + return graph.Do(eventID, args...) +} diff --git a/sysmodule/mysqlmodule/timernode.go b/sysmodule/mysqlmodule/timernode.go new file mode 100644 index 0000000..d6332c8 --- /dev/null +++ b/sysmodule/mysqlmodule/timernode.go @@ -0,0 +1 @@ +package mysqlmodule diff --git a/util/blueprint/blueprint.go b/util/blueprint/blueprint.go index c566360..451473b 100644 --- a/util/blueprint/blueprint.go +++ b/util/blueprint/blueprint.go @@ -7,28 +7,31 @@ import ( type Blueprint struct { execPool ExecPool graphPool GraphPool + + blueprintModule IBlueprintModule } -func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string) error { +func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, blueprintModule IBlueprintModule) error { err := bm.execPool.Load(execDefFilePath) if err != nil { return err } - + for _, e := range execNodes { if !bm.execPool.Register(e) { return fmt.Errorf("register exec failed,exec:%s", e.GetName()) } } - err = bm.graphPool.Load(&bm.execPool, graphFilePath) + err = bm.graphPool.Load(&bm.execPool, graphFilePath, blueprintModule) if err != nil { return err } + bm.blueprintModule = blueprintModule return nil } -func (bm *Blueprint) Create(graphName string) IGraph { - return bm.graphPool.Create(graphName) +func (bm *Blueprint) Create(graphName string, graphID int64) IGraph { + return bm.graphPool.Create(graphName, graphID) } diff --git a/util/blueprint/blueprint_test.go b/util/blueprint/blueprint_test.go index dcaae70..df2e940 100644 --- a/util/blueprint/blueprint_test.go +++ b/util/blueprint/blueprint_test.go @@ -6,12 +6,12 @@ import ( func TestExecMgr(t *testing.T) { var bp Blueprint - err := bp.Init("D:\\Develop\\OriginNodeEditor\\json", "D:\\Develop\\OriginNodeEditor\\vgf") + err := bp.Init("D:\\Develop\\OriginNodeEditor\\json", "D:\\Develop\\OriginNodeEditor\\vgf", nil) if err != nil { t.Fatalf("init failed,err:%v", err) } - graphTest1 := bp.Create("testArrayOperator") + graphTest1 := bp.Create("testArrayOperator", 0) err = graphTest1.Do(EntranceID_IntParam, 20, 1, 3) if err != nil { t.Fatalf("Do EntranceID_IntParam failed,err:%v", err) diff --git a/util/blueprint/exec.go b/util/blueprint/exec.go index 6918a49..bae1447 100644 --- a/util/blueprint/exec.go +++ b/util/blueprint/exec.go @@ -189,6 +189,10 @@ func (em *innerExecNode) GetOutPortParamStartIndex() int { return em.outPortParamStartIndex } +func (en *BaseExecNode) GetBluePrintModule() IBlueprintModule { + return en.gr.IBlueprintModule +} + func (en *BaseExecNode) initInnerExecNode(innerNode *innerExecNode) { en.innerExecNode = innerNode } @@ -272,6 +276,14 @@ func (en *BaseExecNode) GetInPortStr(index int) (Port_Str, bool) { return port.GetStr() } +func (en *BaseExecNode) GetInPortArray(index int) (Port_Array, bool) { + port := en.GetInPort(index) + if port == nil { + return nil, false + } + return port.GetArray() +} + func (en *BaseExecNode) GetInPortArrayValInt(index int, idx int) (Port_Int, bool) { port := en.GetInPort(index) if port == nil { diff --git a/util/blueprint/graph.go b/util/blueprint/graph.go index 27818e7..7329544 100644 --- a/util/blueprint/graph.go +++ b/util/blueprint/graph.go @@ -3,6 +3,7 @@ package blueprint import ( "fmt" "github.com/goccy/go-json" + "time" ) type IGraph interface { @@ -10,13 +11,21 @@ type IGraph interface { Release() } +type IBlueprintModule interface { + SafeAfterFunc(timerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{})) + CancelTimerId(timerId *uint64) bool + TriggerEvent(graphID int64, eventID int64, args ...any) error +} + type baseGraph struct { entrance map[int64]*execNode // 入口 } type Graph struct { + graphID int64 *baseGraph graphContext + IBlueprintModule } type graphContext struct { diff --git a/util/blueprint/graphpool.go b/util/blueprint/graphpool.go index 08fda1f..31a9a29 100644 --- a/util/blueprint/graphpool.go +++ b/util/blueprint/graphpool.go @@ -9,13 +9,15 @@ import ( ) type GraphPool struct { - mapGraphs map[string]*baseGraph - execPool *ExecPool + mapGraphs map[string]*baseGraph + execPool *ExecPool + blueprintModule IBlueprintModule } -func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error { +func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string, blueprintModule IBlueprintModule) error { gp.execPool = execPool gp.mapGraphs = make(map[string]*baseGraph, 1024) + gp.blueprintModule = blueprintModule // 检查路径是否存在 stat, err := os.Stat(graphFilePath) @@ -49,7 +51,7 @@ func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error { }) } -func (gp *GraphPool) Create(graphName string) IGraph { +func (gp *GraphPool) Create(graphName string, graphID int64) IGraph { gr, ok := gp.mapGraphs[graphName] if !ok { return nil @@ -57,8 +59,9 @@ func (gp *GraphPool) Create(graphName string) IGraph { var graph Graph graph.baseGraph = gr + graph.graphID = graphID graph.context = make(map[string]*ExecContext, 4) - + graph.IBlueprintModule = gp.blueprintModule return &graph } @@ -237,15 +240,14 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node return err } - var gr baseGraph - gr.entrance = make(map[int64]*execNode, 16) - gr.entrance[entranceID] = nodeExec - - if _, ok := gp.mapGraphs[graphName]; ok { - return fmt.Errorf("baseGraph %s already exists", graphName) + gr, ok := gp.mapGraphs[graphName] + if !ok { + gr = &baseGraph{} + gr.entrance = make(map[int64]*execNode, 16) + gp.mapGraphs[graphName] = gr } - gp.mapGraphs[graphName] = &gr + gr.entrance[entranceID] = nodeExec return nil } diff --git a/util/blueprint/port.go b/util/blueprint/port.go index 2a58759..42ab9f3 100644 --- a/util/blueprint/port.go +++ b/util/blueprint/port.go @@ -162,6 +162,15 @@ func (em *Port[T]) AppendArrayValStr(val Port_Str) bool { return false } +func (em *Port[T]) AppendArrayData(val ArrayData) bool { + if t, ok := any(&em.PortVal).(*Port_Array); ok { + *t = append(*t, val) + return true + } + + return false +} + func (em *Port[T]) GetArrayLen() Port_Int { if t, ok := any(&em.PortVal).(*Port_Array); ok { return Port_Int(len(*t)) @@ -321,6 +330,11 @@ func (em *Port[T]) setAnyVale(v any) error { for _, val := range arr { em.AppendArrayValStr(val) } + case Port_Array: + arr := v.(Port_Array) + for _, val := range arr { + em.AppendArrayValInt(val.IntVal) + } } return nil diff --git a/util/blueprint/sysnodes.go b/util/blueprint/sysnodes.go index bf99e44..15f7244 100644 --- a/util/blueprint/sysnodes.go +++ b/util/blueprint/sysnodes.go @@ -4,17 +4,20 @@ import ( "fmt" "github.com/duanhf2012/origin/v2/log" "math/rand/v2" + "time" ) // 系统入口ID定义,1000以内 const ( - EntranceID_ArrayParam = 2 EntranceID_IntParam = 1 + EntranceID_ArrayParam = 2 + EntranceID_Timer = 3 ) func init() { RegExecNode(&Entrance_ArrayParam{}) RegExecNode(&Entrance_IntParam{}) + RegExecNode(&Entrance_Timer{}) RegExecNode(&Output{}) RegExecNode(&Sequence{}) RegExecNode(&Foreach{}) @@ -32,6 +35,7 @@ func init() { RegExecNode(&EqualInteger{}) RegExecNode(&RangeCompare{}) RegExecNode(&Probability{}) + RegExecNode(&CreateTimer{}) } type Entrance_ArrayParam struct { @@ -58,6 +62,18 @@ func (em *Entrance_IntParam) Exec() (int, error) { return 0, nil } +type Entrance_Timer struct { + BaseExecNode +} + +func (em *Entrance_Timer) GetName() string { + return "Entrance_Timer" +} + +func (em *Entrance_Timer) Exec() (int, error) { + return 0, nil +} + type Output struct { BaseExecNode } @@ -69,10 +85,20 @@ func (em *Output) GetName() string { func (em *Output) Exec() (int, error) { val, ok := em.GetInPortInt(1) if !ok { - return 0, fmt.Errorf("Output Exec inParam not found") + return 0, fmt.Errorf("output Exec inParam not found") } - fmt.Printf("Output Exec inParam %d\n", val) + valStr, ok := em.GetInPortStr(2) + if !ok { + return 0, fmt.Errorf("output Exec inParam not found") + } + + valArray, ok := em.GetInPortArray(3) + if !ok { + return 0, fmt.Errorf("output Exec inParam not found") + } + + fmt.Printf("output Exec inParam [%d] [%s] [%v]\n", val, valStr, valArray) return 0, nil } @@ -595,3 +621,65 @@ func (em *AppendStringToArray) Exec() (int, error) { return -1, nil } + +// CreateTimer 创建定时器 +type CreateTimer struct { + BaseExecNode +} + +func (em *CreateTimer) GetName() string { + return "CreateTimer" +} + +func (em *CreateTimer) Exec() (int, error) { + delay, ok := em.GetInPortInt(0) + if !ok { + return -1, fmt.Errorf("CreateTimer inParam 0 error") + } + + array, ok := em.GetInPortArray(1) + if !ok { + return -1, fmt.Errorf("CreateTimer inParam 0 error") + } + + var timerId uint64 + graphID := em.gr.graphID + em.gr.IBlueprintModule.SafeAfterFunc(&timerId, time.Duration(delay)*time.Millisecond, nil, func(timerId uint64, additionData interface{}) { + err := em.gr.IBlueprintModule.TriggerEvent(graphID, EntranceID_Timer, array) + if err != nil { + log.Warnf("CreateTimer SafeAfterFunc error timerId:%d err:%v", timerId, err) + } + }) + + outPort := em.GetOutPort(1) + if outPort == nil { + return -1, fmt.Errorf("CreateTimer outParam 1 not found") + } + + outPort.SetInt(int64(timerId)) + return 0, nil +} + +// CloseTimer 关闭定时器 +type CloseTimer struct { + BaseExecNode +} + +func (em *CloseTimer) GetName() string { + return "CloseTimer" +} + +func (em *CloseTimer) Exec() (int, error) { + timerID, ok := em.GetInPortInt(1) + if !ok { + return -1, fmt.Errorf("CreateTimer inParam 0 error") + } + + id := uint64(timerID) + ok = em.gr.IBlueprintModule.CancelTimerId(&id) + if !ok { + log.Warnf("CloseTimer CancelTimerId:%d", id) + } + + return 0, nil +}