Files
clawgo/pkg/autonomy/engine.go

1247 lines
32 KiB
Go

package autonomy
import (
"bufio"
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"clawgo/pkg/bus"
"clawgo/pkg/lifecycle"
)
type Options struct {
Enabled bool
TickIntervalSec int
MinRunIntervalSec int
MaxPendingDurationSec int
MaxConsecutiveStalls int
MaxDispatchPerTick int
Workspace string
DefaultNotifyChannel string
DefaultNotifyChatID string
NotifyAllowFrom []string
NotifyCooldownSec int
NotifySameReasonCooldownSec int
QuietHours string
UserIdleResumeSec int
WaitingResumeDebounceSec int
MaxRoundsWithoutUser int
TaskHistoryRetentionDays int
AllowedTaskKeywords []string
ImportantKeywords []string
CompletionTemplate string
BlockedTemplate string
}
type taskState struct {
ID string
Content string
Priority string
DueAt string
Status string // idle|running|waiting|blocked|completed
BlockReason string
WaitingSince time.Time
LastRunAt time.Time
LastAutonomyAt time.Time
RetryAfter time.Time
ConsecutiveStall int
DedupeHits int
ResourceKeys []string
WaitAttempts int
LastPauseReason string
LastPauseAt time.Time
}
type Engine struct {
opts Options
bus *bus.MessageBus
runner *lifecycle.LoopRunner
taskStore *TaskStore
mu sync.Mutex
state map[string]*taskState
lastNotify map[string]time.Time
lockOwners map[string]string
roundsWithoutUser int
lastDailyReportDate string
lastHistoryCleanupAt time.Time
}
func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
if opts.TickIntervalSec <= 0 {
opts.TickIntervalSec = 30
}
if opts.MinRunIntervalSec <= 0 {
opts.MinRunIntervalSec = 20
}
if opts.MaxPendingDurationSec <= 0 {
opts.MaxPendingDurationSec = 180
}
if opts.MaxConsecutiveStalls <= 0 {
opts.MaxConsecutiveStalls = 3
}
if opts.MaxDispatchPerTick <= 0 {
opts.MaxDispatchPerTick = 2
}
if opts.NotifyCooldownSec <= 0 {
opts.NotifyCooldownSec = 300
}
if opts.UserIdleResumeSec <= 0 {
opts.UserIdleResumeSec = 20
}
if opts.NotifySameReasonCooldownSec <= 0 {
opts.NotifySameReasonCooldownSec = 900
}
if opts.WaitingResumeDebounceSec <= 0 {
opts.WaitingResumeDebounceSec = 5
}
if opts.MaxRoundsWithoutUser < 0 {
opts.MaxRoundsWithoutUser = 0
}
if opts.TaskHistoryRetentionDays <= 0 {
opts.TaskHistoryRetentionDays = 3
}
return &Engine{
opts: opts,
bus: msgBus,
runner: lifecycle.NewLoopRunner(),
taskStore: NewTaskStore(opts.Workspace),
state: map[string]*taskState{},
lastNotify: map[string]time.Time{},
lockOwners: map[string]string{},
}
}
func (e *Engine) Start() {
if !e.opts.Enabled {
return
}
e.runner.Start(e.runLoop)
}
func (e *Engine) Stop() {
e.runner.Stop()
}
func (e *Engine) runLoop(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Duration(e.opts.TickIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
e.tick()
}
}
}
func (e *Engine) tick() {
todos := e.scanTodos()
now := time.Now()
stored, _ := e.taskStore.Load()
e.mu.Lock()
if e.hasManualPause() {
for _, st := range e.state {
if st.Status == "running" {
e.releaseLocksLocked(st.ID)
st.Status = "waiting"
st.BlockReason = "manual_pause"
st.WaitingSince = now
st.LastPauseReason = "manual_pause"
st.LastPauseAt = now
e.writeReflectLog("waiting", st, "paused by manual switch")
e.writeTriggerAudit("waiting", st, "manual_pause")
}
}
e.roundsWithoutUser = 0
e.persistStateLocked()
e.mu.Unlock()
return
}
if e.hasRecentUserActivity(now) {
for _, st := range e.state {
if st.Status == "running" {
e.releaseLocksLocked(st.ID)
st.Status = "waiting"
st.BlockReason = "active_user"
st.WaitingSince = now
st.LastPauseReason = "active_user"
st.LastPauseAt = now
e.writeReflectLog("waiting", st, "paused due to active user conversation")
e.writeTriggerAudit("waiting", st, "active_user")
}
}
e.persistStateLocked()
e.mu.Unlock()
return
}
e.mu.Unlock()
e.mu.Lock()
defer e.mu.Unlock()
storedMap := map[string]TaskItem{}
for _, it := range stored {
storedMap[it.ID] = it
}
known := map[string]struct{}{}
for _, t := range todos {
known[t.ID] = struct{}{}
}
// Merge structured tasks.json entries as todo source too (for WebUI CRUD-created tasks).
for _, old := range stored {
if old.ID == "" {
continue
}
if _, ok := known[old.ID]; ok {
continue
}
st := strings.ToLower(old.Status)
if st == "done" || st == "completed" {
continue
}
todos = append(todos, todoItem{ID: old.ID, Content: old.Content, Priority: old.Priority, DueAt: old.DueAt})
known[old.ID] = struct{}{}
}
for _, t := range todos {
st, ok := e.state[t.ID]
if !ok {
status := "idle"
retryAfter := time.Time{}
resourceKeys := deriveResourceKeys(t.Content)
lastPauseReason := ""
lastPauseAt := time.Time{}
if old, ok := storedMap[t.ID]; ok {
if old.Status == "blocked" {
status = "blocked"
}
if strings.TrimSpace(old.RetryAfter) != "" {
if rt, err := time.Parse(time.RFC3339, old.RetryAfter); err == nil {
retryAfter = rt
}
}
if len(old.ResourceKeys) > 0 {
resourceKeys = append([]string(nil), old.ResourceKeys...)
}
lastPauseReason = old.LastPauseReason
if old.LastPauseAt != "" {
if pt, err := time.Parse(time.RFC3339, old.LastPauseAt); err == nil {
lastPauseAt = pt
}
}
}
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: resourceKeys, LastPauseReason: lastPauseReason, LastPauseAt: lastPauseAt}
continue
}
st.Content = t.Content
st.Priority = t.Priority
st.DueAt = t.DueAt
st.DedupeHits = t.DedupeHits
st.ResourceKeys = deriveResourceKeys(t.Content)
if st.Status == "completed" {
st.Status = "idle"
}
}
// completed when removed from todo source
for id, st := range e.state {
if _, ok := known[id]; !ok {
if st.Status != "completed" {
e.releaseLocksLocked(st.ID)
st.Status = "completed"
e.sendCompletionNotification(st)
}
}
}
ordered := make([]*taskState, 0, len(e.state))
for _, st := range e.state {
ordered = append(ordered, st)
}
sort.Slice(ordered, func(i, j int) bool {
si := schedulingScore(ordered[i], now)
sj := schedulingScore(ordered[j], now)
if si != sj {
return si > sj
}
return ordered[i].ID < ordered[j].ID
})
dispatched := 0
for _, st := range ordered {
if dispatched >= e.opts.MaxDispatchPerTick {
break
}
if st.Status == "completed" {
continue
}
if st.Status == "waiting" {
if st.BlockReason == "resource_lock" && !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
continue
}
// Debounce waiting/resume flapping
if !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) < time.Duration(e.opts.WaitingResumeDebounceSec)*time.Second {
continue
}
reason := st.BlockReason
st.Status = "idle"
st.BlockReason = ""
st.WaitingSince = time.Time{}
pausedFor := 0
if !st.LastPauseAt.IsZero() {
pausedFor = int(now.Sub(st.LastPauseAt).Seconds())
}
e.writeReflectLog("resume", st, fmt.Sprintf("autonomy resumed from waiting (reason=%s paused_for=%ds)", reason, pausedFor))
e.writeTriggerAudit("resume", st, reason)
}
if st.Status == "blocked" {
e.releaseLocksLocked(st.ID)
if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
continue
}
if now.Sub(st.LastRunAt) >= blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec) {
st.Status = "idle"
st.BlockReason = ""
} else {
continue
}
}
if !st.LastRunAt.IsZero() && now.Sub(st.LastRunAt) < time.Duration(e.opts.MinRunIntervalSec)*time.Second {
continue
}
if !e.allowTaskByPolicy(st.Content) {
continue
}
if st.Status == "running" && now.Sub(st.LastRunAt) > time.Duration(e.opts.MaxPendingDurationSec)*time.Second {
st.ConsecutiveStall++
if st.ConsecutiveStall > e.opts.MaxConsecutiveStalls {
st.Status = "blocked"
st.BlockReason = "max_consecutive_stalls"
st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec))
e.sendFailureNotification(st, "max consecutive stalls reached")
continue
}
}
if e.opts.MaxRoundsWithoutUser > 0 && e.roundsWithoutUser >= e.opts.MaxRoundsWithoutUser {
st.Status = "waiting"
st.BlockReason = "idle_round_budget"
st.WaitingSince = now
e.writeReflectLog("waiting", st, fmt.Sprintf("paused by idle round budget (%d)", e.opts.MaxRoundsWithoutUser))
e.writeTriggerAudit("waiting", st, "idle_round_budget")
continue
}
if !e.tryAcquireLocksLocked(st) {
st.Status = "waiting"
st.BlockReason = "resource_lock"
st.WaitingSince = now
st.RetryAfter = now.Add(30 * time.Second)
st.WaitAttempts++
e.writeTriggerAudit("waiting", st, "resource_lock")
continue
}
e.dispatchTask(st)
st.Status = "running"
st.WaitAttempts = 0
st.BlockReason = ""
st.WaitingSince = time.Time{}
st.LastPauseReason = ""
st.LastRunAt = now
st.LastAutonomyAt = now
e.writeReflectLog("dispatch", st, "task dispatched to agent loop")
e.writeTriggerAudit("dispatch", st, "")
e.roundsWithoutUser++
dispatched++
}
e.persistStateLocked()
e.maybeWriteDailyReportLocked(now)
e.maybeCleanupTaskHistoryLocked(now)
}
func (e *Engine) tryAcquireLocksLocked(st *taskState) bool {
if st == nil {
return false
}
keys := st.ResourceKeys
if len(keys) == 0 {
keys = []string{"task:" + st.ID}
}
for _, k := range keys {
if owner, ok := e.lockOwners[k]; ok && owner != "" && owner != st.ID {
return false
}
}
for _, k := range keys {
e.lockOwners[k] = st.ID
}
return true
}
func (e *Engine) releaseLocksLocked(taskID string) {
if strings.TrimSpace(taskID) == "" {
return
}
for k, v := range e.lockOwners {
if v == taskID {
delete(e.lockOwners, k)
}
}
}
func schedulingScore(st *taskState, now time.Time) int {
if st == nil {
return 0
}
score := priorityWeight(st.Priority)*100 + int(dueWeight(st.DueAt))*10
if st.Status == "waiting" && st.BlockReason == "resource_lock" && !st.WaitingSince.IsZero() {
waitSec := int(now.Sub(st.WaitingSince).Seconds())
if waitSec > 0 {
score += waitSec / 10
}
score += st.WaitAttempts * 5
}
return score
}
func deriveResourceKeys(content string) []string {
raw := content
if raw == "" {
return nil
}
if explicit := parseExplicitResourceKeys(raw); len(explicit) > 0 {
return explicit
}
content = strings.ToLower(raw)
keys := make([]string, 0, 8)
hasRepo := false
for _, token := range strings.Fields(content) {
t := strings.Trim(token, "`'\"()[]{}:;,,。!?")
if strings.Contains(t, "gitea.") || strings.Contains(t, "github.com") || strings.Count(t, "/") >= 1 {
if strings.Contains(t, "github.com/") || strings.Contains(t, "gitea.") {
keys = append(keys, "repo:"+t)
hasRepo = true
}
}
if strings.Contains(t, "/") || strings.HasSuffix(t, ".go") || strings.HasSuffix(t, ".md") || strings.HasSuffix(t, ".json") || strings.HasSuffix(t, ".yaml") || strings.HasSuffix(t, ".yml") {
keys = append(keys, "file:"+t)
}
if t == "main" || strings.HasPrefix(t, "branch:") {
keys = append(keys, "branch:"+strings.TrimPrefix(t, "branch:"))
}
}
if !hasRepo {
keys = append(keys, "repo:default")
}
if len(keys) == 0 {
keys = append(keys, "scope:general")
}
return normalizeResourceKeys(keys)
}
func parseExplicitResourceKeys(content string) []string {
lower := strings.ToLower(content)
start := strings.Index(lower, "[keys:")
if start < 0 {
return nil
}
rest := content[start+6:]
end := strings.Index(rest, "]")
if end < 0 {
return nil
}
body := rest[:end]
if body == "" {
return nil
}
parts := strings.Split(body, ",")
keys := make([]string, 0, len(parts))
for _, p := range parts {
k := strings.ToLower(strings.TrimSpace(p))
if k == "" {
continue
}
if !strings.Contains(k, ":") {
k = "file:" + k
}
keys = append(keys, k)
}
return normalizeResourceKeys(keys)
}
func normalizeResourceKeys(keys []string) []string {
if len(keys) == 0 {
return nil
}
sort.Strings(keys)
uniq := keys[:0]
for _, k := range keys {
k = strings.TrimSpace(strings.ToLower(k))
if k == "" {
continue
}
if len(uniq) == 0 || k != uniq[len(uniq)-1] {
uniq = append(uniq, k)
}
}
return append([]string(nil), uniq...)
}
type todoItem struct {
ID string
Content string
Priority string
DueAt string
DedupeHits int
}
func (e *Engine) scanTodos() []todoItem {
if strings.TrimSpace(e.opts.Workspace) == "" {
return nil
}
merged := map[string]todoItem{}
merge := func(it todoItem) {
if strings.TrimSpace(it.ID) == "" || strings.TrimSpace(it.Content) == "" {
return
}
if cur, ok := merged[it.ID]; ok {
if priorityWeight(it.Priority) > priorityWeight(cur.Priority) {
cur.Priority = it.Priority
}
if cur.DueAt == "" && it.DueAt != "" {
cur.DueAt = it.DueAt
}
cur.DedupeHits++
merged[it.ID] = cur
return
}
merged[it.ID] = it
}
// 1) Parse markdown todos from MEMORY + today's daily memory.
paths := []string{
filepath.Join(e.opts.Workspace, "MEMORY.md"),
filepath.Join(e.opts.Workspace, "memory", time.Now().Format("2006-01-02")+".md"),
}
for _, p := range paths {
data, err := os.ReadFile(p)
if err != nil {
continue
}
for _, line := range strings.Split(string(data), "\n") {
t := strings.TrimSpace(line)
if strings.HasPrefix(t, "- [ ]") {
content := strings.TrimPrefix(t, "- [ ]")
priority, dueAt, normalized := parseTodoAttributes(content)
merge(todoItem{ID: hashID(normalized), Content: normalized, Priority: priority, DueAt: dueAt})
continue
}
if strings.HasPrefix(strings.ToLower(t), "todo:") {
content := t[5:]
priority, dueAt, normalized := parseTodoAttributes(content)
merge(todoItem{ID: hashID(normalized), Content: normalized, Priority: priority, DueAt: dueAt})
}
}
}
// 2) Merge structured tasks.json items (manual injections / prior state).
if items, err := e.taskStore.Load(); err == nil {
for _, it := range items {
status := strings.ToLower(strings.TrimSpace(it.Status))
if status == "done" {
continue
}
content := it.Content
if content == "" {
continue
}
id := strings.TrimSpace(it.ID)
if id == "" {
id = hashID(content)
}
priority := strings.TrimSpace(it.Priority)
if priority == "" {
priority = "normal"
}
merge(todoItem{ID: id, Content: content, Priority: priority, DueAt: it.DueAt})
}
}
out := make([]todoItem, 0, len(merged))
for _, it := range merged {
out = append(out, it)
}
return out
}
func (e *Engine) dispatchTask(st *taskState) {
content := fmt.Sprintf("Autonomy task (Plan -> Act -> Reflect):\n- Goal: %s\n- Requirements: concise progress update\n- If blocked, explain blocker and next retry hint", st.Content)
e.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: "autonomy",
ChatID: "internal:autonomy",
Content: content,
SessionKey: "autonomy:" + st.ID,
Metadata: map[string]string{
"trigger": "autonomy",
"task_id": st.ID,
"source": "memory_todo",
},
})
}
func (e *Engine) sendCompletionNotification(st *taskState) {
e.writeReflectLog("complete", st, "task marked completed")
e.writeTriggerAudit("complete", st, "")
if !e.isHighValueCompletion(st) {
return
}
if !e.shouldNotify("done:"+st.ID, "") {
return
}
tpl := strings.TrimSpace(e.opts.CompletionTemplate)
if tpl == "" {
tpl = "✅ Completed: %s\nNext step: reply \"continue %s\" if you want me to proceed."
}
e.bus.PublishOutbound(bus.OutboundMessage{
Channel: e.opts.DefaultNotifyChannel,
ChatID: e.opts.DefaultNotifyChatID,
Content: fmt.Sprintf(tpl, shortTask(st.Content), shortTask(st.Content)),
})
}
func (e *Engine) sendFailureNotification(st *taskState, reason string) {
e.writeReflectLog("blocked", st, reason)
e.writeTriggerAudit("blocked", st, reason)
if !e.shouldNotify("blocked:"+st.ID, reason) {
return
}
tpl := strings.TrimSpace(e.opts.BlockedTemplate)
if tpl == "" {
tpl = "⚠️ Task blocked: %s\nReason: %s\nSuggestion: reply \"continue %s\" and I will retry from current state."
}
e.bus.PublishOutbound(bus.OutboundMessage{
Channel: e.opts.DefaultNotifyChannel,
ChatID: e.opts.DefaultNotifyChatID,
Content: fmt.Sprintf(tpl, shortTask(st.Content), reason, shortTask(st.Content)),
})
}
func (e *Engine) shouldNotify(key string, reason string) bool {
if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" {
return false
}
if len(e.opts.NotifyAllowFrom) > 0 {
allowed := false
for _, c := range e.opts.NotifyAllowFrom {
if c == e.opts.DefaultNotifyChatID {
allowed = true
break
}
}
if !allowed {
return false
}
}
now := time.Now()
if inQuietHours(now, e.opts.QuietHours) {
return false
}
if last, ok := e.lastNotify[key]; ok {
if now.Sub(last) < time.Duration(e.opts.NotifyCooldownSec)*time.Second {
return false
}
}
r := strings.ToLower(strings.TrimSpace(reason))
if r != "" {
rk := key + ":reason:" + strings.ReplaceAll(r, " ", "_")
if last, ok := e.lastNotify[rk]; ok {
if now.Sub(last) < time.Duration(e.opts.NotifySameReasonCooldownSec)*time.Second {
return false
}
}
e.lastNotify[rk] = now
}
e.lastNotify[key] = now
return true
}
func (e *Engine) writeTriggerAudit(action string, st *taskState, errText string) {
if strings.TrimSpace(e.opts.Workspace) == "" || st == nil {
return
}
memDir := filepath.Join(e.opts.Workspace, "memory")
path := filepath.Join(memDir, "trigger-audit.jsonl")
_ = os.MkdirAll(filepath.Dir(path), 0755)
row := map[string]interface{}{
"time": time.Now().UTC().Format(time.RFC3339),
"trigger": "autonomy",
"action": action,
"session": "autonomy:" + st.ID,
}
if errText != "" {
row["error"] = errText
}
if b, err := json.Marshal(row); err == nil {
f, oErr := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if oErr == nil {
_, _ = f.Write(append(b, '\n'))
_ = f.Close()
}
}
statsPath := filepath.Join(memDir, "trigger-stats.json")
stats := struct {
UpdatedAt string `json:"updated_at"`
Counts map[string]int `json:"counts"`
}{Counts: map[string]int{}}
if raw, rErr := os.ReadFile(statsPath); rErr == nil {
_ = json.Unmarshal(raw, &stats)
if stats.Counts == nil {
stats.Counts = map[string]int{}
}
}
stats.Counts["autonomy"]++
act := strings.ToLower(strings.TrimSpace(action))
if act != "" {
stats.Counts["autonomy:"+act]++
reason := strings.ToLower(errText)
if reason != "" {
reason = strings.ReplaceAll(reason, " ", "_")
reason = strings.ReplaceAll(reason, ":", "_")
stats.Counts["autonomy:"+act+":"+reason]++
}
}
stats.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if raw, mErr := json.MarshalIndent(stats, "", " "); mErr == nil {
_ = os.WriteFile(statsPath, raw, 0644)
}
}
func (e *Engine) writeReflectLog(stage string, st *taskState, outcome string) {
if strings.TrimSpace(e.opts.Workspace) == "" || st == nil {
return
}
memDir := filepath.Join(e.opts.Workspace, "memory")
_ = os.MkdirAll(memDir, 0755)
path := filepath.Join(memDir, time.Now().Format("2006-01-02")+".md")
line := fmt.Sprintf("- [%s] [autonomy][%s] task=%s status=%s outcome=%s\n", time.Now().Format("15:04"), stage, st.Content, st.Status, outcome)
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
_, _ = f.WriteString(line)
}
func inQuietHours(now time.Time, spec string) bool {
spec = strings.TrimSpace(spec)
if spec == "" {
return false
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
return false
}
parseHM := func(v string) (int, bool) {
hm := strings.Split(strings.TrimSpace(v), ":")
if len(hm) != 2 {
return 0, false
}
h, err1 := strconv.Atoi(hm[0])
m, err2 := strconv.Atoi(hm[1])
if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 {
return 0, false
}
return h*60 + m, true
}
start, ok1 := parseHM(parts[0])
end, ok2 := parseHM(parts[1])
if !ok1 || !ok2 {
return false
}
nowMin := now.Hour()*60 + now.Minute()
if start <= end {
return nowMin >= start && nowMin <= end
}
return nowMin >= start || nowMin <= end
}
func (e *Engine) persistStateLocked() {
items := make([]TaskItem, 0, len(e.state))
for _, st := range e.state {
status := "todo"
switch st.Status {
case "running":
status = "doing"
case "waiting":
status = "waiting"
case "blocked":
status = "blocked"
case "completed":
status = "done"
default:
status = "todo"
}
retryAfter := ""
if !st.RetryAfter.IsZero() {
retryAfter = st.RetryAfter.UTC().Format(time.RFC3339)
}
lastPauseAt := ""
if !st.LastPauseAt.IsZero() {
lastPauseAt = st.LastPauseAt.UTC().Format(time.RFC3339)
}
items = append(items, TaskItem{
ID: st.ID,
Content: st.Content,
Priority: st.Priority,
DueAt: st.DueAt,
Status: status,
BlockReason: st.BlockReason,
RetryAfter: retryAfter,
Source: "memory_todo",
DedupeHits: st.DedupeHits,
ResourceKeys: append([]string(nil), st.ResourceKeys...),
LastPauseReason: st.LastPauseReason,
LastPauseAt: lastPauseAt,
UpdatedAt: nowRFC3339(),
})
}
_ = e.taskStore.Save(items)
}
func (e *Engine) maybeWriteDailyReportLocked(now time.Time) {
date := now.UTC().Format("2006-01-02")
if e.lastDailyReportDate == date {
return
}
workspace := e.opts.Workspace
if workspace == "" {
return
}
auditPath := filepath.Join(workspace, "memory", "task-audit.jsonl")
f, err := os.Open(auditPath)
if err != nil {
return
}
defer f.Close()
counts := map[string]int{"total": 0, "success": 0, "error": 0, "suppressed": 0, "running": 0}
errorReasons := map[string]int{}
type topTask struct { TaskID string; Duration int; Status string }
top := make([]topTask, 0, 32)
s := bufio.NewScanner(f)
for s.Scan() {
line := s.Bytes()
var row map[string]interface{}
if json.Unmarshal(line, &row) != nil {
continue
}
if fmt.Sprintf("%v", row["source"]) != "autonomy" {
continue
}
ts := fmt.Sprintf("%v", row["time"])
if len(ts) < 10 || ts[:10] != date {
continue
}
counts["total"]++
st := fmt.Sprintf("%v", row["status"])
if _, ok := counts[st]; ok {
counts[st]++
}
if st == "error" {
errText := fmt.Sprintf("%v", row["error"])
if errText == "" {
errText = fmt.Sprintf("%v", row["log"])
}
errText = shortTask(strings.ReplaceAll(errText, "\n", " "))
if errText != "" {
errorReasons[errText]++
}
}
dur := 0
switch v := row["duration_ms"].(type) {
case float64:
dur = int(v)
case int:
dur = v
case string:
if n, err := strconv.Atoi(v); err == nil { dur = n }
}
top = append(top, topTask{TaskID: fmt.Sprintf("%v", row["task_id"]), Duration: dur, Status: st})
}
if counts["total"] == 0 {
e.lastDailyReportDate = date
return
}
sort.Slice(top, func(i, j int) bool { return top[i].Duration > top[j].Duration })
maxTop := 3
if len(top) < maxTop { maxTop = len(top) }
topLines := make([]string, 0, maxTop)
for i := 0; i < maxTop; i++ {
if top[i].TaskID == "" { continue }
topLines = append(topLines, fmt.Sprintf("- %s (%dms, %s)", top[i].TaskID, top[i].Duration, top[i].Status))
}
type kv struct { K string; V int }
reasons := make([]kv, 0, len(errorReasons))
for k, v := range errorReasons { reasons = append(reasons, kv{K:k, V:v}) }
sort.Slice(reasons, func(i, j int) bool { return reasons[i].V > reasons[j].V })
maxR := 3
if len(reasons) < maxR { maxR = len(reasons) }
reasonLines := make([]string, 0, maxR)
for i := 0; i < maxR; i++ {
reasonLines = append(reasonLines, fmt.Sprintf("- %s (x%d)", reasons[i].K, reasons[i].V))
}
reportLine := fmt.Sprintf("\n## Autonomy Daily Report (%s)\n- total: %d\n- success: %d\n- error: %d\n- suppressed: %d\n- running: %d\n\n### Top Duration Tasks\n%s\n\n### Top Error Reasons\n%s\n", date, counts["total"], counts["success"], counts["error"], counts["suppressed"], counts["running"], strings.Join(topLines, "\n"), strings.Join(reasonLines, "\n"))
dailyPath := filepath.Join(workspace, "memory", date+".md")
_ = os.MkdirAll(filepath.Dir(dailyPath), 0755)
_ = appendUniqueReport(dailyPath, reportLine, date)
e.lastDailyReportDate = date
}
func appendUniqueReport(path, content, date string) error {
existing, _ := os.ReadFile(path)
marker := "Autonomy Daily Report (" + date + ")"
if strings.Contains(string(existing), marker) {
return nil
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString(content)
return err
}
func (e *Engine) maybeCleanupTaskHistoryLocked(now time.Time) {
if e.opts.TaskHistoryRetentionDays <= 0 {
return
}
if !e.lastHistoryCleanupAt.IsZero() && now.Sub(e.lastHistoryCleanupAt) < time.Hour {
return
}
workspace := e.opts.Workspace
if workspace == "" {
return
}
cutoff := now.AddDate(0, 0, -e.opts.TaskHistoryRetentionDays)
// Cleanup task-audit.jsonl by event time
auditPath := filepath.Join(workspace, "memory", "task-audit.jsonl")
if b, err := os.ReadFile(auditPath); err == nil {
lines := strings.Split(string(b), "\n")
kept := make([]string, 0, len(lines))
for _, ln := range lines {
if ln == "" {
continue
}
var row map[string]interface{}
if json.Unmarshal([]byte(ln), &row) != nil {
continue
}
ts := fmt.Sprintf("%v", row["time"])
tm, err := time.Parse(time.RFC3339, ts)
if err != nil || tm.After(cutoff) {
kept = append(kept, ln)
}
}
_ = os.WriteFile(auditPath, []byte(strings.Join(kept, "\n")+"\n"), 0644)
}
// Cleanup tasks.json old terminal states
tasksPath := filepath.Join(workspace, "memory", "tasks.json")
if b, err := os.ReadFile(tasksPath); err == nil {
var items []TaskItem
if json.Unmarshal(b, &items) == nil {
kept := make([]TaskItem, 0, len(items))
for _, it := range items {
st := strings.ToLower(it.Status)
terminal := st == "done" || st == "completed" || st == "suppressed" || st == "error"
ts := strings.TrimSpace(it.UpdatedAt)
if !terminal || ts == "" {
kept = append(kept, it)
continue
}
tm, err := time.Parse(time.RFC3339, ts)
if err != nil || tm.After(cutoff) {
kept = append(kept, it)
}
}
if out, err := json.MarshalIndent(kept, "", " "); err == nil {
_ = os.WriteFile(tasksPath, out, 0644)
}
}
}
e.lastHistoryCleanupAt = now
}
func parseTodoAttributes(content string) (priority, dueAt, normalized string) {
priority = "normal"
normalized = content
l := strings.ToLower(normalized)
if strings.HasPrefix(l, "[high]") || strings.HasPrefix(l, "p1:") {
priority = "high"
normalized = strings.TrimSpace(normalized[6:])
} else if strings.HasPrefix(l, "[low]") || strings.HasPrefix(l, "p3:") {
priority = "low"
normalized = strings.TrimSpace(normalized[5:])
} else if strings.HasPrefix(l, "[medium]") || strings.HasPrefix(l, "p2:") {
priority = "normal"
if strings.HasPrefix(l, "[medium]") {
normalized = strings.TrimSpace(normalized[8:])
} else {
normalized = strings.TrimSpace(normalized[3:])
}
}
if idx := strings.Index(strings.ToLower(normalized), " due:"); idx > 0 {
dueAt = strings.TrimSpace(normalized[idx+5:])
normalized = strings.TrimSpace(normalized[:idx])
}
if normalized == "" {
normalized = content
}
return priority, dueAt, normalized
}
func priorityWeight(p string) int {
switch strings.ToLower(strings.TrimSpace(p)) {
case "high":
return 3
case "normal", "medium":
return 2
case "low":
return 1
default:
return 2
}
}
func dueWeight(dueAt string) int64 {
dueAt = strings.TrimSpace(dueAt)
if dueAt == "" {
return 0
}
layouts := []string{"2006-01-02", time.RFC3339, time.RFC3339Nano}
for _, layout := range layouts {
if t, err := time.Parse(layout, dueAt); err == nil {
return -t.Unix() // earlier due => bigger score after descending sort
}
}
return 0
}
func (e *Engine) pauseFilePath() string {
if strings.TrimSpace(e.opts.Workspace) == "" {
return ""
}
return filepath.Join(e.opts.Workspace, "memory", "autonomy.pause")
}
func (e *Engine) controlFilePath() string {
if strings.TrimSpace(e.opts.Workspace) == "" {
return ""
}
return filepath.Join(e.opts.Workspace, "memory", "autonomy.control.json")
}
func (e *Engine) hasManualPause() bool {
p := e.pauseFilePath()
if p == "" {
return false
}
_, err := os.Stat(p)
if err == nil {
return true
}
ctrl := e.controlFilePath()
if ctrl == "" {
return false
}
data, rErr := os.ReadFile(ctrl)
if rErr != nil {
return false
}
var c struct {
Enabled bool `json:"enabled"`
}
if jErr := json.Unmarshal(data, &c); jErr != nil {
return false
}
return !c.Enabled
}
func (e *Engine) hasRecentUserActivity(now time.Time) bool {
if e.opts.UserIdleResumeSec <= 0 || strings.TrimSpace(e.opts.Workspace) == "" {
return false
}
sessionsPath := filepath.Join(filepath.Dir(e.opts.Workspace), "agents", "main", "sessions", "sessions.json")
data, err := os.ReadFile(sessionsPath)
if err != nil {
legacy := filepath.Join(filepath.Dir(e.opts.Workspace), "sessions", "sessions.json")
data, err = os.ReadFile(legacy)
if err != nil {
return false
}
}
var index map[string]struct {
Kind string `json:"kind"`
SessionFile string `json:"sessionFile"`
}
if err := json.Unmarshal(data, &index); err != nil {
return false
}
cutoff := now.Add(-time.Duration(e.opts.UserIdleResumeSec) * time.Second)
for _, row := range index {
if strings.ToLower(strings.TrimSpace(row.Kind)) != "main" {
continue
}
if strings.TrimSpace(row.SessionFile) == "" {
continue
}
if ts := latestUserMessageTime(row.SessionFile); !ts.IsZero() && ts.After(cutoff) {
return true
}
}
return false
}
func (e *Engine) allowTaskByPolicy(content string) bool {
if len(e.opts.AllowedTaskKeywords) == 0 {
return true
}
v := strings.ToLower(content)
for _, kw := range e.opts.AllowedTaskKeywords {
if kw == "" {
continue
}
if strings.Contains(v, strings.ToLower(kw)) {
return true
}
}
return false
}
func latestUserMessageTime(path string) time.Time {
f, err := os.Open(path)
if err != nil {
return time.Time{}
}
defer f.Close()
var latest time.Time
s := bufio.NewScanner(f)
for s.Scan() {
line := s.Bytes()
// OpenClaw-like event line
var ev struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
Message *struct {
Role string `json:"role"`
} `json:"message"`
}
if err := json.Unmarshal(line, &ev); err == nil && ev.Message != nil {
if strings.ToLower(strings.TrimSpace(ev.Message.Role)) == "user" {
if t, err := time.Parse(time.RFC3339Nano, strings.TrimSpace(ev.Timestamp)); err == nil && t.After(latest) {
latest = t
} else if t, err := time.Parse(time.RFC3339, strings.TrimSpace(ev.Timestamp)); err == nil && t.After(latest) {
latest = t
}
}
continue
}
// Legacy line
var msg struct {
Role string `json:"role"`
}
if err := json.Unmarshal(line, &msg); err == nil {
if strings.ToLower(strings.TrimSpace(msg.Role)) == "user" {
latest = time.Now().UTC()
}
}
}
return latest
}
func blockedRetryBackoff(stalls int, minRunIntervalSec int) time.Duration {
if minRunIntervalSec <= 0 {
minRunIntervalSec = 20
}
if stalls < 1 {
stalls = 1
}
base := time.Duration(minRunIntervalSec) * time.Second
factor := 1 << min(stalls, 5)
return time.Duration(factor) * base
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func (e *Engine) isHighValueCompletion(st *taskState) bool {
if st == nil {
return false
}
if priorityWeight(st.Priority) >= 3 {
return true
}
if strings.TrimSpace(st.DueAt) != "" {
return true
}
s := strings.ToLower(st.Content)
keywords := e.opts.ImportantKeywords
if len(keywords) == 0 {
keywords = []string{"urgent", "payment", "release", "deadline", "p0", "asap"}
}
for _, k := range keywords {
kk := strings.ToLower(strings.TrimSpace(k))
if kk != "" && strings.Contains(s, kk) {
return true
}
}
return false
}
func shortTask(s string) string {
s = strings.TrimSpace(s)
if len(s) <= 32 {
return s
}
return s[:32] + "..."
}
func hashID(s string) string {
sum := sha1.Sum([]byte(strings.ToLower(strings.TrimSpace(s))))
return hex.EncodeToString(sum[:])[:12]
}
func RunOnce(ctx context.Context, engine *Engine) {
if engine == nil {
return
}
select {
case <-ctx.Done():
return
default:
engine.tick()
}
}