task-queue: unified task audit events, queue API, and bilingual split-view webui

This commit is contained in:
DBT
2026-02-28 02:41:05 +00:00
parent 4dfd6ce11f
commit f274acf818
4 changed files with 101 additions and 22 deletions

View File

@@ -312,6 +312,7 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() {
func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) {
taskID := fmt.Sprintf("%s-%d", shortSessionKey(msg.SessionKey), time.Now().Unix()%100000)
started := time.Now()
al.appendTaskAuditEvent(taskID, msg, "running", started, 0, "started", false)
response, err := al.processMessage(ctx, msg)
if err != nil {
@@ -342,27 +343,36 @@ func shortSessionKey(s string) string {
}
func (al *AgentLoop) appendTaskAudit(taskID string, msg bus.InboundMessage, started time.Time, runErr error, suppressed bool) {
status := "success"
logText := "completed"
if runErr != nil {
status = "error"
logText = runErr.Error()
} else if suppressed {
status = "suppressed"
logText = "suppressed"
}
al.appendTaskAuditEvent(taskID, msg, status, started, int(time.Since(started).Milliseconds()), logText, suppressed)
}
func (al *AgentLoop) appendTaskAuditEvent(taskID string, msg bus.InboundMessage, status string, started time.Time, durationMs int, logText string, suppressed bool) {
if al.workspace == "" {
return
}
path := filepath.Join(al.workspace, "memory", "task-audit.jsonl")
_ = os.MkdirAll(filepath.Dir(path), 0755)
status := "success"
if runErr != nil {
status = "error"
} else if suppressed {
status = "suppressed"
}
row := map[string]interface{}{
"task_id": taskID,
"time": time.Now().UTC().Format(time.RFC3339),
"channel": msg.Channel,
"session": msg.SessionKey,
"chat_id": msg.ChatID,
"sender_id": msg.SenderID,
"status": status,
"duration_ms": int(time.Since(started).Milliseconds()),
"error": func() string { if runErr != nil { return runErr.Error() }; return "" }(),
"task_id": taskID,
"time": time.Now().UTC().Format(time.RFC3339),
"channel": msg.Channel,
"session": msg.SessionKey,
"chat_id": msg.ChatID,
"sender_id": msg.SenderID,
"status": status,
"duration_ms": durationMs,
"suppressed": suppressed,
"retry_count": 0,
"log": logText,
"input_preview": truncate(strings.ReplaceAll(msg.Content, "\n", " "), 180),
}
b, _ := json.Marshal(row)

View File

@@ -89,6 +89,7 @@ func (s *RegistryServer) Start(ctx context.Context) error {
mux.HandleFunc("/webui/api/sessions", s.handleWebUISessions)
mux.HandleFunc("/webui/api/memory", s.handleWebUIMemory)
mux.HandleFunc("/webui/api/task_audit", s.handleWebUITaskAudit)
mux.HandleFunc("/webui/api/task_queue", s.handleWebUITaskQueue)
mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals)
mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream)
mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent)
@@ -1384,6 +1385,60 @@ func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Req
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "items": items})
}
func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl")
b, err := os.ReadFile(path)
if err != nil {
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": []map[string]interface{}{}, "items": []map[string]interface{}{}})
return
}
lines := strings.Split(string(b), "\n")
type agg struct {
Last map[string]interface{}
Logs []string
}
m := map[string]*agg{}
for _, ln := range lines {
if ln == "" {
continue
}
var row map[string]interface{}
if err := json.Unmarshal([]byte(ln), &row); err != nil {
continue
}
id := fmt.Sprintf("%v", row["task_id"])
if id == "" {
continue
}
if _, ok := m[id]; !ok {
m[id] = &agg{Last: row, Logs: []string{}}
}
m[id].Last = row
if lg := fmt.Sprintf("%v", row["log"]); lg != "" {
m[id].Logs = append(m[id].Logs, lg)
}
}
items := make([]map[string]interface{}, 0, len(m))
running := make([]map[string]interface{}, 0)
for _, a := range m {
row := a.Last
row["logs"] = a.Logs
items = append(items, row)
if fmt.Sprintf("%v", row["status"]) == "running" {
running = append(running, row)
}
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": running, "items": items})
}
func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)