7 Commits

Author SHA1 Message Date
lpf
1218d68b7e Add internal subagent stream and notify policy 2026-03-07 11:52:36 +08:00
lpf
557633b698 Refresh agent topology state after task restart 2026-03-07 01:53:18 +08:00
lpf
823f96be5a Stop planned task spam after cancellation 2026-03-06 21:31:21 +08:00
lpf
9d0ab54a97 Recover running subagent tasks after restart 2026-03-06 20:05:03 +08:00
lpf
ee9326b2f2 Reduce planned task progress noise 2026-03-06 19:57:54 +08:00
lpf
b4cf4a123b Fix planned task splitting and progress summaries 2026-03-06 19:22:57 +08:00
lpf
623b401850 Fix config reload and subagent config feedback 2026-03-06 18:45:43 +08:00
20 changed files with 1163 additions and 203 deletions

View File

@@ -154,8 +154,12 @@ func gatewayCmd() {
}
return out
})
reloadReqCh := make(chan struct{}, 1)
registryServer.SetConfigAfterHook(func() {
_ = requestGatewayReloadSignal()
select {
case reloadReqCh <- struct{}{}:
default:
}
})
registryServer.SetSubagentHandler(func(cctx context.Context, action string, args map[string]interface{}) (interface{}, error) {
return agentLoop.HandleSubagentRuntime(cctx, action, args)
@@ -309,77 +313,35 @@ func gatewayCmd() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, gatewayNotifySignals()...)
for {
sig := <-sigChan
switch {
case isGatewayReloadSignal(sig):
fmt.Println("\n↻ Reloading config...")
newCfg, err := config.LoadConfig(getConfigPath())
if err != nil {
fmt.Printf("✗ Reload failed (load config): %v\n", err)
continue
}
if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") {
applyMaximumPermissionPolicy(newCfg)
}
configureCronServiceRuntime(cronService, newCfg)
heartbeatService.Stop()
heartbeatService = buildHeartbeatService(newCfg, msgBus)
if err := heartbeatService.Start(); err != nil {
fmt.Printf("Error starting heartbeat service: %v\n", err)
}
applyReload := func() {
fmt.Println("\n↻ Reloading config...")
newCfg, err := config.LoadConfig(getConfigPath())
if err != nil {
fmt.Printf(" Reload failed (load config): %v\n", err)
return
}
if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") {
applyMaximumPermissionPolicy(newCfg)
}
configureCronServiceRuntime(cronService, newCfg)
heartbeatService.Stop()
heartbeatService = buildHeartbeatService(newCfg, msgBus)
if err := heartbeatService.Start(); err != nil {
fmt.Printf("Error starting heartbeat service: %v\n", err)
}
if reflect.DeepEqual(cfg, newCfg) {
fmt.Println("✓ Config unchanged, skip reload")
continue
}
if reflect.DeepEqual(cfg, newCfg) {
fmt.Println("✓ Config unchanged, skip reload")
return
}
runtimeSame := reflect.DeepEqual(cfg.Agents, newCfg.Agents) &&
reflect.DeepEqual(cfg.Providers, newCfg.Providers) &&
reflect.DeepEqual(cfg.Tools, newCfg.Tools) &&
reflect.DeepEqual(cfg.Channels, newCfg.Channels)
runtimeSame := reflect.DeepEqual(cfg.Agents, newCfg.Agents) &&
reflect.DeepEqual(cfg.Providers, newCfg.Providers) &&
reflect.DeepEqual(cfg.Tools, newCfg.Tools) &&
reflect.DeepEqual(cfg.Channels, newCfg.Channels)
if runtimeSame {
configureLogging(newCfg)
sentinelService.Stop()
sentinelService = sentinel.NewService(
getConfigPath(),
newCfg.WorkspacePath(),
newCfg.Sentinel.IntervalSec,
newCfg.Sentinel.AutoHeal,
func(message string) {
if newCfg.Sentinel.NotifyChannel != "" && newCfg.Sentinel.NotifyChatID != "" {
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: newCfg.Sentinel.NotifyChannel,
ChatID: newCfg.Sentinel.NotifyChatID,
Content: "[Sentinel] " + message,
})
}
},
)
if newCfg.Sentinel.Enabled {
sentinelService.SetManager(channelManager)
sentinelService.Start()
}
cfg = newCfg
runtimecfg.Set(cfg)
fmt.Println("✓ Config hot-reload applied (logging/metadata only)")
continue
}
newAgentLoop, newChannelManager, err := buildGatewayRuntime(ctx, newCfg, msgBus, cronService)
if err != nil {
fmt.Printf("✗ Reload failed (init runtime): %v\n", err)
continue
}
channelManager.StopAll(ctx)
agentLoop.Stop()
channelManager = newChannelManager
agentLoop = newAgentLoop
cfg = newCfg
runtimecfg.Set(cfg)
if runtimeSame {
configureLogging(newCfg)
sentinelService.Stop()
sentinelService = sentinel.NewService(
getConfigPath(),
@@ -397,27 +359,77 @@ func gatewayCmd() {
},
)
if newCfg.Sentinel.Enabled {
sentinelService.SetManager(channelManager)
sentinelService.Start()
}
sentinelService.SetManager(channelManager)
if err := channelManager.StartAll(ctx); err != nil {
fmt.Printf("✗ Reload failed (start channels): %v\n", err)
continue
}
go agentLoop.Run(ctx)
fmt.Println("✓ Config hot-reload applied")
default:
fmt.Println("\nShutting down...")
cancel()
heartbeatService.Stop()
sentinelService.Stop()
cronService.Stop()
agentLoop.Stop()
channelManager.StopAll(ctx)
fmt.Println("✓ Gateway stopped")
cfg = newCfg
runtimecfg.Set(cfg)
fmt.Println("✓ Config hot-reload applied (logging/metadata only)")
return
}
newAgentLoop, newChannelManager, err := buildGatewayRuntime(ctx, newCfg, msgBus, cronService)
if err != nil {
fmt.Printf("✗ Reload failed (init runtime): %v\n", err)
return
}
channelManager.StopAll(ctx)
agentLoop.Stop()
channelManager = newChannelManager
agentLoop = newAgentLoop
cfg = newCfg
runtimecfg.Set(cfg)
sentinelService.Stop()
sentinelService = sentinel.NewService(
getConfigPath(),
newCfg.WorkspacePath(),
newCfg.Sentinel.IntervalSec,
newCfg.Sentinel.AutoHeal,
func(message string) {
if newCfg.Sentinel.NotifyChannel != "" && newCfg.Sentinel.NotifyChatID != "" {
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: newCfg.Sentinel.NotifyChannel,
ChatID: newCfg.Sentinel.NotifyChatID,
Content: "[Sentinel] " + message,
})
}
},
)
if newCfg.Sentinel.Enabled {
sentinelService.Start()
}
sentinelService.SetManager(channelManager)
if err := channelManager.StartAll(ctx); err != nil {
fmt.Printf("✗ Reload failed (start channels): %v\n", err)
return
}
go agentLoop.Run(ctx)
fmt.Println("✓ Config hot-reload applied")
}
for {
select {
case <-reloadReqCh:
applyReload()
case sig := <-sigChan:
switch {
case isGatewayReloadSignal(sig):
applyReload()
default:
fmt.Println("\nShutting down...")
cancel()
heartbeatService.Stop()
sentinelService.Stop()
cronService.Stop()
agentLoop.Stop()
channelManager.StopAll(ctx)
fmt.Println("✓ Gateway stopped")
return
}
}
}
}

View File

@@ -9,6 +9,7 @@ package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math"
@@ -544,6 +545,11 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage)
response, err := al.processPlannedMessage(ctx, msg)
if err != nil {
if errors.Is(err, context.Canceled) {
al.audit.Record(al.getTrigger(msg), msg.Channel, msg.SessionKey, true, err)
al.appendTaskAudit(taskID, msg, started, err, true)
return
}
response = fmt.Sprintf("Error processing message: %v", err)
}

View File

@@ -131,6 +131,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": fallbackString(strings.TrimSpace(subcfg.Transport), "local"),
"node_id": strings.TrimSpace(subcfg.NodeID),
"parent_agent_id": strings.TrimSpace(subcfg.ParentAgentID),
"notify_main_policy": fallbackString(strings.TrimSpace(subcfg.NotifyMainPolicy), "final_only"),
"display_name": subcfg.DisplayName,
"role": subcfg.Role,
"description": subcfg.Description,
@@ -157,6 +158,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": profile.Transport,
"node_id": profile.NodeID,
"parent_agent_id": profile.ParentAgentID,
"notify_main_policy": fallbackString(strings.TrimSpace(profile.NotifyMainPolicy), "final_only"),
"display_name": profile.Name,
"role": profile.Role,
"description": "Node-registered remote main agent branch",
@@ -360,6 +362,37 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
return nil, err
}
return map[string]interface{}{"found": true, "thread": thread, "messages": items}, nil
case "stream":
taskID, err := resolveSubagentTaskIDForRuntime(sm, runtimeStringArg(args, "id"))
if err != nil {
return nil, err
}
task, ok := sm.GetTask(taskID)
if !ok {
return map[string]interface{}{"found": false}, nil
}
events, err := sm.Events(taskID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
var thread *tools.AgentThread
var messages []tools.AgentMessage
if strings.TrimSpace(task.ThreadID) != "" {
if th, ok := sm.Thread(task.ThreadID); ok {
thread = th
}
messages, err = sm.ThreadMessages(task.ThreadID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
}
stream := mergeSubagentStream(events, messages)
return map[string]interface{}{
"found": true,
"task": cloneSubagentTask(task),
"thread": thread,
"items": stream,
}, nil
case "inbox":
agentID := runtimeStringArg(args, "agent_id")
if agentID == "" {
@@ -386,6 +419,47 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
}
}
func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.AgentMessage) []map[string]interface{} {
items := make([]map[string]interface{}, 0, len(events)+len(messages))
for _, evt := range events {
items = append(items, map[string]interface{}{
"kind": "event",
"at": evt.At,
"run_id": evt.RunID,
"agent_id": evt.AgentID,
"event_type": evt.Type,
"status": evt.Status,
"message": evt.Message,
"retry_count": evt.RetryCount,
})
}
for _, msg := range messages {
items = append(items, map[string]interface{}{
"kind": "message",
"at": msg.CreatedAt,
"message_id": msg.MessageID,
"thread_id": msg.ThreadID,
"from_agent": msg.FromAgent,
"to_agent": msg.ToAgent,
"reply_to": msg.ReplyTo,
"correlation_id": msg.CorrelationID,
"message_type": msg.Type,
"content": msg.Content,
"status": msg.Status,
"requires_reply": msg.RequiresReply,
})
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["at"].(int64)
right, _ := items[j]["at"].(int64)
if left != right {
return left < right
}
return fmt.Sprintf("%v", items[i]["kind"]) < fmt.Sprintf("%v", items[j]["kind"])
})
return items
}
func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask {
if in == nil {
return nil

View File

@@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
"clawgo/pkg/config"
"clawgo/pkg/runtimecfg"
@@ -75,6 +76,7 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
out, err := loop.HandleSubagentRuntime(context.Background(), "upsert_config_subagent", map[string]interface{}{
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"system_prompt": "review changes",
"system_prompt_file": "agents/reviewer/AGENT.md",
@@ -99,6 +101,9 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
if subcfg.SystemPromptFile != "agents/reviewer/AGENT.md" {
t.Fatalf("expected system_prompt_file to persist, got %+v", subcfg)
}
if subcfg.NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", subcfg)
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to be persisted")
}
@@ -316,3 +321,71 @@ func TestHandleSubagentRuntimeProtectsMainAgent(t *testing.T) {
t.Fatalf("expected deleting main agent to fail")
}
}
func TestHandleSubagentRuntimeStream(t *testing.T) {
workspace := t.TempDir()
manager := tools.NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
return "stream-result", nil
})
loop := &AgentLoop{
workspace: workspace,
subagentManager: manager,
subagentRouter: tools.NewSubagentRouter(manager),
}
out, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
"task": "prepare streamable task",
"agent_id": "coder",
"channel": "webui",
"chat_id": "webui",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
payload, ok := out.(map[string]interface{})
if !ok {
t.Fatalf("unexpected spawn payload: %T", out)
}
_ = payload
var task *tools.SubagentTask
for i := 0; i < 50; i++ {
tasks := manager.ListTasks()
if len(tasks) > 0 && tasks[0].Status == "completed" {
task = tasks[0]
break
}
time.Sleep(10 * time.Millisecond)
}
if task == nil {
t.Fatalf("expected completed task")
}
out, err = loop.HandleSubagentRuntime(context.Background(), "stream", map[string]interface{}{
"id": task.ID,
})
if err != nil {
t.Fatalf("stream failed: %v", err)
}
streamPayload, ok := out.(map[string]interface{})
if !ok || streamPayload["found"] != true {
t.Fatalf("unexpected stream payload: %#v", out)
}
items, ok := streamPayload["items"].([]map[string]interface{})
if !ok || len(items) == 0 {
t.Fatalf("expected merged stream items, got %#v", streamPayload["items"])
}
foundEvent := false
foundMessage := false
for _, item := range items {
switch item["kind"] {
case "event":
foundEvent = true
case "message":
foundMessage = true
}
}
if !foundEvent || !foundMessage {
t.Fatalf("expected merged event and message items, got %#v", items)
}
}

View File

@@ -4,7 +4,9 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"regexp"
@@ -26,6 +28,7 @@ type plannedTaskResult struct {
Index int
Task plannedTask
Output string
Err error
ErrText string
}
@@ -103,7 +106,10 @@ func splitPlannedSegments(content string) []string {
return bullet
}
replaced := strings.NewReplacer("", ";", "\n", ";", "。然后", ";", " 然后 ", ";", " and then ", ";")
// Only split implicit plans on strong separators. Plain newlines are often
// just formatting inside a single request, and "然后/and then" frequently
// describes execution order inside one task rather than separate tasks.
replaced := strings.NewReplacer("", ";")
norm := replaced.Replace(content)
parts := strings.Split(norm, ";")
out := make([]string, 0, len(parts))
@@ -120,6 +126,11 @@ func splitPlannedSegments(content string) []string {
func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage, tasks []plannedTask) (string, error) {
results := make([]plannedTaskResult, len(tasks))
var wg sync.WaitGroup
var progressMu sync.Mutex
completed := 0
failed := 0
milestones := plannedProgressMilestones(len(tasks))
notified := make(map[int]struct{}, len(milestones))
for i, task := range tasks {
wg.Add(1)
go func(index int, t plannedTask) {
@@ -134,15 +145,33 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage
subMsg.Metadata["planned_task_index"] = fmt.Sprintf("%d", t.Index)
subMsg.Metadata["planned_task_total"] = fmt.Sprintf("%d", len(tasks))
out, err := al.processMessage(ctx, subMsg)
res := plannedTaskResult{Index: index, Task: t, Output: strings.TrimSpace(out)}
res := plannedTaskResult{Index: index, Task: t, Output: strings.TrimSpace(out), Err: err}
if err != nil {
res.ErrText = err.Error()
}
results[index] = res
al.publishPlannedTaskProgress(msg, len(tasks), res)
progressMu.Lock()
completed++
if res.ErrText != "" && !isPlannedTaskCancellation(ctx, res) {
failed++
}
snapshotCompleted := completed
snapshotFailed := failed
shouldNotify := shouldPublishPlannedTaskProgress(ctx, len(tasks), snapshotCompleted, res, milestones, notified)
if shouldNotify && res.ErrText == "" {
notified[snapshotCompleted] = struct{}{}
}
progressMu.Unlock()
if shouldNotify {
al.publishPlannedTaskProgress(msg, len(tasks), snapshotCompleted, snapshotFailed, res)
}
}(i, task)
}
wg.Wait()
if err := ctx.Err(); err != nil {
return "", err
}
var b strings.Builder
b.WriteString(fmt.Sprintf("已自动拆解为 %d 个任务并执行:\n\n", len(results)))
for _, r := range results {
@@ -160,7 +189,63 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage
return strings.TrimSpace(b.String()), nil
}
func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total int, res plannedTaskResult) {
func plannedProgressMilestones(total int) []int {
if total <= 3 {
return nil
}
points := []float64{0.33, 0.66}
out := make([]int, 0, len(points))
seen := map[int]struct{}{}
for _, p := range points {
step := int(math.Round(float64(total) * p))
if step <= 0 || step >= total {
continue
}
if _, ok := seen[step]; ok {
continue
}
seen[step] = struct{}{}
out = append(out, step)
}
return out
}
func shouldPublishPlannedTaskProgress(ctx context.Context, total, completed int, res plannedTaskResult, milestones []int, notified map[int]struct{}) bool {
if total <= 1 {
return false
}
if isPlannedTaskCancellation(ctx, res) {
return false
}
if strings.TrimSpace(res.ErrText) != "" {
return true
}
if completed >= total {
return false
}
for _, step := range milestones {
if completed != step {
continue
}
if _, ok := notified[step]; ok {
return false
}
return true
}
return false
}
func isPlannedTaskCancellation(ctx context.Context, res plannedTaskResult) bool {
if res.Err != nil && errors.Is(res.Err, context.Canceled) {
return true
}
if strings.EqualFold(strings.TrimSpace(res.ErrText), context.Canceled.Error()) {
return true
}
return ctx != nil && errors.Is(ctx.Err(), context.Canceled)
}
func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total, completed, failed int, res plannedTaskResult) {
if al == nil || al.bus == nil || total <= 1 {
return
}
@@ -180,8 +265,8 @@ func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total in
if body == "" {
body = "(无输出)"
}
body = truncate(strings.ReplaceAll(body, "\n", " "), 280)
content := fmt.Sprintf("进度 %d/%d任务%d已%s\n%s", idx, total, idx, status, body)
body = summarizePlannedTaskProgressBody(body, 6, 320)
content := fmt.Sprintf("阶段进度 %d/%d(失败 %d\n最近任务%d 已%s\n%s", completed, total, failed, idx, status, body)
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
@@ -189,6 +274,37 @@ func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total in
})
}
func summarizePlannedTaskProgressBody(body string, maxLines, maxChars int) string {
body = strings.ReplaceAll(body, "\r\n", "\n")
body = strings.TrimSpace(body)
if body == "" {
return "(无输出)"
}
lines := strings.Split(body, "\n")
out := make([]string, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
out = append(out, line)
if maxLines > 0 && len(out) >= maxLines {
break
}
}
if len(out) == 0 {
return "(无输出)"
}
joined := strings.Join(out, "\n")
if maxChars > 0 && len(joined) > maxChars {
joined = truncate(joined, maxChars)
}
if len(lines) > len(out) && !strings.HasSuffix(joined, "...") {
joined += "\n..."
}
return joined
}
func (al *AgentLoop) enrichTaskContentWithMemoryAndEKG(ctx context.Context, task plannedTask) string {
base := strings.TrimSpace(task.Content)
if base == "" {

View File

@@ -0,0 +1,63 @@
package agent
import (
"context"
"errors"
"testing"
)
func TestPlannedProgressMilestones(t *testing.T) {
t.Parallel()
got := plannedProgressMilestones(12)
if len(got) != 2 || got[0] != 4 || got[1] != 8 {
t.Fatalf("unexpected milestones: %#v", got)
}
}
func TestShouldPublishPlannedTaskProgress(t *testing.T) {
t.Parallel()
milestones := plannedProgressMilestones(12)
notified := map[int]struct{}{}
if shouldPublishPlannedTaskProgress(context.Background(), 12, 1, plannedTaskResult{}, milestones, notified) {
t.Fatalf("did not expect early success notification")
}
if !shouldPublishPlannedTaskProgress(context.Background(), 12, 4, plannedTaskResult{}, milestones, notified) {
t.Fatalf("expected milestone notification")
}
notified[4] = struct{}{}
if shouldPublishPlannedTaskProgress(context.Background(), 12, 4, plannedTaskResult{}, milestones, notified) {
t.Fatalf("did not expect duplicate milestone notification")
}
if !shouldPublishPlannedTaskProgress(context.Background(), 12, 5, plannedTaskResult{ErrText: "boom"}, milestones, notified) {
t.Fatalf("expected failure notification")
}
if shouldPublishPlannedTaskProgress(context.Background(), 3, 3, plannedTaskResult{}, plannedProgressMilestones(3), map[int]struct{}{}) {
t.Fatalf("did not expect final success notification")
}
if shouldPublishPlannedTaskProgress(context.Background(), 12, 5, plannedTaskResult{Err: context.Canceled, ErrText: context.Canceled.Error()}, milestones, notified) {
t.Fatalf("did not expect cancellation notification")
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if shouldPublishPlannedTaskProgress(ctx, 12, 5, plannedTaskResult{Err: errors.New("worker exited after parent stop"), ErrText: "worker exited after parent stop"}, milestones, notified) {
t.Fatalf("did not expect notification after parent cancellation")
}
}
func TestIsPlannedTaskCancellation(t *testing.T) {
t.Parallel()
if !isPlannedTaskCancellation(context.Background(), plannedTaskResult{Err: context.Canceled, ErrText: context.Canceled.Error()}) {
t.Fatalf("expected direct context cancellation to be detected")
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if !isPlannedTaskCancellation(ctx, plannedTaskResult{Err: errors.New("worker exited after parent stop"), ErrText: "worker exited after parent stop"}) {
t.Fatalf("expected canceled parent context to suppress planned task result")
}
if isPlannedTaskCancellation(context.Background(), plannedTaskResult{Err: errors.New("boom"), ErrText: "boom"}) {
t.Fatalf("did not expect non-cancellation error to be suppressed")
}
}

View File

@@ -0,0 +1,33 @@
package agent
import "testing"
func TestSplitPlannedSegmentsDoesNotSplitPlainNewlines(t *testing.T) {
t.Parallel()
content := "编写ai漫画创作平台demo\n让产品出方案方案出完让前端后端开始编写写完后交个测试过一下"
got := splitPlannedSegments(content)
if len(got) != 1 {
t.Fatalf("expected 1 segment, got %d: %#v", len(got), got)
}
}
func TestSplitPlannedSegmentsStillSplitsBullets(t *testing.T) {
t.Parallel()
content := "1. 先实现前端\n2. 再补测试"
got := splitPlannedSegments(content)
if len(got) != 2 {
t.Fatalf("expected 2 segments, got %d: %#v", len(got), got)
}
}
func TestSplitPlannedSegmentsStillSplitsSemicolons(t *testing.T) {
t.Parallel()
content := "先实现前端;再补测试"
got := splitPlannedSegments(content)
if len(got) != 2 {
t.Fatalf("expected 2 segments, got %d: %#v", len(got), got)
}
}

View File

@@ -0,0 +1,23 @@
package agent
import (
"strings"
"testing"
)
func TestSummarizePlannedTaskProgressBodyPreservesUsefulLines(t *testing.T) {
t.Parallel()
body := "subagent 已写入 config.json。\npath: /root/.clawgo/config.json\nagent_id: tester\nrole: testing\ndisplay_name: Test Agent\ntool_allowlist: [filesystem shell]\nrouting_keywords: [test qa]\nsystem_prompt_file: agents/tester/AGENT.md"
out := summarizePlannedTaskProgressBody(body, 6, 320)
if !strings.Contains(out, "subagent 已写入 config.json。") {
t.Fatalf("expected title line, got:\n%s", out)
}
if !strings.Contains(out, "agent_id: tester") {
t.Fatalf("expected agent id line, got:\n%s", out)
}
if strings.Contains(out, "subagent 已写入 config.json。 path:") {
t.Fatalf("expected multi-line formatting, got:\n%s", out)
}
}

View File

@@ -88,15 +88,49 @@ func extractSubagentDescription(content string) string {
}
func formatCreatedSubagentForUser(result map[string]interface{}, configPath string) string {
subagent, _ := result["subagent"].(map[string]interface{})
role := ""
displayName := ""
toolAllowlist := interface{}(nil)
systemPromptFile := ""
if subagent != nil {
if v, _ := subagent["role"].(string); v != "" {
role = v
}
if v, _ := subagent["display_name"].(string); v != "" {
displayName = v
}
if tools, ok := subagent["tools"].(map[string]interface{}); ok {
toolAllowlist = tools["allowlist"]
}
if v, _ := subagent["system_prompt_file"].(string); v != "" {
systemPromptFile = v
}
}
routingKeywords := interface{}(nil)
if rules, ok := result["rules"].([]interface{}); ok {
agentID, _ := result["agent_id"].(string)
for _, raw := range rules {
rule, ok := raw.(map[string]interface{})
if !ok {
continue
}
if strings.TrimSpace(fmt.Sprint(rule["agent_id"])) != agentID {
continue
}
routingKeywords = rule["keywords"]
break
}
}
return fmt.Sprintf(
"subagent 已写入 config.json。\npath: %s\nagent_id: %v\nrole: %v\ndisplay_name: %v\ntool_allowlist: %v\nrouting_keywords: %v\nsystem_prompt_file: %v",
configPath,
result["agent_id"],
result["role"],
result["display_name"],
result["tool_allowlist"],
result["routing_keywords"],
result["system_prompt_file"],
role,
displayName,
toolAllowlist,
routingKeywords,
systemPromptFile,
)
}

View File

@@ -1,71 +1,43 @@
package agent
import (
"context"
"path/filepath"
"strings"
"testing"
"clawgo/pkg/bus"
"clawgo/pkg/config"
"clawgo/pkg/runtimecfg"
)
func TestMaybeHandleSubagentConfigIntentCreatePersistsImmediately(t *testing.T) {
workspace := t.TempDir()
configPath := filepath.Join(workspace, "config.json")
cfg := config.DefaultConfig()
cfg.Agents.Router.Enabled = true
cfg.Agents.Subagents["main"] = config.SubagentConfig{
Enabled: true,
Type: "router",
Role: "orchestrator",
SystemPromptFile: "agents/main/AGENT.md",
}
if err := config.SaveConfig(configPath, cfg); err != nil {
t.Fatalf("save config failed: %v", err)
}
runtimecfg.Set(cfg)
t.Cleanup(func() { runtimecfg.Set(config.DefaultConfig()) })
func TestFormatCreatedSubagentForUserReadsNestedFields(t *testing.T) {
t.Parallel()
loop := &AgentLoop{configPath: configPath}
out, handled, err := loop.maybeHandleSubagentConfigIntent(context.Background(), bus.InboundMessage{
SessionKey: "main",
Channel: "cli",
Content: "创建一个负责回归测试和验证修复结果的子代理",
})
if err != nil {
t.Fatalf("create subagent failed: %v", err)
}
if !handled || !strings.Contains(out, "已写入 config.json") {
t.Fatalf("expected immediate persist response, got handled=%v out=%q", handled, out)
}
if !strings.Contains(out, configPath) {
t.Fatalf("expected response to include config path, got %q", out)
}
out := formatCreatedSubagentForUser(map[string]interface{}{
"agent_id": "coder",
"subagent": map[string]interface{}{
"role": "coding",
"display_name": "Code Agent",
"system_prompt_file": "agents/coder/AGENT.md",
"tools": map[string]interface{}{
"allowlist": []interface{}{"filesystem", "shell"},
},
},
"rules": []interface{}{
map[string]interface{}{
"agent_id": "coder",
"keywords": []interface{}{"code", "fix"},
},
},
}, "/tmp/config.json")
reloaded, err := config.LoadConfig(configPath)
if err != nil {
t.Fatalf("reload config failed: %v", err)
}
if _, ok := reloaded.Agents.Subagents["tester"]; !ok {
t.Fatalf("expected tester subagent to persist, got %+v", reloaded.Agents.Subagents)
}
}
func TestMaybeHandleSubagentConfigIntentConfirmCancelNoLongerHandled(t *testing.T) {
loop := &AgentLoop{}
for _, content := range []string{"确认创建", "取消创建"} {
out, handled, err := loop.maybeHandleSubagentConfigIntent(context.Background(), bus.InboundMessage{
SessionKey: "main",
Channel: "cli",
Content: content,
})
if err != nil {
t.Fatalf("unexpected error for %q: %v", content, err)
}
if handled || out != "" {
t.Fatalf("expected %q to pass through, got handled=%v out=%q", content, handled, out)
for _, want := range []string{
"agent_id: coder",
"role: coding",
"display_name: Code Agent",
"system_prompt_file: agents/coder/AGENT.md",
"routing_keywords: [code fix]",
} {
if !strings.Contains(out, want) {
t.Fatalf("expected output to contain %q, got:\n%s", want, out)
}
}
if strings.Contains(out, "<nil>") {
t.Fatalf("did not expect nil placeholders, got:\n%s", out)
}
}

View File

@@ -73,6 +73,7 @@ type SubagentConfig struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
DisplayName string `json:"display_name,omitempty"`
Role string `json:"role,omitempty"`
Description string `json:"description,omitempty"`

View File

@@ -309,6 +309,13 @@ func validateSubagents(cfg *Config) []error {
errs = append(errs, fmt.Errorf("agents.subagents.%s.transport must be one of: local, node", id))
}
}
if policy := strings.TrimSpace(raw.NotifyMainPolicy); policy != "" {
switch policy {
case "final_only", "milestone", "on_blocked", "always", "internal_only":
default:
errs = append(errs, fmt.Errorf("agents.subagents.%s.notify_main_policy must be one of: final_only, milestone, on_blocked, always, internal_only", id))
}
}
if transport == "node" && strings.TrimSpace(raw.NodeID) == "" {
errs = append(errs, fmt.Errorf("agents.subagents.%s.node_id is required when transport=node", id))
}

View File

@@ -110,3 +110,21 @@ func TestValidateNodeBackedSubagentAllowsMissingPromptFile(t *testing.T) {
t.Fatalf("expected node-backed config to be valid, got %v", errs)
}
}
func TestValidateSubagentsRejectsInvalidNotifyMainPolicy(t *testing.T) {
t.Parallel()
cfg := DefaultConfig()
cfg.Agents.Subagents["coder"] = SubagentConfig{
Enabled: true,
SystemPromptFile: "agents/coder/AGENT.md",
NotifyMainPolicy: "loud",
Runtime: SubagentRuntimeConfig{
Proxy: "proxy",
},
}
if errs := Validate(cfg); len(errs) == 0 {
t.Fatalf("expected validation errors")
}
}

View File

@@ -2,6 +2,7 @@ package tools
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@@ -10,6 +11,7 @@ import (
"time"
"clawgo/pkg/bus"
"clawgo/pkg/ekg"
"clawgo/pkg/providers"
)
@@ -22,6 +24,7 @@ type SubagentTask struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
SessionKey string `json:"session_key"`
MemoryNS string `json:"memory_ns"`
SystemPrompt string `json:"system_prompt,omitempty"`
@@ -51,6 +54,7 @@ type SubagentTask struct {
type SubagentManager struct {
tasks map[string]*SubagentTask
cancelFuncs map[string]context.CancelFunc
recoverableTaskIDs []string
archiveAfterMinute int64
mu sync.RWMutex
provider providers.LLMProvider
@@ -61,23 +65,25 @@ type SubagentManager struct {
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
ekg *ekg.Engine
}
type SubagentSpawnOptions struct {
Task string
Label string
Role string
AgentID string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
Task string
Label string
Role string
AgentID string
NotifyMainPolicy string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
@@ -95,13 +101,18 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
ekg: ekg.New(workspace),
}
if runStore != nil {
for _, task := range runStore.List() {
mgr.tasks[task.ID] = task
if task.Status == "running" {
mgr.recoverableTaskIDs = append(mgr.recoverableTaskIDs, task.ID)
}
}
mgr.nextID = runStore.NextIDSeed()
}
go mgr.resumeRecoveredTasks()
return mgr
}
@@ -162,6 +173,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
transport := "local"
nodeID := ""
parentAgentID := ""
notifyMainPolicy := "final_only"
toolAllowlist := []string(nil)
maxRetries := 0
retryBackoff := 1000
@@ -194,6 +206,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
nodeID = strings.TrimSpace(profile.NodeID)
parentAgentID = strings.TrimSpace(profile.ParentAgentID)
notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy)
systemPrompt = strings.TrimSpace(profile.SystemPrompt)
systemPromptFile = strings.TrimSpace(profile.SystemPromptFile)
toolAllowlist = append([]string(nil), profile.ToolAllowlist...)
@@ -231,6 +244,9 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" {
notifyMainPolicy = normalizeNotifyMainPolicy(raw)
}
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
@@ -269,6 +285,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
Transport: transport,
NodeID: nodeID,
ParentAgentID: parentAgentID,
NotifyMainPolicy: notifyMainPolicy,
SessionKey: sessionKey,
MemoryNS: memoryNS,
SystemPrompt: systemPrompt,
@@ -363,20 +380,11 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
}
sm.mu.Unlock()
// 2. Result broadcast (keep existing behavior)
if sm.bus != nil {
prefix := "Task completed"
if runErr != nil {
prefix = "Task failed"
}
if task.Label != "" {
if runErr != nil {
prefix = fmt.Sprintf("Task '%s' failed", task.Label)
} else {
prefix = fmt.Sprintf("Task '%s' completed", task.Label)
}
}
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
sm.recordEKG(task, runErr)
// 2. Result broadcast
if sm.bus != nil && shouldNotifyMainOnFinal(task.NotifyMainPolicy, runErr, task) {
announceContent, notifyReason := buildSubagentMainNotification(task, runErr)
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
@@ -384,20 +392,142 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
SessionKey: task.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"notify_reason": notifyReason,
},
})
}
}
func (sm *SubagentManager) recordEKG(task *SubagentTask, runErr error) {
if sm == nil || sm.ekg == nil || task == nil {
return
}
status := "success"
logText := strings.TrimSpace(task.Result)
if runErr != nil {
status = "error"
if isBlockedSubagentError(runErr) {
logText = "blocked: " + strings.TrimSpace(task.Result)
}
}
sm.ekg.Record(ekg.Event{
TaskID: task.ID,
Session: task.SessionKey,
Channel: task.OriginChannel,
Source: "subagent",
Status: status,
Log: logText,
})
}
func normalizeNotifyMainPolicy(v string) string {
switch strings.ToLower(strings.TrimSpace(v)) {
case "", "final_only":
return "final_only"
case "milestone", "on_blocked", "always", "internal_only":
return strings.ToLower(strings.TrimSpace(v))
default:
return "final_only"
}
}
func shouldNotifyMainOnFinal(policy string, runErr error, task *SubagentTask) bool {
switch normalizeNotifyMainPolicy(policy) {
case "internal_only":
return false
case "always", "final_only":
return true
case "on_blocked":
return isBlockedSubagentError(runErr)
case "milestone":
return false
default:
return true
}
}
func buildSubagentMainNotification(task *SubagentTask, runErr error) (string, string) {
status := "completed"
reason := "final"
if runErr != nil {
status = "failed"
if isBlockedSubagentError(runErr) {
status = "blocked"
reason = "blocked"
}
}
return fmt.Sprintf(
"Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s",
strings.TrimSpace(task.AgentID),
strings.TrimSpace(task.ID),
status,
reason,
summarizeSubagentText(firstNonEmpty(task.Label, task.Task), 120),
summarizeSubagentText(task.Result, 280),
), reason
}
func isBlockedSubagentError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
blockedHints := []string{
"timeout",
"deadline exceeded",
"quota",
"rate limit",
"too many requests",
"permission denied",
"requires input",
"waiting for reply",
"blocked",
}
for _, hint := range blockedHints {
if strings.Contains(msg, hint) {
return true
}
}
return false
}
func summarizeSubagentText(s string, max int) string {
s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n"))
s = strings.ReplaceAll(s, "\n", " ")
s = strings.Join(strings.Fields(s), " ")
if s == "" {
return "(empty)"
}
if max > 0 && len(s) > max {
return strings.TrimSpace(s[:max-3]) + "..."
}
return s
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) (string, error) {
maxRetries := normalizePositiveBound(task.MaxRetries, 0, 8)
backoffMs := normalizePositiveBound(task.RetryBackoff, 500, 120000)
@@ -534,6 +664,7 @@ func (sm *SubagentManager) SetRunFunc(f SubagentRunFunc) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.runFunc = f
go sm.resumeRecoveredTasks()
}
func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
@@ -542,6 +673,38 @@ func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
return sm.profileStore
}
func (sm *SubagentManager) resumeRecoveredTasks() {
if sm == nil {
return
}
sm.mu.Lock()
if sm.runFunc == nil && sm.provider == nil {
sm.mu.Unlock()
return
}
taskIDs := append([]string(nil), sm.recoverableTaskIDs...)
sm.recoverableTaskIDs = nil
toResume := make([]*SubagentTask, 0, len(taskIDs))
for _, taskID := range taskIDs {
task, ok := sm.tasks[taskID]
if !ok || task == nil || task.Status != "running" {
continue
}
task.Updated = time.Now().UnixMilli()
sm.persistTaskLocked(task, "recovered", "auto-resumed after restart")
toResume = append(toResume, task)
}
sm.mu.Unlock()
for _, task := range toResume {
taskCtx, cancel := context.WithCancel(context.Background())
sm.mu.Lock()
sm.cancelFuncs[task.ID] = cancel
sm.mu.Unlock()
go sm.runTask(taskCtx, task)
}
}
func (sm *SubagentManager) NextTaskSequence() int {
sm.mu.RLock()
defer sm.mu.RUnlock()

View File

@@ -27,6 +27,7 @@ func DraftConfigSubagent(description, agentIDHint string) map[string]interface{}
"role": role,
"display_name": displayName,
"description": desc,
"notify_main_policy": "final_only",
"system_prompt": systemPrompt,
"system_prompt_file": "agents/" + agentID + "/AGENT.md",
"memory_namespace": agentID,
@@ -80,6 +81,9 @@ func UpsertConfigSubagent(configPath string, args map[string]interface{}) (map[s
if v := stringArgFromMap(args, "display_name"); v != "" {
subcfg.DisplayName = v
}
if v := stringArgFromMap(args, "notify_main_policy"); v != "" {
subcfg.NotifyMainPolicy = v
}
if v := stringArgFromMap(args, "description"); v != "" {
subcfg.Description = v
}

View File

@@ -32,6 +32,7 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
"action": "upsert",
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"description": "负责回归与评审",
"system_prompt": "review changes",
@@ -56,6 +57,9 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
if reloaded.Agents.Subagents["reviewer"].DisplayName != "Review Agent" {
t.Fatalf("expected config to persist reviewer, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if reloaded.Agents.Subagents["reviewer"].NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to persist")
}

View File

@@ -22,6 +22,7 @@ type SubagentProfile struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
Role string `json:"role,omitempty"`
SystemPrompt string `json:"system_prompt,omitempty"`
SystemPromptFile string `json:"system_prompt_file,omitempty"`
@@ -188,6 +189,7 @@ func normalizeSubagentProfile(in SubagentProfile) SubagentProfile {
p.Transport = normalizeProfileTransport(p.Transport)
p.NodeID = strings.TrimSpace(p.NodeID)
p.ParentAgentID = normalizeSubagentIdentifier(p.ParentAgentID)
p.NotifyMainPolicy = normalizeNotifyMainPolicy(p.NotifyMainPolicy)
p.Role = strings.TrimSpace(p.Role)
p.SystemPrompt = strings.TrimSpace(p.SystemPrompt)
p.SystemPromptFile = strings.TrimSpace(p.SystemPromptFile)
@@ -404,6 +406,7 @@ func profileFromConfig(agentID string, subcfg config.SubagentConfig) SubagentPro
Transport: strings.TrimSpace(subcfg.Transport),
NodeID: strings.TrimSpace(subcfg.NodeID),
ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID),
NotifyMainPolicy: strings.TrimSpace(subcfg.NotifyMainPolicy),
Role: strings.TrimSpace(subcfg.Role),
SystemPrompt: strings.TrimSpace(subcfg.SystemPrompt),
SystemPromptFile: strings.TrimSpace(subcfg.SystemPromptFile),
@@ -507,6 +510,7 @@ func (t *SubagentProfileTool) Parameters() map[string]interface{} {
"description": "Unique subagent id, e.g. coder/writer/tester",
},
"name": map[string]interface{}{"type": "string"},
"notify_main_policy": map[string]interface{}{"type": "string", "description": "final_only|internal_only|milestone|on_blocked|always"},
"role": map[string]interface{}{"type": "string"},
"system_prompt": map[string]interface{}{"type": "string"},
"system_prompt_file": map[string]interface{}{"type": "string"},
@@ -577,6 +581,7 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
p := SubagentProfile{
AgentID: agentID,
Name: stringArg(args, "name"),
NotifyMainPolicy: stringArg(args, "notify_main_policy"),
Role: stringArg(args, "role"),
SystemPrompt: stringArg(args, "system_prompt"),
SystemPromptFile: stringArg(args, "system_prompt_file"),
@@ -612,6 +617,9 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
if _, ok := args["role"]; ok {
next.Role = stringArg(args, "role")
}
if _, ok := args["notify_main_policy"]; ok {
next.NotifyMainPolicy = stringArg(args, "notify_main_policy")
}
if _, ok := args["system_prompt"]; ok {
next.SystemPrompt = stringArg(args, "system_prompt")
}

View File

@@ -145,8 +145,11 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" {
t.Fatalf("expected metadata status=failed, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "failed") {
t.Fatalf("expected failure wording in content, got %q", msg.Content)
if !strings.Contains(strings.ToLower(msg.Content), "status: failed") {
t.Fatalf("expected structured failure status in content, got %q", msg.Content)
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "final" {
t.Fatalf("expected notify_reason=final, got %q", got)
}
}
@@ -205,6 +208,199 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
func TestSubagentManagerInternalOnlySuppressesMainNotification(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "silent-result", nil
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "coder",
Name: "Code Agent",
NotifyMainPolicy: "internal_only",
SystemPromptFile: "agents/coder/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "internal-only task",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
task := waitSubagentDone(t, manager, 4*time.Second)
if task.Status != "completed" {
t.Fatalf("expected completed task, got %s", task.Status)
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
if msg, ok := msgBus.ConsumeInbound(ctx); ok {
t.Fatalf("did not expect main notification, got %+v", msg)
}
}
func TestSubagentManagerOnBlockedNotifiesOnlyBlockedFailures(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
switch task.Task {
case "blocked-task":
return "", errors.New("command tick timeout exceeded: 600s")
default:
return "done", nil
}
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "pm",
Name: "Product Manager",
NotifyMainPolicy: "on_blocked",
SystemPromptFile: "agents/pm/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "successful-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn success case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxSilent, cancelSilent := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancelSilent()
if msg, ok := msgBus.ConsumeInbound(ctxSilent); ok {
t.Fatalf("did not expect success notification for on_blocked, got %+v", msg)
}
_, err = manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "blocked-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn blocked case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxBlocked, cancelBlocked := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelBlocked()
msg, ok := msgBus.ConsumeInbound(ctxBlocked)
if !ok {
t.Fatalf("expected blocked notification")
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "blocked" {
t.Fatalf("expected notify_reason=blocked, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "blocked") {
t.Fatalf("expected blocked wording in content, got %q", msg.Content)
}
}
func TestSubagentManagerRecordsFailuresToEKG(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "", errors.New("rate limit exceeded")
})
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "ekg failure",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
data, err := os.ReadFile(filepath.Join(workspace, "memory", "ekg-events.jsonl"))
if err != nil {
t.Fatalf("expected ekg events to be written: %v", err)
}
text := string(data)
if !strings.Contains(text, "\"source\":\"subagent\"") {
t.Fatalf("expected subagent source in ekg log, got %s", text)
}
if !strings.Contains(text, "\"status\":\"error\"") {
t.Fatalf("expected error status in ekg log, got %s", text)
}
if !strings.Contains(strings.ToLower(text), "rate limit exceeded") {
t.Fatalf("expected failure text in ekg log, got %s", text)
}
}
func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) {
workspace := t.TempDir()
block := make(chan struct{})
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
<-block
return "should-not-complete-here", nil
})
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "recover me",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
time.Sleep(80 * time.Millisecond)
recovered := make(chan string, 1)
reloaded := NewSubagentManager(nil, workspace, nil)
reloaded.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
recovered <- task.ID
return "recovered-ok", nil
})
select {
case taskID := <-recovered:
if taskID != "subagent-1" {
t.Fatalf("expected recovered task id subagent-1, got %s", taskID)
}
case <-time.After(2 * time.Second):
t.Fatalf("expected running task to auto-recover after restart")
}
got, ok := reloaded.GetTask("subagent-1")
if !ok {
t.Fatalf("expected recovered task to exist")
}
if got.Status != "completed" || got.Result != "recovered-ok" {
t.Fatalf("unexpected recovered task: %+v", got)
}
close(block)
_ = waitSubagentDone(t, manager, 4*time.Second)
time.Sleep(100 * time.Millisecond)
}
func TestSubagentManagerPersistsEvents(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil)

View File

@@ -6,6 +6,7 @@ import { useUI } from '../context/UIContext';
type SubagentProfile = {
agent_id: string;
name?: string;
notify_main_policy?: string;
role?: string;
system_prompt?: string;
system_prompt_file?: string;
@@ -31,6 +32,7 @@ type ToolAllowlistGroup = {
const emptyDraft: SubagentProfile = {
agent_id: '',
name: '',
notify_main_policy: 'final_only',
role: '',
system_prompt: '',
system_prompt_file: '',
@@ -79,6 +81,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: next.agent_id || '',
name: next.name || '',
notify_main_policy: next.notify_main_policy || 'final_only',
role: next.role || '',
system_prompt: next.system_prompt || '',
system_prompt_file: next.system_prompt_file || '',
@@ -140,6 +143,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: p.agent_id || '',
name: p.name || '',
notify_main_policy: p.notify_main_policy || 'final_only',
role: p.role || '',
system_prompt: p.system_prompt || '',
system_prompt_file: p.system_prompt_file || '',
@@ -193,6 +197,7 @@ const SubagentProfiles: React.FC = () => {
action,
agent_id: agentId,
name: draft.name || '',
notify_main_policy: draft.notify_main_policy || 'final_only',
role: draft.role || '',
system_prompt: draft.system_prompt || '',
system_prompt_file: draft.system_prompt_file || '',
@@ -350,6 +355,20 @@ const SubagentProfiles: React.FC = () => {
<option value="disabled">disabled</option>
</select>
</div>
<div>
<div className="text-xs text-zinc-400 mb-1">notify_main_policy</div>
<select
value={draft.notify_main_policy || 'final_only'}
onChange={(e) => setDraft({ ...draft, notify_main_policy: e.target.value })}
className="w-full px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded"
>
<option value="final_only">final_only</option>
<option value="internal_only">internal_only</option>
<option value="milestone">milestone</option>
<option value="on_blocked">on_blocked</option>
<option value="always">always</option>
</select>
</div>
<div className="md:col-span-2">
<div className="text-xs text-zinc-400 mb-1">system_prompt_file</div>
<input

View File

@@ -60,6 +60,26 @@ type AgentMessage = {
created_at?: number;
};
type StreamItem = {
kind?: 'event' | 'message' | string;
at?: number;
run_id?: string;
agent_id?: string;
event_type?: string;
message?: string;
retry_count?: number;
message_id?: string;
thread_id?: string;
from_agent?: string;
to_agent?: string;
reply_to?: string;
correlation_id?: string;
message_type?: string;
content?: string;
status?: string;
requires_reply?: boolean;
};
type RegistrySubagent = {
agent_id?: string;
enabled?: boolean;
@@ -68,6 +88,7 @@ type RegistrySubagent = {
node_id?: string;
parent_agent_id?: string;
managed_by?: string;
notify_main_policy?: string;
display_name?: string;
role?: string;
description?: string;
@@ -115,6 +136,8 @@ type AgentTaskStats = {
running: number;
failed: number;
waiting: number;
latestStatus: string;
latestUpdated: number;
active: Array<{ id: string; status: string; title: string }>;
};
@@ -184,6 +207,11 @@ function summarizeTask(task?: string, label?: string): string {
return text.length > 52 ? `${text.slice(0, 49)}...` : text;
}
function formatStreamTime(ts?: number): string {
if (!ts) return '--:--:--';
return new Date(ts).toLocaleTimeString([], { hour12: false });
}
function bezierCurve(x1: number, y1: number, x2: number, y2: number): string {
const offset = Math.max(Math.abs(y2 - y1) * 0.5, 60);
return `M ${x1} ${y1} C ${x1} ${y1 + offset} ${x2} ${y2 - offset} ${x2} ${y2}`;
@@ -199,13 +227,18 @@ function buildTaskStats(tasks: SubagentTask[]): Record<string, AgentTaskStats> {
const agentID = normalizeTitle(task.agent_id, '');
if (!agentID) return acc;
if (!acc[agentID]) {
acc[agentID] = { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
acc[agentID] = { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
}
const item = acc[agentID];
item.total += 1;
if (task.status === 'running') item.running += 1;
if (task.status === 'failed') item.failed += 1;
if (task.waiting_for_reply) item.waiting += 1;
const updatedAt = Math.max(task.updated || 0, task.created || 0);
if (updatedAt >= item.latestUpdated) {
item.latestUpdated = updatedAt;
item.latestStatus = normalizeTitle(task.status, '');
item.failed = task.status === 'failed' ? 1 : 0;
}
if (task.status === 'running' || task.waiting_for_reply) {
item.active.push({
id: task.id,
@@ -330,6 +363,8 @@ const Subagents: React.FC = () => {
const [registryItems, setRegistryItems] = useState<RegistrySubagent[]>([]);
const [promptFileContent, setPromptFileContent] = useState('');
const [promptFileFound, setPromptFileFound] = useState(false);
const [streamItems, setStreamItems] = useState<StreamItem[]>([]);
const [streamTask, setStreamTask] = useState<SubagentTask | null>(null);
const [selectedTopologyBranch, setSelectedTopologyBranch] = useState('');
const [topologyFilter, setTopologyFilter] = useState<'all' | 'running' | 'failed' | 'local' | 'remote'>('all');
const [topologyZoom, setTopologyZoom] = useState(0.9);
@@ -400,6 +435,13 @@ const Subagents: React.FC = () => {
load().catch(() => { });
}, [q, selectedAgentID]);
useEffect(() => {
const timer = window.setInterval(() => {
load().catch(() => { });
}, 5000);
return () => window.clearInterval(timer);
}, [q, selectedAgentID]);
const selected = useMemo(() => items.find((x) => x.id === selectedId) || null, [items, selectedId]);
const selectedRegistryItem = useMemo(
() => registryItems.find((x) => x.agent_id === selectedAgentID) || null,
@@ -414,6 +456,10 @@ const Subagents: React.FC = () => {
[...selectedAgentTasks].sort((a, b) => Math.max(b.updated || 0, b.created || 0) - Math.max(a.updated || 0, a.created || 0))[0] || null,
[selectedAgentTasks]
);
const selectedAgentDisplayName = useMemo(
() => selectedRegistryItem?.display_name || selectedRegistryItem?.agent_id || selectedAgentID || '',
[selectedRegistryItem, selectedAgentID]
);
const parsedNodeTrees = useMemo<NodeTree[]>(() => {
try {
const parsed = JSON.parse(nodeTrees);
@@ -489,7 +535,7 @@ const Subagents: React.FC = () => {
failed: 0,
};
const localMainStats = taskStats[normalizeTitle(localRoot.agent_id, 'main')] || { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
const localMainStats = taskStats[normalizeTitle(localRoot.agent_id, 'main')] || { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
const localMainTask = recentTaskByAgent[normalizeTitle(localRoot.agent_id, 'main')];
localBranchStats.running += localMainStats.running;
localBranchStats.failed += localMainStats.failed;
@@ -509,14 +555,16 @@ const Subagents: React.FC = () => {
`children=${localChildren.length + remoteClusters.length}`,
`total=${localMainStats.total} running=${localMainStats.running}`,
`waiting=${localMainStats.waiting} failed=${localMainStats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === localRoot.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(localRoot.transport, 'local')} type=${normalizeTitle(localRoot.type, 'router')}`,
localMainStats.active[0] ? `task: ${localMainStats.active[0].title}` : t('noLiveTasks'),
],
accent: 'bg-amber-400',
accent: localMainStats.running > 0 ? 'bg-emerald-500' : localMainStats.latestStatus === 'failed' ? 'bg-red-500' : 'bg-amber-400',
clickable: true,
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(localRoot.agent_id, 'main'));
if (localMainTask?.id) setSelectedId(localMainTask.id);
},
};
@@ -525,7 +573,7 @@ const Subagents: React.FC = () => {
localChildren.forEach((child, idx) => {
const childX = localOriginX + idx * (cardWidth + clusterGap);
const childY = childStartY;
const stats = taskStats[normalizeTitle(child.agent_id, '')] || { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
const stats = taskStats[normalizeTitle(child.agent_id, '')] || { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
const task = recentTaskByAgent[normalizeTitle(child.agent_id, '')];
localBranchStats.running += stats.running;
localBranchStats.failed += stats.failed;
@@ -544,14 +592,16 @@ const Subagents: React.FC = () => {
meta: [
`total=${stats.total} running=${stats.running}`,
`waiting=${stats.waiting} failed=${stats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === child.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(child.transport, 'local')} type=${normalizeTitle(child.type, 'worker')}`,
stats.active[0] ? `task: ${stats.active[0].title}` : task ? `last: ${summarizeTask(task.task, task.label)}` : t('noLiveTasks'),
],
accent: stats.running > 0 ? 'bg-emerald-500' : stats.failed > 0 ? 'bg-red-500' : 'bg-sky-400',
accent: stats.running > 0 ? 'bg-emerald-500' : stats.latestStatus === 'failed' ? 'bg-red-500' : 'bg-sky-400',
clickable: true,
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
if (task?.id) setSelectedId(task.id);
},
});
@@ -588,7 +638,11 @@ const Subagents: React.FC = () => {
accent: tree.online ? 'bg-fuchsia-400' : 'bg-zinc-500',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(treeRoot.agent_id, ''));
setSelectedId('');
},
};
cards.push(rootCard);
lines.push({
@@ -619,7 +673,11 @@ const Subagents: React.FC = () => {
accent: 'bg-violet-400',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
setSelectedId('');
},
});
lines.push({
path: bezierCurve(rootCard.x + cardWidth / 2, rootCard.y + cardHeight / 2, childX + cardWidth / 2, childY + cardHeight / 2),
@@ -881,6 +939,26 @@ const Subagents: React.FC = () => {
loadThreadAndInbox(selected).catch(() => { });
}, [selectedId, q, items]);
const loadStream = async (task: SubagentTask | null) => {
if (!task?.id) {
setStreamTask(null);
setStreamItems([]);
return;
}
try {
const streamRes = await callAction({ action: 'stream', id: task.id, limit: 100 });
setStreamTask(streamRes?.result?.task || task);
setStreamItems(Array.isArray(streamRes?.result?.items) ? streamRes.result.items : []);
} catch {
setStreamTask(task);
setStreamItems([]);
}
};
useEffect(() => {
loadStream(selectedAgentLatestTask).catch(() => { });
}, [selectedAgentLatestTask?.id, q, items.length]);
return (
<div className="h-full p-4 md:p-6 flex flex-col gap-4">
<div className="flex items-center justify-between">
@@ -1092,6 +1170,62 @@ const Subagents: React.FC = () => {
</div>
</div>
)}
{selectedAgentID && (
<div className="absolute top-4 right-4 bottom-4 z-20 w-[360px] rounded-2xl border border-zinc-800 bg-zinc-950/92 shadow-2xl shadow-black/40 backdrop-blur-md overflow-hidden flex flex-col">
<div className="flex items-center justify-between px-4 py-3 border-b border-zinc-800">
<div className="min-w-0">
<div className="text-xs text-zinc-500 uppercase tracking-wider">Internal Stream</div>
<div className="text-sm font-semibold text-zinc-100 truncate">{selectedAgentDisplayName}</div>
<div className="text-[11px] text-zinc-500 truncate">{selectedAgentID}</div>
</div>
<button
onClick={() => {
setSelectedAgentID('');
setSelectedId('');
setStreamTask(null);
setStreamItems([]);
}}
className="px-2 py-1 rounded bg-zinc-800 hover:bg-zinc-700 text-[11px] text-zinc-200"
>
Close
</button>
</div>
<div className="px-4 py-3 border-b border-zinc-800 text-xs text-zinc-400">
{streamTask?.id ? (
<div className="space-y-1">
<div>run={streamTask.id}</div>
<div>status={streamTask.status || '-'} · thread={streamTask.thread_id || '-'}</div>
</div>
) : (
<div>No persisted run for this agent yet.</div>
)}
</div>
<div className="flex-1 overflow-y-auto px-4 py-3 space-y-3">
{streamItems.length === 0 ? (
<div className="text-sm text-zinc-500">No internal stream events yet.</div>
) : streamItems.map((item, idx) => (
<div key={`${item.kind || 'item'}-${item.at || 0}-${idx}`} className="rounded-xl border border-zinc-800 bg-zinc-900/70 p-3">
<div className="flex items-center justify-between gap-2 mb-2">
<div className="text-xs font-medium text-zinc-200">
{item.kind === 'event'
? `${item.event_type || 'event'}${item.status ? ` · ${item.status}` : ''}`
: `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`}
</div>
<div className="text-[11px] text-zinc-500">{formatStreamTime(item.at)}</div>
</div>
<div className="text-xs text-zinc-300 whitespace-pre-wrap break-words">
{item.kind === 'event' ? (item.message || '(no event message)') : (item.content || '(empty message)')}
</div>
<div className="mt-2 text-[11px] text-zinc-500">
{item.kind === 'event'
? `run=${item.run_id || '-'}${item.retry_count ? ` · retry=${item.retry_count}` : ''}`
: `status=${item.status || '-'}${item.reply_to ? ` · reply_to=${item.reply_to}` : ''}`}
</div>
</div>
))}
</div>
</div>
)}
</div>
</div>