diff --git a/docs/master-subagent-development.md b/docs/master-subagent-development.md deleted file mode 100644 index b289abb..0000000 --- a/docs/master-subagent-development.md +++ /dev/null @@ -1,291 +0,0 @@ -# 主 Agent + 子 Agent 架构开发文档 - -## 1. 目标与范围 - -本文档定义在 ClawGo 中实现“单一主 Agent 对话入口 + 可动态创建子 Agent 执行任务”的开发方案。 - -目标: - -- 由主 Agent 统一接收用户对话并调度子 Agent。 -- 子 Agent 可按职责分工(如 coding、docs、testing)。 -- 子 Agent 具备各自独立记忆与会话上下文。 -- 所有 Agent 共享同一工作空间(文件系统)。 -- 支持主 Agent 在一次对话中创建、调用、监控子 Agent 并汇总结果。 - -非目标: - -- 不在本阶段引入独立进程/独立容器隔离。 -- 不在本阶段实现跨机器分布式调度。 - ---- - -## 2. 现状(基于当前代码) - -已具备能力: - -- `spawn` 工具可创建后台子任务。 -- `subagents` 工具可 list/info/kill/steer/resume。 -- `Orchestrator` + `pipeline_*` 具备任务依赖编排模型。 - -关键差距: - -- 目前子任务偏“临时任务实例”,缺少“长期角色子 Agent”概念。 -- `spawn.role` 参数未完整进入任务状态与调度决策。 -- 子任务会话键未与 agent/task 强绑定,存在上下文串线风险。 -- `pipeline_*` 工具已实现但尚未在主循环注册使用。 -- 记忆目前以工作区全局为主,缺少“子 Agent 命名空间隔离”。 - ---- - -## 3. 总体架构 - -### 3.1 角色分层 - -- 主 Agent(Orchestrator Agent) - - 职责:需求理解、任务拆分、分派、进度追踪、结果汇总、用户回复。 - - 不直接承载全部执行细节。 -- 子 Agent(Worker Agent) - - 职责:按角色执行具体任务(代码实现、文档编写、测试验证等)。 - - 具备独立的会话与记忆上下文。 - -### 3.2 共享与隔离策略 - -- 共享:workspace 文件系统、工具注册中心、pipeline shared_state。 -- 隔离:子 Agent 的 session history、长期记忆、每日记忆。 - -### 3.3 运行模型 - -- 主 Agent 发起任务后,创建/复用子 Agent profile。 -- 通过 `pipeline_create` 建立任务 DAG。 -- 通过 `pipeline_dispatch` 调度 dependency-ready 任务到子 Agent。 -- 子 Agent 完成后回填结果,主 Agent 汇总并回复用户。 - ---- - -## 4. 数据模型设计 - -## 4.1 SubagentProfile(新增,持久化) - -建议存储位置:`workspace/agents/profiles/.json` - -字段: - -- `agent_id`: 全局唯一 ID(如 `coder`, `writer`, `tester`)。 -- `name`: 展示名。 -- `role`: 角色(coding/docs/testing/research...)。 -- `system_prompt`: 角色系统提示词模板。 -- `tool_allowlist`: 可调用工具白名单。 -- `memory_namespace`: 记忆命名空间,默认同 `agent_id`。 -- `status`: `active|disabled`。 -- `created_at` / `updated_at`。 - -## 4.2 SubagentTask(扩展现有) - -文件:`pkg/tools/subagent.go` - -已有字段基础上强化: - -- `Role`:落库并展示。 -- `AgentID`:绑定具体子 Agent profile。 -- `SessionKey`:固定记录实际执行 session key。 -- `MemoryNamespace`:记录使用的记忆命名空间。 - -## 4.3 记忆目录结构(新增约定) - -- 主 Agent(保持现状): - - `workspace/MEMORY.md` - - `workspace/memory/YYYY-MM-DD.md` -- 子 Agent ``: - - `workspace/agents//MEMORY.md` - - `workspace/agents//memory/YYYY-MM-DD.md` - ---- - -## 5. 工具与接口设计 - -## 5.1 现有工具接入调整 - -- `spawn` - - 新增/强化参数:`agent_id`、`role`。 - - 优先级:`agent_id` > `role`(role 可映射默认 profile)。 -- `subagents` - - `list/info` 输出包含:`agent_id`、`role`、`session_key`、`memory_namespace`。 - -## 5.2 新增工具建议 - -- `subagent_profile` - - action: `create|get|list|update|disable|enable|delete` - - 用途:由主 Agent 在对话中动态创建/管理子 Agent。 -- `subagent_dispatch`(可选) - - 对单个 profile 下发任务,内部调用 spawn 并处理 profile 绑定。 - -## 5.3 Pipeline 工具注册 - -需在 `pkg/agent/loop.go` 中注册: - -- `pipeline_create` -- `pipeline_status` -- `pipeline_state_set` -- `pipeline_dispatch` - -使主 Agent 能直接调用现有编排能力。 - ---- - -## 6. 执行流程(主对话驱动) - -1. 用户下达复合目标(如“先实现功能,再生成文档”)。 -2. 主 Agent 判断是否已有对应 profile: - - 无则 `subagent_profile.create`。 - - 有则复用。 -3. 主 Agent 生成 pipeline 任务图(含依赖)。 -4. 主 Agent 调用 `pipeline_dispatch` 派发可运行任务。 -5. 子 Agent 执行并回填结果到任务状态/共享状态。 -6. 主 Agent 轮询 `pipeline_status`,直到完成或失败。 -7. 主 Agent 汇总结果,面向用户输出最终响应。 - ---- - -## 7. 会话与记忆隔离实现要点 - -## 7.1 Session Key 规范 - -禁止使用进程级固定 key。 - -建议格式: - -- `subagent::` - -效果: - -- 同一子 Agent 不同任务可隔离上下文。 -- 可追溯某次任务完整对话与工具链。 - -## 7.2 ContextBuilder 多命名空间支持 - -为 `MemoryStore` 增加 namespace 参数,按 namespace 读取不同路径: - -- namespace=`main` -> 当前全局路径(兼容)。 -- namespace=`` -> `workspace/agents//...` - -在子 Agent 执行入口按任务绑定 namespace 构建上下文。 - -## 7.3 Memory Tool 命名空间透传 - -`memory_search` / `memory_get` / `memory_write` 增加可选参数: - -- `namespace`(默认 `main`) - -子 Agent 默认注入自身 namespace,避免写入全局记忆。 - ---- - -## 8. 调度与并发控制 - -- 同会话调度仍受 `SessionScheduler` 约束。 -- pipeline 维度使用依赖图保证顺序正确。 -- 对写冲突资源建议引入 `resource_keys`(文件路径、模块名)串行化。 -- 子 Agent 失败后可支持: - - `resume` - - 自动重试(指数退避) - - 主 Agent 降级接管 - ---- - -## 9. 安全与治理 - -- 子 Agent 继承 `AGENTS.md` 安全策略。 -- 外部/破坏性动作必须经主 Agent 显式确认。 -- 子 Agent 工具权限建议最小化(allowlist)。 -- 审计日志需记录: - - 谁创建了哪个子 Agent - - 子 Agent 执行了什么任务 - - 任务结果和错误 - ---- - -## 10. 实施顺序(不含工期) - -### 阶段 A:打通主调度闭环 - -- 修正子任务 session key 规范化。 -- 将 `Role`、`AgentID` 写入任务结构与 `subagents` 输出。 -- 在主循环注册 `pipeline_*` 工具并验证端到端可调度。 - -### 阶段 B:子 Agent 记忆隔离 - -- 实现 `SubagentProfile` 持久化与管理工具。 -- `MemoryStore` 增加 namespace。 -- memory 系列工具支持 namespace。 - -### 阶段 C:稳定性与治理增强 - -- 增加资源冲突控制(resource keys)。 -- 增加失败重试、超时、配额与审计维度。 -- 增加 WebUI 可视化(profile/task/pipeline/memory namespace)。 - ---- - -## 13. 当前实现状态(代码已落地) - -### 13.1 已完成 - -- 主循环已注册: - - `subagent_profile` - - `pipeline_create` - - `pipeline_status` - - `pipeline_state_set` - - `pipeline_dispatch` -- `spawn` 已支持: - - `agent_id` - - `role` - - `agent_id > role` 解析优先级 - - 当仅提供 `role` 时,按 role 自动映射已存在 profile(最近更新优先) -- 子任务会话隔离: - - `session_key` 使用 `subagent::` -- 子任务记忆隔离: - - `memory namespace` 已接入 `ContextBuilder` 与 `memory_search/get/write` - - 子 Agent 运行时会自动注入其 namespace -- 子任务治理信息增强: - - `subagents list/info/log` 可显示 `agent_id/role/session_key/memory namespace/tool allowlist` -- 安全治理: - - `tool_allowlist` 已在执行期强制拦截 - - `parallel` 工具的内部子调用也会被白名单校验 - - `tool_allowlist` 已支持分组策略(`group:` / `@` / 组名直写) - - disabled profile 会阻止 `spawn` -- 稳定性治理: - - 子 Agent 已支持 profile 级重试/退避/超时/配额控制(`max_retries` / `retry_backoff_ms` / `timeout_sec` / `max_task_chars` / `max_result_chars`) - - 子任务元数据与 system 回填中包含重试/超时信息,便于审计追踪 -- WebUI 可视化: - - 已提供 subagent profile 管理页 - - 已提供 subagent runtime 列表/详情/控制页(spawn/kill/resume/steer) - - 已提供 pipeline 列表/详情/dispatch/创建入口 - -### 13.2 待继续增强 - -- (当前版本)无阻塞项;可继续按需增强: - - allowlist 分组支持自定义组配置(当前为内置组)。 - - pipeline / subagent 运行态持久化与历史回放(当前为进程内实时视图)。 - ---- - -## 11. 验收标准 - -- 用户可通过自然语言要求主 Agent 创建指定角色子 Agent。 -- 主 Agent 可在一次对话内完成“创建 -> 派发 -> 监控 -> 汇总”。 -- 子 Agent 记忆互不污染,主 Agent 与子 Agent 记忆边界清晰。 -- 同一 workspace 下可稳定并发执行多子任务,无明显上下文串线。 -- 任务失败可追踪、可恢复,且审计信息完整。 - ---- - -## 12. 与现有代码的对应关系 - -- 子任务管理:`pkg/tools/subagent.go` -- 子任务工具:`pkg/tools/subagents_tool.go` -- 子任务创建:`pkg/tools/spawn.go` -- 流水线编排:`pkg/tools/orchestrator.go` -- 流水线工具:`pkg/tools/pipeline_tools.go` -- 主循环工具注册:`pkg/agent/loop.go` -- 上下文与记忆:`pkg/agent/context.go`, `pkg/agent/memory.go` -- memory 工具:`pkg/tools/memory*.go` diff --git a/pkg/agent/loop_tool_context_test.go b/pkg/agent/loop_tool_context_test.go deleted file mode 100644 index 851f516..0000000 --- a/pkg/agent/loop_tool_context_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package agent - -import "testing" - -func TestWithToolContextArgsPipelineAutoInject(t *testing.T) { - args := map[string]interface{}{ - "objective": "build feature", - } - got := withToolContextArgs("pipeline_create", args, "telegram", "123") - if got["channel"] != "telegram" { - t.Fatalf("expected channel auto-injected for pipeline_create, got %v", got["channel"]) - } - if got["chat_id"] != "123" { - t.Fatalf("expected chat_id auto-injected for pipeline_create, got %v", got["chat_id"]) - } -} - -func TestWithToolContextArgsPipelineRespectsExplicitTarget(t *testing.T) { - args := map[string]interface{}{ - "pipeline_id": "p-1", - "channel": "webui", - "chat_id": "panel", - } - got := withToolContextArgs("pipeline_dispatch", args, "telegram", "123") - if got["channel"] != "webui" { - t.Fatalf("expected explicit channel preserved, got %v", got["channel"]) - } - if got["chat_id"] != "panel" { - t.Fatalf("expected explicit chat_id preserved, got %v", got["chat_id"]) - } -} - -func TestWithToolContextArgsIgnoresUnrelatedTools(t *testing.T) { - args := map[string]interface{}{ - "path": "README.md", - } - got := withToolContextArgs("read_file", args, "telegram", "123") - if _, ok := got["channel"]; ok { - t.Fatalf("did not expect channel for unrelated tool") - } - if _, ok := got["chat_id"]; ok { - t.Fatalf("did not expect chat_id for unrelated tool") - } -} diff --git a/pkg/tools/orchestrator.go b/pkg/tools/orchestrator.go deleted file mode 100644 index de9c032..0000000 --- a/pkg/tools/orchestrator.go +++ /dev/null @@ -1,340 +0,0 @@ -package tools - -import ( - "encoding/json" - "fmt" - "sort" - "strings" - "sync" - "time" -) - -type PipelineStatus string - -const ( - PipelinePending PipelineStatus = "pending" - PipelineRunning PipelineStatus = "running" - PipelineCompleted PipelineStatus = "completed" - PipelineFailed PipelineStatus = "failed" -) - -type TaskStatus string - -const ( - TaskPending TaskStatus = "pending" - TaskRunning TaskStatus = "running" - TaskCompleted TaskStatus = "completed" - TaskFailed TaskStatus = "failed" -) - -type PipelineTask struct { - ID string `json:"id"` - Role string `json:"role"` - Goal string `json:"goal"` - DependsOn []string `json:"depends_on,omitempty"` - Status TaskStatus `json:"status"` - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` -} - -type Pipeline struct { - ID string `json:"id"` - Label string `json:"label"` - Objective string `json:"objective"` - Status PipelineStatus `json:"status"` - OriginChannel string `json:"origin_channel"` - OriginChatID string `json:"origin_chat_id"` - SharedState map[string]interface{} `json:"shared_state"` - Tasks map[string]*PipelineTask - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` -} - -type PipelineSpec struct { - ID string `json:"id"` - Role string `json:"role"` - Goal string `json:"goal"` - DependsOn []string `json:"depends_on,omitempty"` -} - -type Orchestrator struct { - mu sync.RWMutex - pipelines map[string]*Pipeline - nextID int -} - -func NewOrchestrator() *Orchestrator { - return &Orchestrator{ - pipelines: make(map[string]*Pipeline), - nextID: 1, - } -} - -func (o *Orchestrator) CreatePipeline(label, objective, originChannel, originChatID string, tasks []PipelineSpec) (*Pipeline, error) { - o.mu.Lock() - defer o.mu.Unlock() - - if strings.TrimSpace(objective) == "" { - return nil, fmt.Errorf("objective is required") - } - if len(tasks) == 0 { - return nil, fmt.Errorf("at least one task is required") - } - - id := fmt.Sprintf("pipe-%d", o.nextID) - o.nextID++ - - now := time.Now().UnixMilli() - p := &Pipeline{ - ID: id, - Label: strings.TrimSpace(label), - Objective: strings.TrimSpace(objective), - Status: PipelinePending, - OriginChannel: originChannel, - OriginChatID: originChatID, - SharedState: make(map[string]interface{}), - Tasks: make(map[string]*PipelineTask, len(tasks)), - CreatedAt: now, - UpdatedAt: now, - } - - for _, task := range tasks { - taskID := strings.TrimSpace(task.ID) - if taskID == "" { - return nil, fmt.Errorf("task id is required") - } - if _, exists := p.Tasks[taskID]; exists { - return nil, fmt.Errorf("duplicate task id: %s", taskID) - } - - p.Tasks[taskID] = &PipelineTask{ - ID: taskID, - Role: strings.TrimSpace(task.Role), - Goal: strings.TrimSpace(task.Goal), - DependsOn: normalizeDepends(task.DependsOn), - Status: TaskPending, - CreatedAt: now, - UpdatedAt: now, - } - } - - for taskID, task := range p.Tasks { - for _, dep := range task.DependsOn { - if dep == taskID { - return nil, fmt.Errorf("task %s cannot depend on itself", taskID) - } - if _, exists := p.Tasks[dep]; !exists { - return nil, fmt.Errorf("task %s depends on missing task %s", taskID, dep) - } - } - } - - o.pipelines[p.ID] = p - return clonePipeline(p), nil -} - -func (o *Orchestrator) MarkTaskRunning(pipelineID, taskID string) error { - o.mu.Lock() - defer o.mu.Unlock() - - p, t, err := o.getTaskLocked(pipelineID, taskID) - if err != nil { - return err - } - if t.Status == TaskCompleted || t.Status == TaskFailed { - return nil - } - - t.Status = TaskRunning - t.UpdatedAt = time.Now().UnixMilli() - p.Status = PipelineRunning - p.UpdatedAt = t.UpdatedAt - return nil -} - -func (o *Orchestrator) MarkTaskDone(pipelineID, taskID, result string, runErr error) error { - o.mu.Lock() - defer o.mu.Unlock() - - p, t, err := o.getTaskLocked(pipelineID, taskID) - if err != nil { - return err - } - - now := time.Now().UnixMilli() - t.UpdatedAt = now - t.Result = strings.TrimSpace(result) - if runErr != nil { - t.Status = TaskFailed - t.Error = runErr.Error() - p.Status = PipelineFailed - } else { - t.Status = TaskCompleted - t.Error = "" - p.Status = o.computePipelineStatusLocked(p) - } - p.UpdatedAt = now - return nil -} - -func (o *Orchestrator) SetSharedState(pipelineID, key string, value interface{}) error { - o.mu.Lock() - defer o.mu.Unlock() - - p, ok := o.pipelines[pipelineID] - if !ok { - return fmt.Errorf("pipeline not found: %s", pipelineID) - } - k := strings.TrimSpace(key) - if k == "" { - return fmt.Errorf("state key is required") - } - p.SharedState[k] = value - p.UpdatedAt = time.Now().UnixMilli() - return nil -} - -func (o *Orchestrator) GetPipeline(pipelineID string) (*Pipeline, bool) { - o.mu.RLock() - defer o.mu.RUnlock() - - p, ok := o.pipelines[pipelineID] - if !ok { - return nil, false - } - return clonePipeline(p), true -} - -func (o *Orchestrator) ListPipelines() []*Pipeline { - o.mu.RLock() - defer o.mu.RUnlock() - - items := make([]*Pipeline, 0, len(o.pipelines)) - for _, p := range o.pipelines { - items = append(items, clonePipeline(p)) - } - - sort.Slice(items, func(i, j int) bool { - return items[i].CreatedAt > items[j].CreatedAt - }) - return items -} - -func (o *Orchestrator) ReadyTasks(pipelineID string) ([]*PipelineTask, error) { - o.mu.RLock() - defer o.mu.RUnlock() - - p, ok := o.pipelines[pipelineID] - if !ok { - return nil, fmt.Errorf("pipeline not found: %s", pipelineID) - } - - var ready []*PipelineTask - for _, t := range p.Tasks { - if t.Status != TaskPending { - continue - } - if depsDone(p, t.DependsOn) { - ready = append(ready, cloneTask(t)) - } - } - sort.Slice(ready, func(i, j int) bool { return ready[i].ID < ready[j].ID }) - return ready, nil -} - -func (o *Orchestrator) SnapshotJSON(pipelineID string) (string, error) { - p, ok := o.GetPipeline(pipelineID) - if !ok { - return "", fmt.Errorf("pipeline not found: %s", pipelineID) - } - data, err := json.MarshalIndent(p, "", " ") - if err != nil { - return "", err - } - return string(data), nil -} - -func (o *Orchestrator) getTaskLocked(pipelineID, taskID string) (*Pipeline, *PipelineTask, error) { - p, ok := o.pipelines[pipelineID] - if !ok { - return nil, nil, fmt.Errorf("pipeline not found: %s", pipelineID) - } - t, ok := p.Tasks[taskID] - if !ok { - return nil, nil, fmt.Errorf("task %s not found in pipeline %s", taskID, pipelineID) - } - return p, t, nil -} - -func (o *Orchestrator) computePipelineStatusLocked(p *Pipeline) PipelineStatus { - allDone := true - for _, t := range p.Tasks { - if t.Status == TaskFailed { - return PipelineFailed - } - if t.Status != TaskCompleted { - allDone = false - } - } - if allDone { - return PipelineCompleted - } - return PipelineRunning -} - -func depsDone(p *Pipeline, dependsOn []string) bool { - for _, dep := range dependsOn { - t, ok := p.Tasks[dep] - if !ok || t.Status != TaskCompleted { - return false - } - } - return true -} - -func normalizeDepends(in []string) []string { - uniq := make(map[string]struct{}) - out := make([]string, 0, len(in)) - for _, item := range in { - v := strings.TrimSpace(item) - if v == "" { - continue - } - if _, ok := uniq[v]; ok { - continue - } - uniq[v] = struct{}{} - out = append(out, v) - } - sort.Strings(out) - return out -} - -func cloneTask(in *PipelineTask) *PipelineTask { - if in == nil { - return nil - } - deps := make([]string, len(in.DependsOn)) - copy(deps, in.DependsOn) - out := *in - out.DependsOn = deps - return &out -} - -func clonePipeline(in *Pipeline) *Pipeline { - if in == nil { - return nil - } - out := *in - out.SharedState = make(map[string]interface{}, len(in.SharedState)) - for k, v := range in.SharedState { - out.SharedState[k] = v - } - out.Tasks = make(map[string]*PipelineTask, len(in.Tasks)) - for id, t := range in.Tasks { - out.Tasks[id] = cloneTask(t) - } - return &out -} diff --git a/pkg/tools/pipeline_tools.go b/pkg/tools/pipeline_tools.go deleted file mode 100644 index 5b3e327..0000000 --- a/pkg/tools/pipeline_tools.go +++ /dev/null @@ -1,354 +0,0 @@ -package tools - -import ( - "context" - "encoding/json" - "fmt" - "strings" -) - -type PipelineCreateTool struct { - orc *Orchestrator -} - -func NewPipelineCreateTool(orc *Orchestrator) *PipelineCreateTool { - return &PipelineCreateTool{orc: orc} -} - -func (t *PipelineCreateTool) Name() string { return "pipeline_create" } - -func (t *PipelineCreateTool) Description() string { - return "Create a multi-agent pipeline with standardized task protocol (role/goal/dependencies/shared state)." -} - -func (t *PipelineCreateTool) Parameters() map[string]interface{} { - return map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "label": map[string]interface{}{ - "type": "string", - "description": "Optional short pipeline label", - }, - "objective": map[string]interface{}{ - "type": "string", - "description": "Top-level objective for this pipeline", - }, - "tasks": map[string]interface{}{ - "type": "array", - "description": "Task list with id/role/goal/depends_on", - "items": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "id": map[string]interface{}{"type": "string"}, - "role": map[string]interface{}{ - "type": "string", - "description": "Agent role, e.g. research/coding/testing", - }, - "goal": map[string]interface{}{"type": "string"}, - "depends_on": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{"type": "string"}, - }, - }, - "required": []string{"id", "goal"}, - }, - }, - "channel": map[string]interface{}{ - "type": "string", - "description": "Optional origin channel for completion notifications (auto-injected in normal chat flow)", - }, - "chat_id": map[string]interface{}{ - "type": "string", - "description": "Optional origin chat ID for completion notifications (auto-injected in normal chat flow)", - }, - }, - "required": []string{"objective", "tasks"}, - } -} - -func (t *PipelineCreateTool) Execute(_ context.Context, args map[string]interface{}) (string, error) { - if t.orc == nil { - return "", fmt.Errorf("orchestrator is not configured") - } - - objective, _ := args["objective"].(string) - label, _ := args["label"].(string) - - rawTasks, ok := args["tasks"].([]interface{}) - if !ok || len(rawTasks) == 0 { - return "", fmt.Errorf("tasks is required") - } - - specs := make([]PipelineSpec, 0, len(rawTasks)) - for i, item := range rawTasks { - m, ok := item.(map[string]interface{}) - if !ok { - return "", fmt.Errorf("tasks[%d] must be object", i) - } - id, _ := m["id"].(string) - role, _ := m["role"].(string) - goal, _ := m["goal"].(string) - - deps := make([]string, 0) - if rawDeps, ok := m["depends_on"].([]interface{}); ok { - for _, dep := range rawDeps { - if depS, ok := dep.(string); ok { - deps = append(deps, depS) - } - } - } - specs = append(specs, PipelineSpec{ - ID: id, - Role: role, - Goal: goal, - DependsOn: deps, - }) - } - - originChannel, originChatID := resolvePipelineOrigin(args, "tool", "tool") - p, err := t.orc.CreatePipeline(label, objective, originChannel, originChatID, specs) - if err != nil { - return "", err - } - - return fmt.Sprintf("Pipeline created: %s (%d tasks)\nUse spawn with pipeline_id/task_id to run tasks.\nUse pipeline_dispatch to dispatch ready tasks.", - p.ID, len(p.Tasks)), nil -} - -type PipelineStatusTool struct { - orc *Orchestrator -} - -func NewPipelineStatusTool(orc *Orchestrator) *PipelineStatusTool { - return &PipelineStatusTool{orc: orc} -} - -func (t *PipelineStatusTool) Name() string { return "pipeline_status" } - -func (t *PipelineStatusTool) Description() string { - return "Get pipeline status, tasks progress, and shared state. If pipeline_id is empty, list recent pipelines." -} - -func (t *PipelineStatusTool) Parameters() map[string]interface{} { - return map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "pipeline_id": map[string]interface{}{ - "type": "string", - "description": "Pipeline ID", - }, - }, - } -} - -func (t *PipelineStatusTool) Execute(_ context.Context, args map[string]interface{}) (string, error) { - if t.orc == nil { - return "", fmt.Errorf("orchestrator is not configured") - } - pipelineID, _ := args["pipeline_id"].(string) - pipelineID = strings.TrimSpace(pipelineID) - - if pipelineID == "" { - items := t.orc.ListPipelines() - if len(items) == 0 { - return "No pipelines found.", nil - } - var sb strings.Builder - sb.WriteString("Pipelines:\n") - for _, p := range items { - sb.WriteString(fmt.Sprintf("- %s [%s] %s\n", p.ID, p.Status, p.Label)) - } - return sb.String(), nil - } - - return t.orc.SnapshotJSON(pipelineID) -} - -type PipelineStateSetTool struct { - orc *Orchestrator -} - -func NewPipelineStateSetTool(orc *Orchestrator) *PipelineStateSetTool { - return &PipelineStateSetTool{orc: orc} -} - -func (t *PipelineStateSetTool) Name() string { return "pipeline_state_set" } - -func (t *PipelineStateSetTool) Description() string { - return "Set shared state key/value for a pipeline, allowing sub-agents to share intermediate results." -} - -func (t *PipelineStateSetTool) Parameters() map[string]interface{} { - return map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "pipeline_id": map[string]interface{}{"type": "string"}, - "key": map[string]interface{}{"type": "string"}, - "value": map[string]interface{}{ - "description": "Any JSON-serializable value", - }, - }, - "required": []string{"pipeline_id", "key", "value"}, - } -} - -func (t *PipelineStateSetTool) Execute(_ context.Context, args map[string]interface{}) (string, error) { - if t.orc == nil { - return "", fmt.Errorf("orchestrator is not configured") - } - pipelineID, _ := args["pipeline_id"].(string) - key, _ := args["key"].(string) - value, ok := args["value"] - if !ok { - return "", fmt.Errorf("value is required") - } - if err := t.orc.SetSharedState(strings.TrimSpace(pipelineID), strings.TrimSpace(key), value); err != nil { - return "", err - } - return fmt.Sprintf("Updated pipeline shared state: %s.%s", pipelineID, key), nil -} - -type PipelineDispatchTool struct { - orc *Orchestrator - spawn *SubagentManager -} - -func NewPipelineDispatchTool(orc *Orchestrator, spawn *SubagentManager) *PipelineDispatchTool { - return &PipelineDispatchTool{orc: orc, spawn: spawn} -} - -func (t *PipelineDispatchTool) Name() string { return "pipeline_dispatch" } - -func (t *PipelineDispatchTool) Description() string { - return "Dispatch all dependency-ready tasks in a pipeline by spawning subagents automatically." -} - -func (t *PipelineDispatchTool) Parameters() map[string]interface{} { - return map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "pipeline_id": map[string]interface{}{ - "type": "string", - "description": "Pipeline ID", - }, - "max_dispatch": map[string]interface{}{ - "type": "integer", - "description": "Maximum number of tasks to dispatch in this call (default 3)", - "default": 3, - }, - "channel": map[string]interface{}{ - "type": "string", - "description": "Optional origin channel override for spawned subagents", - }, - "chat_id": map[string]interface{}{ - "type": "string", - "description": "Optional origin chat ID override for spawned subagents", - }, - }, - "required": []string{"pipeline_id"}, - } -} - -func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) { - if t.orc == nil || t.spawn == nil { - return "", fmt.Errorf("pipeline dispatcher is not configured") - } - - pipelineID, _ := args["pipeline_id"].(string) - pipelineID = strings.TrimSpace(pipelineID) - if pipelineID == "" { - return "", fmt.Errorf("pipeline_id is required") - } - - maxDispatch := 3 - if raw, ok := args["max_dispatch"].(float64); ok && raw > 0 { - maxDispatch = int(raw) - } - originChannel, originChatID := resolvePipelineOrigin(args, "", "") - if p, ok := t.orc.GetPipeline(pipelineID); ok && p != nil { - if strings.TrimSpace(originChannel) == "" { - originChannel = strings.TrimSpace(p.OriginChannel) - } - if strings.TrimSpace(originChatID) == "" { - originChatID = strings.TrimSpace(p.OriginChatID) - } - } - if strings.TrimSpace(originChannel) == "" { - originChannel = "tool" - } - if strings.TrimSpace(originChatID) == "" { - originChatID = "tool" - } - - ready, err := t.orc.ReadyTasks(pipelineID) - if err != nil { - return "", err - } - if len(ready) == 0 { - return fmt.Sprintf("No ready tasks for pipeline %s", pipelineID), nil - } - - dispatched := 0 - var lines []string - - for _, task := range ready { - if dispatched >= maxDispatch { - break - } - shared := map[string]interface{}{} - if p, ok := t.orc.GetPipeline(pipelineID); ok { - for k, v := range p.SharedState { - shared[k] = v - } - } - - payload := task.Goal - if len(shared) > 0 { - sharedJSON, _ := json.Marshal(shared) - payload = fmt.Sprintf("%s\n\nShared State:\n%s", payload, string(sharedJSON)) - } - - label := task.ID - if task.Role != "" { - label = fmt.Sprintf("%s:%s", task.Role, task.ID) - } - agentID := strings.TrimSpace(task.Role) - if agentID == "" { - agentID = strings.TrimSpace(task.ID) - } - if _, err := t.spawn.Spawn(ctx, SubagentSpawnOptions{ - Task: payload, - Label: label, - Role: task.Role, - AgentID: agentID, - OriginChannel: originChannel, - OriginChatID: originChatID, - PipelineID: pipelineID, - PipelineTask: task.ID, - }); err != nil { - lines = append(lines, fmt.Sprintf("- %s failed: %v", task.ID, err)) - continue - } - dispatched++ - lines = append(lines, fmt.Sprintf("- %s dispatched", task.ID)) - } - - if len(lines) == 0 { - return "No tasks dispatched.", nil - } - return fmt.Sprintf("Pipeline %s dispatch result:\n%s", pipelineID, strings.Join(lines, "\n")), nil -} - -func resolvePipelineOrigin(args map[string]interface{}, defaultChannel, defaultChatID string) (string, string) { - originChannel, _ := args["channel"].(string) - originChatID, _ := args["chat_id"].(string) - originChannel = strings.TrimSpace(originChannel) - originChatID = strings.TrimSpace(originChatID) - if originChannel == "" { - originChannel = strings.TrimSpace(defaultChannel) - } - if originChatID == "" { - originChatID = strings.TrimSpace(defaultChatID) - } - return originChannel, originChatID -} diff --git a/webui/src/pages/Pipelines.tsx b/webui/src/pages/Pipelines.tsx deleted file mode 100644 index 43e22f0..0000000 --- a/webui/src/pages/Pipelines.tsx +++ /dev/null @@ -1,209 +0,0 @@ -import React, { useEffect, useMemo, useState } from 'react'; -import { useTranslation } from 'react-i18next'; -import { useAppContext } from '../context/AppContext'; -import { useUI } from '../context/UIContext'; - -type PipelineTask = { - id: string; - role?: string; - goal?: string; - status?: string; - depends_on?: string[]; - result?: string; - error?: string; -}; - -type Pipeline = { - id: string; - label?: string; - objective?: string; - status?: string; - tasks?: Record; -}; - -const Pipelines: React.FC = () => { - const { t } = useTranslation(); - const { q } = useAppContext(); - const ui = useUI(); - - const [items, setItems] = useState([]); - const [selectedID, setSelectedID] = useState(''); - const [detail, setDetail] = useState(null); - const [maxDispatch, setMaxDispatch] = useState(3); - const [createLabel, setCreateLabel] = useState(''); - const [createObjective, setCreateObjective] = useState(''); - const [tasksJSON, setTasksJSON] = useState('[\n {"id":"coding","role":"coding","goal":"Implement feature"},\n {"id":"docs","role":"docs","goal":"Write docs","depends_on":["coding"]}\n]'); - - const apiPath = '/webui/api/pipelines'; - const withAction = (action: string) => `${apiPath}${q}${q ? '&' : '?'}action=${encodeURIComponent(action)}`; - - const callAction = async (payload: Record) => { - const r = await fetch(`${apiPath}${q}`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(payload), - }); - if (!r.ok) { - await ui.notify({ title: t('requestFailed'), message: await r.text() }); - return null; - } - return r.json(); - }; - - const loadList = async () => { - const r = await fetch(withAction('list')); - if (!r.ok) throw new Error(await r.text()); - const j = await r.json(); - const arr = Array.isArray(j?.result?.items) ? j.result.items : []; - setItems(arr); - if (arr.length === 0) { - setSelectedID(''); - setDetail(null); - return; - } - const id = selectedID && arr.find((x: Pipeline) => x.id === selectedID) ? selectedID : arr[0].id; - setSelectedID(id); - }; - - const loadDetail = async (pipelineID: string) => { - if (!pipelineID) { - setDetail(null); - return; - } - const u = `${withAction('get')}&pipeline_id=${encodeURIComponent(pipelineID)}`; - const r = await fetch(u); - if (!r.ok) throw new Error(await r.text()); - const j = await r.json(); - setDetail(j?.result?.pipeline || null); - }; - - useEffect(() => { - loadList().catch(() => {}); - }, [q]); - - useEffect(() => { - if (!selectedID) { - setDetail(null); - return; - } - loadDetail(selectedID).catch(() => {}); - }, [selectedID, q]); - - const sortedTasks = useMemo(() => { - const entries = Object.values(detail?.tasks || {}); - entries.sort((a, b) => (a.id || '').localeCompare(b.id || '')); - return entries; - }, [detail]); - - const dispatch = async () => { - if (!detail?.id) return; - const data = await callAction({ action: 'dispatch', pipeline_id: detail.id, max_dispatch: maxDispatch }); - if (!data) return; - await loadList(); - await loadDetail(detail.id); - }; - - const createPipeline = async () => { - if (!createObjective.trim()) { - await ui.notify({ title: t('requestFailed'), message: 'objective is required' }); - return; - } - let tasks: any[] = []; - try { - const parsed = JSON.parse(tasksJSON); - tasks = Array.isArray(parsed) ? parsed : []; - } catch { - await ui.notify({ title: t('requestFailed'), message: 'tasks JSON parse failed' }); - return; - } - const data = await callAction({ - action: 'create', - label: createLabel, - objective: createObjective, - tasks, - }); - if (!data) return; - setCreateLabel(''); - setCreateObjective(''); - await loadList(); - }; - - return ( -
-
-

{t('pipelines')}

- -
- -
-
-
{t('pipelines')}
-
- {items.map((it) => ( - - ))} - {items.length === 0 &&
No pipelines.
} -
-
- -
-
-
{t('pipelineDetail')}
- {!detail &&
{t('selectTask')}
} - {detail && ( - <> -
-
ID: {detail.id}
-
Status: {detail.status}
-
Label: {detail.label || '-'}
-
-
Objective
-
{detail.objective || '-'}
- -
- setMaxDispatch(Math.max(1, Number(e.target.value) || 1))} - className="w-24 px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded" - /> - -
- -
Tasks
-
- {sortedTasks.map((task) => ( -
-
{task.id} · {task.status}
-
{task.role || '-'} · deps: {(task.depends_on || []).join(', ') || '-'}
-
{task.goal || '-'}
- {task.error &&
{task.error}
} -
- ))} -
- - )} -
- -
-
{t('createPipeline')}
- setCreateLabel(e.target.value)} placeholder="label" className="w-full px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded" /> - setCreateObjective(e.target.value)} placeholder="objective" className="w-full px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded" /> -