From 1218d68b7ec171aaa42482783a6e823698a6caf6 Mon Sep 17 00:00:00 2001 From: lpf Date: Sat, 7 Mar 2026 11:52:36 +0800 Subject: [PATCH] Add internal subagent stream and notify policy --- pkg/agent/runtime_admin.go | 74 ++++++++ pkg/agent/runtime_admin_test.go | 73 ++++++++ pkg/config/config.go | 1 + pkg/config/validate.go | 7 + pkg/config/validate_test.go | 18 ++ pkg/tools/subagent.go | 199 +++++++++++++++++---- pkg/tools/subagent_config_manager.go | 4 + pkg/tools/subagent_config_tool_test.go | 4 + pkg/tools/subagent_profile.go | 8 + pkg/tools/subagent_runtime_control_test.go | 151 +++++++++++++++- webui/src/pages/SubagentProfiles.tsx | 19 ++ webui/src/pages/Subagents.tsx | 124 ++++++++++++- 12 files changed, 641 insertions(+), 41 deletions(-) diff --git a/pkg/agent/runtime_admin.go b/pkg/agent/runtime_admin.go index 5c7eb33..cbc71e8 100644 --- a/pkg/agent/runtime_admin.go +++ b/pkg/agent/runtime_admin.go @@ -131,6 +131,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a "transport": fallbackString(strings.TrimSpace(subcfg.Transport), "local"), "node_id": strings.TrimSpace(subcfg.NodeID), "parent_agent_id": strings.TrimSpace(subcfg.ParentAgentID), + "notify_main_policy": fallbackString(strings.TrimSpace(subcfg.NotifyMainPolicy), "final_only"), "display_name": subcfg.DisplayName, "role": subcfg.Role, "description": subcfg.Description, @@ -157,6 +158,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a "transport": profile.Transport, "node_id": profile.NodeID, "parent_agent_id": profile.ParentAgentID, + "notify_main_policy": fallbackString(strings.TrimSpace(profile.NotifyMainPolicy), "final_only"), "display_name": profile.Name, "role": profile.Role, "description": "Node-registered remote main agent branch", @@ -360,6 +362,37 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a return nil, err } return map[string]interface{}{"found": true, "thread": thread, "messages": items}, nil + case "stream": + taskID, err := resolveSubagentTaskIDForRuntime(sm, runtimeStringArg(args, "id")) + if err != nil { + return nil, err + } + task, ok := sm.GetTask(taskID) + if !ok { + return map[string]interface{}{"found": false}, nil + } + events, err := sm.Events(taskID, runtimeIntArg(args, "limit", 100)) + if err != nil { + return nil, err + } + var thread *tools.AgentThread + var messages []tools.AgentMessage + if strings.TrimSpace(task.ThreadID) != "" { + if th, ok := sm.Thread(task.ThreadID); ok { + thread = th + } + messages, err = sm.ThreadMessages(task.ThreadID, runtimeIntArg(args, "limit", 100)) + if err != nil { + return nil, err + } + } + stream := mergeSubagentStream(events, messages) + return map[string]interface{}{ + "found": true, + "task": cloneSubagentTask(task), + "thread": thread, + "items": stream, + }, nil case "inbox": agentID := runtimeStringArg(args, "agent_id") if agentID == "" { @@ -386,6 +419,47 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a } } +func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.AgentMessage) []map[string]interface{} { + items := make([]map[string]interface{}, 0, len(events)+len(messages)) + for _, evt := range events { + items = append(items, map[string]interface{}{ + "kind": "event", + "at": evt.At, + "run_id": evt.RunID, + "agent_id": evt.AgentID, + "event_type": evt.Type, + "status": evt.Status, + "message": evt.Message, + "retry_count": evt.RetryCount, + }) + } + for _, msg := range messages { + items = append(items, map[string]interface{}{ + "kind": "message", + "at": msg.CreatedAt, + "message_id": msg.MessageID, + "thread_id": msg.ThreadID, + "from_agent": msg.FromAgent, + "to_agent": msg.ToAgent, + "reply_to": msg.ReplyTo, + "correlation_id": msg.CorrelationID, + "message_type": msg.Type, + "content": msg.Content, + "status": msg.Status, + "requires_reply": msg.RequiresReply, + }) + } + sort.Slice(items, func(i, j int) bool { + left, _ := items[i]["at"].(int64) + right, _ := items[j]["at"].(int64) + if left != right { + return left < right + } + return fmt.Sprintf("%v", items[i]["kind"]) < fmt.Sprintf("%v", items[j]["kind"]) + }) + return items +} + func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask { if in == nil { return nil diff --git a/pkg/agent/runtime_admin_test.go b/pkg/agent/runtime_admin_test.go index c59893b..7a486a8 100644 --- a/pkg/agent/runtime_admin_test.go +++ b/pkg/agent/runtime_admin_test.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "testing" + "time" "clawgo/pkg/config" "clawgo/pkg/runtimecfg" @@ -75,6 +76,7 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) { out, err := loop.HandleSubagentRuntime(context.Background(), "upsert_config_subagent", map[string]interface{}{ "agent_id": "reviewer", "role": "testing", + "notify_main_policy": "internal_only", "display_name": "Review Agent", "system_prompt": "review changes", "system_prompt_file": "agents/reviewer/AGENT.md", @@ -99,6 +101,9 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) { if subcfg.SystemPromptFile != "agents/reviewer/AGENT.md" { t.Fatalf("expected system_prompt_file to persist, got %+v", subcfg) } + if subcfg.NotifyMainPolicy != "internal_only" { + t.Fatalf("expected notify_main_policy to persist, got %+v", subcfg) + } if len(reloaded.Agents.Router.Rules) == 0 { t.Fatalf("expected router rules to be persisted") } @@ -316,3 +321,71 @@ func TestHandleSubagentRuntimeProtectsMainAgent(t *testing.T) { t.Fatalf("expected deleting main agent to fail") } } + +func TestHandleSubagentRuntimeStream(t *testing.T) { + workspace := t.TempDir() + manager := tools.NewSubagentManager(nil, workspace, nil) + manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) { + return "stream-result", nil + }) + loop := &AgentLoop{ + workspace: workspace, + subagentManager: manager, + subagentRouter: tools.NewSubagentRouter(manager), + } + + out, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{ + "task": "prepare streamable task", + "agent_id": "coder", + "channel": "webui", + "chat_id": "webui", + }) + if err != nil { + t.Fatalf("spawn failed: %v", err) + } + payload, ok := out.(map[string]interface{}) + if !ok { + t.Fatalf("unexpected spawn payload: %T", out) + } + _ = payload + var task *tools.SubagentTask + for i := 0; i < 50; i++ { + tasks := manager.ListTasks() + if len(tasks) > 0 && tasks[0].Status == "completed" { + task = tasks[0] + break + } + time.Sleep(10 * time.Millisecond) + } + if task == nil { + t.Fatalf("expected completed task") + } + + out, err = loop.HandleSubagentRuntime(context.Background(), "stream", map[string]interface{}{ + "id": task.ID, + }) + if err != nil { + t.Fatalf("stream failed: %v", err) + } + streamPayload, ok := out.(map[string]interface{}) + if !ok || streamPayload["found"] != true { + t.Fatalf("unexpected stream payload: %#v", out) + } + items, ok := streamPayload["items"].([]map[string]interface{}) + if !ok || len(items) == 0 { + t.Fatalf("expected merged stream items, got %#v", streamPayload["items"]) + } + foundEvent := false + foundMessage := false + for _, item := range items { + switch item["kind"] { + case "event": + foundEvent = true + case "message": + foundMessage = true + } + } + if !foundEvent || !foundMessage { + t.Fatalf("expected merged event and message items, got %#v", items) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 67d7884..a608317 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -73,6 +73,7 @@ type SubagentConfig struct { Transport string `json:"transport,omitempty"` NodeID string `json:"node_id,omitempty"` ParentAgentID string `json:"parent_agent_id,omitempty"` + NotifyMainPolicy string `json:"notify_main_policy,omitempty"` DisplayName string `json:"display_name,omitempty"` Role string `json:"role,omitempty"` Description string `json:"description,omitempty"` diff --git a/pkg/config/validate.go b/pkg/config/validate.go index b3b31db..b00fba7 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -309,6 +309,13 @@ func validateSubagents(cfg *Config) []error { errs = append(errs, fmt.Errorf("agents.subagents.%s.transport must be one of: local, node", id)) } } + if policy := strings.TrimSpace(raw.NotifyMainPolicy); policy != "" { + switch policy { + case "final_only", "milestone", "on_blocked", "always", "internal_only": + default: + errs = append(errs, fmt.Errorf("agents.subagents.%s.notify_main_policy must be one of: final_only, milestone, on_blocked, always, internal_only", id)) + } + } if transport == "node" && strings.TrimSpace(raw.NodeID) == "" { errs = append(errs, fmt.Errorf("agents.subagents.%s.node_id is required when transport=node", id)) } diff --git a/pkg/config/validate_test.go b/pkg/config/validate_test.go index 910e7f7..5902a6b 100644 --- a/pkg/config/validate_test.go +++ b/pkg/config/validate_test.go @@ -110,3 +110,21 @@ func TestValidateNodeBackedSubagentAllowsMissingPromptFile(t *testing.T) { t.Fatalf("expected node-backed config to be valid, got %v", errs) } } + +func TestValidateSubagentsRejectsInvalidNotifyMainPolicy(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Agents.Subagents["coder"] = SubagentConfig{ + Enabled: true, + SystemPromptFile: "agents/coder/AGENT.md", + NotifyMainPolicy: "loud", + Runtime: SubagentRuntimeConfig{ + Proxy: "proxy", + }, + } + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +} diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index a4108f9..1021c2f 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -2,6 +2,7 @@ package tools import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -10,6 +11,7 @@ import ( "time" "clawgo/pkg/bus" + "clawgo/pkg/ekg" "clawgo/pkg/providers" ) @@ -22,6 +24,7 @@ type SubagentTask struct { Transport string `json:"transport,omitempty"` NodeID string `json:"node_id,omitempty"` ParentAgentID string `json:"parent_agent_id,omitempty"` + NotifyMainPolicy string `json:"notify_main_policy,omitempty"` SessionKey string `json:"session_key"` MemoryNS string `json:"memory_ns"` SystemPrompt string `json:"system_prompt,omitempty"` @@ -62,23 +65,25 @@ type SubagentManager struct { profileStore *SubagentProfileStore runStore *SubagentRunStore mailboxStore *AgentMailboxStore + ekg *ekg.Engine } type SubagentSpawnOptions struct { - Task string - Label string - Role string - AgentID string - MaxRetries int - RetryBackoff int - TimeoutSec int - MaxTaskChars int - MaxResultChars int - OriginChannel string - OriginChatID string - ThreadID string - CorrelationID string - ParentRunID string + Task string + Label string + Role string + AgentID string + NotifyMainPolicy string + MaxRetries int + RetryBackoff int + TimeoutSec int + MaxTaskChars int + MaxResultChars int + OriginChannel string + OriginChatID string + ThreadID string + CorrelationID string + ParentRunID string } func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager { @@ -96,6 +101,7 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b profileStore: store, runStore: runStore, mailboxStore: mailboxStore, + ekg: ekg.New(workspace), } if runStore != nil { for _, task := range runStore.List() { @@ -167,6 +173,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti transport := "local" nodeID := "" parentAgentID := "" + notifyMainPolicy := "final_only" toolAllowlist := []string(nil) maxRetries := 0 retryBackoff := 1000 @@ -199,6 +206,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti } nodeID = strings.TrimSpace(profile.NodeID) parentAgentID = strings.TrimSpace(profile.ParentAgentID) + notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy) systemPrompt = strings.TrimSpace(profile.SystemPrompt) systemPromptFile = strings.TrimSpace(profile.SystemPromptFile) toolAllowlist = append([]string(nil), profile.ToolAllowlist...) @@ -236,6 +244,9 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti } originChannel := strings.TrimSpace(opts.OriginChannel) originChatID := strings.TrimSpace(opts.OriginChatID) + if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" { + notifyMainPolicy = normalizeNotifyMainPolicy(raw) + } threadID := strings.TrimSpace(opts.ThreadID) correlationID := strings.TrimSpace(opts.CorrelationID) parentRunID := strings.TrimSpace(opts.ParentRunID) @@ -274,6 +285,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti Transport: transport, NodeID: nodeID, ParentAgentID: parentAgentID, + NotifyMainPolicy: notifyMainPolicy, SessionKey: sessionKey, MemoryNS: memoryNS, SystemPrompt: systemPrompt, @@ -368,20 +380,11 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) { } sm.mu.Unlock() - // 2. Result broadcast (keep existing behavior) - if sm.bus != nil { - prefix := "Task completed" - if runErr != nil { - prefix = "Task failed" - } - if task.Label != "" { - if runErr != nil { - prefix = fmt.Sprintf("Task '%s' failed", task.Label) - } else { - prefix = fmt.Sprintf("Task '%s' completed", task.Label) - } - } - announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result) + sm.recordEKG(task, runErr) + + // 2. Result broadcast + if sm.bus != nil && shouldNotifyMainOnFinal(task.NotifyMainPolicy, runErr, task) { + announceContent, notifyReason := buildSubagentMainNotification(task, runErr) sm.bus.PublishInbound(bus.InboundMessage{ Channel: "system", SenderID: fmt.Sprintf("subagent:%s", task.ID), @@ -389,20 +392,142 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) { SessionKey: task.SessionKey, Content: announceContent, Metadata: map[string]string{ - "trigger": "subagent", - "subagent_id": task.ID, - "agent_id": task.AgentID, - "role": task.Role, - "session_key": task.SessionKey, - "memory_ns": task.MemoryNS, - "retry_count": fmt.Sprintf("%d", task.RetryCount), - "timeout_sec": fmt.Sprintf("%d", task.TimeoutSec), - "status": task.Status, + "trigger": "subagent", + "subagent_id": task.ID, + "agent_id": task.AgentID, + "role": task.Role, + "session_key": task.SessionKey, + "memory_ns": task.MemoryNS, + "retry_count": fmt.Sprintf("%d", task.RetryCount), + "timeout_sec": fmt.Sprintf("%d", task.TimeoutSec), + "status": task.Status, + "notify_reason": notifyReason, }, }) } } +func (sm *SubagentManager) recordEKG(task *SubagentTask, runErr error) { + if sm == nil || sm.ekg == nil || task == nil { + return + } + status := "success" + logText := strings.TrimSpace(task.Result) + if runErr != nil { + status = "error" + if isBlockedSubagentError(runErr) { + logText = "blocked: " + strings.TrimSpace(task.Result) + } + } + sm.ekg.Record(ekg.Event{ + TaskID: task.ID, + Session: task.SessionKey, + Channel: task.OriginChannel, + Source: "subagent", + Status: status, + Log: logText, + }) +} + +func normalizeNotifyMainPolicy(v string) string { + switch strings.ToLower(strings.TrimSpace(v)) { + case "", "final_only": + return "final_only" + case "milestone", "on_blocked", "always", "internal_only": + return strings.ToLower(strings.TrimSpace(v)) + default: + return "final_only" + } +} + +func shouldNotifyMainOnFinal(policy string, runErr error, task *SubagentTask) bool { + switch normalizeNotifyMainPolicy(policy) { + case "internal_only": + return false + case "always", "final_only": + return true + case "on_blocked": + return isBlockedSubagentError(runErr) + case "milestone": + return false + default: + return true + } +} + +func buildSubagentMainNotification(task *SubagentTask, runErr error) (string, string) { + status := "completed" + reason := "final" + if runErr != nil { + status = "failed" + if isBlockedSubagentError(runErr) { + status = "blocked" + reason = "blocked" + } + } + return fmt.Sprintf( + "Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s", + strings.TrimSpace(task.AgentID), + strings.TrimSpace(task.ID), + status, + reason, + summarizeSubagentText(firstNonEmpty(task.Label, task.Task), 120), + summarizeSubagentText(task.Result, 280), + ), reason +} + +func isBlockedSubagentError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + msg := strings.ToLower(strings.TrimSpace(err.Error())) + if msg == "" { + return false + } + blockedHints := []string{ + "timeout", + "deadline exceeded", + "quota", + "rate limit", + "too many requests", + "permission denied", + "requires input", + "waiting for reply", + "blocked", + } + for _, hint := range blockedHints { + if strings.Contains(msg, hint) { + return true + } + } + return false +} + +func summarizeSubagentText(s string, max int) string { + s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n")) + s = strings.ReplaceAll(s, "\n", " ") + s = strings.Join(strings.Fields(s), " ") + if s == "" { + return "(empty)" + } + if max > 0 && len(s) > max { + return strings.TrimSpace(s[:max-3]) + "..." + } + return s +} + +func firstNonEmpty(values ...string) string { + for _, v := range values { + if strings.TrimSpace(v) != "" { + return strings.TrimSpace(v) + } + } + return "" +} + func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) (string, error) { maxRetries := normalizePositiveBound(task.MaxRetries, 0, 8) backoffMs := normalizePositiveBound(task.RetryBackoff, 500, 120000) diff --git a/pkg/tools/subagent_config_manager.go b/pkg/tools/subagent_config_manager.go index 2ac1521..d761934 100644 --- a/pkg/tools/subagent_config_manager.go +++ b/pkg/tools/subagent_config_manager.go @@ -27,6 +27,7 @@ func DraftConfigSubagent(description, agentIDHint string) map[string]interface{} "role": role, "display_name": displayName, "description": desc, + "notify_main_policy": "final_only", "system_prompt": systemPrompt, "system_prompt_file": "agents/" + agentID + "/AGENT.md", "memory_namespace": agentID, @@ -80,6 +81,9 @@ func UpsertConfigSubagent(configPath string, args map[string]interface{}) (map[s if v := stringArgFromMap(args, "display_name"); v != "" { subcfg.DisplayName = v } + if v := stringArgFromMap(args, "notify_main_policy"); v != "" { + subcfg.NotifyMainPolicy = v + } if v := stringArgFromMap(args, "description"); v != "" { subcfg.Description = v } diff --git a/pkg/tools/subagent_config_tool_test.go b/pkg/tools/subagent_config_tool_test.go index 8470887..651425f 100644 --- a/pkg/tools/subagent_config_tool_test.go +++ b/pkg/tools/subagent_config_tool_test.go @@ -32,6 +32,7 @@ func TestSubagentConfigToolUpsert(t *testing.T) { "action": "upsert", "agent_id": "reviewer", "role": "testing", + "notify_main_policy": "internal_only", "display_name": "Review Agent", "description": "负责回归与评审", "system_prompt": "review changes", @@ -56,6 +57,9 @@ func TestSubagentConfigToolUpsert(t *testing.T) { if reloaded.Agents.Subagents["reviewer"].DisplayName != "Review Agent" { t.Fatalf("expected config to persist reviewer, got %+v", reloaded.Agents.Subagents["reviewer"]) } + if reloaded.Agents.Subagents["reviewer"].NotifyMainPolicy != "internal_only" { + t.Fatalf("expected notify_main_policy to persist, got %+v", reloaded.Agents.Subagents["reviewer"]) + } if len(reloaded.Agents.Router.Rules) == 0 { t.Fatalf("expected router rules to persist") } diff --git a/pkg/tools/subagent_profile.go b/pkg/tools/subagent_profile.go index 17fd4fd..07d3ad3 100644 --- a/pkg/tools/subagent_profile.go +++ b/pkg/tools/subagent_profile.go @@ -22,6 +22,7 @@ type SubagentProfile struct { Transport string `json:"transport,omitempty"` NodeID string `json:"node_id,omitempty"` ParentAgentID string `json:"parent_agent_id,omitempty"` + NotifyMainPolicy string `json:"notify_main_policy,omitempty"` Role string `json:"role,omitempty"` SystemPrompt string `json:"system_prompt,omitempty"` SystemPromptFile string `json:"system_prompt_file,omitempty"` @@ -188,6 +189,7 @@ func normalizeSubagentProfile(in SubagentProfile) SubagentProfile { p.Transport = normalizeProfileTransport(p.Transport) p.NodeID = strings.TrimSpace(p.NodeID) p.ParentAgentID = normalizeSubagentIdentifier(p.ParentAgentID) + p.NotifyMainPolicy = normalizeNotifyMainPolicy(p.NotifyMainPolicy) p.Role = strings.TrimSpace(p.Role) p.SystemPrompt = strings.TrimSpace(p.SystemPrompt) p.SystemPromptFile = strings.TrimSpace(p.SystemPromptFile) @@ -404,6 +406,7 @@ func profileFromConfig(agentID string, subcfg config.SubagentConfig) SubagentPro Transport: strings.TrimSpace(subcfg.Transport), NodeID: strings.TrimSpace(subcfg.NodeID), ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID), + NotifyMainPolicy: strings.TrimSpace(subcfg.NotifyMainPolicy), Role: strings.TrimSpace(subcfg.Role), SystemPrompt: strings.TrimSpace(subcfg.SystemPrompt), SystemPromptFile: strings.TrimSpace(subcfg.SystemPromptFile), @@ -507,6 +510,7 @@ func (t *SubagentProfileTool) Parameters() map[string]interface{} { "description": "Unique subagent id, e.g. coder/writer/tester", }, "name": map[string]interface{}{"type": "string"}, + "notify_main_policy": map[string]interface{}{"type": "string", "description": "final_only|internal_only|milestone|on_blocked|always"}, "role": map[string]interface{}{"type": "string"}, "system_prompt": map[string]interface{}{"type": "string"}, "system_prompt_file": map[string]interface{}{"type": "string"}, @@ -577,6 +581,7 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter p := SubagentProfile{ AgentID: agentID, Name: stringArg(args, "name"), + NotifyMainPolicy: stringArg(args, "notify_main_policy"), Role: stringArg(args, "role"), SystemPrompt: stringArg(args, "system_prompt"), SystemPromptFile: stringArg(args, "system_prompt_file"), @@ -612,6 +617,9 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter if _, ok := args["role"]; ok { next.Role = stringArg(args, "role") } + if _, ok := args["notify_main_policy"]; ok { + next.NotifyMainPolicy = stringArg(args, "notify_main_policy") + } if _, ok := args["system_prompt"]; ok { next.SystemPrompt = stringArg(args, "system_prompt") } diff --git a/pkg/tools/subagent_runtime_control_test.go b/pkg/tools/subagent_runtime_control_test.go index 6090b39..4886ea3 100644 --- a/pkg/tools/subagent_runtime_control_test.go +++ b/pkg/tools/subagent_runtime_control_test.go @@ -145,8 +145,11 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) { if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" { t.Fatalf("expected metadata status=failed, got %q", got) } - if !strings.Contains(strings.ToLower(msg.Content), "failed") { - t.Fatalf("expected failure wording in content, got %q", msg.Content) + if !strings.Contains(strings.ToLower(msg.Content), "status: failed") { + t.Fatalf("expected structured failure status in content, got %q", msg.Content) + } + if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "final" { + t.Fatalf("expected notify_reason=final, got %q", got) } } @@ -205,6 +208,150 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) { time.Sleep(100 * time.Millisecond) } +func TestSubagentManagerInternalOnlySuppressesMainNotification(t *testing.T) { + workspace := t.TempDir() + msgBus := bus.NewMessageBus() + manager := NewSubagentManager(nil, workspace, msgBus) + manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { + return "silent-result", nil + }) + store := manager.ProfileStore() + if store == nil { + t.Fatalf("expected profile store") + } + if _, err := store.Upsert(SubagentProfile{ + AgentID: "coder", + Name: "Code Agent", + NotifyMainPolicy: "internal_only", + SystemPromptFile: "agents/coder/AGENT.md", + Status: "active", + }); err != nil { + t.Fatalf("profile upsert failed: %v", err) + } + + _, err := manager.Spawn(context.Background(), SubagentSpawnOptions{ + Task: "internal-only task", + AgentID: "coder", + OriginChannel: "cli", + OriginChatID: "direct", + }) + if err != nil { + t.Fatalf("spawn failed: %v", err) + } + task := waitSubagentDone(t, manager, 4*time.Second) + if task.Status != "completed" { + t.Fatalf("expected completed task, got %s", task.Status) + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + if msg, ok := msgBus.ConsumeInbound(ctx); ok { + t.Fatalf("did not expect main notification, got %+v", msg) + } +} + +func TestSubagentManagerOnBlockedNotifiesOnlyBlockedFailures(t *testing.T) { + workspace := t.TempDir() + msgBus := bus.NewMessageBus() + manager := NewSubagentManager(nil, workspace, msgBus) + manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { + switch task.Task { + case "blocked-task": + return "", errors.New("command tick timeout exceeded: 600s") + default: + return "done", nil + } + }) + store := manager.ProfileStore() + if store == nil { + t.Fatalf("expected profile store") + } + if _, err := store.Upsert(SubagentProfile{ + AgentID: "pm", + Name: "Product Manager", + NotifyMainPolicy: "on_blocked", + SystemPromptFile: "agents/pm/AGENT.md", + Status: "active", + }); err != nil { + t.Fatalf("profile upsert failed: %v", err) + } + + _, err := manager.Spawn(context.Background(), SubagentSpawnOptions{ + Task: "successful-task", + AgentID: "pm", + OriginChannel: "cli", + OriginChatID: "direct", + }) + if err != nil { + t.Fatalf("spawn success case failed: %v", err) + } + _ = waitSubagentDone(t, manager, 4*time.Second) + + ctxSilent, cancelSilent := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancelSilent() + if msg, ok := msgBus.ConsumeInbound(ctxSilent); ok { + t.Fatalf("did not expect success notification for on_blocked, got %+v", msg) + } + + _, err = manager.Spawn(context.Background(), SubagentSpawnOptions{ + Task: "blocked-task", + AgentID: "pm", + OriginChannel: "cli", + OriginChatID: "direct", + }) + if err != nil { + t.Fatalf("spawn blocked case failed: %v", err) + } + _ = waitSubagentDone(t, manager, 4*time.Second) + + ctxBlocked, cancelBlocked := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelBlocked() + msg, ok := msgBus.ConsumeInbound(ctxBlocked) + if !ok { + t.Fatalf("expected blocked notification") + } + if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "blocked" { + t.Fatalf("expected notify_reason=blocked, got %q", got) + } + if !strings.Contains(strings.ToLower(msg.Content), "blocked") { + t.Fatalf("expected blocked wording in content, got %q", msg.Content) + } +} + +func TestSubagentManagerRecordsFailuresToEKG(t *testing.T) { + workspace := t.TempDir() + manager := NewSubagentManager(nil, workspace, nil) + manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { + return "", errors.New("rate limit exceeded") + }) + + _, err := manager.Spawn(context.Background(), SubagentSpawnOptions{ + Task: "ekg failure", + AgentID: "coder", + OriginChannel: "cli", + OriginChatID: "direct", + }) + if err != nil { + t.Fatalf("spawn failed: %v", err) + } + _ = waitSubagentDone(t, manager, 4*time.Second) + + data, err := os.ReadFile(filepath.Join(workspace, "memory", "ekg-events.jsonl")) + if err != nil { + t.Fatalf("expected ekg events to be written: %v", err) + } + text := string(data) + if !strings.Contains(text, "\"source\":\"subagent\"") { + t.Fatalf("expected subagent source in ekg log, got %s", text) + } + if !strings.Contains(text, "\"status\":\"error\"") { + t.Fatalf("expected error status in ekg log, got %s", text) + } + if !strings.Contains(strings.ToLower(text), "rate limit exceeded") { + t.Fatalf("expected failure text in ekg log, got %s", text) + } +} + func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) { workspace := t.TempDir() block := make(chan struct{}) diff --git a/webui/src/pages/SubagentProfiles.tsx b/webui/src/pages/SubagentProfiles.tsx index 304be7a..2da7c87 100644 --- a/webui/src/pages/SubagentProfiles.tsx +++ b/webui/src/pages/SubagentProfiles.tsx @@ -6,6 +6,7 @@ import { useUI } from '../context/UIContext'; type SubagentProfile = { agent_id: string; name?: string; + notify_main_policy?: string; role?: string; system_prompt?: string; system_prompt_file?: string; @@ -31,6 +32,7 @@ type ToolAllowlistGroup = { const emptyDraft: SubagentProfile = { agent_id: '', name: '', + notify_main_policy: 'final_only', role: '', system_prompt: '', system_prompt_file: '', @@ -79,6 +81,7 @@ const SubagentProfiles: React.FC = () => { setDraft({ agent_id: next.agent_id || '', name: next.name || '', + notify_main_policy: next.notify_main_policy || 'final_only', role: next.role || '', system_prompt: next.system_prompt || '', system_prompt_file: next.system_prompt_file || '', @@ -140,6 +143,7 @@ const SubagentProfiles: React.FC = () => { setDraft({ agent_id: p.agent_id || '', name: p.name || '', + notify_main_policy: p.notify_main_policy || 'final_only', role: p.role || '', system_prompt: p.system_prompt || '', system_prompt_file: p.system_prompt_file || '', @@ -193,6 +197,7 @@ const SubagentProfiles: React.FC = () => { action, agent_id: agentId, name: draft.name || '', + notify_main_policy: draft.notify_main_policy || 'final_only', role: draft.role || '', system_prompt: draft.system_prompt || '', system_prompt_file: draft.system_prompt_file || '', @@ -350,6 +355,20 @@ const SubagentProfiles: React.FC = () => { +
+
notify_main_policy
+ +
system_prompt_file
52 ? `${text.slice(0, 49)}...` : text; } +function formatStreamTime(ts?: number): string { + if (!ts) return '--:--:--'; + return new Date(ts).toLocaleTimeString([], { hour12: false }); +} + function bezierCurve(x1: number, y1: number, x2: number, y2: number): string { const offset = Math.max(Math.abs(y2 - y1) * 0.5, 60); return `M ${x1} ${y1} C ${x1} ${y1 + offset} ${x2} ${y2 - offset} ${x2} ${y2}`; @@ -337,6 +363,8 @@ const Subagents: React.FC = () => { const [registryItems, setRegistryItems] = useState([]); const [promptFileContent, setPromptFileContent] = useState(''); const [promptFileFound, setPromptFileFound] = useState(false); + const [streamItems, setStreamItems] = useState([]); + const [streamTask, setStreamTask] = useState(null); const [selectedTopologyBranch, setSelectedTopologyBranch] = useState(''); const [topologyFilter, setTopologyFilter] = useState<'all' | 'running' | 'failed' | 'local' | 'remote'>('all'); const [topologyZoom, setTopologyZoom] = useState(0.9); @@ -428,6 +456,10 @@ const Subagents: React.FC = () => { [...selectedAgentTasks].sort((a, b) => Math.max(b.updated || 0, b.created || 0) - Math.max(a.updated || 0, a.created || 0))[0] || null, [selectedAgentTasks] ); + const selectedAgentDisplayName = useMemo( + () => selectedRegistryItem?.display_name || selectedRegistryItem?.agent_id || selectedAgentID || '', + [selectedRegistryItem, selectedAgentID] + ); const parsedNodeTrees = useMemo(() => { try { const parsed = JSON.parse(nodeTrees); @@ -523,6 +555,7 @@ const Subagents: React.FC = () => { `children=${localChildren.length + remoteClusters.length}`, `total=${localMainStats.total} running=${localMainStats.running}`, `waiting=${localMainStats.waiting} failed=${localMainStats.failed}`, + `notify=${normalizeTitle(registryItems.find((item) => item.agent_id === localRoot.agent_id)?.notify_main_policy, 'final_only')}`, `transport=${normalizeTitle(localRoot.transport, 'local')} type=${normalizeTitle(localRoot.type, 'router')}`, localMainStats.active[0] ? `task: ${localMainStats.active[0].title}` : t('noLiveTasks'), ], @@ -531,6 +564,7 @@ const Subagents: React.FC = () => { scale, onClick: () => { setSelectedTopologyBranch(localBranch); + setSelectedAgentID(normalizeTitle(localRoot.agent_id, 'main')); if (localMainTask?.id) setSelectedId(localMainTask.id); }, }; @@ -558,6 +592,7 @@ const Subagents: React.FC = () => { meta: [ `total=${stats.total} running=${stats.running}`, `waiting=${stats.waiting} failed=${stats.failed}`, + `notify=${normalizeTitle(registryItems.find((item) => item.agent_id === child.agent_id)?.notify_main_policy, 'final_only')}`, `transport=${normalizeTitle(child.transport, 'local')} type=${normalizeTitle(child.type, 'worker')}`, stats.active[0] ? `task: ${stats.active[0].title}` : task ? `last: ${summarizeTask(task.task, task.label)}` : t('noLiveTasks'), ], @@ -566,6 +601,7 @@ const Subagents: React.FC = () => { scale, onClick: () => { setSelectedTopologyBranch(localBranch); + setSelectedAgentID(normalizeTitle(child.agent_id, '')); if (task?.id) setSelectedId(task.id); }, }); @@ -602,7 +638,11 @@ const Subagents: React.FC = () => { accent: tree.online ? 'bg-fuchsia-400' : 'bg-zinc-500', clickable: true, scale, - onClick: () => setSelectedTopologyBranch(branch), + onClick: () => { + setSelectedTopologyBranch(branch); + setSelectedAgentID(normalizeTitle(treeRoot.agent_id, '')); + setSelectedId(''); + }, }; cards.push(rootCard); lines.push({ @@ -633,7 +673,11 @@ const Subagents: React.FC = () => { accent: 'bg-violet-400', clickable: true, scale, - onClick: () => setSelectedTopologyBranch(branch), + onClick: () => { + setSelectedTopologyBranch(branch); + setSelectedAgentID(normalizeTitle(child.agent_id, '')); + setSelectedId(''); + }, }); lines.push({ path: bezierCurve(rootCard.x + cardWidth / 2, rootCard.y + cardHeight / 2, childX + cardWidth / 2, childY + cardHeight / 2), @@ -895,6 +939,26 @@ const Subagents: React.FC = () => { loadThreadAndInbox(selected).catch(() => { }); }, [selectedId, q, items]); + const loadStream = async (task: SubagentTask | null) => { + if (!task?.id) { + setStreamTask(null); + setStreamItems([]); + return; + } + try { + const streamRes = await callAction({ action: 'stream', id: task.id, limit: 100 }); + setStreamTask(streamRes?.result?.task || task); + setStreamItems(Array.isArray(streamRes?.result?.items) ? streamRes.result.items : []); + } catch { + setStreamTask(task); + setStreamItems([]); + } + }; + + useEffect(() => { + loadStream(selectedAgentLatestTask).catch(() => { }); + }, [selectedAgentLatestTask?.id, q, items.length]); + return (
@@ -1106,6 +1170,62 @@ const Subagents: React.FC = () => {
)} + {selectedAgentID && ( +
+
+
+
Internal Stream
+
{selectedAgentDisplayName}
+
{selectedAgentID}
+
+ +
+
+ {streamTask?.id ? ( +
+
run={streamTask.id}
+
status={streamTask.status || '-'} · thread={streamTask.thread_id || '-'}
+
+ ) : ( +
No persisted run for this agent yet.
+ )} +
+
+ {streamItems.length === 0 ? ( +
No internal stream events yet.
+ ) : streamItems.map((item, idx) => ( +
+
+
+ {item.kind === 'event' + ? `${item.event_type || 'event'}${item.status ? ` · ${item.status}` : ''}` + : `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`} +
+
{formatStreamTime(item.at)}
+
+
+ {item.kind === 'event' ? (item.message || '(no event message)') : (item.content || '(empty message)')} +
+
+ {item.kind === 'event' + ? `run=${item.run_id || '-'}${item.retry_count ? ` · retry=${item.retry_count}` : ''}` + : `status=${item.status || '-'}${item.reply_to ? ` · reply_to=${item.reply_to}` : ''}`} +
+
+ ))} +
+
+ )}