From 0b1fdecd683db4470c91ce9f70332850de5402f5 Mon Sep 17 00:00:00 2001 From: lpf Date: Sat, 7 Mar 2026 12:12:12 +0800 Subject: [PATCH] Separate main chat from subagent group stream --- pkg/agent/runtime_admin.go | 109 ++++++++++++++++++++++++++++++++ pkg/agent/runtime_admin_test.go | 45 +++++++++++++ webui/src/i18n/index.ts | 8 +++ webui/src/pages/Chat.tsx | 107 ++++++++++++++++++++++++++----- 4 files changed, 252 insertions(+), 17 deletions(-) diff --git a/pkg/agent/runtime_admin.go b/pkg/agent/runtime_admin.go index cbc71e8..d448302 100644 --- a/pkg/agent/runtime_admin.go +++ b/pkg/agent/runtime_admin.go @@ -393,6 +393,22 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a "thread": thread, "items": stream, }, nil + case "stream_all": + tasks := sm.ListTasks() + sort.Slice(tasks, func(i, j int) bool { + left := maxInt64(tasks[i].Updated, tasks[i].Created) + right := maxInt64(tasks[j].Updated, tasks[j].Created) + if left != right { + return left > right + } + return tasks[i].ID > tasks[j].ID + }) + taskLimit := runtimeIntArg(args, "task_limit", 16) + if taskLimit > 0 && len(tasks) > taskLimit { + tasks = tasks[:taskLimit] + } + items := mergeAllSubagentStreams(sm, tasks, runtimeIntArg(args, "limit", 200)) + return map[string]interface{}{"found": true, "items": items}, nil case "inbox": agentID := runtimeStringArg(args, "agent_id") if agentID == "" { @@ -460,6 +476,99 @@ func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.Agent return items } +func mergeAllSubagentStreams(sm *tools.SubagentManager, tasks []*tools.SubagentTask, limit int) []map[string]interface{} { + if sm == nil || len(tasks) == 0 { + return nil + } + items := make([]map[string]interface{}, 0) + seenEvents := map[string]struct{}{} + seenMessages := map[string]struct{}{} + for _, task := range tasks { + if task == nil { + continue + } + if events, err := sm.Events(task.ID, limit); err == nil { + for _, evt := range events { + key := fmt.Sprintf("%s:%s:%d:%s", evt.RunID, evt.Type, evt.At, evt.Message) + if _, ok := seenEvents[key]; ok { + continue + } + seenEvents[key] = struct{}{} + items = append(items, map[string]interface{}{ + "kind": "event", + "at": evt.At, + "task_id": task.ID, + "label": task.Label, + "run_id": evt.RunID, + "agent_id": firstNonEmptyString(evt.AgentID, task.AgentID), + "event_type": evt.Type, + "status": evt.Status, + "message": evt.Message, + "retry_count": evt.RetryCount, + }) + } + } + if strings.TrimSpace(task.ThreadID) == "" { + continue + } + if messages, err := sm.ThreadMessages(task.ThreadID, limit); err == nil { + for _, msg := range messages { + if _, ok := seenMessages[msg.MessageID]; ok { + continue + } + seenMessages[msg.MessageID] = struct{}{} + items = append(items, map[string]interface{}{ + "kind": "message", + "at": msg.CreatedAt, + "task_id": task.ID, + "label": task.Label, + "message_id": msg.MessageID, + "thread_id": msg.ThreadID, + "from_agent": msg.FromAgent, + "to_agent": msg.ToAgent, + "reply_to": msg.ReplyTo, + "correlation_id": msg.CorrelationID, + "message_type": msg.Type, + "content": msg.Content, + "status": msg.Status, + "requires_reply": msg.RequiresReply, + }) + } + } + } + sort.Slice(items, func(i, j int) bool { + left, _ := items[i]["at"].(int64) + right, _ := items[j]["at"].(int64) + if left != right { + return left < right + } + return fmt.Sprintf("%v", items[i]["task_id"]) < fmt.Sprintf("%v", items[j]["task_id"]) + }) + if limit > 0 && len(items) > limit { + items = items[len(items)-limit:] + } + return items +} + +func maxInt64(values ...int64) int64 { + var out int64 + for _, v := range values { + if v > out { + out = v + } + } + return out +} + +func firstNonEmptyString(values ...string) string { + for _, v := range values { + if strings.TrimSpace(v) != "" { + return strings.TrimSpace(v) + } + } + return "" +} + func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask { if in == nil { return nil diff --git a/pkg/agent/runtime_admin_test.go b/pkg/agent/runtime_admin_test.go index 7a486a8..3cbc019 100644 --- a/pkg/agent/runtime_admin_test.go +++ b/pkg/agent/runtime_admin_test.go @@ -389,3 +389,48 @@ func TestHandleSubagentRuntimeStream(t *testing.T) { t.Fatalf("expected merged event and message items, got %#v", items) } } + +func TestHandleSubagentRuntimeStreamAll(t *testing.T) { + workspace := t.TempDir() + manager := tools.NewSubagentManager(nil, workspace, nil) + manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) { + return "stream-all-result", nil + }) + loop := &AgentLoop{ + workspace: workspace, + subagentManager: manager, + subagentRouter: tools.NewSubagentRouter(manager), + } + + if _, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{ + "task": "prepare grouped stream task", + "agent_id": "coder", + "channel": "webui", + "chat_id": "webui", + }); err != nil { + t.Fatalf("spawn failed: %v", err) + } + for i := 0; i < 50; i++ { + tasks := manager.ListTasks() + if len(tasks) > 0 && tasks[0].Status == "completed" { + break + } + time.Sleep(10 * time.Millisecond) + } + + out, err := loop.HandleSubagentRuntime(context.Background(), "stream_all", map[string]interface{}{ + "limit": 100, + "task_limit": 10, + }) + if err != nil { + t.Fatalf("stream_all failed: %v", err) + } + payload, ok := out.(map[string]interface{}) + if !ok || payload["found"] != true { + t.Fatalf("unexpected stream_all payload: %#v", out) + } + items, ok := payload["items"].([]map[string]interface{}) + if !ok || len(items) == 0 { + t.Fatalf("expected grouped stream items, got %#v", payload["items"]) + } +} diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index b0b65f3..c68951a 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -116,6 +116,10 @@ const resources = { noCronJobs: 'No cron jobs found', noNodes: 'No nodes available', sessions: 'Sessions', + mainChat: 'Main Chat', + subagentGroup: 'Subagent Group', + noSubagentStream: 'No subagent internal stream yet.', + subagentGroupReadonly: 'Subagent group is read-only.', startConversation: 'Start a conversation', typeMessage: 'Type a message...', configuration: 'Configuration', @@ -571,6 +575,10 @@ const resources = { noCronJobs: '未找到定时任务', noNodes: '无可用节点', sessions: '会话', + mainChat: '主对话', + subagentGroup: '子代理群组', + noSubagentStream: '当前还没有子代理内部流。', + subagentGroupReadonly: '子代理群组为只读视图。', startConversation: '开始对话', typeMessage: '输入消息...', configuration: '配置', diff --git a/webui/src/pages/Chat.tsx b/webui/src/pages/Chat.tsx index c820c97..02c3d23 100644 --- a/webui/src/pages/Chat.tsx +++ b/webui/src/pages/Chat.tsx @@ -5,12 +5,28 @@ import { useTranslation } from 'react-i18next'; import { useAppContext } from '../context/AppContext'; import { ChatItem } from '../types'; +type StreamItem = { + kind?: string; + at?: number; + task_id?: string; + label?: string; + agent_id?: string; + event_type?: string; + message?: string; + message_type?: string; + content?: string; + from_agent?: string; + to_agent?: string; + status?: string; +}; + const Chat: React.FC = () => { const { t } = useTranslation(); const { q, sessions } = useAppContext(); const [chat, setChat] = useState([]); const [msg, setMsg] = useState(''); const [fileSelected, setFileSelected] = useState(false); + const [chatTab, setChatTab] = useState<'main' | 'subagents'>('main'); const [sessionKey, setSessionKey] = useState('main'); const chatEndRef = useRef(null); @@ -51,6 +67,34 @@ const Chat: React.FC = () => { } }; + const loadSubagentGroup = async () => { + try { + const r = await fetch(`/webui/api/subagents_runtime${q}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: 'stream_all', limit: 200, task_limit: 24 }), + }); + if (!r.ok) return; + const j = await r.json(); + const arr = Array.isArray(j?.result?.items) ? j.result.items : []; + const mapped: ChatItem[] = arr.map((item: StreamItem) => { + const isEvent = item.kind === 'event'; + const label = isEvent + ? `${item.agent_id || 'subagent'} · ${item.event_type || 'event'}` + : `${item.from_agent || '-'} -> ${item.to_agent || '-'} · ${item.message_type || 'message'}`; + const body = isEvent ? (item.message || '') : (item.content || ''); + return { + role: 'assistant', + label, + text: `${body}${item.status ? `\n\nstatus: ${item.status}` : ''}`, + }; + }); + setChat(mapped); + } catch (e) { + console.error(e); + } + }; + async function send() { if (!msg.trim() && !fileSelected) return; @@ -111,27 +155,55 @@ const Chat: React.FC = () => { } useEffect(() => { - loadHistory(); - }, [q, sessionKey]); - + if (chatTab === 'main') { + loadHistory(); + return; + } + loadSubagentGroup(); + }, [q, chatTab, sessionKey]); useEffect(() => { - if (!sessions || sessions.length === 0) return; - if (!sessions.some(s => s.key === sessionKey)) { - setSessionKey(sessions[0].key); + if (chatTab !== 'subagents') return; + const timer = window.setInterval(() => { + loadSubagentGroup(); + }, 5000); + return () => window.clearInterval(timer); + }, [q, chatTab]); + + const userSessions = (sessions || []).filter((s: any) => !String(s?.key || '').startsWith('subagent:')); + + useEffect(() => { + if (chatTab !== 'main') return; + if (!userSessions.length) return; + if (!userSessions.some((s: any) => s.key === sessionKey)) { + setSessionKey(userSessions[0].key); } - }, [sessions]); + }, [chatTab, sessionKey, userSessions]); + return (
-

{t('session')}

- + + + {chatTab === 'main' && ( + + )}
- +
@@ -140,7 +212,7 @@ const Chat: React.FC = () => {
-

{t('startConversation')}

+

{chatTab === 'main' ? t('startConversation') : t('noSubagentStream')}

) : ( chat.map((m, i) => { @@ -203,13 +275,14 @@ const Chat: React.FC = () => { setMsg(e.target.value)} - onKeyDown={(e) => e.key === 'Enter' && send()} - placeholder={t('typeMessage')} - className="w-full bg-zinc-900 border border-zinc-800 rounded-full pl-14 pr-14 py-3.5 text-[15px] focus:outline-none focus:border-indigo-500 focus:ring-1 focus:ring-indigo-500 transition-all placeholder:text-zinc-500 shadow-sm" + onKeyDown={(e) => chatTab === 'main' && e.key === 'Enter' && send()} + placeholder={chatTab === 'main' ? t('typeMessage') : t('subagentGroupReadonly')} + disabled={chatTab !== 'main'} + className="w-full bg-zinc-900 border border-zinc-800 rounded-full pl-14 pr-14 py-3.5 text-[15px] focus:outline-none focus:border-indigo-500 focus:ring-1 focus:ring-indigo-500 transition-all placeholder:text-zinc-500 shadow-sm disabled:opacity-60" />