Compare commits

...

2 Commits

Author SHA1 Message Date
lpf
0b1fdecd68 Separate main chat from subagent group stream 2026-03-07 12:12:12 +08:00
lpf
1218d68b7e Add internal subagent stream and notify policy 2026-03-07 11:52:36 +08:00
14 changed files with 893 additions and 58 deletions

View File

@@ -131,6 +131,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": fallbackString(strings.TrimSpace(subcfg.Transport), "local"),
"node_id": strings.TrimSpace(subcfg.NodeID),
"parent_agent_id": strings.TrimSpace(subcfg.ParentAgentID),
"notify_main_policy": fallbackString(strings.TrimSpace(subcfg.NotifyMainPolicy), "final_only"),
"display_name": subcfg.DisplayName,
"role": subcfg.Role,
"description": subcfg.Description,
@@ -157,6 +158,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
"transport": profile.Transport,
"node_id": profile.NodeID,
"parent_agent_id": profile.ParentAgentID,
"notify_main_policy": fallbackString(strings.TrimSpace(profile.NotifyMainPolicy), "final_only"),
"display_name": profile.Name,
"role": profile.Role,
"description": "Node-registered remote main agent branch",
@@ -360,6 +362,53 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
return nil, err
}
return map[string]interface{}{"found": true, "thread": thread, "messages": items}, nil
case "stream":
taskID, err := resolveSubagentTaskIDForRuntime(sm, runtimeStringArg(args, "id"))
if err != nil {
return nil, err
}
task, ok := sm.GetTask(taskID)
if !ok {
return map[string]interface{}{"found": false}, nil
}
events, err := sm.Events(taskID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
var thread *tools.AgentThread
var messages []tools.AgentMessage
if strings.TrimSpace(task.ThreadID) != "" {
if th, ok := sm.Thread(task.ThreadID); ok {
thread = th
}
messages, err = sm.ThreadMessages(task.ThreadID, runtimeIntArg(args, "limit", 100))
if err != nil {
return nil, err
}
}
stream := mergeSubagentStream(events, messages)
return map[string]interface{}{
"found": true,
"task": cloneSubagentTask(task),
"thread": thread,
"items": stream,
}, nil
case "stream_all":
tasks := sm.ListTasks()
sort.Slice(tasks, func(i, j int) bool {
left := maxInt64(tasks[i].Updated, tasks[i].Created)
right := maxInt64(tasks[j].Updated, tasks[j].Created)
if left != right {
return left > right
}
return tasks[i].ID > tasks[j].ID
})
taskLimit := runtimeIntArg(args, "task_limit", 16)
if taskLimit > 0 && len(tasks) > taskLimit {
tasks = tasks[:taskLimit]
}
items := mergeAllSubagentStreams(sm, tasks, runtimeIntArg(args, "limit", 200))
return map[string]interface{}{"found": true, "items": items}, nil
case "inbox":
agentID := runtimeStringArg(args, "agent_id")
if agentID == "" {
@@ -386,6 +435,140 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
}
}
func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.AgentMessage) []map[string]interface{} {
items := make([]map[string]interface{}, 0, len(events)+len(messages))
for _, evt := range events {
items = append(items, map[string]interface{}{
"kind": "event",
"at": evt.At,
"run_id": evt.RunID,
"agent_id": evt.AgentID,
"event_type": evt.Type,
"status": evt.Status,
"message": evt.Message,
"retry_count": evt.RetryCount,
})
}
for _, msg := range messages {
items = append(items, map[string]interface{}{
"kind": "message",
"at": msg.CreatedAt,
"message_id": msg.MessageID,
"thread_id": msg.ThreadID,
"from_agent": msg.FromAgent,
"to_agent": msg.ToAgent,
"reply_to": msg.ReplyTo,
"correlation_id": msg.CorrelationID,
"message_type": msg.Type,
"content": msg.Content,
"status": msg.Status,
"requires_reply": msg.RequiresReply,
})
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["at"].(int64)
right, _ := items[j]["at"].(int64)
if left != right {
return left < right
}
return fmt.Sprintf("%v", items[i]["kind"]) < fmt.Sprintf("%v", items[j]["kind"])
})
return items
}
func mergeAllSubagentStreams(sm *tools.SubagentManager, tasks []*tools.SubagentTask, limit int) []map[string]interface{} {
if sm == nil || len(tasks) == 0 {
return nil
}
items := make([]map[string]interface{}, 0)
seenEvents := map[string]struct{}{}
seenMessages := map[string]struct{}{}
for _, task := range tasks {
if task == nil {
continue
}
if events, err := sm.Events(task.ID, limit); err == nil {
for _, evt := range events {
key := fmt.Sprintf("%s:%s:%d:%s", evt.RunID, evt.Type, evt.At, evt.Message)
if _, ok := seenEvents[key]; ok {
continue
}
seenEvents[key] = struct{}{}
items = append(items, map[string]interface{}{
"kind": "event",
"at": evt.At,
"task_id": task.ID,
"label": task.Label,
"run_id": evt.RunID,
"agent_id": firstNonEmptyString(evt.AgentID, task.AgentID),
"event_type": evt.Type,
"status": evt.Status,
"message": evt.Message,
"retry_count": evt.RetryCount,
})
}
}
if strings.TrimSpace(task.ThreadID) == "" {
continue
}
if messages, err := sm.ThreadMessages(task.ThreadID, limit); err == nil {
for _, msg := range messages {
if _, ok := seenMessages[msg.MessageID]; ok {
continue
}
seenMessages[msg.MessageID] = struct{}{}
items = append(items, map[string]interface{}{
"kind": "message",
"at": msg.CreatedAt,
"task_id": task.ID,
"label": task.Label,
"message_id": msg.MessageID,
"thread_id": msg.ThreadID,
"from_agent": msg.FromAgent,
"to_agent": msg.ToAgent,
"reply_to": msg.ReplyTo,
"correlation_id": msg.CorrelationID,
"message_type": msg.Type,
"content": msg.Content,
"status": msg.Status,
"requires_reply": msg.RequiresReply,
})
}
}
}
sort.Slice(items, func(i, j int) bool {
left, _ := items[i]["at"].(int64)
right, _ := items[j]["at"].(int64)
if left != right {
return left < right
}
return fmt.Sprintf("%v", items[i]["task_id"]) < fmt.Sprintf("%v", items[j]["task_id"])
})
if limit > 0 && len(items) > limit {
items = items[len(items)-limit:]
}
return items
}
func maxInt64(values ...int64) int64 {
var out int64
for _, v := range values {
if v > out {
out = v
}
}
return out
}
func firstNonEmptyString(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask {
if in == nil {
return nil

View File

@@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
"clawgo/pkg/config"
"clawgo/pkg/runtimecfg"
@@ -75,6 +76,7 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
out, err := loop.HandleSubagentRuntime(context.Background(), "upsert_config_subagent", map[string]interface{}{
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"system_prompt": "review changes",
"system_prompt_file": "agents/reviewer/AGENT.md",
@@ -99,6 +101,9 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
if subcfg.SystemPromptFile != "agents/reviewer/AGENT.md" {
t.Fatalf("expected system_prompt_file to persist, got %+v", subcfg)
}
if subcfg.NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", subcfg)
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to be persisted")
}
@@ -316,3 +321,116 @@ func TestHandleSubagentRuntimeProtectsMainAgent(t *testing.T) {
t.Fatalf("expected deleting main agent to fail")
}
}
func TestHandleSubagentRuntimeStream(t *testing.T) {
workspace := t.TempDir()
manager := tools.NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
return "stream-result", nil
})
loop := &AgentLoop{
workspace: workspace,
subagentManager: manager,
subagentRouter: tools.NewSubagentRouter(manager),
}
out, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
"task": "prepare streamable task",
"agent_id": "coder",
"channel": "webui",
"chat_id": "webui",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
payload, ok := out.(map[string]interface{})
if !ok {
t.Fatalf("unexpected spawn payload: %T", out)
}
_ = payload
var task *tools.SubagentTask
for i := 0; i < 50; i++ {
tasks := manager.ListTasks()
if len(tasks) > 0 && tasks[0].Status == "completed" {
task = tasks[0]
break
}
time.Sleep(10 * time.Millisecond)
}
if task == nil {
t.Fatalf("expected completed task")
}
out, err = loop.HandleSubagentRuntime(context.Background(), "stream", map[string]interface{}{
"id": task.ID,
})
if err != nil {
t.Fatalf("stream failed: %v", err)
}
streamPayload, ok := out.(map[string]interface{})
if !ok || streamPayload["found"] != true {
t.Fatalf("unexpected stream payload: %#v", out)
}
items, ok := streamPayload["items"].([]map[string]interface{})
if !ok || len(items) == 0 {
t.Fatalf("expected merged stream items, got %#v", streamPayload["items"])
}
foundEvent := false
foundMessage := false
for _, item := range items {
switch item["kind"] {
case "event":
foundEvent = true
case "message":
foundMessage = true
}
}
if !foundEvent || !foundMessage {
t.Fatalf("expected merged event and message items, got %#v", items)
}
}
func TestHandleSubagentRuntimeStreamAll(t *testing.T) {
workspace := t.TempDir()
manager := tools.NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
return "stream-all-result", nil
})
loop := &AgentLoop{
workspace: workspace,
subagentManager: manager,
subagentRouter: tools.NewSubagentRouter(manager),
}
if _, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
"task": "prepare grouped stream task",
"agent_id": "coder",
"channel": "webui",
"chat_id": "webui",
}); err != nil {
t.Fatalf("spawn failed: %v", err)
}
for i := 0; i < 50; i++ {
tasks := manager.ListTasks()
if len(tasks) > 0 && tasks[0].Status == "completed" {
break
}
time.Sleep(10 * time.Millisecond)
}
out, err := loop.HandleSubagentRuntime(context.Background(), "stream_all", map[string]interface{}{
"limit": 100,
"task_limit": 10,
})
if err != nil {
t.Fatalf("stream_all failed: %v", err)
}
payload, ok := out.(map[string]interface{})
if !ok || payload["found"] != true {
t.Fatalf("unexpected stream_all payload: %#v", out)
}
items, ok := payload["items"].([]map[string]interface{})
if !ok || len(items) == 0 {
t.Fatalf("expected grouped stream items, got %#v", payload["items"])
}
}

View File

@@ -73,6 +73,7 @@ type SubagentConfig struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
DisplayName string `json:"display_name,omitempty"`
Role string `json:"role,omitempty"`
Description string `json:"description,omitempty"`

View File

@@ -309,6 +309,13 @@ func validateSubagents(cfg *Config) []error {
errs = append(errs, fmt.Errorf("agents.subagents.%s.transport must be one of: local, node", id))
}
}
if policy := strings.TrimSpace(raw.NotifyMainPolicy); policy != "" {
switch policy {
case "final_only", "milestone", "on_blocked", "always", "internal_only":
default:
errs = append(errs, fmt.Errorf("agents.subagents.%s.notify_main_policy must be one of: final_only, milestone, on_blocked, always, internal_only", id))
}
}
if transport == "node" && strings.TrimSpace(raw.NodeID) == "" {
errs = append(errs, fmt.Errorf("agents.subagents.%s.node_id is required when transport=node", id))
}

View File

@@ -110,3 +110,21 @@ func TestValidateNodeBackedSubagentAllowsMissingPromptFile(t *testing.T) {
t.Fatalf("expected node-backed config to be valid, got %v", errs)
}
}
func TestValidateSubagentsRejectsInvalidNotifyMainPolicy(t *testing.T) {
t.Parallel()
cfg := DefaultConfig()
cfg.Agents.Subagents["coder"] = SubagentConfig{
Enabled: true,
SystemPromptFile: "agents/coder/AGENT.md",
NotifyMainPolicy: "loud",
Runtime: SubagentRuntimeConfig{
Proxy: "proxy",
},
}
if errs := Validate(cfg); len(errs) == 0 {
t.Fatalf("expected validation errors")
}
}

View File

@@ -2,6 +2,7 @@ package tools
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@@ -10,6 +11,7 @@ import (
"time"
"clawgo/pkg/bus"
"clawgo/pkg/ekg"
"clawgo/pkg/providers"
)
@@ -22,6 +24,7 @@ type SubagentTask struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
SessionKey string `json:"session_key"`
MemoryNS string `json:"memory_ns"`
SystemPrompt string `json:"system_prompt,omitempty"`
@@ -62,23 +65,25 @@ type SubagentManager struct {
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
ekg *ekg.Engine
}
type SubagentSpawnOptions struct {
Task string
Label string
Role string
AgentID string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
Task string
Label string
Role string
AgentID string
NotifyMainPolicy string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
@@ -96,6 +101,7 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
ekg: ekg.New(workspace),
}
if runStore != nil {
for _, task := range runStore.List() {
@@ -167,6 +173,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
transport := "local"
nodeID := ""
parentAgentID := ""
notifyMainPolicy := "final_only"
toolAllowlist := []string(nil)
maxRetries := 0
retryBackoff := 1000
@@ -199,6 +206,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
nodeID = strings.TrimSpace(profile.NodeID)
parentAgentID = strings.TrimSpace(profile.ParentAgentID)
notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy)
systemPrompt = strings.TrimSpace(profile.SystemPrompt)
systemPromptFile = strings.TrimSpace(profile.SystemPromptFile)
toolAllowlist = append([]string(nil), profile.ToolAllowlist...)
@@ -236,6 +244,9 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" {
notifyMainPolicy = normalizeNotifyMainPolicy(raw)
}
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
@@ -274,6 +285,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
Transport: transport,
NodeID: nodeID,
ParentAgentID: parentAgentID,
NotifyMainPolicy: notifyMainPolicy,
SessionKey: sessionKey,
MemoryNS: memoryNS,
SystemPrompt: systemPrompt,
@@ -368,20 +380,11 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
}
sm.mu.Unlock()
// 2. Result broadcast (keep existing behavior)
if sm.bus != nil {
prefix := "Task completed"
if runErr != nil {
prefix = "Task failed"
}
if task.Label != "" {
if runErr != nil {
prefix = fmt.Sprintf("Task '%s' failed", task.Label)
} else {
prefix = fmt.Sprintf("Task '%s' completed", task.Label)
}
}
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
sm.recordEKG(task, runErr)
// 2. Result broadcast
if sm.bus != nil && shouldNotifyMainOnFinal(task.NotifyMainPolicy, runErr, task) {
announceContent, notifyReason := buildSubagentMainNotification(task, runErr)
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
@@ -389,20 +392,142 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
SessionKey: task.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
"notify_reason": notifyReason,
},
})
}
}
func (sm *SubagentManager) recordEKG(task *SubagentTask, runErr error) {
if sm == nil || sm.ekg == nil || task == nil {
return
}
status := "success"
logText := strings.TrimSpace(task.Result)
if runErr != nil {
status = "error"
if isBlockedSubagentError(runErr) {
logText = "blocked: " + strings.TrimSpace(task.Result)
}
}
sm.ekg.Record(ekg.Event{
TaskID: task.ID,
Session: task.SessionKey,
Channel: task.OriginChannel,
Source: "subagent",
Status: status,
Log: logText,
})
}
func normalizeNotifyMainPolicy(v string) string {
switch strings.ToLower(strings.TrimSpace(v)) {
case "", "final_only":
return "final_only"
case "milestone", "on_blocked", "always", "internal_only":
return strings.ToLower(strings.TrimSpace(v))
default:
return "final_only"
}
}
func shouldNotifyMainOnFinal(policy string, runErr error, task *SubagentTask) bool {
switch normalizeNotifyMainPolicy(policy) {
case "internal_only":
return false
case "always", "final_only":
return true
case "on_blocked":
return isBlockedSubagentError(runErr)
case "milestone":
return false
default:
return true
}
}
func buildSubagentMainNotification(task *SubagentTask, runErr error) (string, string) {
status := "completed"
reason := "final"
if runErr != nil {
status = "failed"
if isBlockedSubagentError(runErr) {
status = "blocked"
reason = "blocked"
}
}
return fmt.Sprintf(
"Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s",
strings.TrimSpace(task.AgentID),
strings.TrimSpace(task.ID),
status,
reason,
summarizeSubagentText(firstNonEmpty(task.Label, task.Task), 120),
summarizeSubagentText(task.Result, 280),
), reason
}
func isBlockedSubagentError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
blockedHints := []string{
"timeout",
"deadline exceeded",
"quota",
"rate limit",
"too many requests",
"permission denied",
"requires input",
"waiting for reply",
"blocked",
}
for _, hint := range blockedHints {
if strings.Contains(msg, hint) {
return true
}
}
return false
}
func summarizeSubagentText(s string, max int) string {
s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n"))
s = strings.ReplaceAll(s, "\n", " ")
s = strings.Join(strings.Fields(s), " ")
if s == "" {
return "(empty)"
}
if max > 0 && len(s) > max {
return strings.TrimSpace(s[:max-3]) + "..."
}
return s
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) (string, error) {
maxRetries := normalizePositiveBound(task.MaxRetries, 0, 8)
backoffMs := normalizePositiveBound(task.RetryBackoff, 500, 120000)

View File

@@ -27,6 +27,7 @@ func DraftConfigSubagent(description, agentIDHint string) map[string]interface{}
"role": role,
"display_name": displayName,
"description": desc,
"notify_main_policy": "final_only",
"system_prompt": systemPrompt,
"system_prompt_file": "agents/" + agentID + "/AGENT.md",
"memory_namespace": agentID,
@@ -80,6 +81,9 @@ func UpsertConfigSubagent(configPath string, args map[string]interface{}) (map[s
if v := stringArgFromMap(args, "display_name"); v != "" {
subcfg.DisplayName = v
}
if v := stringArgFromMap(args, "notify_main_policy"); v != "" {
subcfg.NotifyMainPolicy = v
}
if v := stringArgFromMap(args, "description"); v != "" {
subcfg.Description = v
}

View File

@@ -32,6 +32,7 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
"action": "upsert",
"agent_id": "reviewer",
"role": "testing",
"notify_main_policy": "internal_only",
"display_name": "Review Agent",
"description": "负责回归与评审",
"system_prompt": "review changes",
@@ -56,6 +57,9 @@ func TestSubagentConfigToolUpsert(t *testing.T) {
if reloaded.Agents.Subagents["reviewer"].DisplayName != "Review Agent" {
t.Fatalf("expected config to persist reviewer, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if reloaded.Agents.Subagents["reviewer"].NotifyMainPolicy != "internal_only" {
t.Fatalf("expected notify_main_policy to persist, got %+v", reloaded.Agents.Subagents["reviewer"])
}
if len(reloaded.Agents.Router.Rules) == 0 {
t.Fatalf("expected router rules to persist")
}

View File

@@ -22,6 +22,7 @@ type SubagentProfile struct {
Transport string `json:"transport,omitempty"`
NodeID string `json:"node_id,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
Role string `json:"role,omitempty"`
SystemPrompt string `json:"system_prompt,omitempty"`
SystemPromptFile string `json:"system_prompt_file,omitempty"`
@@ -188,6 +189,7 @@ func normalizeSubagentProfile(in SubagentProfile) SubagentProfile {
p.Transport = normalizeProfileTransport(p.Transport)
p.NodeID = strings.TrimSpace(p.NodeID)
p.ParentAgentID = normalizeSubagentIdentifier(p.ParentAgentID)
p.NotifyMainPolicy = normalizeNotifyMainPolicy(p.NotifyMainPolicy)
p.Role = strings.TrimSpace(p.Role)
p.SystemPrompt = strings.TrimSpace(p.SystemPrompt)
p.SystemPromptFile = strings.TrimSpace(p.SystemPromptFile)
@@ -404,6 +406,7 @@ func profileFromConfig(agentID string, subcfg config.SubagentConfig) SubagentPro
Transport: strings.TrimSpace(subcfg.Transport),
NodeID: strings.TrimSpace(subcfg.NodeID),
ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID),
NotifyMainPolicy: strings.TrimSpace(subcfg.NotifyMainPolicy),
Role: strings.TrimSpace(subcfg.Role),
SystemPrompt: strings.TrimSpace(subcfg.SystemPrompt),
SystemPromptFile: strings.TrimSpace(subcfg.SystemPromptFile),
@@ -507,6 +510,7 @@ func (t *SubagentProfileTool) Parameters() map[string]interface{} {
"description": "Unique subagent id, e.g. coder/writer/tester",
},
"name": map[string]interface{}{"type": "string"},
"notify_main_policy": map[string]interface{}{"type": "string", "description": "final_only|internal_only|milestone|on_blocked|always"},
"role": map[string]interface{}{"type": "string"},
"system_prompt": map[string]interface{}{"type": "string"},
"system_prompt_file": map[string]interface{}{"type": "string"},
@@ -577,6 +581,7 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
p := SubagentProfile{
AgentID: agentID,
Name: stringArg(args, "name"),
NotifyMainPolicy: stringArg(args, "notify_main_policy"),
Role: stringArg(args, "role"),
SystemPrompt: stringArg(args, "system_prompt"),
SystemPromptFile: stringArg(args, "system_prompt_file"),
@@ -612,6 +617,9 @@ func (t *SubagentProfileTool) Execute(ctx context.Context, args map[string]inter
if _, ok := args["role"]; ok {
next.Role = stringArg(args, "role")
}
if _, ok := args["notify_main_policy"]; ok {
next.NotifyMainPolicy = stringArg(args, "notify_main_policy")
}
if _, ok := args["system_prompt"]; ok {
next.SystemPrompt = stringArg(args, "system_prompt")
}

View File

@@ -145,8 +145,11 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" {
t.Fatalf("expected metadata status=failed, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "failed") {
t.Fatalf("expected failure wording in content, got %q", msg.Content)
if !strings.Contains(strings.ToLower(msg.Content), "status: failed") {
t.Fatalf("expected structured failure status in content, got %q", msg.Content)
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "final" {
t.Fatalf("expected notify_reason=final, got %q", got)
}
}
@@ -205,6 +208,150 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
func TestSubagentManagerInternalOnlySuppressesMainNotification(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "silent-result", nil
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "coder",
Name: "Code Agent",
NotifyMainPolicy: "internal_only",
SystemPromptFile: "agents/coder/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "internal-only task",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
task := waitSubagentDone(t, manager, 4*time.Second)
if task.Status != "completed" {
t.Fatalf("expected completed task, got %s", task.Status)
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
if msg, ok := msgBus.ConsumeInbound(ctx); ok {
t.Fatalf("did not expect main notification, got %+v", msg)
}
}
func TestSubagentManagerOnBlockedNotifiesOnlyBlockedFailures(t *testing.T) {
workspace := t.TempDir()
msgBus := bus.NewMessageBus()
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
switch task.Task {
case "blocked-task":
return "", errors.New("command tick timeout exceeded: 600s")
default:
return "done", nil
}
})
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store")
}
if _, err := store.Upsert(SubagentProfile{
AgentID: "pm",
Name: "Product Manager",
NotifyMainPolicy: "on_blocked",
SystemPromptFile: "agents/pm/AGENT.md",
Status: "active",
}); err != nil {
t.Fatalf("profile upsert failed: %v", err)
}
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "successful-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn success case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxSilent, cancelSilent := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancelSilent()
if msg, ok := msgBus.ConsumeInbound(ctxSilent); ok {
t.Fatalf("did not expect success notification for on_blocked, got %+v", msg)
}
_, err = manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "blocked-task",
AgentID: "pm",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn blocked case failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
ctxBlocked, cancelBlocked := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelBlocked()
msg, ok := msgBus.ConsumeInbound(ctxBlocked)
if !ok {
t.Fatalf("expected blocked notification")
}
if got := strings.TrimSpace(msg.Metadata["notify_reason"]); got != "blocked" {
t.Fatalf("expected notify_reason=blocked, got %q", got)
}
if !strings.Contains(strings.ToLower(msg.Content), "blocked") {
t.Fatalf("expected blocked wording in content, got %q", msg.Content)
}
}
func TestSubagentManagerRecordsFailuresToEKG(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "", errors.New("rate limit exceeded")
})
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
Task: "ekg failure",
AgentID: "coder",
OriginChannel: "cli",
OriginChatID: "direct",
})
if err != nil {
t.Fatalf("spawn failed: %v", err)
}
_ = waitSubagentDone(t, manager, 4*time.Second)
data, err := os.ReadFile(filepath.Join(workspace, "memory", "ekg-events.jsonl"))
if err != nil {
t.Fatalf("expected ekg events to be written: %v", err)
}
text := string(data)
if !strings.Contains(text, "\"source\":\"subagent\"") {
t.Fatalf("expected subagent source in ekg log, got %s", text)
}
if !strings.Contains(text, "\"status\":\"error\"") {
t.Fatalf("expected error status in ekg log, got %s", text)
}
if !strings.Contains(strings.ToLower(text), "rate limit exceeded") {
t.Fatalf("expected failure text in ekg log, got %s", text)
}
}
func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) {
workspace := t.TempDir()
block := make(chan struct{})

View File

@@ -116,6 +116,10 @@ const resources = {
noCronJobs: 'No cron jobs found',
noNodes: 'No nodes available',
sessions: 'Sessions',
mainChat: 'Main Chat',
subagentGroup: 'Subagent Group',
noSubagentStream: 'No subagent internal stream yet.',
subagentGroupReadonly: 'Subagent group is read-only.',
startConversation: 'Start a conversation',
typeMessage: 'Type a message...',
configuration: 'Configuration',
@@ -571,6 +575,10 @@ const resources = {
noCronJobs: '未找到定时任务',
noNodes: '无可用节点',
sessions: '会话',
mainChat: '主对话',
subagentGroup: '子代理群组',
noSubagentStream: '当前还没有子代理内部流。',
subagentGroupReadonly: '子代理群组为只读视图。',
startConversation: '开始对话',
typeMessage: '输入消息...',
configuration: '配置',

View File

@@ -5,12 +5,28 @@ import { useTranslation } from 'react-i18next';
import { useAppContext } from '../context/AppContext';
import { ChatItem } from '../types';
type StreamItem = {
kind?: string;
at?: number;
task_id?: string;
label?: string;
agent_id?: string;
event_type?: string;
message?: string;
message_type?: string;
content?: string;
from_agent?: string;
to_agent?: string;
status?: string;
};
const Chat: React.FC = () => {
const { t } = useTranslation();
const { q, sessions } = useAppContext();
const [chat, setChat] = useState<ChatItem[]>([]);
const [msg, setMsg] = useState('');
const [fileSelected, setFileSelected] = useState(false);
const [chatTab, setChatTab] = useState<'main' | 'subagents'>('main');
const [sessionKey, setSessionKey] = useState('main');
const chatEndRef = useRef<HTMLDivElement>(null);
@@ -51,6 +67,34 @@ const Chat: React.FC = () => {
}
};
const loadSubagentGroup = async () => {
try {
const r = await fetch(`/webui/api/subagents_runtime${q}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: 'stream_all', limit: 200, task_limit: 24 }),
});
if (!r.ok) return;
const j = await r.json();
const arr = Array.isArray(j?.result?.items) ? j.result.items : [];
const mapped: ChatItem[] = arr.map((item: StreamItem) => {
const isEvent = item.kind === 'event';
const label = isEvent
? `${item.agent_id || 'subagent'} · ${item.event_type || 'event'}`
: `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`;
const body = isEvent ? (item.message || '') : (item.content || '');
return {
role: 'assistant',
label,
text: `${body}${item.status ? `\n\nstatus: ${item.status}` : ''}`,
};
});
setChat(mapped);
} catch (e) {
console.error(e);
}
};
async function send() {
if (!msg.trim() && !fileSelected) return;
@@ -111,27 +155,55 @@ const Chat: React.FC = () => {
}
useEffect(() => {
loadHistory();
}, [q, sessionKey]);
if (chatTab === 'main') {
loadHistory();
return;
}
loadSubagentGroup();
}, [q, chatTab, sessionKey]);
useEffect(() => {
if (!sessions || sessions.length === 0) return;
if (!sessions.some(s => s.key === sessionKey)) {
setSessionKey(sessions[0].key);
if (chatTab !== 'subagents') return;
const timer = window.setInterval(() => {
loadSubagentGroup();
}, 5000);
return () => window.clearInterval(timer);
}, [q, chatTab]);
const userSessions = (sessions || []).filter((s: any) => !String(s?.key || '').startsWith('subagent:'));
useEffect(() => {
if (chatTab !== 'main') return;
if (!userSessions.length) return;
if (!userSessions.some((s: any) => s.key === sessionKey)) {
setSessionKey(userSessions[0].key);
}
}, [sessions]);
}, [chatTab, sessionKey, userSessions]);
return (
<div className="flex h-full">
<div className="flex-1 flex flex-col bg-zinc-950/50">
<div className="px-4 py-3 border-b border-zinc-800 flex items-center justify-between gap-3 flex-wrap">
<div className="flex items-center gap-2">
<h2 className="text-sm text-zinc-300 font-medium">{t('session')}</h2>
<select value={sessionKey} onChange={(e)=>setSessionKey(e.target.value)} className="bg-zinc-900 border border-zinc-700 rounded px-2 py-1 text-xs text-zinc-200">
{(sessions || []).map((s:any)=> <option key={s.key} value={s.key}>{s.key}</option>)}
</select>
<button
onClick={() => setChatTab('main')}
className={`px-3 py-1.5 rounded-lg text-xs ${chatTab === 'main' ? 'bg-indigo-600 text-white' : 'bg-zinc-900 border border-zinc-700 text-zinc-300'}`}
>
Main Chat
</button>
<button
onClick={() => setChatTab('subagents')}
className={`px-3 py-1.5 rounded-lg text-xs ${chatTab === 'subagents' ? 'bg-amber-600 text-white' : 'bg-zinc-900 border border-zinc-700 text-zinc-300'}`}
>
{t('subagentGroup')}
</button>
{chatTab === 'main' && (
<select value={sessionKey} onChange={(e) => setSessionKey(e.target.value)} className="bg-zinc-900 border border-zinc-700 rounded px-2 py-1 text-xs text-zinc-200">
{userSessions.map((s: any) => <option key={s.key} value={s.key}>{s.title || s.key}</option>)}
</select>
)}
</div>
<button onClick={loadHistory} className="flex items-center gap-1 px-2 py-1 text-xs rounded bg-zinc-800 hover:bg-zinc-700"><RefreshCw className="w-3 h-3"/>{t('reloadHistory')}</button>
<button onClick={chatTab === 'main' ? loadHistory : loadSubagentGroup} className="flex items-center gap-1 px-2 py-1 text-xs rounded bg-zinc-800 hover:bg-zinc-700"><RefreshCw className="w-3 h-3"/>{t('reloadHistory')}</button>
</div>
<div className="flex-1 overflow-y-auto p-6 space-y-6">
@@ -140,7 +212,7 @@ const Chat: React.FC = () => {
<div className="w-16 h-16 rounded-2xl bg-zinc-900 flex items-center justify-center border border-zinc-800">
<MessageSquare className="w-8 h-8 text-zinc-600" />
</div>
<p className="text-sm font-medium">{t('startConversation')}</p>
<p className="text-sm font-medium">{chatTab === 'main' ? t('startConversation') : t('noSubagentStream')}</p>
</div>
) : (
chat.map((m, i) => {
@@ -203,13 +275,14 @@ const Chat: React.FC = () => {
<input
value={msg}
onChange={(e) => setMsg(e.target.value)}
onKeyDown={(e) => e.key === 'Enter' && send()}
placeholder={t('typeMessage')}
className="w-full bg-zinc-900 border border-zinc-800 rounded-full pl-14 pr-14 py-3.5 text-[15px] focus:outline-none focus:border-indigo-500 focus:ring-1 focus:ring-indigo-500 transition-all placeholder:text-zinc-500 shadow-sm"
onKeyDown={(e) => chatTab === 'main' && e.key === 'Enter' && send()}
placeholder={chatTab === 'main' ? t('typeMessage') : t('subagentGroupReadonly')}
disabled={chatTab !== 'main'}
className="w-full bg-zinc-900 border border-zinc-800 rounded-full pl-14 pr-14 py-3.5 text-[15px] focus:outline-none focus:border-indigo-500 focus:ring-1 focus:ring-indigo-500 transition-all placeholder:text-zinc-500 shadow-sm disabled:opacity-60"
/>
<button
onClick={send}
disabled={!msg.trim() && !fileSelected}
disabled={chatTab !== 'main' || (!msg.trim() && !fileSelected)}
className="absolute right-2 p-2.5 bg-indigo-600 hover:bg-indigo-500 disabled:opacity-50 disabled:hover:bg-indigo-600 text-white rounded-full transition-colors shadow-sm"
>
<Send className="w-4 h-4 ml-0.5" />

View File

@@ -6,6 +6,7 @@ import { useUI } from '../context/UIContext';
type SubagentProfile = {
agent_id: string;
name?: string;
notify_main_policy?: string;
role?: string;
system_prompt?: string;
system_prompt_file?: string;
@@ -31,6 +32,7 @@ type ToolAllowlistGroup = {
const emptyDraft: SubagentProfile = {
agent_id: '',
name: '',
notify_main_policy: 'final_only',
role: '',
system_prompt: '',
system_prompt_file: '',
@@ -79,6 +81,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: next.agent_id || '',
name: next.name || '',
notify_main_policy: next.notify_main_policy || 'final_only',
role: next.role || '',
system_prompt: next.system_prompt || '',
system_prompt_file: next.system_prompt_file || '',
@@ -140,6 +143,7 @@ const SubagentProfiles: React.FC = () => {
setDraft({
agent_id: p.agent_id || '',
name: p.name || '',
notify_main_policy: p.notify_main_policy || 'final_only',
role: p.role || '',
system_prompt: p.system_prompt || '',
system_prompt_file: p.system_prompt_file || '',
@@ -193,6 +197,7 @@ const SubagentProfiles: React.FC = () => {
action,
agent_id: agentId,
name: draft.name || '',
notify_main_policy: draft.notify_main_policy || 'final_only',
role: draft.role || '',
system_prompt: draft.system_prompt || '',
system_prompt_file: draft.system_prompt_file || '',
@@ -350,6 +355,20 @@ const SubagentProfiles: React.FC = () => {
<option value="disabled">disabled</option>
</select>
</div>
<div>
<div className="text-xs text-zinc-400 mb-1">notify_main_policy</div>
<select
value={draft.notify_main_policy || 'final_only'}
onChange={(e) => setDraft({ ...draft, notify_main_policy: e.target.value })}
className="w-full px-2 py-1 text-xs bg-zinc-900 border border-zinc-700 rounded"
>
<option value="final_only">final_only</option>
<option value="internal_only">internal_only</option>
<option value="milestone">milestone</option>
<option value="on_blocked">on_blocked</option>
<option value="always">always</option>
</select>
</div>
<div className="md:col-span-2">
<div className="text-xs text-zinc-400 mb-1">system_prompt_file</div>
<input

View File

@@ -60,6 +60,26 @@ type AgentMessage = {
created_at?: number;
};
type StreamItem = {
kind?: 'event' | 'message' | string;
at?: number;
run_id?: string;
agent_id?: string;
event_type?: string;
message?: string;
retry_count?: number;
message_id?: string;
thread_id?: string;
from_agent?: string;
to_agent?: string;
reply_to?: string;
correlation_id?: string;
message_type?: string;
content?: string;
status?: string;
requires_reply?: boolean;
};
type RegistrySubagent = {
agent_id?: string;
enabled?: boolean;
@@ -68,6 +88,7 @@ type RegistrySubagent = {
node_id?: string;
parent_agent_id?: string;
managed_by?: string;
notify_main_policy?: string;
display_name?: string;
role?: string;
description?: string;
@@ -186,6 +207,11 @@ function summarizeTask(task?: string, label?: string): string {
return text.length > 52 ? `${text.slice(0, 49)}...` : text;
}
function formatStreamTime(ts?: number): string {
if (!ts) return '--:--:--';
return new Date(ts).toLocaleTimeString([], { hour12: false });
}
function bezierCurve(x1: number, y1: number, x2: number, y2: number): string {
const offset = Math.max(Math.abs(y2 - y1) * 0.5, 60);
return `M ${x1} ${y1} C ${x1} ${y1 + offset} ${x2} ${y2 - offset} ${x2} ${y2}`;
@@ -337,6 +363,8 @@ const Subagents: React.FC = () => {
const [registryItems, setRegistryItems] = useState<RegistrySubagent[]>([]);
const [promptFileContent, setPromptFileContent] = useState('');
const [promptFileFound, setPromptFileFound] = useState(false);
const [streamItems, setStreamItems] = useState<StreamItem[]>([]);
const [streamTask, setStreamTask] = useState<SubagentTask | null>(null);
const [selectedTopologyBranch, setSelectedTopologyBranch] = useState('');
const [topologyFilter, setTopologyFilter] = useState<'all' | 'running' | 'failed' | 'local' | 'remote'>('all');
const [topologyZoom, setTopologyZoom] = useState(0.9);
@@ -428,6 +456,10 @@ const Subagents: React.FC = () => {
[...selectedAgentTasks].sort((a, b) => Math.max(b.updated || 0, b.created || 0) - Math.max(a.updated || 0, a.created || 0))[0] || null,
[selectedAgentTasks]
);
const selectedAgentDisplayName = useMemo(
() => selectedRegistryItem?.display_name || selectedRegistryItem?.agent_id || selectedAgentID || '',
[selectedRegistryItem, selectedAgentID]
);
const parsedNodeTrees = useMemo<NodeTree[]>(() => {
try {
const parsed = JSON.parse(nodeTrees);
@@ -523,6 +555,7 @@ const Subagents: React.FC = () => {
`children=${localChildren.length + remoteClusters.length}`,
`total=${localMainStats.total} running=${localMainStats.running}`,
`waiting=${localMainStats.waiting} failed=${localMainStats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === localRoot.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(localRoot.transport, 'local')} type=${normalizeTitle(localRoot.type, 'router')}`,
localMainStats.active[0] ? `task: ${localMainStats.active[0].title}` : t('noLiveTasks'),
],
@@ -531,6 +564,7 @@ const Subagents: React.FC = () => {
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(localRoot.agent_id, 'main'));
if (localMainTask?.id) setSelectedId(localMainTask.id);
},
};
@@ -558,6 +592,7 @@ const Subagents: React.FC = () => {
meta: [
`total=${stats.total} running=${stats.running}`,
`waiting=${stats.waiting} failed=${stats.failed}`,
`notify=${normalizeTitle(registryItems.find((item) => item.agent_id === child.agent_id)?.notify_main_policy, 'final_only')}`,
`transport=${normalizeTitle(child.transport, 'local')} type=${normalizeTitle(child.type, 'worker')}`,
stats.active[0] ? `task: ${stats.active[0].title}` : task ? `last: ${summarizeTask(task.task, task.label)}` : t('noLiveTasks'),
],
@@ -566,6 +601,7 @@ const Subagents: React.FC = () => {
scale,
onClick: () => {
setSelectedTopologyBranch(localBranch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
if (task?.id) setSelectedId(task.id);
},
});
@@ -602,7 +638,11 @@ const Subagents: React.FC = () => {
accent: tree.online ? 'bg-fuchsia-400' : 'bg-zinc-500',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(treeRoot.agent_id, ''));
setSelectedId('');
},
};
cards.push(rootCard);
lines.push({
@@ -633,7 +673,11 @@ const Subagents: React.FC = () => {
accent: 'bg-violet-400',
clickable: true,
scale,
onClick: () => setSelectedTopologyBranch(branch),
onClick: () => {
setSelectedTopologyBranch(branch);
setSelectedAgentID(normalizeTitle(child.agent_id, ''));
setSelectedId('');
},
});
lines.push({
path: bezierCurve(rootCard.x + cardWidth / 2, rootCard.y + cardHeight / 2, childX + cardWidth / 2, childY + cardHeight / 2),
@@ -895,6 +939,26 @@ const Subagents: React.FC = () => {
loadThreadAndInbox(selected).catch(() => { });
}, [selectedId, q, items]);
const loadStream = async (task: SubagentTask | null) => {
if (!task?.id) {
setStreamTask(null);
setStreamItems([]);
return;
}
try {
const streamRes = await callAction({ action: 'stream', id: task.id, limit: 100 });
setStreamTask(streamRes?.result?.task || task);
setStreamItems(Array.isArray(streamRes?.result?.items) ? streamRes.result.items : []);
} catch {
setStreamTask(task);
setStreamItems([]);
}
};
useEffect(() => {
loadStream(selectedAgentLatestTask).catch(() => { });
}, [selectedAgentLatestTask?.id, q, items.length]);
return (
<div className="h-full p-4 md:p-6 flex flex-col gap-4">
<div className="flex items-center justify-between">
@@ -1106,6 +1170,62 @@ const Subagents: React.FC = () => {
</div>
</div>
)}
{selectedAgentID && (
<div className="absolute top-4 right-4 bottom-4 z-20 w-[360px] rounded-2xl border border-zinc-800 bg-zinc-950/92 shadow-2xl shadow-black/40 backdrop-blur-md overflow-hidden flex flex-col">
<div className="flex items-center justify-between px-4 py-3 border-b border-zinc-800">
<div className="min-w-0">
<div className="text-xs text-zinc-500 uppercase tracking-wider">Internal Stream</div>
<div className="text-sm font-semibold text-zinc-100 truncate">{selectedAgentDisplayName}</div>
<div className="text-[11px] text-zinc-500 truncate">{selectedAgentID}</div>
</div>
<button
onClick={() => {
setSelectedAgentID('');
setSelectedId('');
setStreamTask(null);
setStreamItems([]);
}}
className="px-2 py-1 rounded bg-zinc-800 hover:bg-zinc-700 text-[11px] text-zinc-200"
>
Close
</button>
</div>
<div className="px-4 py-3 border-b border-zinc-800 text-xs text-zinc-400">
{streamTask?.id ? (
<div className="space-y-1">
<div>run={streamTask.id}</div>
<div>status={streamTask.status || '-'} · thread={streamTask.thread_id || '-'}</div>
</div>
) : (
<div>No persisted run for this agent yet.</div>
)}
</div>
<div className="flex-1 overflow-y-auto px-4 py-3 space-y-3">
{streamItems.length === 0 ? (
<div className="text-sm text-zinc-500">No internal stream events yet.</div>
) : streamItems.map((item, idx) => (
<div key={`${item.kind || 'item'}-${item.at || 0}-${idx}`} className="rounded-xl border border-zinc-800 bg-zinc-900/70 p-3">
<div className="flex items-center justify-between gap-2 mb-2">
<div className="text-xs font-medium text-zinc-200">
{item.kind === 'event'
? `${item.event_type || 'event'}${item.status ? ` · ${item.status}` : ''}`
: `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`}
</div>
<div className="text-[11px] text-zinc-500">{formatStreamTime(item.at)}</div>
</div>
<div className="text-xs text-zinc-300 whitespace-pre-wrap break-words">
{item.kind === 'event' ? (item.message || '(no event message)') : (item.content || '(empty message)')}
</div>
<div className="mt-2 text-[11px] text-zinc-500">
{item.kind === 'event'
? `run=${item.run_id || '-'}${item.retry_count ? ` · retry=${item.retry_count}` : ''}`
: `status=${item.status || '-'}${item.reply_to ? ` · reply_to=${item.reply_to}` : ''}`}
</div>
</div>
))}
</div>
</div>
)}
</div>
</div>