From b91d33b6b8107c75a942d35a5d0aeabbe2198878 Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 04:48:50 +0000 Subject: [PATCH] webui ekg: add ekg stats api and task-audit insights panel (top errsig/provider, escalation count) --- pkg/nodes/registry_server.go | 88 +++++++++++++++++++++++++++++++++++ webui/src/pages/TaskAudit.tsx | 39 ++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 0a64f86..509f429 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -93,6 +93,7 @@ func (s *RegistryServer) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/task_queue", s.handleWebUITaskQueue) mux.HandleFunc("/webui/api/tasks", s.handleWebUITasks) mux.HandleFunc("/webui/api/task_daily_summary", s.handleWebUITaskDailySummary) + mux.HandleFunc("/webui/api/ekg_stats", s.handleWebUIEKGStats) mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals) mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream) mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent) @@ -1617,6 +1618,93 @@ func (s *RegistryServer) handleWebUITaskDailySummary(w http.ResponseWriter, r *h _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "date": date, "report": report}) } +func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + workspace := strings.TrimSpace(s.workspacePath) + ekgPath := filepath.Join(workspace, "memory", "ekg-events.jsonl") + b, _ := os.ReadFile(ekgPath) + lines := strings.Split(string(b), "\n") + if len(lines) > 0 && lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + if len(lines) > 3000 { + lines = lines[len(lines)-3000:] + } + providerScore := map[string]float64{} + errSigCount := map[string]int{} + for _, ln := range lines { + if strings.TrimSpace(ln) == "" { + continue + } + var row map[string]interface{} + if json.Unmarshal([]byte(ln), &row) != nil { + continue + } + provider := strings.TrimSpace(fmt.Sprintf("%v", row["provider"])) + status := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"]))) + errSig := strings.TrimSpace(fmt.Sprintf("%v", row["errsig"])) + if provider != "" { + switch status { + case "success": + providerScore[provider] += 1 + case "suppressed": + providerScore[provider] += 0.2 + case "error": + providerScore[provider] -= 1 + } + } + if errSig != "" { + errSigCount[errSig]++ + } + } + type kv struct { + Key string `json:"key"` + Score float64 `json:"score,omitempty"` + Count int `json:"count,omitempty"` + } + providerTop := make([]kv, 0, len(providerScore)) + for k, v := range providerScore { + providerTop = append(providerTop, kv{Key: k, Score: v}) + } + sort.Slice(providerTop, func(i, j int) bool { return providerTop[i].Score > providerTop[j].Score }) + if len(providerTop) > 5 { + providerTop = providerTop[:5] + } + errTop := make([]kv, 0, len(errSigCount)) + for k, v := range errSigCount { + errTop = append(errTop, kv{Key: k, Count: v}) + } + sort.Slice(errTop, func(i, j int) bool { return errTop[i].Count > errTop[j].Count }) + if len(errTop) > 5 { + errTop = errTop[:5] + } + escalations := 0 + tasksPath := filepath.Join(workspace, "memory", "tasks.json") + if tb, err := os.ReadFile(tasksPath); err == nil { + var tasks []map[string]interface{} + if json.Unmarshal(tb, &tasks) == nil { + for _, t := range tasks { + if strings.TrimSpace(fmt.Sprintf("%v", t["block_reason"])) == "repeated_error_signature" { + escalations++ + } + } + } + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "ok": true, + "provider_top": providerTop, + "errsig_top": errTop, + "escalation_count": escalations, + }) +} + func (s *RegistryServer) handleWebUITasks(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index 5654e8a..36c3129 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -2,6 +2,8 @@ import React, { useEffect, useMemo, useState } from 'react'; import { useTranslation } from 'react-i18next'; import { useAppContext } from '../context/AppContext'; +type EKGKV = { key?: string; score?: number; count?: number }; + type TaskAuditItem = { task_id?: string; time?: string; @@ -37,6 +39,9 @@ const TaskAudit: React.FC = () => { const [dailyReport, setDailyReport] = useState(''); const [reportDate, setReportDate] = useState(new Date().toISOString().slice(0,10)); const [showDailyReport, setShowDailyReport] = useState(false); + const [ekgProviderTop, setEkgProviderTop] = useState([]); + const [ekgErrsigTop, setEkgErrsigTop] = useState([]); + const [ekgEscalationCount, setEkgEscalationCount] = useState(0); const fetchData = async () => { setLoading(true); @@ -57,6 +62,14 @@ const TaskAudit: React.FC = () => { } else { setDailyReport(''); } + const ekgUrl = `/webui/api/ekg_stats${q || ''}`; + const er = await fetch(ekgUrl); + if (er.ok) { + const ej = await er.json(); + setEkgProviderTop(Array.isArray(ej.provider_top) ? ej.provider_top : []); + setEkgErrsigTop(Array.isArray(ej.errsig_top) ? ej.errsig_top : []); + setEkgEscalationCount(Number(ej.escalation_count || 0)); + } } catch (e) { console.error(e); setItems([]); @@ -125,6 +138,32 @@ const TaskAudit: React.FC = () => { +
+
EKG Insights
+
+
+
Escalations
+
{ekgEscalationCount}
+
+
+
Top Providers
+
+ {ekgProviderTop.length === 0 ?
-
: ekgProviderTop.map((x, i) => ( +
{x.key} ({Number(x.score || 0).toFixed(2)})
+ ))} +
+
+
+
+
Top Error Signatures
+
+ {ekgErrsigTop.length === 0 ?
-
: ekgErrsigTop.map((x, i) => ( +
{x.key} (x{x.count || 0})
+ ))} +
+
+
+
{t('dailySummary')}