Remove obsolete pipeline files

This commit is contained in:
lpf
2026-03-06 17:45:43 +08:00
parent 7d9ca89476
commit ad7316c9b6
5 changed files with 0 additions and 1238 deletions

View File

@@ -1,340 +0,0 @@
package tools
import (
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
)
type PipelineStatus string
const (
PipelinePending PipelineStatus = "pending"
PipelineRunning PipelineStatus = "running"
PipelineCompleted PipelineStatus = "completed"
PipelineFailed PipelineStatus = "failed"
)
type TaskStatus string
const (
TaskPending TaskStatus = "pending"
TaskRunning TaskStatus = "running"
TaskCompleted TaskStatus = "completed"
TaskFailed TaskStatus = "failed"
)
type PipelineTask struct {
ID string `json:"id"`
Role string `json:"role"`
Goal string `json:"goal"`
DependsOn []string `json:"depends_on,omitempty"`
Status TaskStatus `json:"status"`
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type Pipeline struct {
ID string `json:"id"`
Label string `json:"label"`
Objective string `json:"objective"`
Status PipelineStatus `json:"status"`
OriginChannel string `json:"origin_channel"`
OriginChatID string `json:"origin_chat_id"`
SharedState map[string]interface{} `json:"shared_state"`
Tasks map[string]*PipelineTask
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type PipelineSpec struct {
ID string `json:"id"`
Role string `json:"role"`
Goal string `json:"goal"`
DependsOn []string `json:"depends_on,omitempty"`
}
type Orchestrator struct {
mu sync.RWMutex
pipelines map[string]*Pipeline
nextID int
}
func NewOrchestrator() *Orchestrator {
return &Orchestrator{
pipelines: make(map[string]*Pipeline),
nextID: 1,
}
}
func (o *Orchestrator) CreatePipeline(label, objective, originChannel, originChatID string, tasks []PipelineSpec) (*Pipeline, error) {
o.mu.Lock()
defer o.mu.Unlock()
if strings.TrimSpace(objective) == "" {
return nil, fmt.Errorf("objective is required")
}
if len(tasks) == 0 {
return nil, fmt.Errorf("at least one task is required")
}
id := fmt.Sprintf("pipe-%d", o.nextID)
o.nextID++
now := time.Now().UnixMilli()
p := &Pipeline{
ID: id,
Label: strings.TrimSpace(label),
Objective: strings.TrimSpace(objective),
Status: PipelinePending,
OriginChannel: originChannel,
OriginChatID: originChatID,
SharedState: make(map[string]interface{}),
Tasks: make(map[string]*PipelineTask, len(tasks)),
CreatedAt: now,
UpdatedAt: now,
}
for _, task := range tasks {
taskID := strings.TrimSpace(task.ID)
if taskID == "" {
return nil, fmt.Errorf("task id is required")
}
if _, exists := p.Tasks[taskID]; exists {
return nil, fmt.Errorf("duplicate task id: %s", taskID)
}
p.Tasks[taskID] = &PipelineTask{
ID: taskID,
Role: strings.TrimSpace(task.Role),
Goal: strings.TrimSpace(task.Goal),
DependsOn: normalizeDepends(task.DependsOn),
Status: TaskPending,
CreatedAt: now,
UpdatedAt: now,
}
}
for taskID, task := range p.Tasks {
for _, dep := range task.DependsOn {
if dep == taskID {
return nil, fmt.Errorf("task %s cannot depend on itself", taskID)
}
if _, exists := p.Tasks[dep]; !exists {
return nil, fmt.Errorf("task %s depends on missing task %s", taskID, dep)
}
}
}
o.pipelines[p.ID] = p
return clonePipeline(p), nil
}
func (o *Orchestrator) MarkTaskRunning(pipelineID, taskID string) error {
o.mu.Lock()
defer o.mu.Unlock()
p, t, err := o.getTaskLocked(pipelineID, taskID)
if err != nil {
return err
}
if t.Status == TaskCompleted || t.Status == TaskFailed {
return nil
}
t.Status = TaskRunning
t.UpdatedAt = time.Now().UnixMilli()
p.Status = PipelineRunning
p.UpdatedAt = t.UpdatedAt
return nil
}
func (o *Orchestrator) MarkTaskDone(pipelineID, taskID, result string, runErr error) error {
o.mu.Lock()
defer o.mu.Unlock()
p, t, err := o.getTaskLocked(pipelineID, taskID)
if err != nil {
return err
}
now := time.Now().UnixMilli()
t.UpdatedAt = now
t.Result = strings.TrimSpace(result)
if runErr != nil {
t.Status = TaskFailed
t.Error = runErr.Error()
p.Status = PipelineFailed
} else {
t.Status = TaskCompleted
t.Error = ""
p.Status = o.computePipelineStatusLocked(p)
}
p.UpdatedAt = now
return nil
}
func (o *Orchestrator) SetSharedState(pipelineID, key string, value interface{}) error {
o.mu.Lock()
defer o.mu.Unlock()
p, ok := o.pipelines[pipelineID]
if !ok {
return fmt.Errorf("pipeline not found: %s", pipelineID)
}
k := strings.TrimSpace(key)
if k == "" {
return fmt.Errorf("state key is required")
}
p.SharedState[k] = value
p.UpdatedAt = time.Now().UnixMilli()
return nil
}
func (o *Orchestrator) GetPipeline(pipelineID string) (*Pipeline, bool) {
o.mu.RLock()
defer o.mu.RUnlock()
p, ok := o.pipelines[pipelineID]
if !ok {
return nil, false
}
return clonePipeline(p), true
}
func (o *Orchestrator) ListPipelines() []*Pipeline {
o.mu.RLock()
defer o.mu.RUnlock()
items := make([]*Pipeline, 0, len(o.pipelines))
for _, p := range o.pipelines {
items = append(items, clonePipeline(p))
}
sort.Slice(items, func(i, j int) bool {
return items[i].CreatedAt > items[j].CreatedAt
})
return items
}
func (o *Orchestrator) ReadyTasks(pipelineID string) ([]*PipelineTask, error) {
o.mu.RLock()
defer o.mu.RUnlock()
p, ok := o.pipelines[pipelineID]
if !ok {
return nil, fmt.Errorf("pipeline not found: %s", pipelineID)
}
var ready []*PipelineTask
for _, t := range p.Tasks {
if t.Status != TaskPending {
continue
}
if depsDone(p, t.DependsOn) {
ready = append(ready, cloneTask(t))
}
}
sort.Slice(ready, func(i, j int) bool { return ready[i].ID < ready[j].ID })
return ready, nil
}
func (o *Orchestrator) SnapshotJSON(pipelineID string) (string, error) {
p, ok := o.GetPipeline(pipelineID)
if !ok {
return "", fmt.Errorf("pipeline not found: %s", pipelineID)
}
data, err := json.MarshalIndent(p, "", " ")
if err != nil {
return "", err
}
return string(data), nil
}
func (o *Orchestrator) getTaskLocked(pipelineID, taskID string) (*Pipeline, *PipelineTask, error) {
p, ok := o.pipelines[pipelineID]
if !ok {
return nil, nil, fmt.Errorf("pipeline not found: %s", pipelineID)
}
t, ok := p.Tasks[taskID]
if !ok {
return nil, nil, fmt.Errorf("task %s not found in pipeline %s", taskID, pipelineID)
}
return p, t, nil
}
func (o *Orchestrator) computePipelineStatusLocked(p *Pipeline) PipelineStatus {
allDone := true
for _, t := range p.Tasks {
if t.Status == TaskFailed {
return PipelineFailed
}
if t.Status != TaskCompleted {
allDone = false
}
}
if allDone {
return PipelineCompleted
}
return PipelineRunning
}
func depsDone(p *Pipeline, dependsOn []string) bool {
for _, dep := range dependsOn {
t, ok := p.Tasks[dep]
if !ok || t.Status != TaskCompleted {
return false
}
}
return true
}
func normalizeDepends(in []string) []string {
uniq := make(map[string]struct{})
out := make([]string, 0, len(in))
for _, item := range in {
v := strings.TrimSpace(item)
if v == "" {
continue
}
if _, ok := uniq[v]; ok {
continue
}
uniq[v] = struct{}{}
out = append(out, v)
}
sort.Strings(out)
return out
}
func cloneTask(in *PipelineTask) *PipelineTask {
if in == nil {
return nil
}
deps := make([]string, len(in.DependsOn))
copy(deps, in.DependsOn)
out := *in
out.DependsOn = deps
return &out
}
func clonePipeline(in *Pipeline) *Pipeline {
if in == nil {
return nil
}
out := *in
out.SharedState = make(map[string]interface{}, len(in.SharedState))
for k, v := range in.SharedState {
out.SharedState[k] = v
}
out.Tasks = make(map[string]*PipelineTask, len(in.Tasks))
for id, t := range in.Tasks {
out.Tasks[id] = cloneTask(t)
}
return &out
}

View File

@@ -1,354 +0,0 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"strings"
)
type PipelineCreateTool struct {
orc *Orchestrator
}
func NewPipelineCreateTool(orc *Orchestrator) *PipelineCreateTool {
return &PipelineCreateTool{orc: orc}
}
func (t *PipelineCreateTool) Name() string { return "pipeline_create" }
func (t *PipelineCreateTool) Description() string {
return "Create a multi-agent pipeline with standardized task protocol (role/goal/dependencies/shared state)."
}
func (t *PipelineCreateTool) Parameters() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"label": map[string]interface{}{
"type": "string",
"description": "Optional short pipeline label",
},
"objective": map[string]interface{}{
"type": "string",
"description": "Top-level objective for this pipeline",
},
"tasks": map[string]interface{}{
"type": "array",
"description": "Task list with id/role/goal/depends_on",
"items": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"id": map[string]interface{}{"type": "string"},
"role": map[string]interface{}{
"type": "string",
"description": "Agent role, e.g. research/coding/testing",
},
"goal": map[string]interface{}{"type": "string"},
"depends_on": map[string]interface{}{
"type": "array",
"items": map[string]interface{}{"type": "string"},
},
},
"required": []string{"id", "goal"},
},
},
"channel": map[string]interface{}{
"type": "string",
"description": "Optional origin channel for completion notifications (auto-injected in normal chat flow)",
},
"chat_id": map[string]interface{}{
"type": "string",
"description": "Optional origin chat ID for completion notifications (auto-injected in normal chat flow)",
},
},
"required": []string{"objective", "tasks"},
}
}
func (t *PipelineCreateTool) Execute(_ context.Context, args map[string]interface{}) (string, error) {
if t.orc == nil {
return "", fmt.Errorf("orchestrator is not configured")
}
objective, _ := args["objective"].(string)
label, _ := args["label"].(string)
rawTasks, ok := args["tasks"].([]interface{})
if !ok || len(rawTasks) == 0 {
return "", fmt.Errorf("tasks is required")
}
specs := make([]PipelineSpec, 0, len(rawTasks))
for i, item := range rawTasks {
m, ok := item.(map[string]interface{})
if !ok {
return "", fmt.Errorf("tasks[%d] must be object", i)
}
id, _ := m["id"].(string)
role, _ := m["role"].(string)
goal, _ := m["goal"].(string)
deps := make([]string, 0)
if rawDeps, ok := m["depends_on"].([]interface{}); ok {
for _, dep := range rawDeps {
if depS, ok := dep.(string); ok {
deps = append(deps, depS)
}
}
}
specs = append(specs, PipelineSpec{
ID: id,
Role: role,
Goal: goal,
DependsOn: deps,
})
}
originChannel, originChatID := resolvePipelineOrigin(args, "tool", "tool")
p, err := t.orc.CreatePipeline(label, objective, originChannel, originChatID, specs)
if err != nil {
return "", err
}
return fmt.Sprintf("Pipeline created: %s (%d tasks)\nUse spawn with pipeline_id/task_id to run tasks.\nUse pipeline_dispatch to dispatch ready tasks.",
p.ID, len(p.Tasks)), nil
}
type PipelineStatusTool struct {
orc *Orchestrator
}
func NewPipelineStatusTool(orc *Orchestrator) *PipelineStatusTool {
return &PipelineStatusTool{orc: orc}
}
func (t *PipelineStatusTool) Name() string { return "pipeline_status" }
func (t *PipelineStatusTool) Description() string {
return "Get pipeline status, tasks progress, and shared state. If pipeline_id is empty, list recent pipelines."
}
func (t *PipelineStatusTool) Parameters() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"pipeline_id": map[string]interface{}{
"type": "string",
"description": "Pipeline ID",
},
},
}
}
func (t *PipelineStatusTool) Execute(_ context.Context, args map[string]interface{}) (string, error) {
if t.orc == nil {
return "", fmt.Errorf("orchestrator is not configured")
}
pipelineID, _ := args["pipeline_id"].(string)
pipelineID = strings.TrimSpace(pipelineID)
if pipelineID == "" {
items := t.orc.ListPipelines()
if len(items) == 0 {
return "No pipelines found.", nil
}
var sb strings.Builder
sb.WriteString("Pipelines:\n")
for _, p := range items {
sb.WriteString(fmt.Sprintf("- %s [%s] %s\n", p.ID, p.Status, p.Label))
}
return sb.String(), nil
}
return t.orc.SnapshotJSON(pipelineID)
}
type PipelineStateSetTool struct {
orc *Orchestrator
}
func NewPipelineStateSetTool(orc *Orchestrator) *PipelineStateSetTool {
return &PipelineStateSetTool{orc: orc}
}
func (t *PipelineStateSetTool) Name() string { return "pipeline_state_set" }
func (t *PipelineStateSetTool) Description() string {
return "Set shared state key/value for a pipeline, allowing sub-agents to share intermediate results."
}
func (t *PipelineStateSetTool) Parameters() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"pipeline_id": map[string]interface{}{"type": "string"},
"key": map[string]interface{}{"type": "string"},
"value": map[string]interface{}{
"description": "Any JSON-serializable value",
},
},
"required": []string{"pipeline_id", "key", "value"},
}
}
func (t *PipelineStateSetTool) Execute(_ context.Context, args map[string]interface{}) (string, error) {
if t.orc == nil {
return "", fmt.Errorf("orchestrator is not configured")
}
pipelineID, _ := args["pipeline_id"].(string)
key, _ := args["key"].(string)
value, ok := args["value"]
if !ok {
return "", fmt.Errorf("value is required")
}
if err := t.orc.SetSharedState(strings.TrimSpace(pipelineID), strings.TrimSpace(key), value); err != nil {
return "", err
}
return fmt.Sprintf("Updated pipeline shared state: %s.%s", pipelineID, key), nil
}
type PipelineDispatchTool struct {
orc *Orchestrator
spawn *SubagentManager
}
func NewPipelineDispatchTool(orc *Orchestrator, spawn *SubagentManager) *PipelineDispatchTool {
return &PipelineDispatchTool{orc: orc, spawn: spawn}
}
func (t *PipelineDispatchTool) Name() string { return "pipeline_dispatch" }
func (t *PipelineDispatchTool) Description() string {
return "Dispatch all dependency-ready tasks in a pipeline by spawning subagents automatically."
}
func (t *PipelineDispatchTool) Parameters() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"pipeline_id": map[string]interface{}{
"type": "string",
"description": "Pipeline ID",
},
"max_dispatch": map[string]interface{}{
"type": "integer",
"description": "Maximum number of tasks to dispatch in this call (default 3)",
"default": 3,
},
"channel": map[string]interface{}{
"type": "string",
"description": "Optional origin channel override for spawned subagents",
},
"chat_id": map[string]interface{}{
"type": "string",
"description": "Optional origin chat ID override for spawned subagents",
},
},
"required": []string{"pipeline_id"},
}
}
func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) {
if t.orc == nil || t.spawn == nil {
return "", fmt.Errorf("pipeline dispatcher is not configured")
}
pipelineID, _ := args["pipeline_id"].(string)
pipelineID = strings.TrimSpace(pipelineID)
if pipelineID == "" {
return "", fmt.Errorf("pipeline_id is required")
}
maxDispatch := 3
if raw, ok := args["max_dispatch"].(float64); ok && raw > 0 {
maxDispatch = int(raw)
}
originChannel, originChatID := resolvePipelineOrigin(args, "", "")
if p, ok := t.orc.GetPipeline(pipelineID); ok && p != nil {
if strings.TrimSpace(originChannel) == "" {
originChannel = strings.TrimSpace(p.OriginChannel)
}
if strings.TrimSpace(originChatID) == "" {
originChatID = strings.TrimSpace(p.OriginChatID)
}
}
if strings.TrimSpace(originChannel) == "" {
originChannel = "tool"
}
if strings.TrimSpace(originChatID) == "" {
originChatID = "tool"
}
ready, err := t.orc.ReadyTasks(pipelineID)
if err != nil {
return "", err
}
if len(ready) == 0 {
return fmt.Sprintf("No ready tasks for pipeline %s", pipelineID), nil
}
dispatched := 0
var lines []string
for _, task := range ready {
if dispatched >= maxDispatch {
break
}
shared := map[string]interface{}{}
if p, ok := t.orc.GetPipeline(pipelineID); ok {
for k, v := range p.SharedState {
shared[k] = v
}
}
payload := task.Goal
if len(shared) > 0 {
sharedJSON, _ := json.Marshal(shared)
payload = fmt.Sprintf("%s\n\nShared State:\n%s", payload, string(sharedJSON))
}
label := task.ID
if task.Role != "" {
label = fmt.Sprintf("%s:%s", task.Role, task.ID)
}
agentID := strings.TrimSpace(task.Role)
if agentID == "" {
agentID = strings.TrimSpace(task.ID)
}
if _, err := t.spawn.Spawn(ctx, SubagentSpawnOptions{
Task: payload,
Label: label,
Role: task.Role,
AgentID: agentID,
OriginChannel: originChannel,
OriginChatID: originChatID,
PipelineID: pipelineID,
PipelineTask: task.ID,
}); err != nil {
lines = append(lines, fmt.Sprintf("- %s failed: %v", task.ID, err))
continue
}
dispatched++
lines = append(lines, fmt.Sprintf("- %s dispatched", task.ID))
}
if len(lines) == 0 {
return "No tasks dispatched.", nil
}
return fmt.Sprintf("Pipeline %s dispatch result:\n%s", pipelineID, strings.Join(lines, "\n")), nil
}
func resolvePipelineOrigin(args map[string]interface{}, defaultChannel, defaultChatID string) (string, string) {
originChannel, _ := args["channel"].(string)
originChatID, _ := args["chat_id"].(string)
originChannel = strings.TrimSpace(originChannel)
originChatID = strings.TrimSpace(originChatID)
if originChannel == "" {
originChannel = strings.TrimSpace(defaultChannel)
}
if originChatID == "" {
originChatID = strings.TrimSpace(defaultChatID)
}
return originChannel, originChatID
}