From 6608456fbf6f151cf439b993b2eb6567833cca36 Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 05:33:47 +0000 Subject: [PATCH] task-audit cleanup: filter heartbeat by default and merge recurring autonomy runs via stable task ids; add channel-level inbound dedupe --- pkg/agent/loop.go | 27 ++++++++++++++++++++++++++- pkg/nodes/registry_server.go | 10 ++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index feac6f9..fef27b1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -405,7 +405,7 @@ func (al *AgentLoop) getSessionProvider(sessionKey string) string { } func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) { - taskID := fmt.Sprintf("%s-%d", shortSessionKey(msg.SessionKey), time.Now().Unix()%100000) + taskID := buildAuditTaskID(msg) started := time.Now() al.appendTaskAuditEvent(taskID, msg, "running", started, 0, "started", false) @@ -437,6 +437,31 @@ func shortSessionKey(s string) string { return s[:8] } +func buildAuditTaskID(msg bus.InboundMessage) string { + trigger := "" + if msg.Metadata != nil { + trigger = strings.ToLower(strings.TrimSpace(msg.Metadata["trigger"])) + } + sessionPart := shortSessionKey(msg.SessionKey) + switch trigger { + case "heartbeat": + if sessionPart == "" { + sessionPart = "default" + } + return "heartbeat:" + sessionPart + case "autonomy": + norm := strings.ToLower(strings.TrimSpace(strings.ReplaceAll(msg.Content, "\n", " "))) + if len(norm) > 180 { + norm = norm[:180] + } + h := fnv.New32a() + _, _ = h.Write([]byte(msg.SessionKey + "|" + norm)) + return fmt.Sprintf("autonomy:%08x", h.Sum32()) + default: + return fmt.Sprintf("%s-%d", sessionPart, time.Now().Unix()%100000) + } +} + func (al *AgentLoop) appendTaskAudit(taskID string, msg bus.InboundMessage, started time.Time, runErr error, suppressed bool) { status := "success" logText := "completed" diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 4636f5a..b4b5336 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -1447,6 +1447,7 @@ func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Req } path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl") + includeHeartbeat := r.URL.Query().Get("include_heartbeat") == "1" limit := 100 if v := r.URL.Query().Get("limit"); v != "" { if n, err := strconv.Atoi(v); err == nil && n > 0 { @@ -1475,6 +1476,10 @@ path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit. } var row map[string]interface{} if err := json.Unmarshal([]byte(ln), &row); err == nil { + source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"]))) + if !includeHeartbeat && source == "heartbeat" { + continue + } items = append(items, row) } } @@ -1491,6 +1496,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req return } path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl") + includeHeartbeat := r.URL.Query().Get("include_heartbeat") == "1" b, err := os.ReadFile(path) if err != nil { _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": []map[string]interface{}{}, "items": []map[string]interface{}{}}) @@ -1510,6 +1516,10 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req if err := json.Unmarshal([]byte(ln), &row); err != nil { continue } + source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"]))) + if !includeHeartbeat && source == "heartbeat" { + continue + } id := fmt.Sprintf("%v", row["task_id"]) if id == "" { continue