From eb4f1dc82ae7501d81b9d0be6d15fb11d878d492 Mon Sep 17 00:00:00 2001 From: DBT Date: Mon, 2 Mar 2026 00:18:46 +0000 Subject: [PATCH] autonomy persistence: keep task lifecycle records with parent links, attempts, and memory/audit references instead of transient queue semantics --- pkg/autonomy/engine.go | 67 +++++++++++++++++++++++++++++++++++--- pkg/autonomy/task_store.go | 39 ++++++++++++++-------- 2 files changed, 88 insertions(+), 18 deletions(-) diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index 0059507..7959d61 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -336,6 +336,7 @@ func (e *Engine) tick() { e.releaseLocksLocked(st.ID) if outcome == "success" || outcome == "suppressed" { st.Status = "completed" + e.appendTaskAttemptLocked(st.ID, "done", "autonomy:"+st.ID, "run outcome success") e.writeReflectLog("complete", st, "marked completed by run outcome") e.enqueueInferredNextTasksLocked(st) continue @@ -356,6 +357,7 @@ func (e *Engine) tick() { continue } st.BlockReason = "last_run_error" + e.appendTaskAttemptLocked(st.ID, "blocked", "autonomy:"+st.ID, "run outcome error") st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall+1, e.opts.MinRunIntervalSec)) e.sendFailureNotification(st, "last run ended with error") continue @@ -416,6 +418,7 @@ func (e *Engine) tick() { continue } e.dispatchTask(st) + e.appendTaskAttemptLocked(st.ID, "running", "autonomy:"+st.ID, "dispatch") st.Status = "running" st.WaitAttempts = 0 st.BlockReason = "" @@ -733,7 +736,7 @@ func (e *Engine) enqueueInferredNextTasksLocked(st *taskState) { } id := hashID(candidate) e.state[id] = &taskState{ID: id, Content: candidate, Priority: "normal", Status: "idle"} - items = append(items, TaskItem{ID: id, Content: candidate, Priority: "normal", Status: "todo", Source: "autonomy_infer", UpdatedAt: now}) + items = append(items, TaskItem{ID: id, ParentTaskID: st.ID, Content: candidate, Priority: "normal", Status: "todo", Source: "autonomy_infer", UpdatedAt: now}) existing[candidate] = true added++ } @@ -765,7 +768,7 @@ func (e *Engine) enqueueAutoRepairTaskLocked(st *taskState, errSig string) { } id := hashID(content) e.state[id] = &taskState{ID: id, Content: content, Priority: "high", Status: "idle"} - items = append(items, TaskItem{ID: id, Content: content, Priority: "high", Status: "todo", Source: "autonomy_repair", UpdatedAt: nowRFC3339()}) + items = append(items, TaskItem{ID: id, ParentTaskID: st.ID, Content: content, Priority: "high", Status: "todo", Source: "autonomy_repair", UpdatedAt: nowRFC3339()}) _ = e.taskStore.Save(items) e.writeReflectLog("infer", st, "generated auto-repair task due to repeated error signature") } @@ -816,8 +819,50 @@ func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reason _, _ = f.WriteString(line + "\n") } dayPath := filepath.Join(e.opts.Workspace, "memory", now.Format("2006-01-02")+".md") + memoryMain := filepath.Join(e.opts.Workspace, "MEMORY.md") appendIfDue(dayPath) - appendIfDue(filepath.Join(e.opts.Workspace, "MEMORY.md")) + appendIfDue(memoryMain) + items, _ := e.taskStore.Load() + for i := range items { + if items[i].ID != st.ID { + continue + } + items[i].MemoryRefs = append(items[i].MemoryRefs, dayPath, memoryMain) + if len(items[i].MemoryRefs) > 10 { + items[i].MemoryRefs = items[i].MemoryRefs[len(items[i].MemoryRefs)-10:] + } + break + } + _ = e.taskStore.Save(items) +} + +func (e *Engine) appendTaskAttemptLocked(taskID, status, session, note string) { + taskID = strings.TrimSpace(taskID) + if taskID == "" { + return + } + items, _ := e.taskStore.Load() + for i := range items { + if items[i].ID != taskID { + continue + } + items[i].Attempts = append(items[i].Attempts, TaskAttempt{ + Time: time.Now().UTC().Format(time.RFC3339), + Status: strings.TrimSpace(status), + Session: strings.TrimSpace(session), + Note: shortTask(note), + }) + if len(items[i].Attempts) > 30 { + items[i].Attempts = items[i].Attempts[len(items[i].Attempts)-30:] + } + items[i].AuditRefs = append(items[i].AuditRefs, time.Now().UTC().Format(time.RFC3339)) + if len(items[i].AuditRefs) > 60 { + items[i].AuditRefs = items[i].AuditRefs[len(items[i].AuditRefs)-60:] + } + items[i].UpdatedAt = nowRFC3339() + break + } + _ = e.taskStore.Save(items) } func (e *Engine) sendFailureNotification(st *taskState, reason string) { @@ -978,6 +1023,11 @@ func inQuietHours(now time.Time, spec string) bool { } func (e *Engine) persistStateLocked() { + existing, _ := e.taskStore.Load() + existingMap := map[string]TaskItem{} + for _, it := range existing { + existingMap[it.ID] = it + } items := make([]TaskItem, 0, len(e.state)) for _, st := range e.state { status := "todo" @@ -1001,19 +1051,28 @@ func (e *Engine) persistStateLocked() { if !st.LastPauseAt.IsZero() { lastPauseAt = st.LastPauseAt.UTC().Format(time.RFC3339) } + prev := existingMap[st.ID] + source := prev.Source + if strings.TrimSpace(source) == "" { + source = "memory_todo" + } items = append(items, TaskItem{ ID: st.ID, + ParentTaskID: prev.ParentTaskID, Content: st.Content, Priority: st.Priority, DueAt: st.DueAt, Status: status, BlockReason: st.BlockReason, RetryAfter: retryAfter, - Source: "memory_todo", + Source: source, DedupeHits: st.DedupeHits, ResourceKeys: append([]string(nil), st.ResourceKeys...), LastPauseReason: st.LastPauseReason, LastPauseAt: lastPauseAt, + MemoryRefs: append([]string(nil), prev.MemoryRefs...), + AuditRefs: append([]string(nil), prev.AuditRefs...), + Attempts: append([]TaskAttempt(nil), prev.Attempts...), UpdatedAt: nowRFC3339(), }) } diff --git a/pkg/autonomy/task_store.go b/pkg/autonomy/task_store.go index 726689e..c141717 100644 --- a/pkg/autonomy/task_store.go +++ b/pkg/autonomy/task_store.go @@ -9,20 +9,31 @@ import ( "time" ) +type TaskAttempt struct { + Time string `json:"time"` + Status string `json:"status"` + Session string `json:"session,omitempty"` + Note string `json:"note,omitempty"` +} + type TaskItem struct { - ID string `json:"id"` - Content string `json:"content"` - Priority string `json:"priority"` - DueAt string `json:"due_at,omitempty"` - Status string `json:"status"` // todo|doing|waiting|blocked|done - BlockReason string `json:"block_reason,omitempty"` - RetryAfter string `json:"retry_after,omitempty"` - Source string `json:"source"` - DedupeHits int `json:"dedupe_hits,omitempty"` - ResourceKeys []string `json:"resource_keys,omitempty"` - LastPauseReason string `json:"last_pause_reason,omitempty"` - LastPauseAt string `json:"last_pause_at,omitempty"` - UpdatedAt string `json:"updated_at"` + ID string `json:"id"` + ParentTaskID string `json:"parent_task_id,omitempty"` + Content string `json:"content"` + Priority string `json:"priority"` + DueAt string `json:"due_at,omitempty"` + Status string `json:"status"` // todo|doing|waiting|blocked|done|paused|canceled + BlockReason string `json:"block_reason,omitempty"` + RetryAfter string `json:"retry_after,omitempty"` + Source string `json:"source"` + DedupeHits int `json:"dedupe_hits,omitempty"` + ResourceKeys []string `json:"resource_keys,omitempty"` + LastPauseReason string `json:"last_pause_reason,omitempty"` + LastPauseAt string `json:"last_pause_at,omitempty"` + MemoryRefs []string `json:"memory_refs,omitempty"` + AuditRefs []string `json:"audit_refs,omitempty"` + Attempts []TaskAttempt `json:"attempts,omitempty"` + UpdatedAt string `json:"updated_at"` } type TaskStore struct { @@ -65,7 +76,7 @@ func (s *TaskStore) Save(items []TaskItem) error { func normalizeStatus(v string) string { s := strings.ToLower(strings.TrimSpace(v)) switch s { - case "todo", "doing", "blocked", "done": + case "todo", "doing", "waiting", "blocked", "done", "paused", "canceled": return s default: return "todo"