Files
clawgo/pkg/autonomy/engine.go
2026-03-05 12:55:30 +08:00

1508 lines
41 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package autonomy
import (
"bufio"
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"clawgo/pkg/bus"
"clawgo/pkg/ekg"
"clawgo/pkg/lifecycle"
"clawgo/pkg/scheduling"
)
type Options struct {
Enabled bool
TickIntervalSec int
MinRunIntervalSec int
MaxPendingDurationSec int
MaxConsecutiveStalls int
MaxDispatchPerTick int
Workspace string
DefaultNotifyChannel string
DefaultNotifyChatID string
NotifyAllowFrom []string
NotifyCooldownSec int
NotifySameReasonCooldownSec int
QuietHours string
UserIdleResumeSec int
WaitingResumeDebounceSec int
IdleRoundBudgetReleaseSec int
MaxRoundsWithoutUser int
TaskHistoryRetentionDays int
AllowedTaskKeywords []string
EKGConsecutiveErrorThreshold int
}
type taskState struct {
ID string
Content string
Priority string
DueAt string
Status string // idle|running|waiting|blocked|completed
BlockReason string
WaitingSince time.Time
LastRunAt time.Time
LastAutonomyAt time.Time
RetryAfter time.Time
ConsecutiveStall int
DedupeHits int
ResourceKeys []string
WaitAttempts int
LastPauseReason string
LastPauseAt time.Time
}
type Engine struct {
opts Options
bus *bus.MessageBus
runner *lifecycle.LoopRunner
taskStore *TaskStore
mu sync.Mutex
state map[string]*taskState
lastNotify map[string]time.Time
lockOwners map[string]string
roundsWithoutUser int
lastDailyReportDate string
lastHistoryCleanupAt time.Time
ekg *ekg.Engine
}
func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
if opts.TickIntervalSec <= 0 {
opts.TickIntervalSec = 30
}
if opts.MinRunIntervalSec <= 0 {
opts.MinRunIntervalSec = 20
}
if opts.MaxPendingDurationSec <= 0 {
opts.MaxPendingDurationSec = 180
}
if opts.MaxConsecutiveStalls <= 0 {
opts.MaxConsecutiveStalls = 3
}
// max_dispatch_per_tick <= 0 means "unlimited dispatch per tick".
if opts.MaxDispatchPerTick < 0 {
opts.MaxDispatchPerTick = 2
}
if opts.NotifyCooldownSec <= 0 {
opts.NotifyCooldownSec = 300
}
if opts.UserIdleResumeSec <= 0 {
opts.UserIdleResumeSec = 20
}
if opts.NotifySameReasonCooldownSec <= 0 {
opts.NotifySameReasonCooldownSec = 900
}
if opts.WaitingResumeDebounceSec <= 0 {
opts.WaitingResumeDebounceSec = 5
}
if opts.MaxRoundsWithoutUser < 0 {
opts.MaxRoundsWithoutUser = 0
}
if opts.IdleRoundBudgetReleaseSec < 0 {
opts.IdleRoundBudgetReleaseSec = 0
}
if opts.TaskHistoryRetentionDays <= 0 {
opts.TaskHistoryRetentionDays = 3
}
if opts.EKGConsecutiveErrorThreshold <= 0 {
opts.EKGConsecutiveErrorThreshold = 3
}
eng := &Engine{
opts: opts,
bus: msgBus,
runner: lifecycle.NewLoopRunner(),
taskStore: NewTaskStore(opts.Workspace),
state: map[string]*taskState{},
lastNotify: map[string]time.Time{},
lockOwners: map[string]string{},
ekg: ekg.New(opts.Workspace),
}
if eng.ekg != nil {
eng.ekg.SetConsecutiveErrorThreshold(opts.EKGConsecutiveErrorThreshold)
}
return eng
}
func (e *Engine) Start() {
if !e.opts.Enabled {
return
}
e.runner.Start(e.runLoop)
}
func (e *Engine) Stop() {
e.runner.Stop()
}
func (e *Engine) runLoop(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Duration(e.opts.TickIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
e.tick()
}
}
}
func (e *Engine) tick() {
todos := e.scanTodos()
now := time.Now()
stored, _ := e.taskStore.Load()
e.mu.Lock()
if e.hasManualPause() {
for _, st := range e.state {
if st.Status == "running" {
e.releaseLocksLocked(st.ID)
st.Status = "waiting"
st.BlockReason = "manual_pause"
st.WaitingSince = now
st.LastPauseReason = "manual_pause"
st.LastPauseAt = now
e.writeReflectLog("waiting", st, "paused by manual switch")
e.writeTriggerAudit("waiting", st, "manual_pause")
}
}
e.roundsWithoutUser = 0
e.persistStateLocked()
e.mu.Unlock()
return
}
if e.hasRecentUserActivity(now) {
for _, st := range e.state {
if st.Status == "running" {
e.releaseLocksLocked(st.ID)
st.Status = "waiting"
st.BlockReason = "active_user"
st.WaitingSince = now
st.LastPauseReason = "active_user"
st.LastPauseAt = now
e.writeReflectLog("waiting", st, "paused due to active user conversation")
e.writeTriggerAudit("waiting", st, "active_user")
}
}
e.persistStateLocked()
e.mu.Unlock()
return
}
e.mu.Unlock()
e.mu.Lock()
defer e.mu.Unlock()
storedMap := map[string]TaskItem{}
for _, it := range stored {
storedMap[it.ID] = it
}
known := map[string]struct{}{}
for _, t := range todos {
known[t.ID] = struct{}{}
}
// Merge structured tasks.json entries as todo source too (for WebUI CRUD-created tasks).
for _, old := range stored {
if old.ID == "" {
continue
}
if _, ok := known[old.ID]; ok {
continue
}
st := strings.ToLower(old.Status)
if st == "done" || st == "completed" {
continue
}
todos = append(todos, todoItem{ID: old.ID, Content: old.Content, Priority: old.Priority, DueAt: old.DueAt})
known[old.ID] = struct{}{}
}
for _, t := range todos {
st, ok := e.state[t.ID]
if !ok {
status := "idle"
retryAfter := time.Time{}
resourceKeys := deriveResourceKeys(t.Content)
lastPauseReason := ""
lastPauseAt := time.Time{}
if old, ok := storedMap[t.ID]; ok {
if old.Status == "blocked" {
status = "blocked"
}
if strings.TrimSpace(old.RetryAfter) != "" {
if rt, err := time.Parse(time.RFC3339, old.RetryAfter); err == nil {
retryAfter = rt
}
}
if len(old.ResourceKeys) > 0 {
resourceKeys = append([]string(nil), old.ResourceKeys...)
}
lastPauseReason = old.LastPauseReason
if old.LastPauseAt != "" {
if pt, err := time.Parse(time.RFC3339, old.LastPauseAt); err == nil {
lastPauseAt = pt
}
}
}
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: resourceKeys, LastPauseReason: lastPauseReason, LastPauseAt: lastPauseAt}
continue
}
st.Content = t.Content
st.Priority = t.Priority
st.DueAt = t.DueAt
st.DedupeHits = t.DedupeHits
st.ResourceKeys = deriveResourceKeys(t.Content)
}
// completed when removed from todo source
for id, st := range e.state {
if _, ok := known[id]; !ok {
if st.Status != "completed" {
e.releaseLocksLocked(st.ID)
st.Status = "completed"
e.sendCompletionNotification(st)
e.enqueueInferredNextTasksLocked(st)
}
}
}
ordered := make([]*taskState, 0, len(e.state))
for _, st := range e.state {
ordered = append(ordered, st)
}
sort.Slice(ordered, func(i, j int) bool {
si := schedulingScore(ordered[i], now)
sj := schedulingScore(ordered[j], now)
if si != sj {
return si > sj
}
return ordered[i].ID < ordered[j].ID
})
dispatched := 0
for _, st := range ordered {
if e.opts.MaxDispatchPerTick > 0 && dispatched >= e.opts.MaxDispatchPerTick {
break
}
if st.Status == "completed" {
continue
}
if st.Status == "waiting" {
if st.BlockReason == "resource_lock" && !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
continue
}
if st.BlockReason == "idle_round_budget" && e.opts.MaxRoundsWithoutUser > 0 && e.roundsWithoutUser >= e.opts.MaxRoundsWithoutUser {
// Optional auto-release without user dialog: allow one round after configured cooldown.
if e.opts.IdleRoundBudgetReleaseSec > 0 && !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) >= time.Duration(e.opts.IdleRoundBudgetReleaseSec)*time.Second {
e.roundsWithoutUser = e.opts.MaxRoundsWithoutUser - 1
e.writeReflectLog("resume", st, fmt.Sprintf("autonomy auto-resumed from idle round budget after %ds", e.opts.IdleRoundBudgetReleaseSec))
e.writeTriggerAudit("resume", st, "idle_round_budget_auto_release")
} else {
// Stay waiting until user activity resets round budget.
continue
}
}
// Debounce waiting/resume flapping
if !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) < time.Duration(e.opts.WaitingResumeDebounceSec)*time.Second {
continue
}
reason := st.BlockReason
st.Status = "idle"
st.BlockReason = ""
st.WaitingSince = time.Time{}
pausedFor := 0
if !st.LastPauseAt.IsZero() {
pausedFor = int(now.Sub(st.LastPauseAt).Seconds())
}
e.writeReflectLog("resume", st, fmt.Sprintf("autonomy resumed from waiting (reason=%s paused_for=%ds)", reason, pausedFor))
e.writeTriggerAudit("resume", st, reason)
}
if st.Status == "running" {
if outcome, ok := e.detectRunOutcome(st.ID, st.LastRunAt); ok {
e.releaseLocksLocked(st.ID)
if outcome == "success" || outcome == "suppressed" {
st.Status = "completed"
e.appendTaskAttemptLocked(st.ID, "done", "autonomy:"+st.ID, "run outcome success")
e.writeReflectLog("complete", st, "marked completed by run outcome")
e.enqueueInferredNextTasksLocked(st)
continue
}
if outcome == "error" {
errSig := e.latestErrorSignature(st.ID, st.LastRunAt)
advice := ekg.Advice{}
if e.ekg != nil {
advice = e.ekg.GetAdvice(ekg.SignalContext{TaskID: st.ID, ErrSig: errSig, Source: "autonomy", Channel: "system"})
}
st.Status = "blocked"
if advice.ShouldEscalate {
st.BlockReason = "repeated_error_signature"
st.RetryAfter = now.Add(5 * time.Minute)
e.enqueueAutoRepairTaskLocked(st, errSig)
e.appendMemoryIncidentLocked(st, errSig, advice.Reason)
e.sendFailureNotification(st, "repeated error signature detected; escalate")
continue
}
st.BlockReason = "last_run_error"
e.appendTaskAttemptLocked(st.ID, "blocked", "autonomy:"+st.ID, "run outcome error")
st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall+1, e.opts.MinRunIntervalSec))
e.sendFailureNotification(st, "last run ended with error")
continue
}
}
// Keep running tasks intact across ticks/restarts until pending timeout,
// so a gateway restart won't immediately redispatch from scratch.
if !st.LastRunAt.IsZero() && now.Sub(st.LastRunAt) <= time.Duration(e.opts.MaxPendingDurationSec)*time.Second {
continue
}
}
if st.Status == "blocked" {
e.releaseLocksLocked(st.ID)
if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
continue
}
if now.Sub(st.LastRunAt) >= blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec) {
st.Status = "idle"
st.BlockReason = ""
} else {
continue
}
}
if !st.LastRunAt.IsZero() && now.Sub(st.LastRunAt) < time.Duration(e.opts.MinRunIntervalSec)*time.Second {
continue
}
if !e.allowTaskByPolicy(st.Content) {
continue
}
if st.Status == "running" && now.Sub(st.LastRunAt) > time.Duration(e.opts.MaxPendingDurationSec)*time.Second {
st.ConsecutiveStall++
if st.ConsecutiveStall > e.opts.MaxConsecutiveStalls {
st.Status = "blocked"
st.BlockReason = "max_consecutive_stalls"
st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec))
e.sendFailureNotification(st, "max consecutive stalls reached")
continue
}
}
if e.opts.MaxRoundsWithoutUser > 0 && e.roundsWithoutUser >= e.opts.MaxRoundsWithoutUser {
if !(st.Status == "waiting" && st.BlockReason == "idle_round_budget") {
st.Status = "waiting"
st.BlockReason = "idle_round_budget"
st.WaitingSince = now
e.writeReflectLog("waiting", st, fmt.Sprintf("paused by idle round budget (%d)", e.opts.MaxRoundsWithoutUser))
e.writeTriggerAudit("waiting", st, "idle_round_budget")
}
continue
}
if !e.tryAcquireLocksLocked(st) {
st.Status = "waiting"
st.BlockReason = "resource_lock"
st.WaitingSince = now
st.RetryAfter = now.Add(30 * time.Second)
st.WaitAttempts++
e.writeTriggerAudit("waiting", st, "resource_lock")
continue
}
e.dispatchTask(st)
e.appendTaskAttemptLocked(st.ID, "running", "autonomy:"+st.ID, "dispatch")
st.Status = "running"
st.WaitAttempts = 0
st.BlockReason = ""
st.WaitingSince = time.Time{}
st.LastPauseReason = ""
st.LastRunAt = now
st.LastAutonomyAt = now
e.writeReflectLog("dispatch", st, "task dispatched to agent loop")
e.writeTriggerAudit("dispatch", st, "")
e.roundsWithoutUser++
dispatched++
}
e.persistStateLocked()
e.maybeWriteDailyReportLocked(now)
e.maybeCleanupTaskHistoryLocked(now)
}
func (e *Engine) tryAcquireLocksLocked(st *taskState) bool {
if st == nil {
return false
}
keys := st.ResourceKeys
if len(keys) == 0 {
keys = []string{"task:" + st.ID}
}
for _, k := range keys {
if owner, ok := e.lockOwners[k]; ok && owner != "" && owner != st.ID {
return false
}
}
for _, k := range keys {
e.lockOwners[k] = st.ID
}
return true
}
func (e *Engine) releaseLocksLocked(taskID string) {
if strings.TrimSpace(taskID) == "" {
return
}
for k, v := range e.lockOwners {
if v == taskID {
delete(e.lockOwners, k)
}
}
}
func schedulingScore(st *taskState, now time.Time) int {
if st == nil {
return 0
}
score := priorityWeight(st.Priority)*100 + int(dueWeight(st.DueAt))*10
if st.Status == "waiting" && st.BlockReason == "resource_lock" && !st.WaitingSince.IsZero() {
waitSec := int(now.Sub(st.WaitingSince).Seconds())
if waitSec > 0 {
score += waitSec / 10
}
score += st.WaitAttempts * 5
}
return score
}
func deriveResourceKeys(content string) []string {
return scheduling.DeriveResourceKeys(content)
}
type todoItem struct {
ID string
Content string
Priority string
DueAt string
DedupeHits int
}
func (e *Engine) scanTodos() []todoItem {
if strings.TrimSpace(e.opts.Workspace) == "" {
return nil
}
merged := map[string]todoItem{}
storedItems, _ := e.taskStore.Load()
doneIDs := map[string]bool{}
for _, it := range storedItems {
status := strings.ToLower(strings.TrimSpace(it.Status))
if status == "done" || status == "completed" {
doneIDs[strings.TrimSpace(it.ID)] = true
}
}
merge := func(it todoItem) {
if strings.TrimSpace(it.ID) == "" || strings.TrimSpace(it.Content) == "" {
return
}
if doneIDs[strings.TrimSpace(it.ID)] {
return
}
if cur, ok := merged[it.ID]; ok {
if priorityWeight(it.Priority) > priorityWeight(cur.Priority) {
cur.Priority = it.Priority
}
if cur.DueAt == "" && it.DueAt != "" {
cur.DueAt = it.DueAt
}
cur.DedupeHits++
merged[it.ID] = cur
return
}
merged[it.ID] = it
}
// 1) Parse markdown todos from MEMORY + today's daily memory.
paths := []string{
filepath.Join(e.opts.Workspace, "MEMORY.md"),
filepath.Join(e.opts.Workspace, "memory", time.Now().Format("2006-01-02")+".md"),
}
for _, p := range paths {
data, err := os.ReadFile(p)
if err != nil {
continue
}
for _, line := range strings.Split(string(data), "\n") {
t := strings.TrimSpace(line)
if strings.HasPrefix(t, "- [ ]") {
content := strings.TrimPrefix(t, "- [ ]")
priority, dueAt, normalized := parseTodoAttributes(content)
merge(todoItem{ID: hashID(normalized), Content: normalized, Priority: priority, DueAt: dueAt})
continue
}
if strings.HasPrefix(strings.ToLower(t), "todo:") {
content := t[5:]
priority, dueAt, normalized := parseTodoAttributes(content)
merge(todoItem{ID: hashID(normalized), Content: normalized, Priority: priority, DueAt: dueAt})
}
}
}
// 2) Merge structured tasks.json items (manual injections / prior state).
for _, it := range storedItems {
status := strings.ToLower(strings.TrimSpace(it.Status))
if status == "done" {
continue
}
content := it.Content
if content == "" {
continue
}
id := strings.TrimSpace(it.ID)
if id == "" {
id = hashID(content)
}
priority := strings.TrimSpace(it.Priority)
if priority == "" {
priority = "normal"
}
merge(todoItem{ID: id, Content: content, Priority: priority, DueAt: it.DueAt})
}
out := make([]todoItem, 0, len(merged))
for _, it := range merged {
out = append(out, it)
}
return out
}
func (e *Engine) dispatchTask(st *taskState) {
content := fmt.Sprintf("Autonomy task:\n%s", st.Content)
e.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: "autonomy",
ChatID: "internal:autonomy",
Content: content,
SessionKey: "autonomy:" + st.ID,
Metadata: map[string]string{
"trigger": "autonomy",
"task_id": st.ID,
"source": "memory_todo",
},
})
}
func (e *Engine) sendCompletionNotification(st *taskState) {
e.writeReflectLog("complete", st, "task marked completed")
e.writeTriggerAudit("complete", st, "")
if !e.shouldNotify("done:"+st.ID, "") {
return
}
tpl := "✅ Completed: %s\nNext step: reply \"continue %s\" if you want me to proceed."
e.bus.PublishOutbound(bus.OutboundMessage{
Channel: e.opts.DefaultNotifyChannel,
ChatID: e.opts.DefaultNotifyChatID,
Content: fmt.Sprintf(tpl, shortTask(st.Content), shortTask(st.Content)),
})
}
func (e *Engine) enqueueInferredNextTasksLocked(st *taskState) {
if st == nil {
return
}
content := strings.TrimSpace(st.Content)
if content == "" {
return
}
if strings.Contains(content, "[auto-next]") {
return
}
c := strings.ToLower(content)
looksLikeStudy := strings.Contains(c, "学习") || strings.Contains(c, "研究") || strings.Contains(c, "analy") || strings.Contains(c, "study") || strings.Contains(c, "代码") || strings.Contains(c, "codebase")
if !looksLikeStudy {
return
}
candidates := []string{
fmt.Sprintf("[auto-next] 基于「%s」输出架构摘要与改进点 Top5含收益/风险评估)", shortTask(content)),
fmt.Sprintf("[auto-next] 基于「%s」拆解 3-5 个可执行开发任务(含优先级、验收标准),并写入任务队列", shortTask(content)),
}
existing := map[string]bool{}
for _, cur := range e.state {
existing[strings.TrimSpace(cur.Content)] = true
}
items, _ := e.taskStore.Load()
for _, it := range items {
existing[strings.TrimSpace(it.Content)] = true
}
now := nowRFC3339()
added := 0
for _, candidate := range candidates {
candidate = strings.TrimSpace(candidate)
if candidate == "" || existing[candidate] {
continue
}
id := hashID(candidate)
e.state[id] = &taskState{ID: id, Content: candidate, Priority: "normal", Status: "idle"}
items = append(items, TaskItem{ID: id, ParentTaskID: st.ID, Content: candidate, Priority: "normal", Status: "todo", Source: "autonomy_infer", UpdatedAt: now})
existing[candidate] = true
added++
}
if added > 0 {
_ = e.taskStore.Save(items)
e.writeReflectLog("infer", st, fmt.Sprintf("generated %d follow-up task(s)", added))
}
}
func (e *Engine) enqueueAutoRepairTaskLocked(st *taskState, errSig string) {
if st == nil {
return
}
errSig = strings.TrimSpace(errSig)
if errSig == "" {
errSig = "unknown_error_signature"
}
content := fmt.Sprintf("[auto-repair] 排查任务 %s 的重复错误签名并给出修复步骤errsig=%s", shortTask(st.Content), shortTask(errSig))
existing := map[string]bool{}
for _, cur := range e.state {
existing[strings.TrimSpace(cur.Content)] = true
}
items, _ := e.taskStore.Load()
for _, it := range items {
existing[strings.TrimSpace(it.Content)] = true
}
if existing[content] {
return
}
id := hashID(content)
e.state[id] = &taskState{ID: id, Content: content, Priority: "high", Status: "idle"}
items = append(items, TaskItem{ID: id, ParentTaskID: st.ID, Content: content, Priority: "high", Status: "todo", Source: "autonomy_repair", UpdatedAt: nowRFC3339()})
_ = e.taskStore.Save(items)
e.writeReflectLog("infer", st, "generated auto-repair task due to repeated error signature")
}
func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reasons []string) {
if st == nil || strings.TrimSpace(e.opts.Workspace) == "" {
return
}
errSig = ekg.NormalizeErrorSignature(errSig)
if errSig == "" {
errSig = "unknown_error_signature"
}
marker := "[EKG_INCIDENT] errsig=" + errSig
now := time.Now().UTC()
line := fmt.Sprintf("- [EKG_INCIDENT] errsig=%s task=%s reason=%s time=%s", errSig, shortTask(st.Content), strings.Join(reasons, ";"), now.Format(time.RFC3339))
cooldown := 6 * time.Hour
hasRecentIncident := func(content string) bool {
for _, ln := range strings.Split(content, "\n") {
if !strings.Contains(ln, marker) {
continue
}
idx := strings.LastIndex(ln, "time=")
if idx < 0 {
return true
}
ts := strings.TrimSpace(ln[idx+len("time="):])
if tm, err := time.Parse(time.RFC3339, ts); err == nil {
if now.Sub(tm) < cooldown {
return true
}
continue
}
return true
}
return false
}
appendIfDue := func(path string) {
_ = os.MkdirAll(filepath.Dir(path), 0755)
b, _ := os.ReadFile(path)
if hasRecentIncident(string(b)) {
return
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
_, _ = f.WriteString(line + "\n")
}
dayPath := filepath.Join(e.opts.Workspace, "memory", now.Format("2006-01-02")+".md")
memoryMain := filepath.Join(e.opts.Workspace, "MEMORY.md")
appendIfDue(dayPath)
appendIfDue(memoryMain)
items, _ := e.taskStore.Load()
for i := range items {
if items[i].ID != st.ID {
continue
}
items[i].MemoryRefs = append(items[i].MemoryRefs, dayPath, memoryMain)
if len(items[i].MemoryRefs) > 10 {
items[i].MemoryRefs = items[i].MemoryRefs[len(items[i].MemoryRefs)-10:]
}
break
}
_ = e.taskStore.Save(items)
}
func (e *Engine) appendTaskAttemptLocked(taskID, status, session, note string) {
taskID = strings.TrimSpace(taskID)
if taskID == "" {
return
}
items, _ := e.taskStore.Load()
for i := range items {
if items[i].ID != taskID {
continue
}
items[i].Attempts = append(items[i].Attempts, TaskAttempt{
Time: time.Now().UTC().Format(time.RFC3339),
Status: strings.TrimSpace(status),
Session: strings.TrimSpace(session),
Note: shortTask(note),
})
if len(items[i].Attempts) > 30 {
items[i].Attempts = items[i].Attempts[len(items[i].Attempts)-30:]
}
items[i].AuditRefs = append(items[i].AuditRefs, time.Now().UTC().Format(time.RFC3339))
if len(items[i].AuditRefs) > 60 {
items[i].AuditRefs = items[i].AuditRefs[len(items[i].AuditRefs)-60:]
}
items[i].UpdatedAt = nowRFC3339()
break
}
_ = e.taskStore.Save(items)
}
func (e *Engine) sendFailureNotification(st *taskState, reason string) {
e.writeReflectLog("blocked", st, reason)
e.writeTriggerAudit("blocked", st, reason)
if !e.shouldNotify("blocked:"+st.ID, reason) {
return
}
tpl := "⚠️ Task blocked: %s\nReason: %s\nSuggestion: reply \"continue %s\" and I will retry from current state."
e.bus.PublishOutbound(bus.OutboundMessage{
Channel: e.opts.DefaultNotifyChannel,
ChatID: e.opts.DefaultNotifyChatID,
Content: fmt.Sprintf(tpl, shortTask(st.Content), reason, shortTask(st.Content)),
})
}
func (e *Engine) shouldNotify(key string, reason string) bool {
if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" {
return false
}
if len(e.opts.NotifyAllowFrom) > 0 {
allowed := false
for _, c := range e.opts.NotifyAllowFrom {
if c == e.opts.DefaultNotifyChatID {
allowed = true
break
}
}
if !allowed {
return false
}
}
now := time.Now()
if inQuietHours(now, e.opts.QuietHours) {
return false
}
if last, ok := e.lastNotify[key]; ok {
if now.Sub(last) < time.Duration(e.opts.NotifyCooldownSec)*time.Second {
return false
}
}
r := strings.ToLower(strings.TrimSpace(reason))
if r != "" {
rk := key + ":reason:" + strings.ReplaceAll(r, " ", "_")
if last, ok := e.lastNotify[rk]; ok {
if now.Sub(last) < time.Duration(e.opts.NotifySameReasonCooldownSec)*time.Second {
return false
}
}
e.lastNotify[rk] = now
}
e.lastNotify[key] = now
return true
}
func (e *Engine) writeTriggerAudit(action string, st *taskState, errText string) {
if strings.TrimSpace(e.opts.Workspace) == "" || st == nil {
return
}
memDir := filepath.Join(e.opts.Workspace, "memory")
path := filepath.Join(memDir, "trigger-audit.jsonl")
_ = os.MkdirAll(filepath.Dir(path), 0755)
row := map[string]interface{}{
"time": time.Now().UTC().Format(time.RFC3339),
"trigger": "autonomy",
"action": action,
"session": "autonomy:" + st.ID,
}
if errText != "" {
row["error"] = errText
}
if b, err := json.Marshal(row); err == nil {
f, oErr := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if oErr == nil {
_, _ = f.Write(append(b, '\n'))
_ = f.Close()
}
}
statsPath := filepath.Join(memDir, "trigger-stats.json")
stats := struct {
UpdatedAt string `json:"updated_at"`
Counts map[string]int `json:"counts"`
}{Counts: map[string]int{}}
if raw, rErr := os.ReadFile(statsPath); rErr == nil {
_ = json.Unmarshal(raw, &stats)
if stats.Counts == nil {
stats.Counts = map[string]int{}
}
}
stats.Counts["autonomy"]++
act := strings.ToLower(strings.TrimSpace(action))
if act != "" {
stats.Counts["autonomy:"+act]++
reason := strings.ToLower(errText)
if reason != "" {
reason = strings.ReplaceAll(reason, " ", "_")
reason = strings.ReplaceAll(reason, ":", "_")
stats.Counts["autonomy:"+act+":"+reason]++
}
}
stats.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if raw, mErr := json.MarshalIndent(stats, "", " "); mErr == nil {
_ = os.WriteFile(statsPath, raw, 0644)
}
}
func (e *Engine) writeReflectLog(stage string, st *taskState, outcome string) {
if strings.TrimSpace(e.opts.Workspace) == "" || st == nil {
return
}
memDir := filepath.Join(e.opts.Workspace, "memory")
_ = os.MkdirAll(memDir, 0755)
path := filepath.Join(memDir, time.Now().Format("2006-01-02")+".md")
line := fmt.Sprintf("- [%s] [autonomy][%s] task=%s status=%s outcome=%s\n", time.Now().Format("15:04"), stage, st.Content, st.Status, outcome)
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
_, _ = f.WriteString(line)
}
func inQuietHours(now time.Time, spec string) bool {
spec = strings.TrimSpace(spec)
if spec == "" {
return false
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
return false
}
parseHM := func(v string) (int, bool) {
hm := strings.Split(strings.TrimSpace(v), ":")
if len(hm) != 2 {
return 0, false
}
h, err1 := strconv.Atoi(hm[0])
m, err2 := strconv.Atoi(hm[1])
if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 {
return 0, false
}
return h*60 + m, true
}
start, ok1 := parseHM(parts[0])
end, ok2 := parseHM(parts[1])
if !ok1 || !ok2 {
return false
}
nowMin := now.Hour()*60 + now.Minute()
if start <= end {
return nowMin >= start && nowMin <= end
}
return nowMin >= start || nowMin <= end
}
func (e *Engine) persistStateLocked() {
existing, _ := e.taskStore.Load()
existingMap := map[string]TaskItem{}
for _, it := range existing {
existingMap[it.ID] = it
}
items := make([]TaskItem, 0, len(e.state))
built := map[string]struct{}{}
for _, st := range e.state {
status := "todo"
switch st.Status {
case "running":
status = "doing"
case "waiting":
status = "waiting"
case "blocked":
status = "blocked"
case "completed":
status = "done"
default:
status = "todo"
}
retryAfter := ""
if !st.RetryAfter.IsZero() {
retryAfter = st.RetryAfter.UTC().Format(time.RFC3339)
}
lastPauseAt := ""
if !st.LastPauseAt.IsZero() {
lastPauseAt = st.LastPauseAt.UTC().Format(time.RFC3339)
}
prev := existingMap[st.ID]
source := prev.Source
if strings.TrimSpace(source) == "" {
source = "memory_todo"
}
built[st.ID] = struct{}{}
items = append(items, TaskItem{
ID: st.ID,
ParentTaskID: prev.ParentTaskID,
Content: st.Content,
Priority: st.Priority,
DueAt: st.DueAt,
Status: status,
BlockReason: st.BlockReason,
RetryAfter: retryAfter,
Source: source,
DedupeHits: st.DedupeHits,
ResourceKeys: append([]string(nil), st.ResourceKeys...),
LastPauseReason: st.LastPauseReason,
LastPauseAt: lastPauseAt,
MemoryRefs: append([]string(nil), prev.MemoryRefs...),
AuditRefs: append([]string(nil), prev.AuditRefs...),
Attempts: append([]TaskAttempt(nil), prev.Attempts...),
UpdatedAt: nowRFC3339(),
})
}
for _, old := range existing {
if old.ID == "" {
continue
}
if _, ok := built[old.ID]; ok {
continue
}
st := strings.ToLower(strings.TrimSpace(old.Status))
if st == "done" || st == "canceled" || st == "paused" {
items = append(items, old)
}
}
_ = e.taskStore.Save(items)
}
func (e *Engine) maybeWriteDailyReportLocked(now time.Time) {
date := now.UTC().Format("2006-01-02")
if e.lastDailyReportDate == date {
return
}
workspace := e.opts.Workspace
if workspace == "" {
return
}
auditPath := filepath.Join(workspace, "memory", "task-audit.jsonl")
f, err := os.Open(auditPath)
if err != nil {
return
}
defer f.Close()
counts := map[string]int{"total": 0, "success": 0, "error": 0, "suppressed": 0, "running": 0}
errorReasons := map[string]int{}
type topTask struct {
TaskID string
Duration int
Status string
}
top := make([]topTask, 0, 32)
s := bufio.NewScanner(f)
for s.Scan() {
line := s.Bytes()
var row map[string]interface{}
if json.Unmarshal(line, &row) != nil {
continue
}
if fmt.Sprintf("%v", row["source"]) != "autonomy" {
continue
}
ts := fmt.Sprintf("%v", row["time"])
if len(ts) < 10 || ts[:10] != date {
continue
}
counts["total"]++
st := fmt.Sprintf("%v", row["status"])
if _, ok := counts[st]; ok {
counts[st]++
}
if st == "error" {
errText := fmt.Sprintf("%v", row["error"])
if errText == "" {
errText = fmt.Sprintf("%v", row["log"])
}
errText = shortTask(strings.ReplaceAll(errText, "\n", " "))
if errText != "" {
errorReasons[errText]++
}
}
dur := 0
switch v := row["duration_ms"].(type) {
case float64:
dur = int(v)
case int:
dur = v
case string:
if n, err := strconv.Atoi(v); err == nil {
dur = n
}
}
top = append(top, topTask{TaskID: fmt.Sprintf("%v", row["task_id"]), Duration: dur, Status: st})
}
if counts["total"] == 0 {
e.lastDailyReportDate = date
return
}
sort.Slice(top, func(i, j int) bool { return top[i].Duration > top[j].Duration })
maxTop := 3
if len(top) < maxTop {
maxTop = len(top)
}
topLines := make([]string, 0, maxTop)
for i := 0; i < maxTop; i++ {
if top[i].TaskID == "" {
continue
}
topLines = append(topLines, fmt.Sprintf("- %s (%dms, %s)", top[i].TaskID, top[i].Duration, top[i].Status))
}
type kv struct {
K string
V int
}
reasons := make([]kv, 0, len(errorReasons))
for k, v := range errorReasons {
reasons = append(reasons, kv{K: k, V: v})
}
sort.Slice(reasons, func(i, j int) bool { return reasons[i].V > reasons[j].V })
maxR := 3
if len(reasons) < maxR {
maxR = len(reasons)
}
reasonLines := make([]string, 0, maxR)
for i := 0; i < maxR; i++ {
reasonLines = append(reasonLines, fmt.Sprintf("- %s (x%d)", reasons[i].K, reasons[i].V))
}
reportLine := fmt.Sprintf("\n## Autonomy Daily Report (%s)\n- total: %d\n- success: %d\n- error: %d\n- suppressed: %d\n- running: %d\n\n### Top Duration Tasks\n%s\n\n### Top Error Reasons\n%s\n", date, counts["total"], counts["success"], counts["error"], counts["suppressed"], counts["running"], strings.Join(topLines, "\n"), strings.Join(reasonLines, "\n"))
dailyPath := filepath.Join(workspace, "memory", date+".md")
_ = os.MkdirAll(filepath.Dir(dailyPath), 0755)
_ = appendUniqueReport(dailyPath, reportLine, date)
e.lastDailyReportDate = date
}
func appendUniqueReport(path, content, date string) error {
existing, _ := os.ReadFile(path)
marker := "Autonomy Daily Report (" + date + ")"
if strings.Contains(string(existing), marker) {
return nil
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString(content)
return err
}
func (e *Engine) maybeCleanupTaskHistoryLocked(now time.Time) {
if e.opts.TaskHistoryRetentionDays <= 0 {
return
}
if !e.lastHistoryCleanupAt.IsZero() && now.Sub(e.lastHistoryCleanupAt) < time.Hour {
return
}
workspace := e.opts.Workspace
if workspace == "" {
return
}
cutoff := now.AddDate(0, 0, -e.opts.TaskHistoryRetentionDays)
// Cleanup task-audit.jsonl by event time
auditPath := filepath.Join(workspace, "memory", "task-audit.jsonl")
if b, err := os.ReadFile(auditPath); err == nil {
lines := strings.Split(string(b), "\n")
kept := make([]string, 0, len(lines))
for _, ln := range lines {
if ln == "" {
continue
}
var row map[string]interface{}
if json.Unmarshal([]byte(ln), &row) != nil {
continue
}
ts := fmt.Sprintf("%v", row["time"])
tm, err := time.Parse(time.RFC3339, ts)
if err != nil || tm.After(cutoff) {
kept = append(kept, ln)
}
}
_ = os.WriteFile(auditPath, []byte(strings.Join(kept, "\n")+"\n"), 0644)
}
// Cleanup tasks.json old terminal states
tasksPath := filepath.Join(workspace, "memory", "tasks.json")
if b, err := os.ReadFile(tasksPath); err == nil {
var items []TaskItem
if json.Unmarshal(b, &items) == nil {
kept := make([]TaskItem, 0, len(items))
for _, it := range items {
st := strings.ToLower(it.Status)
terminal := st == "done" || st == "completed" || st == "suppressed" || st == "error"
ts := strings.TrimSpace(it.UpdatedAt)
if !terminal || ts == "" {
kept = append(kept, it)
continue
}
tm, err := time.Parse(time.RFC3339, ts)
if err != nil || tm.After(cutoff) {
kept = append(kept, it)
}
}
if out, err := json.MarshalIndent(kept, "", " "); err == nil {
_ = os.WriteFile(tasksPath, out, 0644)
}
}
}
e.lastHistoryCleanupAt = now
}
func (e *Engine) detectRunOutcome(taskID string, since time.Time) (string, bool) {
if e.opts.Workspace == "" || taskID == "" {
return "", false
}
path := filepath.Join(e.opts.Workspace, "memory", "task-audit.jsonl")
f, err := os.Open(path)
if err != nil {
return "", false
}
defer f.Close()
sessionKey := "autonomy:" + taskID
latest := ""
latestAt := time.Time{}
s := bufio.NewScanner(f)
for s.Scan() {
var row map[string]interface{}
if json.Unmarshal(s.Bytes(), &row) != nil {
continue
}
if fmt.Sprintf("%v", row["session"]) != sessionKey {
continue
}
st := fmt.Sprintf("%v", row["status"])
if st == "" || st == "running" {
continue
}
ts := fmt.Sprintf("%v", row["time"])
tm, err := time.Parse(time.RFC3339, ts)
if err != nil {
continue
}
if !since.IsZero() && tm.Before(since) {
continue
}
if latestAt.IsZero() || tm.After(latestAt) {
latestAt = tm
latest = st
}
}
if latest == "" {
return "", false
}
return latest, true
}
func (e *Engine) latestErrorSignature(taskID string, since time.Time) string {
if e.opts.Workspace == "" || taskID == "" {
return ""
}
path := filepath.Join(e.opts.Workspace, "memory", "task-audit.jsonl")
f, err := os.Open(path)
if err != nil {
return ""
}
defer f.Close()
sessionKey := "autonomy:" + taskID
latestAt := time.Time{}
latestErr := ""
s := bufio.NewScanner(f)
for s.Scan() {
var row map[string]interface{}
if json.Unmarshal(s.Bytes(), &row) != nil {
continue
}
if fmt.Sprintf("%v", row["session"]) != sessionKey {
continue
}
if fmt.Sprintf("%v", row["status"]) != "error" {
continue
}
ts := fmt.Sprintf("%v", row["time"])
tm, err := time.Parse(time.RFC3339, ts)
if err != nil {
continue
}
if !since.IsZero() && tm.Before(since) {
continue
}
if latestAt.IsZero() || tm.After(latestAt) {
latestAt = tm
latestErr = fmt.Sprintf("%v", row["log"])
}
}
return ekg.NormalizeErrorSignature(latestErr)
}
func parseTodoAttributes(content string) (priority, dueAt, normalized string) {
priority = "normal"
normalized = content
l := strings.ToLower(normalized)
if strings.HasPrefix(l, "[high]") || strings.HasPrefix(l, "p1:") {
priority = "high"
normalized = strings.TrimSpace(normalized[6:])
} else if strings.HasPrefix(l, "[low]") || strings.HasPrefix(l, "p3:") {
priority = "low"
normalized = strings.TrimSpace(normalized[5:])
} else if strings.HasPrefix(l, "[medium]") || strings.HasPrefix(l, "p2:") {
priority = "normal"
if strings.HasPrefix(l, "[medium]") {
normalized = strings.TrimSpace(normalized[8:])
} else {
normalized = strings.TrimSpace(normalized[3:])
}
}
if idx := strings.Index(strings.ToLower(normalized), " due:"); idx > 0 {
dueAt = strings.TrimSpace(normalized[idx+5:])
normalized = strings.TrimSpace(normalized[:idx])
}
if normalized == "" {
normalized = content
}
return priority, dueAt, normalized
}
func priorityWeight(p string) int {
switch strings.ToLower(strings.TrimSpace(p)) {
case "high":
return 3
case "normal", "medium":
return 2
case "low":
return 1
default:
return 2
}
}
func dueWeight(dueAt string) int64 {
dueAt = strings.TrimSpace(dueAt)
if dueAt == "" {
return 0
}
layouts := []string{"2006-01-02", time.RFC3339, time.RFC3339Nano}
for _, layout := range layouts {
if t, err := time.Parse(layout, dueAt); err == nil {
return -t.Unix() // earlier due => bigger score after descending sort
}
}
return 0
}
func (e *Engine) pauseFilePath() string {
if strings.TrimSpace(e.opts.Workspace) == "" {
return ""
}
return filepath.Join(e.opts.Workspace, "memory", "autonomy.pause")
}
func (e *Engine) controlFilePath() string {
if strings.TrimSpace(e.opts.Workspace) == "" {
return ""
}
return filepath.Join(e.opts.Workspace, "memory", "autonomy.control.json")
}
func (e *Engine) hasManualPause() bool {
p := e.pauseFilePath()
if p == "" {
return false
}
_, err := os.Stat(p)
if err == nil {
return true
}
ctrl := e.controlFilePath()
if ctrl == "" {
return false
}
data, rErr := os.ReadFile(ctrl)
if rErr != nil {
return false
}
var c struct {
Enabled bool `json:"enabled"`
}
if jErr := json.Unmarshal(data, &c); jErr != nil {
return false
}
return !c.Enabled
}
func (e *Engine) hasRecentUserActivity(now time.Time) bool {
if e.opts.UserIdleResumeSec <= 0 || strings.TrimSpace(e.opts.Workspace) == "" {
return false
}
sessionsPath := filepath.Join(filepath.Dir(e.opts.Workspace), "agents", "main", "sessions", "sessions.json")
data, err := os.ReadFile(sessionsPath)
if err != nil {
legacy := filepath.Join(filepath.Dir(e.opts.Workspace), "sessions", "sessions.json")
data, err = os.ReadFile(legacy)
if err != nil {
return false
}
}
var index map[string]struct {
Kind string `json:"kind"`
SessionFile string `json:"sessionFile"`
}
if err := json.Unmarshal(data, &index); err != nil {
return false
}
cutoff := now.Add(-time.Duration(e.opts.UserIdleResumeSec) * time.Second)
for _, row := range index {
if strings.ToLower(strings.TrimSpace(row.Kind)) != "main" {
continue
}
if strings.TrimSpace(row.SessionFile) == "" {
continue
}
if ts := latestUserMessageTime(row.SessionFile); !ts.IsZero() && ts.After(cutoff) {
return true
}
}
return false
}
func (e *Engine) allowTaskByPolicy(content string) bool {
if len(e.opts.AllowedTaskKeywords) == 0 {
return true
}
v := strings.ToLower(content)
for _, kw := range e.opts.AllowedTaskKeywords {
if kw == "" {
continue
}
if strings.Contains(v, strings.ToLower(kw)) {
return true
}
}
return false
}
func latestUserMessageTime(path string) time.Time {
f, err := os.Open(path)
if err != nil {
return time.Time{}
}
defer f.Close()
var latest time.Time
s := bufio.NewScanner(f)
for s.Scan() {
line := s.Bytes()
// OpenClaw-like event line
var ev struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
Message *struct {
Role string `json:"role"`
} `json:"message"`
}
if err := json.Unmarshal(line, &ev); err == nil && ev.Message != nil {
if strings.ToLower(strings.TrimSpace(ev.Message.Role)) == "user" {
if t, err := time.Parse(time.RFC3339Nano, strings.TrimSpace(ev.Timestamp)); err == nil && t.After(latest) {
latest = t
} else if t, err := time.Parse(time.RFC3339, strings.TrimSpace(ev.Timestamp)); err == nil && t.After(latest) {
latest = t
}
}
continue
}
// Legacy line
var msg struct {
Role string `json:"role"`
}
if err := json.Unmarshal(line, &msg); err == nil {
if strings.ToLower(strings.TrimSpace(msg.Role)) == "user" {
latest = time.Now().UTC()
}
}
}
return latest
}
func blockedRetryBackoff(stalls int, minRunIntervalSec int) time.Duration {
if minRunIntervalSec <= 0 {
minRunIntervalSec = 20
}
if stalls < 1 {
stalls = 1
}
base := time.Duration(minRunIntervalSec) * time.Second
factor := 1 << _min(stalls, 5)
return time.Duration(factor) * base
}
func _min(a, b int) int {
if a < b {
return a
}
return b
}
func shortTask(s string) string {
s = strings.TrimSpace(s)
if len(s) <= 32 {
return s
}
return s[:32] + "..."
}
func hashID(s string) string {
sum := sha1.Sum([]byte(strings.ToLower(strings.TrimSpace(s))))
return hex.EncodeToString(sum[:])[:12]
}
func RunOnce(ctx context.Context, engine *Engine) {
if engine == nil {
return
}
select {
case <-ctx.Done():
return
default:
engine.tick()
}
}