6 Commits

Author SHA1 Message Date
lpf
0b1fdecd68 Separate main chat from subagent group stream 2026-03-07 12:12:12 +08:00
lpf
1218d68b7e Add internal subagent stream and notify policy 2026-03-07 11:52:36 +08:00
lpf
557633b698 Refresh agent topology state after task restart 2026-03-07 01:53:18 +08:00
lpf
823f96be5a Stop planned task spam after cancellation 2026-03-06 21:31:21 +08:00
lpf
9d0ab54a97 Recover running subagent tasks after restart 2026-03-06 20:05:03 +08:00
lpf
ee9326b2f2 Reduce planned task progress noise 2026-03-06 19:57:54 +08:00
17 changed files with 1155 additions and 68 deletions

View File

@@ -9,6 +9,7 @@ package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math"
@@ -544,6 +545,11 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage)
response, err := al.processPlannedMessage(ctx, msg)
if err != nil {
if errors.Is(err, context.Canceled) {
al.audit.Record(al.getTrigger(msg), msg.Channel, msg.SessionKey, true, err)
al.appendTaskAudit(taskID, msg, started, err, true)
return
}
response = fmt.Sprintf("Error processing message: %v", err)
}

View File

@@ -131,6 +131,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": fallbackString(strings.TrimSpace(subcfg.Transport), "local"),
"node_id": strings.TrimSpace(subcfg.NodeID),
"parent_agent_id": strings.TrimSpace(subcfg.ParentAgentID),
"notify_main_policy": fallbackString(strings.TrimSpace(subcfg.NotifyMainPolicy), "final_only"),
"display_name": subcfg.DisplayName,
"role": subcfg.Role,
"description": subcfg.Description,
@@ -157,6 +158,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": profile.Transport,
"node_id": profile.NodeID,
"parent_agent_id": profile.ParentAgentID,
"notify_main_policy": fallbackString(strings.TrimSpace(profile.NotifyMainPolicy), "final_only"),
"display_name": profile.Name,
"role": profile.Role,
"description": "Node-registered remote main agent branch",
@@ -360,6 +362,53 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
return nil, err
}
return map[string]interface{}{"found": true, "thread": thread, "messages": items}, nil
case "stream":
taskID, err := resolveSubagentTaskIDForRuntime(sm, runtimeStringArg(args, "id"))
if err != nil {
return nil, err
}
task, ok := sm.GetTask(taskID)
if !ok {
return map[string]interface{}{"found": false}, nil
}
events, err := sm.Events(taskID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
var thread *tools.AgentThread
var messages []tools.AgentMessage
if strings.TrimSpace(task.ThreadID) != "" {
if th, ok := sm.Thread(task.ThreadID); ok {
thread = th
}
messages, err = sm.ThreadMessages(task.ThreadID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
}
stream := mergeSubagentStream(events, messages)
return map[string]interface{}{
"found": true,
"task": cloneSubagentTask(task),
"thread": thread,
"items": stream,
}, nil
case "stream_all":
tasks := sm.ListTasks()
sort.Slice(tasks, func(i, j int) bool {
left := maxInt64(tasks[i].Updated, tasks[i].Created)
right := maxInt64(tasks[j].Updated, tasks[j].Created)
if left != right {
return left > right
}
return tasks[i].ID > tasks[j].ID
})
taskLimit := runtimeIntArg(args, "task_limit", 16)
if taskLimit > 0 && len(tasks) > taskLimit {
tasks = tasks[:taskLimit]
}
items := mergeAllSubagentStreams(sm, tasks, runtimeIntArg(args, "limit", 200))
return map[string]interface{}{"found": true, "items": items}, nil
case "inbox":
agentID := runtimeStringArg(args, "agent_id")
if agentID == "" {
@@ -386,6 +435,140 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
}
}
func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.AgentMessage) []map[string]interface{} {
items := make([]map[string]interface{}, 0, len(events)+len(messages))
for _, evt := range events {
items = append(items, map[string]interface{}{
"kind": "event",
"at": evt.At,
"run_id": evt.RunID,
"agent_id": evt.AgentID,
"event_type": evt.Type,
"status": evt.Status,
"message": evt.Message,
"retry_count": evt.RetryCount,
})
}
for _, msg := range messages {
items = append(items, map[string]interface{}{
"kind": "message",
"at": msg.CreatedAt,
"message_id": msg.MessageID,
"thread_id": msg.ThreadID,
"from_agent": msg.FromAgent,
"to_agent": msg.ToAgent,
"reply_to": msg.ReplyTo,
"correlation_id": msg.CorrelationID,
"message_type": msg.Type,
"content": msg.Content,
"status": msg.Status,
"requires_reply": msg.RequiresReply,
})
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["at"].(int64)
right, _ := items[j]["at"].(int64)
if left != right {
return left < right
}
return fmt.Sprintf("%v", items[i]["kind"]) < fmt.Sprintf("%v", items[j]["kind"])
})
return items
}
func mergeAllSubagentStreams(sm *tools.SubagentManager, tasks []*tools.SubagentTask, limit int) []map[string]interface{} {
if sm == nil || len(tasks) == 0 {
return nil
}
items := make([]map[string]interface{}, 0)
seenEvents := map[string]struct{}{}
seenMessages := map[string]struct{}{}
for _, task := range tasks {
if task == nil {
continue
}
if events, err := sm.Events(task.ID, limit); err == nil {
for _, evt := range events {
key := fmt.Sprintf("%s:%s:%d:%s", evt.RunID, evt.Type, evt.At, evt.Message)
if _, ok := seenEvents[key]; ok {
continue
}
seenEvents[key] = struct{}{}
items = append(items, map[string]interface{}{
"kind": "event",
"at": evt.At,
"task_id": task.ID,
"label": task.Label,
"run_id": evt.RunID,
"agent_id": firstNonEmptyString(evt.AgentID, task.AgentID),
"event_type": evt.Type,
"status": evt.Status,
"message": evt.Message,
"retry_count": evt.RetryCount,
})
}
}
if strings.TrimSpace(task.ThreadID) == "" {
continue
}
if messages, err := sm.ThreadMessages(task.ThreadID, limit); err == nil {
for _, msg := range messages {
if _, ok := seenMessages[msg.MessageID]; ok {
continue
}
seenMessages[msg.MessageID] = struct{}{}
items = append(items, map[string]interface{}{
"kind": "message",
"at": msg.CreatedAt,
"task_id": task.ID,
"label": task.Label,
"message_id": msg.MessageID,
"thread_id": msg.ThreadID,
"from_agent": msg.FromAgent,
"to_agent": msg.ToAgent,
"reply_to": msg.ReplyTo,
"correlation_id": msg.CorrelationID,
"message_type": msg.Type,
"content": msg.Content,
"status": msg.Status,
"requires_reply": msg.RequiresReply,
})
}
}
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["at"].(int64)
right, _ := items[j]["at"].(int64)
if left != right {
return left < right
}
return fmt.Sprintf("%v", items[i]["task_id"]) < fmt.Sprintf("%v", items[j]["task_id"])
})
if limit > 0 && len(items) > limit {
items = items[len(items)-limit:]
}
return items
}
func maxInt64(values ...int64) int64 {
var out int64
for _, v := range values {
if v > out {
out = v
}
}
return out
}
func firstNonEmptyString(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask {
if in == nil {
return nil

View File

@@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
"clawgo/pkg/config"
"clawgo/pkg/runtimecfg"
@@ -75,6 +76,7 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
out, err := loop.HandleSubagentRuntime(context.Background(), "upsert_config_subagent", map[string]interface{}{
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"system_prompt": "review changes",
"system_prompt_file": "agents/reviewer/AGENT.md",
@@ -99,6 +101,9 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
if subcfg.SystemPromptFile != "agents/reviewer/AGENT.md" {
t.Fatalf("expected system_prompt_file to persist, got %+v", subcfg)
}
if subcfg.NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", subcfg)
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to be persisted")
}
@@ -316,3 +321,116 @@ func TestHandleSubagentRuntimeProtectsMainAgent(t *testing.T) {
t.Fatalf("expected deleting main agent to fail")
}
}
func TestHandleSubagentRuntimeStream(t *testing.T) {
workspace := t.TempDir()
manager := tools.NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
return "stream-result", nil
})
loop := &AgentLoop{
workspace: workspace,
subagentManager: manager,
subagentRouter: tools.NewSubagentRouter(manager),
}
out, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
"task": "prepare streamable task",
"agent_id": "coder",
"channel": "webui",
"chat_id": "webui",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
payload, ok := out.(map[string]interface{})
if !ok {
t.Fatalf("unexpected spawn payload: %T", out)
}
_ = payload
var task *tools.SubagentTask
for i := 0; i < 50; i++ {
tasks := manager.ListTasks()
if len(tasks) > 0 && tasks[0].Status == "completed" {
task = tasks[0]
break
}
time.Sleep(10 * time.Millisecond)
}
if task == nil {
t.Fatalf("expected completed task")
}
out, err = loop.HandleSubagentRuntime(context.Background(), "stream", map[string]interface{}{
"id": task.ID,
})
if err != nil {
t.Fatalf("stream failed: %v", err)
}
streamPayload, ok := out.(map[string]interface{})
if !ok || streamPayload["found"] != true {
t.Fatalf("unexpected stream payload: %#v", out)
}
items, ok := streamPayload["items"].([]map[string]interface{})
if !ok || len(items) == 0 {
t.Fatalf("expected merged stream items, got %#v", streamPayload["items"])
}
foundEvent := false
foundMessage := false
for _, item := range items {
switch item["kind"] {
case "event":
foundEvent = true
case "message":
foundMessage = true
}
}
if !foundEvent || !foundMessage {
t.Fatalf("expected merged event and message items, got %#v", items)
}
}
func TestHandleSubagentRuntimeStreamAll(t *testing.T) {
workspace := t.TempDir()
manager := tools.NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
return "stream-all-result", nil
})
loop := &AgentLoop{
workspace: workspace,
subagentManager: manager,
subagentRouter: tools.NewSubagentRouter(manager),
}
if _, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
"task": "prepare grouped stream task",
"agent_id": "coder",
"channel": "webui",
"chat_id": "webui",
}); err != nil {
t.Fatalf("spawn failed: %v", err)
}
for i := 0; i < 50; i++ {
tasks := manager.ListTasks()
if len(tasks) > 0 && tasks[0].Status == "completed" {
break
}
time.Sleep(10 * time.Millisecond)
}
out, err := loop.HandleSubagentRuntime(context.Background(), "stream_all", map[string]interface{}{
"limit": 100,
"task_limit": 10,
})
if err != nil {
t.Fatalf("stream_all failed: %v", err)
}
payload, ok := out.(map[string]interface{})
if !ok || payload["found"] != true {
t.Fatalf("unexpected stream_all payload: %#v", out)
}
items, ok := payload["items"].([]map[string]interface{})
if !ok || len(items) == 0 {
t.Fatalf("expected grouped stream items, got %#v", payload["items"])
}
}

View File

@@ -4,7 +4,9 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"regexp"
@@ -26,6 +28,7 @@ type plannedTaskResult struct {
Index int
Task plannedTask
Output string
Err error
ErrText string
}
@@ -123,6 +126,11 @@ func splitPlannedSegments(content string) []string {
func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage, tasks []plannedTask) (string, error) {
results := make([]plannedTaskResult, len(tasks))
var wg sync.WaitGroup
var progressMu sync.Mutex
completed := 0
failed := 0
milestones := plannedProgressMilestones(len(tasks))
notified := make(map[int]struct{}, len(milestones))
for i, task := range tasks {
wg.Add(1)
go func(index int, t plannedTask) {
@@ -137,15 +145,33 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage
subMsg.Metadata["planned_task_index"] = fmt.Sprintf("%d", t.Index)
subMsg.Metadata["planned_task_total"] = fmt.Sprintf("%d", len(tasks))
out, err := al.processMessage(ctx, subMsg)
res := plannedTaskResult{Index: index, Task: t, Output: strings.TrimSpace(out)}
res := plannedTaskResult{Index: index, Task: t, Output: strings.TrimSpace(out), Err: err}
if err != nil {
res.ErrText = err.Error()
}
results[index] = res
al.publishPlannedTaskProgress(msg, len(tasks), res)
progressMu.Lock()
completed++
if res.ErrText != "" && !isPlannedTaskCancellation(ctx, res) {
failed++
}
snapshotCompleted := completed
snapshotFailed := failed
shouldNotify := shouldPublishPlannedTaskProgress(ctx, len(tasks), snapshotCompleted, res, milestones, notified)
if shouldNotify && res.ErrText == "" {
notified[snapshotCompleted] = struct{}{}
}
progressMu.Unlock()
if shouldNotify {
al.publishPlannedTaskProgress(msg, len(tasks), snapshotCompleted, snapshotFailed, res)
}
}(i, task)
}
wg.Wait()
if err := ctx.Err(); err != nil {
return "", err
}
var b strings.Builder
b.WriteString(fmt.Sprintf("已自动拆解为 %d 个任务并执行:\n\n", len(results)))
for _, r := range results {
@@ -163,7 +189,63 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage
return strings.TrimSpace(b.String()), nil
}
func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total int, res plannedTaskResult) {
func plannedProgressMilestones(total int) []int {
if total <= 3 {
return nil
}
points := []float64{0.33, 0.66}
out := make([]int, 0, len(points))
seen := map[int]struct{}{}
for _, p := range points {
step := int(math.Round(float64(total) * p))
if step <= 0 || step >= total {
continue
}
if _, ok := seen[step]; ok {
continue
}
seen[step] = struct{}{}
out = append(out, step)
}
return out
}
func shouldPublishPlannedTaskProgress(ctx context.Context, total, completed int, res plannedTaskResult, milestones []int, notified map[int]struct{}) bool {
if total <= 1 {
return false
}
if isPlannedTaskCancellation(ctx, res) {
return false
}
if strings.TrimSpace(res.ErrText) != "" {
return true
}
if completed >= total {
return false
}
for _, step := range milestones {
if completed != step {
continue
}
if _, ok := notified[step]; ok {
return false
}
return true
}
return false
}
func isPlannedTaskCancellation(ctx context.Context, res plannedTaskResult) bool {
if res.Err != nil && errors.Is(res.Err, context.Canceled) {
return true
}
if strings.EqualFold(strings.TrimSpace(res.ErrText), context.Canceled.Error()) {
return true
}
return ctx != nil && errors.Is(ctx.Err(), context.Canceled)
}
func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total, completed, failed int, res plannedTaskResult) {
if al == nil || al.bus == nil || total <= 1 {
return
}
@@ -184,7 +266,7 @@ func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total in
body = "(无输出)"
}
body = summarizePlannedTaskProgressBody(body, 6, 320)
content := fmt.Sprintf("进度 %d/%d任务%d已%s\n%s", idx, total, idx, status, body)
content := fmt.Sprintf("阶段进度 %d/%d(失败 %d\n最近任务%d 已%s\n%s", completed, total, failed, idx, status, body)
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,

View File

@@ -0,0 +1,63 @@
package agent
import (
"context"
"errors"
"testing"
)
func TestPlannedProgressMilestones(t *testing.T) {
t.Parallel()
got := plannedProgressMilestones(12)
if len(got) != 2 || got[0] != 4 || got[1] != 8 {
t.Fatalf("unexpected milestones: %#v", got)
}
}
func TestShouldPublishPlannedTaskProgress(t *testing.T) {
t.Parallel()
milestones := plannedProgressMilestones(12)
notified := map[int]struct{}{}
if shouldPublishPlannedTaskProgress(context.Background(), 12, 1, plannedTaskResult{}, milestones, notified) {
t.Fatalf("did not expect early success notification")
}
if !shouldPublishPlannedTaskProgress(context.Background(), 12, 4, plannedTaskResult{}, milestones, notified) {
t.Fatalf("expected milestone notification")
}
notified[4] = struct{}{}
if shouldPublishPlannedTaskProgress(context.Background(), 12, 4, plannedTaskResult{}, milestones, notified) {
t.Fatalf("did not expect duplicate milestone notification")
}
if !shouldPublishPlannedTaskProgress(context.Background(), 12, 5, plannedTaskResult{ErrText: "boom"}, milestones, notified) {
t.Fatalf("expected failure notification")
}
if shouldPublishPlannedTaskProgress(context.Background(), 3, 3, plannedTaskResult{}, plannedProgressMilestones(3), map[int]struct{}{}) {
t.Fatalf("did not expect final success notification")
}
if shouldPublishPlannedTaskProgress(context.Background(), 12, 5, plannedTaskResult{Err: context.Canceled, ErrText: context.Canceled.Error()}, milestones, notified) {
t.Fatalf("did not expect cancellation notification")
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if shouldPublishPlannedTaskProgress(ctx, 12, 5, plannedTaskResult{Err: errors.New("worker exited after parent stop"), ErrText: "worker exited after parent stop"}, milestones, notified) {
t.Fatalf("did not expect notification after parent cancellation")
}
}
func TestIsPlannedTaskCancellation(t *testing.T) {
t.Parallel()
if !isPlannedTaskCancellation(context.Background(), plannedTaskResult{Err: context.Canceled, ErrText: context.Canceled.Error()}) {
t.Fatalf("expected direct context cancellation to be detected")
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if !isPlannedTaskCancellation(ctx, plannedTaskResult{Err: errors.New("worker exited after parent stop"), ErrText: "worker exited after parent stop"}) {
t.Fatalf("expected canceled parent context to suppress planned task result")
}
if isPlannedTaskCancellation(context.Background(), plannedTaskResult{Err: errors.New("boom"), ErrText: "boom"}) {
t.Fatalf("did not expect non-cancellation error to be suppressed")
}
}

View File

@@ -73,6 +73,7 @@ type SubagentConfig struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
DisplayName string `json:"display_name,omitempty"`
Role string `json:"role,omitempty"`
Description string `json:"description,omitempty"`

View File

@@ -309,6 +309,13 @@ func validateSubagents(cfg *Config) []error {
errs = append(errs, fmt.Errorf("agents.subagents.%s.transport must be one of: local, node", id))
}
}
if policy := strings.TrimSpace(raw.NotifyMainPolicy); policy != "" {
switch policy {
case "final_only", "milestone", "on_blocked", "always", "internal_only":
default:
errs = append(errs, fmt.Errorf("agents.subagents.%s.notify_main_policy must be one of: final_only, milestone, on_blocked, always, internal_only", id))
}
}
if transport == "node" && strings.TrimSpace(raw.NodeID) == "" {
errs = append(errs, fmt.Errorf("agents.subagents.%s.node_id is required when transport=node", id))
}

View File

@@ -110,3 +110,21 @@ func TestValidateNodeBackedSubagentAllowsMissingPromptFile(t *testing.T) {
t.Fatalf("expected node-backed config to be valid, got %v", errs)
}
}
func TestValidateSubagentsRejectsInvalidNotifyMainPolicy(t *testing.T) {
t.Parallel()
cfg := DefaultConfig()
cfg.Agents.Subagents["coder"] = SubagentConfig{
Enabled: true,
SystemPromptFile: "agents/coder/AGENT.md",
NotifyMainPolicy: "loud",
Runtime: SubagentRuntimeConfig{
Proxy: "proxy",
},
}
if errs := Validate(cfg); len(errs) == 0 {
t.Fatalf("expected validation errors")
}
}

View File

@@ -2,6 +2,7 @@ package tools
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@@ -10,6 +11,7 @@ import (
"time"
"clawgo/pkg/bus"
"clawgo/pkg/ekg"
"clawgo/pkg/providers"
)
@@ -22,6 +24,7 @@ type SubagentTask struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
SessionKey string `json:"session_key"`
MemoryNS string `json:"memory_ns"`
SystemPrompt string `json:"system_prompt,omitempty"`
@@ -51,6 +54,7 @@ type SubagentTask struct {
type SubagentManager struct {
tasks map[string]*SubagentTask
cancelFuncs map[string]context.CancelFunc
recoverableTaskIDs []string
archiveAfterMinute int64
mu sync.RWMutex
provider providers.LLMProvider
@@ -61,23 +65,25 @@ type SubagentManager struct {
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
ekg *ekg.Engine
}
type SubagentSpawnOptions struct {
Task string
Label string
Role string
AgentID string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
Task string
Label string
Role string
AgentID string
NotifyMainPolicy string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
@@ -95,13 +101,18 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
ekg: ekg.New(workspace),
}
if runStore != nil {
for _, task := range runStore.List() {
mgr.tasks[task.ID] = task
if task.Status == "running" {
mgr.recoverableTaskIDs = append(mgr.recoverableTaskIDs, task.ID)
}
}
mgr.nextID = runStore.NextIDSeed()
}
go mgr.resumeRecoveredTasks()
return mgr
}
@@ -162,6 +173,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
transport := "local"
nodeID := ""
parentAgentID := ""
notifyMainPolicy := "final_only"
toolAllowlist := []string(nil)
maxRetries := 0
retryBackoff := 1000
@@ -194,6 +206,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
nodeID = strings.TrimSpace(profile.NodeID)
parentAgentID = strings.TrimSpace(profile.ParentAgentID)
notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy)
systemPrompt = strings.TrimSpace(profile.SystemPrompt)
systemPromptFile = strings.TrimSpace(profile.SystemPromptFile)
toolAllowlist = append([]string(nil), profile.ToolAllowlist...)
@@ -231,6 +244,9 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" {
notifyMainPolicy = normalizeNotifyMainPolicy(raw)
}
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
@@ -269,6 +285,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
Transport: transport,
NodeID: nodeID,
ParentAgentID: parentAgentID,
NotifyMainPolicy: notifyMainPolicy,
SessionKey: sessionKey,
MemoryNS: memoryNS,
SystemPrompt: systemPrompt,
@@ -363,20 +380,11 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
}
sm.mu.Unlock()
// 2. Result broadcast (keep existing behavior)
if sm.bus != nil {
prefix := "Task completed"
if runErr != nil {
prefix = "Task failed"
}
if task.Label != "" {
if runErr != nil {
prefix = fmt.Sprintf("Task '%s' failed", task.Label)
} else {
prefix = fmt.Sprintf("Task '%s' completed", task.Label)
}
}
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
sm.recordEKG(task, runErr)
// 2. Result broadcast
if sm.bus != nil && shouldNotifyMainOnFinal(task.NotifyMainPolicy, runErr, task) {
announceContent, notifyReason := buildSubagentMainNotification(task, runErr)
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
@@ -384,20 +392,142 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
SessionKey: task.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"notify_reason": notifyReason,
},
})
}
}
func (sm *SubagentManager) recordEKG(task *SubagentTask, runErr error) {
if sm == nil || sm.ekg == nil || task == nil {
return
}
status := "success"
logText := strings.TrimSpace(task.Result)
if runErr != nil {
status = "error"
if isBlockedSubagentError(runErr) {
logText = "blocked: " + strings.TrimSpace(task.Result)
}
}
sm.ekg.Record(ekg.Event{
TaskID: task.ID,
Session: task.SessionKey,
Channel: task.OriginChannel,
Source: "subagent",
Status: status,
Log: logText,
})
}
func normalizeNotifyMainPolicy(v string) string {
switch strings.ToLower(strings.TrimSpace(v)) {
case "", "final_only":
return "final_only"
case "milestone", "on_blocked", "always", "internal_only":
return strings.ToLower(strings.TrimSpace(v))
default:
return "final_only"
}
}
func shouldNotifyMainOnFinal(policy string, runErr error, task *SubagentTask) bool {
switch normalizeNotifyMainPolicy(policy) {
case "internal_only":
return false
case "always", "final_only":
return true
case "on_blocked":
return isBlockedSubagentError(runErr)
case "milestone":
return false
default:
return true
}
}
func buildSubagentMainNotification(task *SubagentTask, runErr error) (string, string) {
status := "completed"
reason := "final"
if runErr != nil {
status = "failed"
if isBlockedSubagentError(runErr) {
status = "blocked"
reason = "blocked"
}
}
return fmt.Sprintf(
"Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s",
strings.TrimSpace(task.AgentID),
strings.TrimSpace(task.ID),
status,
reason,
summarizeSubagentText(firstNonEmpty(task.Label, task.Task), 120),
summarizeSubagentText(task.Result, 280),
), reason
}
func isBlockedSubagentError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
blockedHints := []string{
"timeout",
"deadline exceeded",
"quota",
"rate limit",
"too many requests",
"permission denied",
"requires input",
"waiting for reply",
"blocked",
}
for _, hint := range blockedHints {
if strings.Contains(msg, hint) {
return true
}
}
return false
}
func summarizeSubagentText(s string, max int) string {
s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n"))
s = strings.ReplaceAll(s, "\n", " ")
s = strings.Join(strings.Fields(s), " ")
if s == "" {
return "(empty)"
}
if max > 0 && len(s) > max {
return strings.TrimSpace(s[:max-3]) + "..."
}
return s
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) (string, error) {
maxRetries := normalizePositiveBound(task.MaxRetries, 0, 8)
backoffMs := normalizePositiveBound(task.RetryBackoff, 500, 120000)
@@ -534,6 +664,7 @@ func (sm *SubagentManager) SetRunFunc(f SubagentRunFunc) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.runFunc = f
go sm.resumeRecoveredTasks()
}
func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
@@ -542,6 +673,38 @@ func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
return sm.profileStore
}
func (sm *SubagentManager) resumeRecoveredTasks() {
if sm == nil {
return
}
sm.mu.Lock()
if sm.runFunc == nil && sm.provider == nil {
sm.mu.Unlock()
return
}
taskIDs := append([]string(nil), sm.recoverableTaskIDs...)
sm.recoverableTaskIDs = nil
toResume := make([]*SubagentTask, 0, len(taskIDs))
for _, taskID := range taskIDs {
task, ok := sm.tasks[taskID]
if !ok || task == nil || task.Status != "running" {
continue
}
task.Updated = time.Now().UnixMilli()
sm.persistTaskLocked(task, "recovered", "auto-resumed after restart")
toResume = append(toResume, task)
}
sm.mu.Unlock()
for _, task := range toResume {
taskCtx, cancel := context.WithCancel(context.Background())
sm.mu.Lock()
sm.cancelFuncs[task.ID] = cancel
sm.mu.Unlock()
go sm.runTask(taskCtx, task)
}
}
func (sm *SubagentManager) NextTaskSequence() int {
sm.mu.RLock()
defer sm.mu.RUnlock()

View File

@@ -27,6 +27,7 @@ func DraftConfigSubagent(description, agentIDHint string) map[string]interface{}
"role": role,
"display_name": displayName,
"description": desc,
"notify_main_policy": "final_only",
"system_prompt": systemPrompt,
"system_prompt_file": "agents/" + agentID + "/AGENT.md",
"memory_namespace": agentID,
@@ -80,6 +81,9 @@ func UpsertConfigSubagent(configPath string, args map[string]interface{}) (map[s
if v := stringArgFromMap(args, "display_name"); v != "" {
subcfg.DisplayName = v
}
if v := stringArgFromMap(args, "notify_main_policy"); v != "" {
subcfg.NotifyMainPolicy = v
}
if v := stringArgFromMap(args, "description"); v != "" {
subcfg.Description = v
}

View File

@@ -32,6 +32,7 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
"action": "upsert",
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"description": "负责回归与评审",
"system_prompt": "review changes",
@@ -56,6 +57,9 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
if reloaded.Agents.Subagents["reviewer"].DisplayName != "Review Agent" {
t.Fatalf("expected config to persist reviewer, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if reloaded.Agents.Subagents["reviewer"].NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to persist")
}

View File

@@ -22,6 +22,7 @@ type SubagentProfile struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
Role string `json:"role,omitempty"`
SystemPrompt string `json:"system_prompt,omitempty"`
SystemPromptFile string `json:"system_prompt_file,omitempty"`
@@ -188,6 +189,7 @@ func normalizeSubagentProfile(in SubagentProfile) SubagentProfile {
p.Transport = normalizeProfileTransport(p.Transport)
p.NodeID = strings.TrimSpace(p.NodeID)
p.ParentAgentID = normalizeSubagentIdentifier(p.ParentAgentID)
p.NotifyMainPolicy = normalizeNotifyMainPolicy(p.NotifyMainPolicy)
p.Role = strings.TrimSpace(p.Role)
p.SystemPrompt = strings.TrimSpace(p.SystemPrompt)
p.SystemPromptFile = strings.TrimSpace(p.SystemPromptFile)
@@ -404,6 +406,7 @@ func profileFromConfig(agentID string, subcfg config.SubagentConfig) SubagentPro
Transport: strings.TrimSpace(subcfg.Transport),
NodeID: strings.TrimSpace(subcfg.NodeID),
ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID),
NotifyMainPolicy: strings.TrimSpace(subcfg.NotifyMainPolicy),
Role: strings.TrimSpace(subcfg.Role),
SystemPrompt: strings.TrimSpace(subcfg.SystemPrompt),
SystemPromptFile: strings.TrimSpace(subcfg.SystemPromptFile),
@@ -507,6 +510,7 @@ func (t *SubagentProfileTool) Parameters() map[string]interface{} {
"description": "Unique subagent id, e.g. coder/writer/tester",
},
"name": map[string]interface{}{"type": "string"},
"notify_main_policy": map[string]interface{}{"type": "string", "description": "final_only|internal_only|milestone|on_blocked|always"},
"role": map[string]interface{}{"type": "string"},
"system_prompt": map[string]interface{}{"type": "string"},
"system_prompt_file": map[string]interface{}{"type": "string"},
@@ -577,6 +581,7 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
p := SubagentProfile{
AgentID: agentID,
Name: stringArg(args, "name"),
NotifyMainPolicy: stringArg(args, "notify_main_policy"),
Role: stringArg(args, "role"),
SystemPrompt: stringArg(args, "system_prompt"),
SystemPromptFile: stringArg(args, "system_prompt_file"),
@@ -612,6 +617,9 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
if _, ok := args["role"]; ok {
next.Role = stringArg(args, "role")
}
if _, ok := args["notify_main_policy"]; ok {
next.NotifyMainPolicy = stringArg(args, "notify_main_policy")
}
if _, ok := args["system_prompt"]; ok {
next.SystemPrompt = stringArg(args, "system_prompt")
}

View File

@@ -145,8 +145,11 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" {
t.Fatalf("expected metadata status=failed, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "failed") {
t.Fatalf("expected failure wording in content, got %q", msg.Content)
if !strings.Contains(strings.ToLower(msg.Content), "status: failed") {
t.Fatalf("expected structured failure status in content, got %q", msg.Content)
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "final" {
t.Fatalf("expected notify_reason=final, got %q", got)
}
}
@@ -205,6 +208,199 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
func TestSubagentManagerInternalOnlySuppressesMainNotification(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "silent-result", nil
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "coder",
Name: "Code Agent",
NotifyMainPolicy: "internal_only",
SystemPromptFile: "agents/coder/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "internal-only task",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
task := waitSubagentDone(t, manager, 4*time.Second)
if task.Status != "completed" {
t.Fatalf("expected completed task, got %s", task.Status)
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
if msg, ok := msgBus.ConsumeInbound(ctx); ok {
t.Fatalf("did not expect main notification, got %+v", msg)
}
}
func TestSubagentManagerOnBlockedNotifiesOnlyBlockedFailures(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
switch task.Task {
case "blocked-task":
return "", errors.New("command tick timeout exceeded: 600s")
default:
return "done", nil
}
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "pm",
Name: "Product Manager",
NotifyMainPolicy: "on_blocked",
SystemPromptFile: "agents/pm/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "successful-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn success case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxSilent, cancelSilent := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancelSilent()
if msg, ok := msgBus.ConsumeInbound(ctxSilent); ok {
t.Fatalf("did not expect success notification for on_blocked, got %+v", msg)
}
_, err = manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "blocked-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn blocked case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxBlocked, cancelBlocked := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelBlocked()
msg, ok := msgBus.ConsumeInbound(ctxBlocked)
if !ok {
t.Fatalf("expected blocked notification")
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "blocked" {
t.Fatalf("expected notify_reason=blocked, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "blocked") {
t.Fatalf("expected blocked wording in content, got %q", msg.Content)
}
}
func TestSubagentManagerRecordsFailuresToEKG(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "", errors.New("rate limit exceeded")
})
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "ekg failure",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
data, err := os.ReadFile(filepath.Join(workspace, "memory", "ekg-events.jsonl"))
if err != nil {
t.Fatalf("expected ekg events to be written: %v", err)
}
text := string(data)
if !strings.Contains(text, "\"source\":\"subagent\"") {
t.Fatalf("expected subagent source in ekg log, got %s", text)
}
if !strings.Contains(text, "\"status\":\"error\"") {
t.Fatalf("expected error status in ekg log, got %s", text)
}
if !strings.Contains(strings.ToLower(text), "rate limit exceeded") {
t.Fatalf("expected failure text in ekg log, got %s", text)
}
}
func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) {
workspace := t.TempDir()
block := make(chan struct{})
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
<-block
return "should-not-complete-here", nil
})
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "recover me",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
time.Sleep(80 * time.Millisecond)
recovered := make(chan string, 1)
reloaded := NewSubagentManager(nil, workspace, nil)
reloaded.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
recovered <- task.ID
return "recovered-ok", nil
})
select {
case taskID := <-recovered:
if taskID != "subagent-1" {
t.Fatalf("expected recovered task id subagent-1, got %s", taskID)
}
case <-time.After(2 * time.Second):
t.Fatalf("expected running task to auto-recover after restart")
}
got, ok := reloaded.GetTask("subagent-1")
if !ok {
t.Fatalf("expected recovered task to exist")
}
if got.Status != "completed" || got.Result != "recovered-ok" {
t.Fatalf("unexpected recovered task: %+v", got)
}
close(block)
_ = waitSubagentDone(t, manager, 4*time.Second)
time.Sleep(100 * time.Millisecond)
}
func TestSubagentManagerPersistsEvents(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil)

View File

@@ -116,6 +116,10 @@ const resources = {
noCronJobs: 'No cron jobs found',
noNodes: 'No nodes available',
sessions: 'Sessions',
mainChat: 'Main Chat',
subagentGroup: 'Subagent Group',
noSubagentStream: 'No subagent internal stream yet.',
subagentGroupReadonly: 'Subagent group is read-only.',
startConversation: 'Start a conversation',
typeMessage: 'Type a message...',
configuration: 'Configuration',
@@ -571,6 +575,10 @@ const resources = {
noCronJobs: '未找到定时任务',
noNodes: '无可用节点',
sessions: '会话',
mainChat: '主对话',
subagentGroup: '子代理群组',
noSubagentStream: '当前还没有子代理内部流。',
subagentGroupReadonly: '子代理群组为只读视图。',
startConversation: '开始对话',
typeMessage: '输入消息...',
configuration: '配置',

View File

@@ -5,12 +5,28 @@ import { useTranslation } from 'react-i18next';
import { useAppContext } from '../context/AppContext';
import { ChatItem } from '../types';
type StreamItem = {
kind?: string;
at?: number;
task_id?: string;
label?: string;
agent_id?: string;
event_type?: string;
message?: string;
message_type?: string;
content?: string;
from_agent?: string;
to_agent?: string;
status?: string;
};
const Chat: React.FC = () => {
const { t } = useTranslation();
const { q, sessions } = useAppContext();
const [chat, setChat] = useState<ChatItem[]>([]);
const [msg, setMsg] = useState('');
const [fileSelected, setFileSelected] = useState(false);
const [chatTab, setChatTab] = useState<'main' | 'subagents'>('main');
const [sessionKey, setSessionKey] = useState('main');
const chatEndRef = useRef<HTMLDivElement>(null);
@@ -51,6 +67,34 @@ const Chat: React.FC = () => {
}
};
const loadSubagentGroup = async () => {
try {
const r = await fetch(`/webui/api/subagents_runtime${q}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: 'stream_all', limit: 200, task_limit: 24 }),
});
if (!r.ok) return;
const j = await r.json();
const arr = Array.isArray(j?.result?.items) ? j.result.items : [];
const mapped: ChatItem[] = arr.map((item: StreamItem) => {
const isEvent = item.kind === 'event';
const label = isEvent
? `${item.agent_id || 'subagent'} · ${item.event_type || 'event'}`
: `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`;
const body = isEvent ? (item.message || '') : (item.content || '');
return {
role: 'assistant',
label,
text: `${body}${item.status ? `\n\nstatus: ${item.status}` : ''}`,
};
});
setChat(mapped);
} catch (e) {
console.error(e);
}
};
async function send() {
if (!msg.trim() && !fileSelected) return;
@@ -111,27 +155,55 @@ const Chat: React.FC = () => {
}
useEffect(() => {
loadHistory();
}, [q, sessionKey]);
if (chatTab === 'main') {
loadHistory();
return;
}
loadSubagentGroup();
}, [q, chatTab, sessionKey]);
useEffect(() => {
if (!sessions || sessions.length === 0) return;
if (!sessions.some(s => s.key === sessionKey)) {
setSessionKey(sessions[0].key);
if (chatTab !== 'subagents') return;
const timer = window.setInterval(() => {
loadSubagentGroup();
}, 5000);
return () => window.clearInterval(timer);
}, [q, chatTab]);
const userSessions = (sessions || []).filter((s: any) => !String(s?.key || '').startsWith('subagent:'));
useEffect(() => {
if (chatTab !== 'main') return;
if (!userSessions.length) return;
if (!userSessions.some((s: any) => s.key === sessionKey)) {
setSessionKey(userSessions[0].key);
}
}, [sessions]);
}, [chatTab, sessionKey, userSessions]);
return (
<div className="flex h-full">
<div className="flex-1 flex flex-col bg-zinc-950/50">
<div className="px-4 py-3 border-b border-zinc-800 flex items-center justify-between gap-3 flex-wrap">
<div className="flex items-center gap-2">
<h2 className="text-sm text-zinc-300 font-medium">{t('session')}</h2>
<select value={sessionKey} onChange={(e)=>setSessionKey(e.target.value)} className="bg-zinc-900 border border-zinc-700 rounded px-2 py-1 text-xs text-zinc-200">
{(sessions || []).map((s:any)=> <option key={s.key} value={s.key}>{s.key}</option>)}
</select>
<button
onClick={() => setChatTab('main')}
className={`px-3 py-1.5 rounded-lg text-xs ${chatTab === 'main' ? 'bg-indigo-600 text-white' : 'bg-zinc-900 border border-zinc-700 text-zinc-300'}`}
>
Main Chat
</button>
<button
onClick={() => setChatTab('subagents')}
className={`px-3 py-1.5 rounded-lg text-xs ${chatTab === 'subagents' ? 'bg-amber-600 text-white' : 'bg-zinc-900 border border-zinc-700 text-zinc-300'}`}
>
{t('subagentGroup')}
</button>
{chatTab === 'main' && (
<select value={sessionKey} onChange={(e) => setSessionKey(e.target.value)} className="bg-zinc-900 border border-zinc-700 rounded px-2 py-1 text-xs text-zinc-200">
{userSessions.map((s: any) => <option key={s.key} value={s.key}>{s.title || s.key}</option>)}
</select>
)}
</div>
<button onClick={loadHistory} className="flex items-center gap-1 px-2 py-1 text-xs rounded bg-zinc-800 hover:bg-zinc-700"><RefreshCw className="w-3 h-3"/>{t('reloadHistory')}</button>
<button onClick={chatTab === 'main' ? loadHistory : loadSubagentGroup} className="flex items-center gap-1 px-2 py-1 text-xs rounded bg-zinc-800 hover:bg-zinc-700"><RefreshCw className="w-3 h-3"/>{t('reloadHistory')}</button>
</div>
<div className="flex-1 overflow-y-auto p-6 space-y-6">
@@ -140,7 +212,7 @@ const Chat: React.FC = () => {
<div className="w-16 h-16 rounded-2xl bg-zinc-900 flex items-center justify-center border border-zinc-800">
<MessageSquare className="w-8 h-8 text-zinc-600" />
</div>
<p className="text-sm font-medium">{t('startConversation')}</p>
<p className="text-sm font-medium">{chatTab === 'main' ? t('startConversation') : t('noSubagentStream')}</p>
</div>
) : (
chat.map((m, i) => {
@@ -203,13 +275,14 @@ const Chat: React.FC = () => {
<input
value={msg}
onChange={(e) => setMsg(e.target.value)}
onKeyDown={(e) => e.key === 'Enter' && send()}
placeholder={t('typeMessage')}
className="w-full bg-zinc-900 border border-zinc-800 rounded-full pl-14 pr-14 py-3.5 text-[15px] focus:outline-none focus:border-indigo-500 focus:ring-1 focus:ring-indigo-500 transition-all placeholder:text-zinc-500 shadow-sm"
onKeyDown={(e) => chatTab === 'main' && e.key === 'Enter' && send()}
placeholder={chatTab === 'main' ? t('typeMessage') : t('subagentGroupReadonly')}
disabled={chatTab !== 'main'}
className="w-full bg-zinc-900 border border-zinc-800 rounded-full pl-14 pr-14 py-3.5 text-[15px] focus:outline-none focus:border-indigo-500 focus:ring-1 focus:ring-indigo-500 transition-all placeholder:text-zinc-500 shadow-sm disabled:opacity-60"
/>
<button
onClick={send}
disabled={!msg.trim() && !fileSelected}
disabled={chatTab !== 'main' || (!msg.trim() && !fileSelected)}
className="absolute right-2 p-2.5 bg-indigo-600 hover:bg-indigo-500 disabled:opacity-50 disabled:hover:bg-indigo-600 text-white rounded-full transition-colors shadow-sm"
>
<Send className="w-4 h-4 ml-0.5" />

View File

@@ -6,6 +6,7 @@ import { useUI } from '../context/UIContext';
type SubagentProfile = {
agent_id: string;
name?: string;
notify_main_policy?: string;
role?: string;
system_prompt?: string;
system_prompt_file?: string;
@@ -31,6 +32,7 @@ type ToolAllowlistGroup = {
const emptyDraft: SubagentProfile = {
agent_id: '',
name: '',
notify_main_policy: 'final_only',
role: '',
system_prompt: '',
system_prompt_file: '',
@@ -79,6 +81,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: next.agent_id || '',
name: next.name || '',
notify_main_policy: next.notify_main_policy || 'final_only',
role: next.role || '',
system_prompt: next.system_prompt || '',
system_prompt_file: next.system_prompt_file || '',
@@ -140,6 +143,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: p.agent_id || '',
name: p.name || '',
notify_main_policy: p.notify_main_policy || 'final_only',
role: p.role || '',
system_prompt: p.system_prompt || '',
system_prompt_file: p.system_prompt_file || '',
@@ -193,6 +197,7 @@ const SubagentProfiles: React.FC = () => {
action,
agent_id: agentId,
name: draft.name || '',
notify_main_policy: draft.notify_main_policy || 'final_only',
role: draft.role || '',
system_prompt: draft.system_prompt || '',
system_prompt_file: draft.system_prompt_file || '',
@@ -350,6 +355,20 @@ const SubagentProfiles: React.FC = () => {
<option value="disabled">disabled</option>
</select>
</div>
<div>
<div className="text-xs text-zinc-400 mb-1">notify_main_policy</div>
<select
value={draft.notify_main_policy || 'final_only'}
onChange={(e) => setDraft({ ...draft, notify_main_policy: e.target.value })}
className="w-full px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded"
>
<option value="final_only">final_only</option>
<option value="internal_only">internal_only</option>
<option value="milestone">milestone</option>
<option value="on_blocked">on_blocked</option>
<option value="always">always</option>
</select>
</div>
<div className="md:col-span-2">
<div className="text-xs text-zinc-400 mb-1">system_prompt_file</div>
<input

View File

@@ -60,6 +60,26 @@ type AgentMessage = {
created_at?: number;
};
type StreamItem = {
kind?: 'event' | 'message' | string;
at?: number;
run_id?: string;
agent_id?: string;
event_type?: string;
message?: string;
retry_count?: number;
message_id?: string;
thread_id?: string;
from_agent?: string;
to_agent?: string;
reply_to?: string;
correlation_id?: string;
message_type?: string;
content?: string;
status?: string;
requires_reply?: boolean;
};
type RegistrySubagent = {
agent_id?: string;
enabled?: boolean;
@@ -68,6 +88,7 @@ type RegistrySubagent = {
node_id?: string;
parent_agent_id?: string;
managed_by?: string;
notify_main_policy?: string;
display_name?: string;
role?: string;
description?: string;
@@ -115,6 +136,8 @@ type AgentTaskStats = {
running: number;
failed: number;
waiting: number;
latestStatus: string;
latestUpdated: number;
active: Array<{ id: string; status: string; title: string }>;
};
@@ -184,6 +207,11 @@ function summarizeTask(task?: string, label?: string): string {
return text.length > 52 ? `${text.slice(0, 49)}...` : text;
}
function formatStreamTime(ts?: number): string {
if (!ts) return '--:--:--';
return new Date(ts).toLocaleTimeString([], { hour12: false });
}
function bezierCurve(x1: number, y1: number, x2: number, y2: number): string {
const offset = Math.max(Math.abs(y2 - y1) * 0.5, 60);
return `M ${x1} ${y1} C ${x1} ${y1 + offset} ${x2} ${y2 - offset} ${x2} ${y2}`;
@@ -199,13 +227,18 @@ function buildTaskStats(tasks: SubagentTask[]): Record<string, AgentTaskStats> {
const agentID = normalizeTitle(task.agent_id, '');
if (!agentID) return acc;
if (!acc[agentID]) {
acc[agentID] = { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
acc[agentID] = { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
}
const item = acc[agentID];
item.total += 1;
if (task.status === 'running') item.running += 1;
if (task.status === 'failed') item.failed += 1;
if (task.waiting_for_reply) item.waiting += 1;
const updatedAt = Math.max(task.updated || 0, task.created || 0);
if (updatedAt >= item.latestUpdated) {
item.latestUpdated = updatedAt;
item.latestStatus = normalizeTitle(task.status, '');
item.failed = task.status === 'failed' ? 1 : 0;
}
if (task.status === 'running' || task.waiting_for_reply) {
item.active.push({
id: task.id,
@@ -330,6 +363,8 @@ const Subagents: React.FC = () => {
const [registryItems, setRegistryItems] = useState<RegistrySubagent[]>([]);
const [promptFileContent, setPromptFileContent] = useState('');
const [promptFileFound, setPromptFileFound] = useState(false);
const [streamItems, setStreamItems] = useState<StreamItem[]>([]);
const [streamTask, setStreamTask] = useState<SubagentTask | null>(null);
const [selectedTopologyBranch, setSelectedTopologyBranch] = useState('');
const [topologyFilter, setTopologyFilter] = useState<'all' | 'running' | 'failed' | 'local' | 'remote'>('all');
const [topologyZoom, setTopologyZoom] = useState(0.9);
@@ -400,6 +435,13 @@ const Subagents: React.FC = () => {
load().catch(() => { });
}, [q, selectedAgentID]);
useEffect(() => {
const timer = window.setInterval(() => {
load().catch(() => { });
}, 5000);
return () => window.clearInterval(timer);
}, [q, selectedAgentID]);
const selected = useMemo(() => items.find((x) => x.id === selectedId) || null, [items, selectedId]);
const selectedRegistryItem = useMemo(
() => registryItems.find((x) => x.agent_id === selectedAgentID) || null,
@@ -414,6 +456,10 @@ const Subagents: React.FC = () => {
[...selectedAgentTasks].sort((a, b) => Math.max(b.updated || 0, b.created || 0) - Math.max(a.updated || 0, a.created || 0))[0] || null,
[selectedAgentTasks]
);
const selectedAgentDisplayName = useMemo(
() => selectedRegistryItem?.display_name || selectedRegistryItem?.agent_id || selectedAgentID || '',
[selectedRegistryItem, selectedAgentID]
);
const parsedNodeTrees = useMemo<NodeTree[]>(() => {
try {
const parsed = JSON.parse(nodeTrees);
@@ -489,7 +535,7 @@ const Subagents: React.FC = () => {
failed: 0,
};
const localMainStats = taskStats[normalizeTitle(localRoot.agent_id, 'main')] || { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
const localMainStats = taskStats[normalizeTitle(localRoot.agent_id, 'main')] || { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
const localMainTask = recentTaskByAgent[normalizeTitle(localRoot.agent_id, 'main')];
localBranchStats.running += localMainStats.running;
localBranchStats.failed += localMainStats.failed;
@@ -509,14 +555,16 @@ const Subagents: React.FC = () => {
`children=${localChildren.length + remoteClusters.length}`,
`total=${localMainStats.total} running=${localMainStats.running}`,
`waiting=${localMainStats.waiting} failed=${localMainStats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === localRoot.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(localRoot.transport, 'local')} type=${normalizeTitle(localRoot.type, 'router')}`,
localMainStats.active[0] ? `task: ${localMainStats.active[0].title}` : t('noLiveTasks'),
],
accent: 'bg-amber-400',
accent: localMainStats.running > 0 ? 'bg-emerald-500' : localMainStats.latestStatus === 'failed' ? 'bg-red-500' : 'bg-amber-400',
clickable: true,
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(localRoot.agent_id, 'main'));
if (localMainTask?.id) setSelectedId(localMainTask.id);
},
};
@@ -525,7 +573,7 @@ const Subagents: React.FC = () => {
localChildren.forEach((child, idx) => {
const childX = localOriginX + idx * (cardWidth + clusterGap);
const childY = childStartY;
const stats = taskStats[normalizeTitle(child.agent_id, '')] || { total: 0, running: 0, failed: 0, waiting: 0, active: [] };
const stats = taskStats[normalizeTitle(child.agent_id, '')] || { total: 0, running: 0, failed: 0, waiting: 0, latestStatus: '', latestUpdated: 0, active: [] };
const task = recentTaskByAgent[normalizeTitle(child.agent_id, '')];
localBranchStats.running += stats.running;
localBranchStats.failed += stats.failed;
@@ -544,14 +592,16 @@ const Subagents: React.FC = () => {
meta: [
`total=${stats.total} running=${stats.running}`,
`waiting=${stats.waiting} failed=${stats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === child.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(child.transport, 'local')} type=${normalizeTitle(child.type, 'worker')}`,
stats.active[0] ? `task: ${stats.active[0].title}` : task ? `last: ${summarizeTask(task.task, task.label)}` : t('noLiveTasks'),
],
accent: stats.running > 0 ? 'bg-emerald-500' : stats.failed > 0 ? 'bg-red-500' : 'bg-sky-400',
accent: stats.running > 0 ? 'bg-emerald-500' : stats.latestStatus === 'failed' ? 'bg-red-500' : 'bg-sky-400',
clickable: true,
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
if (task?.id) setSelectedId(task.id);
},
});
@@ -588,7 +638,11 @@ const Subagents: React.FC = () => {
accent: tree.online ? 'bg-fuchsia-400' : 'bg-zinc-500',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(treeRoot.agent_id, ''));
setSelectedId('');
},
};
cards.push(rootCard);
lines.push({
@@ -619,7 +673,11 @@ const Subagents: React.FC = () => {
accent: 'bg-violet-400',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
setSelectedId('');
},
});
lines.push({
path: bezierCurve(rootCard.x + cardWidth / 2, rootCard.y + cardHeight / 2, childX + cardWidth / 2, childY + cardHeight / 2),
@@ -881,6 +939,26 @@ const Subagents: React.FC = () => {
loadThreadAndInbox(selected).catch(() => { });
}, [selectedId, q, items]);
const loadStream = async (task: SubagentTask | null) => {
if (!task?.id) {
setStreamTask(null);
setStreamItems([]);
return;
}
try {
const streamRes = await callAction({ action: 'stream', id: task.id, limit: 100 });
setStreamTask(streamRes?.result?.task || task);
setStreamItems(Array.isArray(streamRes?.result?.items) ? streamRes.result.items : []);
} catch {
setStreamTask(task);
setStreamItems([]);
}
};
useEffect(() => {
loadStream(selectedAgentLatestTask).catch(() => { });
}, [selectedAgentLatestTask?.id, q, items.length]);
return (
<div className="h-full p-4 md:p-6 flex flex-col gap-4">
<div className="flex items-center justify-between">
@@ -1092,6 +1170,62 @@ const Subagents: React.FC = () => {
</div>
</div>
)}
{selectedAgentID && (
<div className="absolute top-4 right-4 bottom-4 z-20 w-[360px] rounded-2xl border border-zinc-800 bg-zinc-950/92 shadow-2xl shadow-black/40 backdrop-blur-md overflow-hidden flex flex-col">
<div className="flex items-center justify-between px-4 py-3 border-b border-zinc-800">
<div className="min-w-0">
<div className="text-xs text-zinc-500 uppercase tracking-wider">Internal Stream</div>
<div className="text-sm font-semibold text-zinc-100 truncate">{selectedAgentDisplayName}</div>
<div className="text-[11px] text-zinc-500 truncate">{selectedAgentID}</div>
</div>
<button
onClick={() => {
setSelectedAgentID('');
setSelectedId('');
setStreamTask(null);
setStreamItems([]);
}}
className="px-2 py-1 rounded bg-zinc-800 hover:bg-zinc-700 text-[11px] text-zinc-200"
>
Close
</button>
</div>
<div className="px-4 py-3 border-b border-zinc-800 text-xs text-zinc-400">
{streamTask?.id ? (
<div className="space-y-1">
<div>run={streamTask.id}</div>
<div>status={streamTask.status || '-'} · thread={streamTask.thread_id || '-'}</div>
</div>
) : (
<div>No persisted run for this agent yet.</div>
)}
</div>
<div className="flex-1 overflow-y-auto px-4 py-3 space-y-3">
{streamItems.length === 0 ? (
<div className="text-sm text-zinc-500">No internal stream events yet.</div>
) : streamItems.map((item, idx) => (
<div key={`${item.kind || 'item'}-${item.at || 0}-${idx}`} className="rounded-xl border border-zinc-800 bg-zinc-900/70 p-3">
<div className="flex items-center justify-between gap-2 mb-2">
<div className="text-xs font-medium text-zinc-200">
{item.kind === 'event'
? `${item.event_type || 'event'}${item.status ? ` · ${item.status}` : ''}`
: `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`}
</div>
<div className="text-[11px] text-zinc-500">{formatStreamTime(item.at)}</div>
</div>
<div className="text-xs text-zinc-300 whitespace-pre-wrap break-words">
{item.kind === 'event' ? (item.message || '(no event message)') : (item.content || '(empty message)')}
</div>
<div className="mt-2 text-[11px] text-zinc-500">
{item.kind === 'event'
? `run=${item.run_id || '-'}${item.retry_count ? ` · retry=${item.retry_count}` : ''}`
: `status=${item.status || '-'}${item.reply_to ? ` · reply_to=${item.reply_to}` : ''}`}
</div>
</div>
))}
</div>
</div>
)}
</div>
</div>