添加模块与定时器功能

This commit is contained in:
boyce
2025-10-08 16:16:08 +08:00
parent 7a34fafdc8
commit 419e7ee0c4
9 changed files with 225 additions and 22 deletions

View File

@@ -0,0 +1,74 @@
package blueprintmodule
import (
"fmt"
"github.com/duanhf2012/origin/v2/service"
"github.com/duanhf2012/origin/v2/util/blueprint"
"sync/atomic"
)
type BlueprintModule struct {
service.Module
bp blueprint.Blueprint
execDefFilePath string
graphFilePath string
seedGraphID int64
mapGraph map[int64]blueprint.IGraph
}
func (m *BlueprintModule) Init(execDefFilePath string, graphFilePath string) error {
m.execDefFilePath = execDefFilePath
m.graphFilePath = graphFilePath
m.mapGraph = make(map[int64]blueprint.IGraph, 1024)
return nil
}
func (m *BlueprintModule) OnInit() error {
if m.execDefFilePath == "" || m.graphFilePath == "" {
return fmt.Errorf("execDefFilePath or graphFilePath is empty")
}
m.seedGraphID = 1
return m.bp.Init(m.execDefFilePath, m.graphFilePath, m)
}
func (m *BlueprintModule) CreateGraph(graphName string) int64 {
graphID := atomic.AddInt64(&m.seedGraphID, 1)
graph := m.bp.Create(graphName, graphID)
if graph == nil {
return 0
}
m.mapGraph[graphID] = graph
return graphID
}
func (m *BlueprintModule) GetGraph(graphID int64) (blueprint.IGraph, error) {
graph, ok := m.mapGraph[graphID]
if !ok {
return nil, fmt.Errorf("graph not found,graphID:%d", graphID)
}
return graph, nil
}
func (m *BlueprintModule) Do(graphID int64, entranceID int64, args ...any) error {
graph, err := m.GetGraph(graphID)
if err != nil {
return err
}
return graph.Do(entranceID, args...)
}
func (m *BlueprintModule) TriggerEvent(graphID int64, eventID int64, args ...any) error {
graph, err := m.GetGraph(graphID)
if err != nil {
return err
}
return graph.Do(eventID, args...)
}

View File

@@ -0,0 +1 @@
package mysqlmodule

View File

@@ -7,28 +7,31 @@ import (
type Blueprint struct { type Blueprint struct {
execPool ExecPool execPool ExecPool
graphPool GraphPool graphPool GraphPool
blueprintModule IBlueprintModule
} }
func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string) error { func (bm *Blueprint) Init(execDefFilePath string, graphFilePath string, blueprintModule IBlueprintModule) error {
err := bm.execPool.Load(execDefFilePath) err := bm.execPool.Load(execDefFilePath)
if err != nil { if err != nil {
return err return err
} }
for _, e := range execNodes { for _, e := range execNodes {
if !bm.execPool.Register(e) { if !bm.execPool.Register(e) {
return fmt.Errorf("register exec failed,exec:%s", e.GetName()) return fmt.Errorf("register exec failed,exec:%s", e.GetName())
} }
} }
err = bm.graphPool.Load(&bm.execPool, graphFilePath) err = bm.graphPool.Load(&bm.execPool, graphFilePath, blueprintModule)
if err != nil { if err != nil {
return err return err
} }
bm.blueprintModule = blueprintModule
return nil return nil
} }
func (bm *Blueprint) Create(graphName string) IGraph { func (bm *Blueprint) Create(graphName string, graphID int64) IGraph {
return bm.graphPool.Create(graphName) return bm.graphPool.Create(graphName, graphID)
} }

View File

@@ -6,12 +6,12 @@ import (
func TestExecMgr(t *testing.T) { func TestExecMgr(t *testing.T) {
var bp Blueprint var bp Blueprint
err := bp.Init("D:\\Develop\\OriginNodeEditor\\json", "D:\\Develop\\OriginNodeEditor\\vgf") err := bp.Init("D:\\Develop\\OriginNodeEditor\\json", "D:\\Develop\\OriginNodeEditor\\vgf", nil)
if err != nil { if err != nil {
t.Fatalf("init failed,err:%v", err) t.Fatalf("init failed,err:%v", err)
} }
graphTest1 := bp.Create("testArrayOperator") graphTest1 := bp.Create("testArrayOperator", 0)
err = graphTest1.Do(EntranceID_IntParam, 20, 1, 3) err = graphTest1.Do(EntranceID_IntParam, 20, 1, 3)
if err != nil { if err != nil {
t.Fatalf("Do EntranceID_IntParam failed,err:%v", err) t.Fatalf("Do EntranceID_IntParam failed,err:%v", err)

View File

@@ -189,6 +189,10 @@ func (em *innerExecNode) GetOutPortParamStartIndex() int {
return em.outPortParamStartIndex return em.outPortParamStartIndex
} }
func (en *BaseExecNode) GetBluePrintModule() IBlueprintModule {
return en.gr.IBlueprintModule
}
func (en *BaseExecNode) initInnerExecNode(innerNode *innerExecNode) { func (en *BaseExecNode) initInnerExecNode(innerNode *innerExecNode) {
en.innerExecNode = innerNode en.innerExecNode = innerNode
} }
@@ -272,6 +276,14 @@ func (en *BaseExecNode) GetInPortStr(index int) (Port_Str, bool) {
return port.GetStr() return port.GetStr()
} }
func (en *BaseExecNode) GetInPortArray(index int) (Port_Array, bool) {
port := en.GetInPort(index)
if port == nil {
return nil, false
}
return port.GetArray()
}
func (en *BaseExecNode) GetInPortArrayValInt(index int, idx int) (Port_Int, bool) { func (en *BaseExecNode) GetInPortArrayValInt(index int, idx int) (Port_Int, bool) {
port := en.GetInPort(index) port := en.GetInPort(index)
if port == nil { if port == nil {

View File

@@ -3,6 +3,7 @@ package blueprint
import ( import (
"fmt" "fmt"
"github.com/goccy/go-json" "github.com/goccy/go-json"
"time"
) )
type IGraph interface { type IGraph interface {
@@ -10,13 +11,21 @@ type IGraph interface {
Release() Release()
} }
type IBlueprintModule interface {
SafeAfterFunc(timerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{}))
CancelTimerId(timerId *uint64) bool
TriggerEvent(graphID int64, eventID int64, args ...any) error
}
type baseGraph struct { type baseGraph struct {
entrance map[int64]*execNode // 入口 entrance map[int64]*execNode // 入口
} }
type Graph struct { type Graph struct {
graphID int64
*baseGraph *baseGraph
graphContext graphContext
IBlueprintModule
} }
type graphContext struct { type graphContext struct {

View File

@@ -9,13 +9,15 @@ import (
) )
type GraphPool struct { type GraphPool struct {
mapGraphs map[string]*baseGraph mapGraphs map[string]*baseGraph
execPool *ExecPool execPool *ExecPool
blueprintModule IBlueprintModule
} }
func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error { func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string, blueprintModule IBlueprintModule) error {
gp.execPool = execPool gp.execPool = execPool
gp.mapGraphs = make(map[string]*baseGraph, 1024) gp.mapGraphs = make(map[string]*baseGraph, 1024)
gp.blueprintModule = blueprintModule
// 检查路径是否存在 // 检查路径是否存在
stat, err := os.Stat(graphFilePath) stat, err := os.Stat(graphFilePath)
@@ -49,7 +51,7 @@ func (gp *GraphPool) Load(execPool *ExecPool, graphFilePath string) error {
}) })
} }
func (gp *GraphPool) Create(graphName string) IGraph { func (gp *GraphPool) Create(graphName string, graphID int64) IGraph {
gr, ok := gp.mapGraphs[graphName] gr, ok := gp.mapGraphs[graphName]
if !ok { if !ok {
return nil return nil
@@ -57,8 +59,9 @@ func (gp *GraphPool) Create(graphName string) IGraph {
var graph Graph var graph Graph
graph.baseGraph = gr graph.baseGraph = gr
graph.graphID = graphID
graph.context = make(map[string]*ExecContext, 4) graph.context = make(map[string]*ExecContext, 4)
graph.IBlueprintModule = gp.blueprintModule
return &graph return &graph
} }
@@ -237,15 +240,14 @@ func (gp *GraphPool) prepareOneEntrance(graphName string, entranceID int64, node
return err return err
} }
var gr baseGraph gr, ok := gp.mapGraphs[graphName]
gr.entrance = make(map[int64]*execNode, 16) if !ok {
gr.entrance[entranceID] = nodeExec gr = &baseGraph{}
gr.entrance = make(map[int64]*execNode, 16)
if _, ok := gp.mapGraphs[graphName]; ok { gp.mapGraphs[graphName] = gr
return fmt.Errorf("baseGraph %s already exists", graphName)
} }
gp.mapGraphs[graphName] = &gr gr.entrance[entranceID] = nodeExec
return nil return nil
} }

View File

@@ -162,6 +162,15 @@ func (em *Port[T]) AppendArrayValStr(val Port_Str) bool {
return false return false
} }
func (em *Port[T]) AppendArrayData(val ArrayData) bool {
if t, ok := any(&em.PortVal).(*Port_Array); ok {
*t = append(*t, val)
return true
}
return false
}
func (em *Port[T]) GetArrayLen() Port_Int { func (em *Port[T]) GetArrayLen() Port_Int {
if t, ok := any(&em.PortVal).(*Port_Array); ok { if t, ok := any(&em.PortVal).(*Port_Array); ok {
return Port_Int(len(*t)) return Port_Int(len(*t))
@@ -321,6 +330,11 @@ func (em *Port[T]) setAnyVale(v any) error {
for _, val := range arr { for _, val := range arr {
em.AppendArrayValStr(val) em.AppendArrayValStr(val)
} }
case Port_Array:
arr := v.(Port_Array)
for _, val := range arr {
em.AppendArrayValInt(val.IntVal)
}
} }
return nil return nil

View File

@@ -4,17 +4,20 @@ import (
"fmt" "fmt"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"math/rand/v2" "math/rand/v2"
"time"
) )
// 系统入口ID定义1000以内 // 系统入口ID定义1000以内
const ( const (
EntranceID_ArrayParam = 2
EntranceID_IntParam = 1 EntranceID_IntParam = 1
EntranceID_ArrayParam = 2
EntranceID_Timer = 3
) )
func init() { func init() {
RegExecNode(&Entrance_ArrayParam{}) RegExecNode(&Entrance_ArrayParam{})
RegExecNode(&Entrance_IntParam{}) RegExecNode(&Entrance_IntParam{})
RegExecNode(&Entrance_Timer{})
RegExecNode(&Output{}) RegExecNode(&Output{})
RegExecNode(&Sequence{}) RegExecNode(&Sequence{})
RegExecNode(&Foreach{}) RegExecNode(&Foreach{})
@@ -32,6 +35,7 @@ func init() {
RegExecNode(&EqualInteger{}) RegExecNode(&EqualInteger{})
RegExecNode(&RangeCompare{}) RegExecNode(&RangeCompare{})
RegExecNode(&Probability{}) RegExecNode(&Probability{})
RegExecNode(&CreateTimer{})
} }
type Entrance_ArrayParam struct { type Entrance_ArrayParam struct {
@@ -58,6 +62,18 @@ func (em *Entrance_IntParam) Exec() (int, error) {
return 0, nil return 0, nil
} }
type Entrance_Timer struct {
BaseExecNode
}
func (em *Entrance_Timer) GetName() string {
return "Entrance_Timer"
}
func (em *Entrance_Timer) Exec() (int, error) {
return 0, nil
}
type Output struct { type Output struct {
BaseExecNode BaseExecNode
} }
@@ -69,10 +85,20 @@ func (em *Output) GetName() string {
func (em *Output) Exec() (int, error) { func (em *Output) Exec() (int, error) {
val, ok := em.GetInPortInt(1) val, ok := em.GetInPortInt(1)
if !ok { if !ok {
return 0, fmt.Errorf("Output Exec inParam not found") return 0, fmt.Errorf("output Exec inParam not found")
} }
fmt.Printf("Output Exec inParam %d\n", val) valStr, ok := em.GetInPortStr(2)
if !ok {
return 0, fmt.Errorf("output Exec inParam not found")
}
valArray, ok := em.GetInPortArray(3)
if !ok {
return 0, fmt.Errorf("output Exec inParam not found")
}
fmt.Printf("output Exec inParam [%d] [%s] [%v]\n", val, valStr, valArray)
return 0, nil return 0, nil
} }
@@ -595,3 +621,65 @@ func (em *AppendStringToArray) Exec() (int, error) {
return -1, nil return -1, nil
} }
// CreateTimer 创建定时器
type CreateTimer struct {
BaseExecNode
}
func (em *CreateTimer) GetName() string {
return "CreateTimer"
}
func (em *CreateTimer) Exec() (int, error) {
delay, ok := em.GetInPortInt(0)
if !ok {
return -1, fmt.Errorf("CreateTimer inParam 0 error")
}
array, ok := em.GetInPortArray(1)
if !ok {
return -1, fmt.Errorf("CreateTimer inParam 0 error")
}
var timerId uint64
graphID := em.gr.graphID
em.gr.IBlueprintModule.SafeAfterFunc(&timerId, time.Duration(delay)*time.Millisecond, nil, func(timerId uint64, additionData interface{}) {
err := em.gr.IBlueprintModule.TriggerEvent(graphID, EntranceID_Timer, array)
if err != nil {
log.Warnf("CreateTimer SafeAfterFunc error timerId:%d err:%v", timerId, err)
}
})
outPort := em.GetOutPort(1)
if outPort == nil {
return -1, fmt.Errorf("CreateTimer outParam 1 not found")
}
outPort.SetInt(int64(timerId))
return 0, nil
}
// CloseTimer 关闭定时器
type CloseTimer struct {
BaseExecNode
}
func (em *CloseTimer) GetName() string {
return "CloseTimer"
}
func (em *CloseTimer) Exec() (int, error) {
timerID, ok := em.GetInPortInt(1)
if !ok {
return -1, fmt.Errorf("CreateTimer inParam 0 error")
}
id := uint64(timerID)
ok = em.gr.IBlueprintModule.CancelTimerId(&id)
if !ok {
log.Warnf("CloseTimer CancelTimerId:%d", id)
}
return 0, nil
}