mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-28 09:57:29 +08:00
fix bug
This commit is contained in:
340
pkg/tools/orchestrator.go
Normal file
340
pkg/tools/orchestrator.go
Normal file
@@ -0,0 +1,340 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PipelineStatus string
|
||||
|
||||
const (
|
||||
PipelinePending PipelineStatus = "pending"
|
||||
PipelineRunning PipelineStatus = "running"
|
||||
PipelineCompleted PipelineStatus = "completed"
|
||||
PipelineFailed PipelineStatus = "failed"
|
||||
)
|
||||
|
||||
type TaskStatus string
|
||||
|
||||
const (
|
||||
TaskPending TaskStatus = "pending"
|
||||
TaskRunning TaskStatus = "running"
|
||||
TaskCompleted TaskStatus = "completed"
|
||||
TaskFailed TaskStatus = "failed"
|
||||
)
|
||||
|
||||
type PipelineTask struct {
|
||||
ID string `json:"id"`
|
||||
Role string `json:"role"`
|
||||
Goal string `json:"goal"`
|
||||
DependsOn []string `json:"depends_on,omitempty"`
|
||||
Status TaskStatus `json:"status"`
|
||||
Result string `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
}
|
||||
|
||||
type Pipeline struct {
|
||||
ID string `json:"id"`
|
||||
Label string `json:"label"`
|
||||
Objective string `json:"objective"`
|
||||
Status PipelineStatus `json:"status"`
|
||||
OriginChannel string `json:"origin_channel"`
|
||||
OriginChatID string `json:"origin_chat_id"`
|
||||
SharedState map[string]interface{} `json:"shared_state"`
|
||||
Tasks map[string]*PipelineTask
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
}
|
||||
|
||||
type PipelineSpec struct {
|
||||
ID string `json:"id"`
|
||||
Role string `json:"role"`
|
||||
Goal string `json:"goal"`
|
||||
DependsOn []string `json:"depends_on,omitempty"`
|
||||
}
|
||||
|
||||
type Orchestrator struct {
|
||||
mu sync.RWMutex
|
||||
pipelines map[string]*Pipeline
|
||||
nextID int
|
||||
}
|
||||
|
||||
func NewOrchestrator() *Orchestrator {
|
||||
return &Orchestrator{
|
||||
pipelines: make(map[string]*Pipeline),
|
||||
nextID: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Orchestrator) CreatePipeline(label, objective, originChannel, originChatID string, tasks []PipelineSpec) (*Pipeline, error) {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
if strings.TrimSpace(objective) == "" {
|
||||
return nil, fmt.Errorf("objective is required")
|
||||
}
|
||||
if len(tasks) == 0 {
|
||||
return nil, fmt.Errorf("at least one task is required")
|
||||
}
|
||||
|
||||
id := fmt.Sprintf("pipe-%d", o.nextID)
|
||||
o.nextID++
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
p := &Pipeline{
|
||||
ID: id,
|
||||
Label: strings.TrimSpace(label),
|
||||
Objective: strings.TrimSpace(objective),
|
||||
Status: PipelinePending,
|
||||
OriginChannel: originChannel,
|
||||
OriginChatID: originChatID,
|
||||
SharedState: make(map[string]interface{}),
|
||||
Tasks: make(map[string]*PipelineTask, len(tasks)),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
taskID := strings.TrimSpace(task.ID)
|
||||
if taskID == "" {
|
||||
return nil, fmt.Errorf("task id is required")
|
||||
}
|
||||
if _, exists := p.Tasks[taskID]; exists {
|
||||
return nil, fmt.Errorf("duplicate task id: %s", taskID)
|
||||
}
|
||||
|
||||
p.Tasks[taskID] = &PipelineTask{
|
||||
ID: taskID,
|
||||
Role: strings.TrimSpace(task.Role),
|
||||
Goal: strings.TrimSpace(task.Goal),
|
||||
DependsOn: normalizeDepends(task.DependsOn),
|
||||
Status: TaskPending,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
}
|
||||
|
||||
for taskID, task := range p.Tasks {
|
||||
for _, dep := range task.DependsOn {
|
||||
if dep == taskID {
|
||||
return nil, fmt.Errorf("task %s cannot depend on itself", taskID)
|
||||
}
|
||||
if _, exists := p.Tasks[dep]; !exists {
|
||||
return nil, fmt.Errorf("task %s depends on missing task %s", taskID, dep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
o.pipelines[p.ID] = p
|
||||
return clonePipeline(p), nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) MarkTaskRunning(pipelineID, taskID string) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
p, t, err := o.getTaskLocked(pipelineID, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if t.Status == TaskCompleted || t.Status == TaskFailed {
|
||||
return nil
|
||||
}
|
||||
|
||||
t.Status = TaskRunning
|
||||
t.UpdatedAt = time.Now().UnixMilli()
|
||||
p.Status = PipelineRunning
|
||||
p.UpdatedAt = t.UpdatedAt
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) MarkTaskDone(pipelineID, taskID, result string, runErr error) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
p, t, err := o.getTaskLocked(pipelineID, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
t.UpdatedAt = now
|
||||
t.Result = strings.TrimSpace(result)
|
||||
if runErr != nil {
|
||||
t.Status = TaskFailed
|
||||
t.Error = runErr.Error()
|
||||
p.Status = PipelineFailed
|
||||
} else {
|
||||
t.Status = TaskCompleted
|
||||
t.Error = ""
|
||||
p.Status = o.computePipelineStatusLocked(p)
|
||||
}
|
||||
p.UpdatedAt = now
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) SetSharedState(pipelineID, key string, value interface{}) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
p, ok := o.pipelines[pipelineID]
|
||||
if !ok {
|
||||
return fmt.Errorf("pipeline not found: %s", pipelineID)
|
||||
}
|
||||
k := strings.TrimSpace(key)
|
||||
if k == "" {
|
||||
return fmt.Errorf("state key is required")
|
||||
}
|
||||
p.SharedState[k] = value
|
||||
p.UpdatedAt = time.Now().UnixMilli()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) GetPipeline(pipelineID string) (*Pipeline, bool) {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
|
||||
p, ok := o.pipelines[pipelineID]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return clonePipeline(p), true
|
||||
}
|
||||
|
||||
func (o *Orchestrator) ListPipelines() []*Pipeline {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
|
||||
items := make([]*Pipeline, 0, len(o.pipelines))
|
||||
for _, p := range o.pipelines {
|
||||
items = append(items, clonePipeline(p))
|
||||
}
|
||||
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
return items[i].CreatedAt > items[j].CreatedAt
|
||||
})
|
||||
return items
|
||||
}
|
||||
|
||||
func (o *Orchestrator) ReadyTasks(pipelineID string) ([]*PipelineTask, error) {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
|
||||
p, ok := o.pipelines[pipelineID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("pipeline not found: %s", pipelineID)
|
||||
}
|
||||
|
||||
var ready []*PipelineTask
|
||||
for _, t := range p.Tasks {
|
||||
if t.Status != TaskPending {
|
||||
continue
|
||||
}
|
||||
if depsDone(p, t.DependsOn) {
|
||||
ready = append(ready, cloneTask(t))
|
||||
}
|
||||
}
|
||||
sort.Slice(ready, func(i, j int) bool { return ready[i].ID < ready[j].ID })
|
||||
return ready, nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) SnapshotJSON(pipelineID string) (string, error) {
|
||||
p, ok := o.GetPipeline(pipelineID)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("pipeline not found: %s", pipelineID)
|
||||
}
|
||||
data, err := json.MarshalIndent(p, "", " ")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) getTaskLocked(pipelineID, taskID string) (*Pipeline, *PipelineTask, error) {
|
||||
p, ok := o.pipelines[pipelineID]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("pipeline not found: %s", pipelineID)
|
||||
}
|
||||
t, ok := p.Tasks[taskID]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("task %s not found in pipeline %s", taskID, pipelineID)
|
||||
}
|
||||
return p, t, nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) computePipelineStatusLocked(p *Pipeline) PipelineStatus {
|
||||
allDone := true
|
||||
for _, t := range p.Tasks {
|
||||
if t.Status == TaskFailed {
|
||||
return PipelineFailed
|
||||
}
|
||||
if t.Status != TaskCompleted {
|
||||
allDone = false
|
||||
}
|
||||
}
|
||||
if allDone {
|
||||
return PipelineCompleted
|
||||
}
|
||||
return PipelineRunning
|
||||
}
|
||||
|
||||
func depsDone(p *Pipeline, dependsOn []string) bool {
|
||||
for _, dep := range dependsOn {
|
||||
t, ok := p.Tasks[dep]
|
||||
if !ok || t.Status != TaskCompleted {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func normalizeDepends(in []string) []string {
|
||||
uniq := make(map[string]struct{})
|
||||
out := make([]string, 0, len(in))
|
||||
for _, item := range in {
|
||||
v := strings.TrimSpace(item)
|
||||
if v == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := uniq[v]; ok {
|
||||
continue
|
||||
}
|
||||
uniq[v] = struct{}{}
|
||||
out = append(out, v)
|
||||
}
|
||||
sort.Strings(out)
|
||||
return out
|
||||
}
|
||||
|
||||
func cloneTask(in *PipelineTask) *PipelineTask {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
deps := make([]string, len(in.DependsOn))
|
||||
copy(deps, in.DependsOn)
|
||||
out := *in
|
||||
out.DependsOn = deps
|
||||
return &out
|
||||
}
|
||||
|
||||
func clonePipeline(in *Pipeline) *Pipeline {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := *in
|
||||
out.SharedState = make(map[string]interface{}, len(in.SharedState))
|
||||
for k, v := range in.SharedState {
|
||||
out.SharedState[k] = v
|
||||
}
|
||||
out.Tasks = make(map[string]*PipelineTask, len(in.Tasks))
|
||||
for id, t := range in.Tasks {
|
||||
out.Tasks[id] = cloneTask(t)
|
||||
}
|
||||
return &out
|
||||
}
|
||||
Reference in New Issue
Block a user