2 Commits

Author SHA1 Message Date
lpf
1218d68b7e Add internal subagent stream and notify policy 2026-03-07 11:52:36 +08:00
lpf
557633b698 Refresh agent topology state after task restart 2026-03-07 01:53:18 +08:00
12 changed files with 661 additions and 47 deletions

View File

@@ -131,6 +131,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": fallbackString(strings.TrimSpace(subcfg.Transport), "local"),
"node_id": strings.TrimSpace(subcfg.NodeID),
"parent_agent_id": strings.TrimSpace(subcfg.ParentAgentID),
"notify_main_policy": fallbackString(strings.TrimSpace(subcfg.NotifyMainPolicy), "final_only"),
"display_name": subcfg.DisplayName,
"role": subcfg.Role,
"description": subcfg.Description,
@@ -157,6 +158,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": profile.Transport,
"node_id": profile.NodeID,
"parent_agent_id": profile.ParentAgentID,
"notify_main_policy": fallbackString(strings.TrimSpace(profile.NotifyMainPolicy), "final_only"),
"display_name": profile.Name,
"role": profile.Role,
"description": "Node-registered remote main agent branch",
@@ -360,6 +362,37 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
return nil, err
}
return map[string]interface{}{"found": true, "thread": thread, "messages": items}, nil
case "stream":
taskID, err := resolveSubagentTaskIDForRuntime(sm, runtimeStringArg(args, "id"))
if err != nil {
return nil, err
}
task, ok := sm.GetTask(taskID)
if !ok {
return map[string]interface{}{"found": false}, nil
}
events, err := sm.Events(taskID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
var thread *tools.AgentThread
var messages []tools.AgentMessage
if strings.TrimSpace(task.ThreadID) != "" {
if th, ok := sm.Thread(task.ThreadID); ok {
thread = th
}
messages, err = sm.ThreadMessages(task.ThreadID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
}
stream := mergeSubagentStream(events, messages)
return map[string]interface{}{
"found": true,
"task": cloneSubagentTask(task),
"thread": thread,
"items": stream,
}, nil
case "inbox":
agentID := runtimeStringArg(args, "agent_id")
if agentID == "" {
@@ -386,6 +419,47 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
}
}
func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.AgentMessage) []map[string]interface{} {
items := make([]map[string]interface{}, 0, len(events)+len(messages))
for _, evt := range events {
items = append(items, map[string]interface{}{
"kind": "event",
"at": evt.At,
"run_id": evt.RunID,
"agent_id": evt.AgentID,
"event_type": evt.Type,
"status": evt.Status,
"message": evt.Message,
"retry_count": evt.RetryCount,
})
}
for _, msg := range messages {
items = append(items, map[string]interface{}{
"kind": "message",
"at": msg.CreatedAt,
"message_id": msg.MessageID,
"thread_id": msg.ThreadID,
"from_agent": msg.FromAgent,
"to_agent": msg.ToAgent,
"reply_to": msg.ReplyTo,
"correlation_id": msg.CorrelationID,
"message_type": msg.Type,
"content": msg.Content,
"status": msg.Status,
"requires_reply": msg.RequiresReply,
})
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["at"].(int64)
right, _ := items[j]["at"].(int64)
if left != right {
return left < right
}
return fmt.Sprintf("%v", items[i]["kind"]) < fmt.Sprintf("%v", items[j]["kind"])
})
return items
}
func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask {
if in == nil {
return nil

View File

@@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
"clawgo/pkg/config"
"clawgo/pkg/runtimecfg"
@@ -75,6 +76,7 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
out, err := loop.HandleSubagentRuntime(context.Background(), "upsert_config_subagent", map[string]interface{}{
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"system_prompt": "review changes",
"system_prompt_file": "agents/reviewer/AGENT.md",
@@ -99,6 +101,9 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
if subcfg.SystemPromptFile != "agents/reviewer/AGENT.md" {
t.Fatalf("expected system_prompt_file to persist, got %+v", subcfg)
}
if subcfg.NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", subcfg)
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to be persisted")
}
@@ -316,3 +321,71 @@ func TestHandleSubagentRuntimeProtectsMainAgent(t *testing.T) {
t.Fatalf("expected deleting main agent to fail")
}
}
func TestHandleSubagentRuntimeStream(t *testing.T) {
workspace := t.TempDir()
manager := tools.NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
return "stream-result", nil
})
loop := &AgentLoop{
workspace: workspace,
subagentManager: manager,
subagentRouter: tools.NewSubagentRouter(manager),
}
out, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
"task": "prepare streamable task",
"agent_id": "coder",
"channel": "webui",
"chat_id": "webui",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
payload, ok := out.(map[string]interface{})
if !ok {
t.Fatalf("unexpected spawn payload: %T", out)
}
_ = payload
var task *tools.SubagentTask
for i := 0; i < 50; i++ {
tasks := manager.ListTasks()
if len(tasks) > 0 && tasks[0].Status == "completed" {
task = tasks[0]
break
}
time.Sleep(10 * time.Millisecond)
}
if task == nil {
t.Fatalf("expected completed task")
}
out, err = loop.HandleSubagentRuntime(context.Background(), "stream", map[string]interface{}{
"id": task.ID,
})
if err != nil {
t.Fatalf("stream failed: %v", err)
}
streamPayload, ok := out.(map[string]interface{})
if !ok || streamPayload["found"] != true {
t.Fatalf("unexpected stream payload: %#v", out)
}
items, ok := streamPayload["items"].([]map[string]interface{})
if !ok || len(items) == 0 {
t.Fatalf("expected merged stream items, got %#v", streamPayload["items"])
}
foundEvent := false
foundMessage := false
for _, item := range items {
switch item["kind"] {
case "event":
foundEvent = true
case "message":
foundMessage = true
}
}
if !foundEvent || !foundMessage {
t.Fatalf("expected merged event and message items, got %#v", items)
}
}

View File

@@ -73,6 +73,7 @@ type SubagentConfig struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
DisplayName string `json:"display_name,omitempty"`
Role string `json:"role,omitempty"`
Description string `json:"description,omitempty"`

View File

@@ -309,6 +309,13 @@ func validateSubagents(cfg *Config) []error {
errs = append(errs, fmt.Errorf("agents.subagents.%s.transport must be one of: local, node", id))
}
}
if policy := strings.TrimSpace(raw.NotifyMainPolicy); policy != "" {
switch policy {
case "final_only", "milestone", "on_blocked", "always", "internal_only":
default:
errs = append(errs, fmt.Errorf("agents.subagents.%s.notify_main_policy must be one of: final_only, milestone, on_blocked, always, internal_only", id))
}
}
if transport == "node" && strings.TrimSpace(raw.NodeID) == "" {
errs = append(errs, fmt.Errorf("agents.subagents.%s.node_id is required when transport=node", id))
}

View File

@@ -110,3 +110,21 @@ func TestValidateNodeBackedSubagentAllowsMissingPromptFile(t *testing.T) {
t.Fatalf("expected node-backed config to be valid, got %v", errs)
}
}
func TestValidateSubagentsRejectsInvalidNotifyMainPolicy(t *testing.T) {
t.Parallel()
cfg := DefaultConfig()
cfg.Agents.Subagents["coder"] = SubagentConfig{
Enabled: true,
SystemPromptFile: "agents/coder/AGENT.md",
NotifyMainPolicy: "loud",
Runtime: SubagentRuntimeConfig{
Proxy: "proxy",
},
}
if errs := Validate(cfg); len(errs) == 0 {
t.Fatalf("expected validation errors")
}
}

View File

@@ -2,6 +2,7 @@ package tools
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@@ -10,6 +11,7 @@ import (
"time"
"clawgo/pkg/bus"
"clawgo/pkg/ekg"
"clawgo/pkg/providers"
)
@@ -22,6 +24,7 @@ type SubagentTask struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
SessionKey string `json:"session_key"`
MemoryNS string `json:"memory_ns"`
SystemPrompt string `json:"system_prompt,omitempty"`
@@ -62,23 +65,25 @@ type SubagentManager struct {
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
ekg *ekg.Engine
}
type SubagentSpawnOptions struct {
Task string
Label string
Role string
AgentID string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
Task string
Label string
Role string
AgentID string
NotifyMainPolicy string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
@@ -96,6 +101,7 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
ekg: ekg.New(workspace),
}
if runStore != nil {
for _, task := range runStore.List() {
@@ -167,6 +173,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
transport := "local"
nodeID := ""
parentAgentID := ""
notifyMainPolicy := "final_only"
toolAllowlist := []string(nil)
maxRetries := 0
retryBackoff := 1000
@@ -199,6 +206,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
nodeID = strings.TrimSpace(profile.NodeID)
parentAgentID = strings.TrimSpace(profile.ParentAgentID)
notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy)
systemPrompt = strings.TrimSpace(profile.SystemPrompt)
systemPromptFile = strings.TrimSpace(profile.SystemPromptFile)
toolAllowlist = append([]string(nil), profile.ToolAllowlist...)
@@ -236,6 +244,9 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" {
notifyMainPolicy = normalizeNotifyMainPolicy(raw)
}
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
@@ -274,6 +285,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
Transport: transport,
NodeID: nodeID,
ParentAgentID: parentAgentID,
NotifyMainPolicy: notifyMainPolicy,
SessionKey: sessionKey,
MemoryNS: memoryNS,
SystemPrompt: systemPrompt,
@@ -368,20 +380,11 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
}
sm.mu.Unlock()
// 2. Result broadcast (keep existing behavior)
if sm.bus != nil {
prefix := "Task completed"
if runErr != nil {
prefix = "Task failed"
}
if task.Label != "" {
if runErr != nil {
prefix = fmt.Sprintf("Task '%s' failed", task.Label)
} else {
prefix = fmt.Sprintf("Task '%s' completed", task.Label)
}
}
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
sm.recordEKG(task, runErr)
// 2. Result broadcast
if sm.bus != nil && shouldNotifyMainOnFinal(task.NotifyMainPolicy, runErr, task) {
announceContent, notifyReason := buildSubagentMainNotification(task, runErr)
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
@@ -389,20 +392,142 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
SessionKey: task.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"notify_reason": notifyReason,
},
})
}
}
func (sm *SubagentManager) recordEKG(task *SubagentTask, runErr error) {
if sm == nil || sm.ekg == nil || task == nil {
return
}
status := "success"
logText := strings.TrimSpace(task.Result)
if runErr != nil {
status = "error"
if isBlockedSubagentError(runErr) {
logText = "blocked: " + strings.TrimSpace(task.Result)
}
}
sm.ekg.Record(ekg.Event{
TaskID: task.ID,
Session: task.SessionKey,
Channel: task.OriginChannel,
Source: "subagent",
Status: status,
Log: logText,
})
}
func normalizeNotifyMainPolicy(v string) string {
switch strings.ToLower(strings.TrimSpace(v)) {
case "", "final_only":
return "final_only"
case "milestone", "on_blocked", "always", "internal_only":
return strings.ToLower(strings.TrimSpace(v))
default:
return "final_only"
}
}
func shouldNotifyMainOnFinal(policy string, runErr error, task *SubagentTask) bool {
switch normalizeNotifyMainPolicy(policy) {
case "internal_only":
return false
case "always", "final_only":
return true
case "on_blocked":
return isBlockedSubagentError(runErr)
case "milestone":
return false
default:
return true
}
}
func buildSubagentMainNotification(task *SubagentTask, runErr error) (string, string) {
status := "completed"
reason := "final"
if runErr != nil {
status = "failed"
if isBlockedSubagentError(runErr) {
status = "blocked"
reason = "blocked"
}
}
return fmt.Sprintf(
"Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s",
strings.TrimSpace(task.AgentID),
strings.TrimSpace(task.ID),
status,
reason,
summarizeSubagentText(firstNonEmpty(task.Label, task.Task), 120),
summarizeSubagentText(task.Result, 280),
), reason
}
func isBlockedSubagentError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
blockedHints := []string{
"timeout",
"deadline exceeded",
"quota",
"rate limit",
"too many requests",
"permission denied",
"requires input",
"waiting for reply",
"blocked",
}
for _, hint := range blockedHints {
if strings.Contains(msg, hint) {
return true
}
}
return false
}
func summarizeSubagentText(s string, max int) string {
s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n"))
s = strings.ReplaceAll(s, "\n", " ")
s = strings.Join(strings.Fields(s), " ")
if s == "" {
return "(empty)"
}
if max > 0 && len(s) > max {
return strings.TrimSpace(s[:max-3]) + "..."
}
return s
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) (string, error) {
maxRetries := normalizePositiveBound(task.MaxRetries, 0, 8)
backoffMs := normalizePositiveBound(task.RetryBackoff, 500, 120000)

View File

@@ -27,6 +27,7 @@ func DraftConfigSubagent(description, agentIDHint string) map[string]interface{}
"role": role,
"display_name": displayName,
"description": desc,
"notify_main_policy": "final_only",
"system_prompt": systemPrompt,
"system_prompt_file": "agents/" + agentID + "/AGENT.md",
"memory_namespace": agentID,
@@ -80,6 +81,9 @@ func UpsertConfigSubagent(configPath string, args map[string]interface{}) (map[s
if v := stringArgFromMap(args, "display_name"); v != "" {
subcfg.DisplayName = v
}
if v := stringArgFromMap(args, "notify_main_policy"); v != "" {
subcfg.NotifyMainPolicy = v
}
if v := stringArgFromMap(args, "description"); v != "" {
subcfg.Description = v
}

View File

@@ -32,6 +32,7 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
"action": "upsert",
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"description": "负责回归与评审",
"system_prompt": "review changes",
@@ -56,6 +57,9 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
if reloaded.Agents.Subagents["reviewer"].DisplayName != "Review Agent" {
t.Fatalf("expected config to persist reviewer, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if reloaded.Agents.Subagents["reviewer"].NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to persist")
}

View File

@@ -22,6 +22,7 @@ type SubagentProfile struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
Role string `json:"role,omitempty"`
SystemPrompt string `json:"system_prompt,omitempty"`
SystemPromptFile string `json:"system_prompt_file,omitempty"`
@@ -188,6 +189,7 @@ func normalizeSubagentProfile(in SubagentProfile) SubagentProfile {
p.Transport = normalizeProfileTransport(p.Transport)
p.NodeID = strings.TrimSpace(p.NodeID)
p.ParentAgentID = normalizeSubagentIdentifier(p.ParentAgentID)
p.NotifyMainPolicy = normalizeNotifyMainPolicy(p.NotifyMainPolicy)
p.Role = strings.TrimSpace(p.Role)
p.SystemPrompt = strings.TrimSpace(p.SystemPrompt)
p.SystemPromptFile = strings.TrimSpace(p.SystemPromptFile)
@@ -404,6 +406,7 @@ func profileFromConfig(agentID string, subcfg config.SubagentConfig) SubagentPro
Transport: strings.TrimSpace(subcfg.Transport),
NodeID: strings.TrimSpace(subcfg.NodeID),
ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID),
NotifyMainPolicy: strings.TrimSpace(subcfg.NotifyMainPolicy),
Role: strings.TrimSpace(subcfg.Role),
SystemPrompt: strings.TrimSpace(subcfg.SystemPrompt),
SystemPromptFile: strings.TrimSpace(subcfg.SystemPromptFile),
@@ -507,6 +510,7 @@ func (t *SubagentProfileTool) Parameters() map[string]interface{} {
"description": "Unique subagent id, e.g. coder/writer/tester",
},
"name": map[string]interface{}{"type": "string"},
"notify_main_policy": map[string]interface{}{"type": "string", "description": "final_only|internal_only|milestone|on_blocked|always"},
"role": map[string]interface{}{"type": "string"},
"system_prompt": map[string]interface{}{"type": "string"},
"system_prompt_file": map[string]interface{}{"type": "string"},
@@ -577,6 +581,7 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
p := SubagentProfile{
AgentID: agentID,
Name: stringArg(args, "name"),
NotifyMainPolicy: stringArg(args, "notify_main_policy"),
Role: stringArg(args, "role"),
SystemPrompt: stringArg(args, "system_prompt"),
SystemPromptFile: stringArg(args, "system_prompt_file"),
@@ -612,6 +617,9 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
if _, ok := args["role"]; ok {
next.Role = stringArg(args, "role")
}
if _, ok := args["notify_main_policy"]; ok {
next.NotifyMainPolicy = stringArg(args, "notify_main_policy")
}
if _, ok := args["system_prompt"]; ok {
next.SystemPrompt = stringArg(args, "system_prompt")
}

View File

@@ -145,8 +145,11 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" {
t.Fatalf("expected metadata status=failed, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "failed") {
t.Fatalf("expected failure wording in content, got %q", msg.Content)
if !strings.Contains(strings.ToLower(msg.Content), "status: failed") {
t.Fatalf("expected structured failure status in content, got %q", msg.Content)
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "final" {
t.Fatalf("expected notify_reason=final, got %q", got)
}
}
@@ -205,6 +208,150 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
func TestSubagentManagerInternalOnlySuppressesMainNotification(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "silent-result", nil
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "coder",
Name: "Code Agent",
NotifyMainPolicy: "internal_only",
SystemPromptFile: "agents/coder/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "internal-only task",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
task := waitSubagentDone(t, manager, 4*time.Second)
if task.Status != "completed" {
t.Fatalf("expected completed task, got %s", task.Status)
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
if msg, ok := msgBus.ConsumeInbound(ctx); ok {
t.Fatalf("did not expect main notification, got %+v", msg)
}
}
func TestSubagentManagerOnBlockedNotifiesOnlyBlockedFailures(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
switch task.Task {
case "blocked-task":
return "", errors.New("command tick timeout exceeded: 600s")
default:
return "done", nil
}
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "pm",
Name: "Product Manager",
NotifyMainPolicy: "on_blocked",
SystemPromptFile: "agents/pm/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "successful-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn success case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxSilent, cancelSilent := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancelSilent()
if msg, ok := msgBus.ConsumeInbound(ctxSilent); ok {
t.Fatalf("did not expect success notification for on_blocked, got %+v", msg)
}
_, err = manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "blocked-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn blocked case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxBlocked, cancelBlocked := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelBlocked()
msg, ok := msgBus.ConsumeInbound(ctxBlocked)
if !ok {
t.Fatalf("expected blocked notification")
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "blocked" {
t.Fatalf("expected notify_reason=blocked, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "blocked") {
t.Fatalf("expected blocked wording in content, got %q", msg.Content)
}
}
func TestSubagentManagerRecordsFailuresToEKG(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "", errors.New("rate limit exceeded")
})
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "ekg failure",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
data, err := os.ReadFile(filepath.Join(workspace, "memory", "ekg-events.jsonl"))
if err != nil {
t.Fatalf("expected ekg events to be written: %v", err)
}
text := string(data)
if !strings.Contains(text, "\"source\":\"subagent\"") {
t.Fatalf("expected subagent source in ekg log, got %s", text)
}
if !strings.Contains(text, "\"status\":\"error\"") {
t.Fatalf("expected error status in ekg log, got %s", text)
}
if !strings.Contains(strings.ToLower(text), "rate limit exceeded") {
t.Fatalf("expected failure text in ekg log, got %s", text)
}
}
func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) {
workspace := t.TempDir()
block := make(chan struct{})

View File

@@ -6,6 +6,7 @@ import { useUI } from '../context/UIContext';
type SubagentProfile = {
agent_id: string;
name?: string;
notify_main_policy?: string;
role?: string;
system_prompt?: string;
system_prompt_file?: string;
@@ -31,6 +32,7 @@ type ToolAllowlistGroup = {
const emptyDraft: SubagentProfile = {
agent_id: '',
name: '',
notify_main_policy: 'final_only',
role: '',
system_prompt: '',
system_prompt_file: '',
@@ -79,6 +81,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: next.agent_id || '',
name: next.name || '',
notify_main_policy: next.notify_main_policy || 'final_only',
role: next.role || '',
system_prompt: next.system_prompt || '',
system_prompt_file: next.system_prompt_file || '',
@@ -140,6 +143,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: p.agent_id || '',
name: p.name || '',
notify_main_policy: p.notify_main_policy || 'final_only',
role: p.role || '',
system_prompt: p.system_prompt || '',
system_prompt_file: p.system_prompt_file || '',
@@ -193,6 +197,7 @@ const SubagentProfiles: React.FC = () => {
action,
agent_id: agentId,
name: draft.name || '',
notify_main_policy: draft.notify_main_policy || 'final_only',
role: draft.role || '',
system_prompt: draft.system_prompt || '',
system_prompt_file: draft.system_prompt_file || '',
@@ -350,6 +355,20 @@ const SubagentProfiles: React.FC = () => {
<option value="disabled">disabled</option>
</select>
</div>
<div>
<div className="text-xs text-zinc-400 mb-1">notify_main_policy</div>
<select
value={draft.notify_main_policy || 'final_only'}
onChange={(e) => setDraft({ ...draft, notify_main_policy: e.target.value })}
className="w-full px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded"
>
<option value="final_only">final_only</option>
<option value="internal_only">internal_only</option>
<option value="milestone">milestone</option>
<option value="on_blocked">on_blocked</option>
<option value="always">always</option>
</select>
</div>
<div className="md:col-span-2">
<div className="text-xs text-zinc-400 mb-1">system_prompt_file</div>
<input

View File

@@ -60,6 +60,26 @@ type AgentMessage = {
created_at?: number;
};
type StreamItem = {
kind?: 'event' | 'message' | string;
at?: number;
run_id?: string;
agent_id?: string;
event_type?: string;
message?: string;
retry_count?: number;
message_id?: string;
thread_id?: string;
from_agent?: string;
to_agent?: string;
reply_to?: string;
correlation_id?: string;
message_type?: string;
content?: string;
status?: string;
requires_reply?: boolean;
};
type RegistrySubagent = {
agent_id?: string;
enabled?: boolean;
@@ -68,6 +88,7 @@ type RegistrySubagent = {
node_id?: string;
parent_agent_id?: string;
managed_by?: string;
notify_main_policy?: string;
display_name?: string;
role?: string;
description?: string;
@@ -115,6 +136,8 @@ type AgentTaskStats = {
running: number;
failed: number;
waiting: number;
latestStatus: string;
latestUpdated: number;
active: Array<{ id: string; status: string; title: string }>;
};
@@ -184,6 +207,11 @@ function summarizeTask(task?: string, label?: string): string {
return text.length > 52 ? `${text.slice(0, 49)}...` : text;
}
function formatStreamTime(ts?: number): string {
if (!ts) return '--:--:--';
return new Date(ts).toLocaleTimeString([], { hour12: false });
}
function bezierCurve(x1: number, y1: number, x2: number, y2: number): string {
const offset = Math.max(Math.abs(y2 - y1) * 0.5, 60);
return `M ${x1} ${y1} C ${x1} ${y1 + offset} ${x2} ${y2 - offset} ${x2} ${y2}`;
@@ -199,13 +227,18 @@ function buildTaskStats(tasks: SubagentTask[]): Record<string, AgentTaskStats> {
const agentID = normalizeTitle(task.agent_id, '');
if (!agentID) return acc;
if (!acc[agentID]) {
acc[agentID] = { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
acc[agentID] = { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
}
const item = acc[agentID];
item.total += 1;
if (task.status === 'running') item.running += 1;
if (task.status === 'failed') item.failed += 1;
if (task.waiting_for_reply) item.waiting += 1;
const updatedAt = Math.max(task.updated || 0, task.created || 0);
if (updatedAt >= item.latestUpdated) {
item.latestUpdated = updatedAt;
item.latestStatus = normalizeTitle(task.status, '');
item.failed = task.status === 'failed' ? 1 : 0;
}
if (task.status === 'running' || task.waiting_for_reply) {
item.active.push({
id: task.id,
@@ -330,6 +363,8 @@ const Subagents: React.FC = () => {
const [registryItems, setRegistryItems] = useState<RegistrySubagent[]>([]);
const [promptFileContent, setPromptFileContent] = useState('');
const [promptFileFound, setPromptFileFound] = useState(false);
const [streamItems, setStreamItems] = useState<StreamItem[]>([]);
const [streamTask, setStreamTask] = useState<SubagentTask | null>(null);
const [selectedTopologyBranch, setSelectedTopologyBranch] = useState('');
const [topologyFilter, setTopologyFilter] = useState<'all' | 'running' | 'failed' | 'local' | 'remote'>('all');
const [topologyZoom, setTopologyZoom] = useState(0.9);
@@ -400,6 +435,13 @@ const Subagents: React.FC = () => {
load().catch(() => { });
}, [q, selectedAgentID]);
useEffect(() => {
const timer = window.setInterval(() => {
load().catch(() => { });
}, 5000);
return () => window.clearInterval(timer);
}, [q, selectedAgentID]);
const selected = useMemo(() => items.find((x) => x.id === selectedId) || null, [items, selectedId]);
const selectedRegistryItem = useMemo(
() => registryItems.find((x) => x.agent_id === selectedAgentID) || null,
@@ -414,6 +456,10 @@ const Subagents: React.FC = () => {
[...selectedAgentTasks].sort((a, b) => Math.max(b.updated || 0, b.created || 0) - Math.max(a.updated || 0, a.created || 0))[0] || null,
[selectedAgentTasks]
);
const selectedAgentDisplayName = useMemo(
() => selectedRegistryItem?.display_name || selectedRegistryItem?.agent_id || selectedAgentID || '',
[selectedRegistryItem, selectedAgentID]
);
const parsedNodeTrees = useMemo<NodeTree[]>(() => {
try {
const parsed = JSON.parse(nodeTrees);
@@ -489,7 +535,7 @@ const Subagents: React.FC = () => {
failed: 0,
};
const localMainStats = taskStats[normalizeTitle(localRoot.agent_id, 'main')] || { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
const localMainStats = taskStats[normalizeTitle(localRoot.agent_id, 'main')] || { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
const localMainTask = recentTaskByAgent[normalizeTitle(localRoot.agent_id, 'main')];
localBranchStats.running += localMainStats.running;
localBranchStats.failed += localMainStats.failed;
@@ -509,14 +555,16 @@ const Subagents: React.FC = () => {
`children=${localChildren.length + remoteClusters.length}`,
`total=${localMainStats.total} running=${localMainStats.running}`,
`waiting=${localMainStats.waiting} failed=${localMainStats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === localRoot.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(localRoot.transport, 'local')} type=${normalizeTitle(localRoot.type, 'router')}`,
localMainStats.active[0] ? `task: ${localMainStats.active[0].title}` : t('noLiveTasks'),
],
accent: 'bg-amber-400',
accent: localMainStats.running > 0 ? 'bg-emerald-500' : localMainStats.latestStatus === 'failed' ? 'bg-red-500' : 'bg-amber-400',
clickable: true,
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(localRoot.agent_id, 'main'));
if (localMainTask?.id) setSelectedId(localMainTask.id);
},
};
@@ -525,7 +573,7 @@ const Subagents: React.FC = () => {
localChildren.forEach((child, idx) => {
const childX = localOriginX + idx * (cardWidth + clusterGap);
const childY = childStartY;
const stats = taskStats[normalizeTitle(child.agent_id, '')] || { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
const stats = taskStats[normalizeTitle(child.agent_id, '')] || { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
const task = recentTaskByAgent[normalizeTitle(child.agent_id, '')];
localBranchStats.running += stats.running;
localBranchStats.failed += stats.failed;
@@ -544,14 +592,16 @@ const Subagents: React.FC = () => {
meta: [
`total=${stats.total} running=${stats.running}`,
`waiting=${stats.waiting} failed=${stats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === child.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(child.transport, 'local')} type=${normalizeTitle(child.type, 'worker')}`,
stats.active[0] ? `task: ${stats.active[0].title}` : task ? `last: ${summarizeTask(task.task, task.label)}` : t('noLiveTasks'),
],
accent: stats.running > 0 ? 'bg-emerald-500' : stats.failed > 0 ? 'bg-red-500' : 'bg-sky-400',
accent: stats.running > 0 ? 'bg-emerald-500' : stats.latestStatus === 'failed' ? 'bg-red-500' : 'bg-sky-400',
clickable: true,
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
if (task?.id) setSelectedId(task.id);
},
});
@@ -588,7 +638,11 @@ const Subagents: React.FC = () => {
accent: tree.online ? 'bg-fuchsia-400' : 'bg-zinc-500',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(treeRoot.agent_id, ''));
setSelectedId('');
},
};
cards.push(rootCard);
lines.push({
@@ -619,7 +673,11 @@ const Subagents: React.FC = () => {
accent: 'bg-violet-400',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
setSelectedId('');
},
});
lines.push({
path: bezierCurve(rootCard.x + cardWidth / 2, rootCard.y + cardHeight / 2, childX + cardWidth / 2, childY + cardHeight / 2),
@@ -881,6 +939,26 @@ const Subagents: React.FC = () => {
loadThreadAndInbox(selected).catch(() => { });
}, [selectedId, q, items]);
const loadStream = async (task: SubagentTask | null) => {
if (!task?.id) {
setStreamTask(null);
setStreamItems([]);
return;
}
try {
const streamRes = await callAction({ action: 'stream', id: task.id, limit: 100 });
setStreamTask(streamRes?.result?.task || task);
setStreamItems(Array.isArray(streamRes?.result?.items) ? streamRes.result.items : []);
} catch {
setStreamTask(task);
setStreamItems([]);
}
};
useEffect(() => {
loadStream(selectedAgentLatestTask).catch(() => { });
}, [selectedAgentLatestTask?.id, q, items.length]);
return (
<div className="h-full p-4 md:p-6 flex flex-col gap-4">
<div className="flex items-center justify-between">
@@ -1092,6 +1170,62 @@ const Subagents: React.FC = () => {
</div>
</div>
)}
{selectedAgentID && (
<div className="absolute top-4 right-4 bottom-4 z-20 w-[360px] rounded-2xl border border-zinc-800 bg-zinc-950/92 shadow-2xl shadow-black/40 backdrop-blur-md overflow-hidden flex flex-col">
<div className="flex items-center justify-between px-4 py-3 border-b border-zinc-800">
<div className="min-w-0">
<div className="text-xs text-zinc-500 uppercase tracking-wider">Internal Stream</div>
<div className="text-sm font-semibold text-zinc-100 truncate">{selectedAgentDisplayName}</div>
<div className="text-[11px] text-zinc-500 truncate">{selectedAgentID}</div>
</div>
<button
onClick={() => {
setSelectedAgentID('');
setSelectedId('');
setStreamTask(null);
setStreamItems([]);
}}
className="px-2 py-1 rounded bg-zinc-800 hover:bg-zinc-700 text-[11px] text-zinc-200"
>
Close
</button>
</div>
<div className="px-4 py-3 border-b border-zinc-800 text-xs text-zinc-400">
{streamTask?.id ? (
<div className="space-y-1">
<div>run={streamTask.id}</div>
<div>status={streamTask.status || '-'} · thread={streamTask.thread_id || '-'}</div>
</div>
) : (
<div>No persisted run for this agent yet.</div>
)}
</div>
<div className="flex-1 overflow-y-auto px-4 py-3 space-y-3">
{streamItems.length === 0 ? (
<div className="text-sm text-zinc-500">No internal stream events yet.</div>
) : streamItems.map((item, idx) => (
<div key={`${item.kind || 'item'}-${item.at || 0}-${idx}`} className="rounded-xl border border-zinc-800 bg-zinc-900/70 p-3">
<div className="flex items-center justify-between gap-2 mb-2">
<div className="text-xs font-medium text-zinc-200">
{item.kind === 'event'
? `${item.event_type || 'event'}${item.status ? ` · ${item.status}` : ''}`
: `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`}
</div>
<div className="text-[11px] text-zinc-500">{formatStreamTime(item.at)}</div>
</div>
<div className="text-xs text-zinc-300 whitespace-pre-wrap break-words">
{item.kind === 'event' ? (item.message || '(no event message)') : (item.content || '(empty message)')}
</div>
<div className="mt-2 text-[11px] text-zinc-500">
{item.kind === 'event'
? `run=${item.run_id || '-'}${item.retry_count ? ` · retry=${item.retry_count}` : ''}`
: `status=${item.status || '-'}${item.reply_to ? ` · reply_to=${item.reply_to}` : ''}`}
</div>
</div>
))}
</div>
</div>
)}
</div>
</div>