From f274acf818810081563ab0d555650518d224bf87 Mon Sep 17 00:00:00 2001 From: DBT Date: Sat, 28 Feb 2026 02:41:05 +0000 Subject: [PATCH] task-queue: unified task audit events, queue API, and bilingual split-view webui --- pkg/agent/loop.go | 40 +++++++++++++++---------- pkg/nodes/registry_server.go | 55 +++++++++++++++++++++++++++++++++++ webui/src/i18n/index.ts | 6 ++++ webui/src/pages/TaskAudit.tsx | 22 +++++++++----- 4 files changed, 101 insertions(+), 22 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index e2ce00f..7145ae5 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -312,6 +312,7 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() { func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) { taskID := fmt.Sprintf("%s-%d", shortSessionKey(msg.SessionKey), time.Now().Unix()%100000) started := time.Now() + al.appendTaskAuditEvent(taskID, msg, "running", started, 0, "started", false) response, err := al.processMessage(ctx, msg) if err != nil { @@ -342,27 +343,36 @@ func shortSessionKey(s string) string { } func (al *AgentLoop) appendTaskAudit(taskID string, msg bus.InboundMessage, started time.Time, runErr error, suppressed bool) { + status := "success" + logText := "completed" + if runErr != nil { + status = "error" + logText = runErr.Error() + } else if suppressed { + status = "suppressed" + logText = "suppressed" + } + al.appendTaskAuditEvent(taskID, msg, status, started, int(time.Since(started).Milliseconds()), logText, suppressed) +} + +func (al *AgentLoop) appendTaskAuditEvent(taskID string, msg bus.InboundMessage, status string, started time.Time, durationMs int, logText string, suppressed bool) { if al.workspace == "" { return } path := filepath.Join(al.workspace, "memory", "task-audit.jsonl") _ = os.MkdirAll(filepath.Dir(path), 0755) - status := "success" - if runErr != nil { - status = "error" - } else if suppressed { - status = "suppressed" - } row := map[string]interface{}{ - "task_id": taskID, - "time": time.Now().UTC().Format(time.RFC3339), - "channel": msg.Channel, - "session": msg.SessionKey, - "chat_id": msg.ChatID, - "sender_id": msg.SenderID, - "status": status, - "duration_ms": int(time.Since(started).Milliseconds()), - "error": func() string { if runErr != nil { return runErr.Error() }; return "" }(), + "task_id": taskID, + "time": time.Now().UTC().Format(time.RFC3339), + "channel": msg.Channel, + "session": msg.SessionKey, + "chat_id": msg.ChatID, + "sender_id": msg.SenderID, + "status": status, + "duration_ms": durationMs, + "suppressed": suppressed, + "retry_count": 0, + "log": logText, "input_preview": truncate(strings.ReplaceAll(msg.Content, "\n", " "), 180), } b, _ := json.Marshal(row) diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 780dd41..ac5b32f 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -89,6 +89,7 @@ func (s *RegistryServer) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/sessions", s.handleWebUISessions) mux.HandleFunc("/webui/api/memory", s.handleWebUIMemory) mux.HandleFunc("/webui/api/task_audit", s.handleWebUITaskAudit) + mux.HandleFunc("/webui/api/task_queue", s.handleWebUITaskQueue) mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals) mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream) mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent) @@ -1384,6 +1385,60 @@ func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Req _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "items": items}) } +func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl") + b, err := os.ReadFile(path) + if err != nil { + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": []map[string]interface{}{}, "items": []map[string]interface{}{}}) + return + } + lines := strings.Split(string(b), "\n") + type agg struct { + Last map[string]interface{} + Logs []string + } + m := map[string]*agg{} + for _, ln := range lines { + if ln == "" { + continue + } + var row map[string]interface{} + if err := json.Unmarshal([]byte(ln), &row); err != nil { + continue + } + id := fmt.Sprintf("%v", row["task_id"]) + if id == "" { + continue + } + if _, ok := m[id]; !ok { + m[id] = &agg{Last: row, Logs: []string{}} + } + m[id].Last = row + if lg := fmt.Sprintf("%v", row["log"]); lg != "" { + m[id].Logs = append(m[id].Logs, lg) + } + } + items := make([]map[string]interface{}, 0, len(m)) + running := make([]map[string]interface{}, 0) + for _, a := range m { + row := a.Last + row["logs"] = a.Logs + items = append(items, row) + if fmt.Sprintf("%v", row["status"]) == "running" { + running = append(running, row) + } + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": running, "items": items}) +} + func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index d2e2454..faa8296 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -16,6 +16,9 @@ const resources = { taskAudit: 'Task Audit', taskList: 'Task List', taskDetail: 'Task Detail', + taskQueue: 'Task Queue', + taskLogs: 'Task Logs', + error: 'Error', noTaskAudit: 'No task audit records', selectTask: 'Select a task from the left list', loading: 'Loading...', @@ -165,6 +168,9 @@ const resources = { taskAudit: '任务审计', taskList: '任务列表', taskDetail: '任务详情', + taskQueue: '任务队列', + taskLogs: '任务日志', + error: '错误', noTaskAudit: '暂无任务审计记录', selectTask: '请从左侧选择任务', loading: '加载中...', diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index 87ffc0f..f22d3c1 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -11,8 +11,10 @@ type TaskAuditItem = { sender_id?: string; status?: string; duration_ms?: number; + retry_count?: number; error?: string; input_preview?: string; + logs?: string[]; [key: string]: any; }; @@ -26,13 +28,14 @@ const TaskAudit: React.FC = () => { const fetchData = async () => { setLoading(true); try { - const url = `/webui/api/task_audit${q ? `${q}&limit=300` : '?limit=300'}`; + const url = `/webui/api/task_queue${q ? `${q}&limit=300` : '?limit=300'}`; const r = await fetch(url); if (!r.ok) throw new Error(await r.text()); const j = await r.json(); const arr = Array.isArray(j.items) ? j.items : []; - setItems(arr.reverse()); - if (arr.length > 0) setSelected(arr[arr.length - 1]); + const sorted = arr.sort((a: any, b: any) => String(b.time || '').localeCompare(String(a.time || ''))); + setItems(sorted); + if (sorted.length > 0) setSelected(sorted[0]); } catch (e) { console.error(e); setItems([]); @@ -55,7 +58,7 @@ const TaskAudit: React.FC = () => {
-
{t('taskList')}
+
{t('taskQueue')}
{items.length === 0 ? (
{t('noTaskAudit')}
@@ -68,7 +71,7 @@ const TaskAudit: React.FC = () => { className={`w-full text-left px-3 py-2 border-b border-zinc-800/60 hover:bg-zinc-800/40 ${active ? 'bg-indigo-500/15' : ''}`} >
{it.task_id || `task-${idx + 1}`}
-
{it.channel} · {it.status} · {it.duration_ms || 0}ms
+
{it.channel} · {it.status} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0}
{it.time}
); @@ -98,12 +101,17 @@ const TaskAudit: React.FC = () => {
-
Error
+
{t('error')}
{selected.error || '-'}
-
Raw JSON
+
{t('taskLogs')}
+
{Array.isArray(selected.logs) && selected.logs.length ? selected.logs.join('\n') : '-'}
+
+ +
+
{t('rawJson')}
{selectedPretty}