task-audit cleanup: filter heartbeat by default and merge recurring autonomy runs via stable task ids; add channel-level inbound dedupe

This commit is contained in:
DBT
2026-03-01 05:33:47 +00:00
parent 4b8d3168c5
commit 6608456fbf
2 changed files with 36 additions and 1 deletions

View File

@@ -405,7 +405,7 @@ func (al *AgentLoop) getSessionProvider(sessionKey string) string {
}
func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) {
taskID := fmt.Sprintf("%s-%d", shortSessionKey(msg.SessionKey), time.Now().Unix()%100000)
taskID := buildAuditTaskID(msg)
started := time.Now()
al.appendTaskAuditEvent(taskID, msg, "running", started, 0, "started", false)
@@ -437,6 +437,31 @@ func shortSessionKey(s string) string {
return s[:8]
}
func buildAuditTaskID(msg bus.InboundMessage) string {
trigger := ""
if msg.Metadata != nil {
trigger = strings.ToLower(strings.TrimSpace(msg.Metadata["trigger"]))
}
sessionPart := shortSessionKey(msg.SessionKey)
switch trigger {
case "heartbeat":
if sessionPart == "" {
sessionPart = "default"
}
return "heartbeat:" + sessionPart
case "autonomy":
norm := strings.ToLower(strings.TrimSpace(strings.ReplaceAll(msg.Content, "\n", " ")))
if len(norm) > 180 {
norm = norm[:180]
}
h := fnv.New32a()
_, _ = h.Write([]byte(msg.SessionKey + "|" + norm))
return fmt.Sprintf("autonomy:%08x", h.Sum32())
default:
return fmt.Sprintf("%s-%d", sessionPart, time.Now().Unix()%100000)
}
}
func (al *AgentLoop) appendTaskAudit(taskID string, msg bus.InboundMessage, started time.Time, runErr error, suppressed bool) {
status := "success"
logText := "completed"

View File

@@ -1447,6 +1447,7 @@ func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Req
}
path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl")
includeHeartbeat := r.URL.Query().Get("include_heartbeat") == "1"
limit := 100
if v := r.URL.Query().Get("limit"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
@@ -1475,6 +1476,10 @@ path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.
}
var row map[string]interface{}
if err := json.Unmarshal([]byte(ln), &row); err == nil {
source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"])))
if !includeHeartbeat && source == "heartbeat" {
continue
}
items = append(items, row)
}
}
@@ -1491,6 +1496,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
return
}
path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl")
includeHeartbeat := r.URL.Query().Get("include_heartbeat") == "1"
b, err := os.ReadFile(path)
if err != nil {
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": []map[string]interface{}{}, "items": []map[string]interface{}{}})
@@ -1510,6 +1516,10 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
if err := json.Unmarshal([]byte(ln), &row); err != nil {
continue
}
source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"])))
if !includeHeartbeat && source == "heartbeat" {
continue
}
id := fmt.Sprintf("%v", row["task_id"])
if id == "" {
continue