mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 20:47:49 +08:00
1166 lines
30 KiB
Go
1166 lines
30 KiB
Go
package autonomy
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/sha1"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"clawgo/pkg/bus"
|
|
"clawgo/pkg/lifecycle"
|
|
)
|
|
|
|
type Options struct {
|
|
Enabled bool
|
|
TickIntervalSec int
|
|
MinRunIntervalSec int
|
|
MaxPendingDurationSec int
|
|
MaxConsecutiveStalls int
|
|
MaxDispatchPerTick int
|
|
Workspace string
|
|
DefaultNotifyChannel string
|
|
DefaultNotifyChatID string
|
|
NotifyCooldownSec int
|
|
NotifySameReasonCooldownSec int
|
|
QuietHours string
|
|
UserIdleResumeSec int
|
|
WaitingResumeDebounceSec int
|
|
MaxRoundsWithoutUser int
|
|
AllowedTaskKeywords []string
|
|
ImportantKeywords []string
|
|
CompletionTemplate string
|
|
BlockedTemplate string
|
|
}
|
|
|
|
type taskState struct {
|
|
ID string
|
|
Content string
|
|
Priority string
|
|
DueAt string
|
|
Status string // idle|running|waiting|blocked|completed
|
|
BlockReason string
|
|
WaitingSince time.Time
|
|
LastRunAt time.Time
|
|
LastAutonomyAt time.Time
|
|
RetryAfter time.Time
|
|
ConsecutiveStall int
|
|
DedupeHits int
|
|
ResourceKeys []string
|
|
WaitAttempts int
|
|
LastPauseReason string
|
|
LastPauseAt time.Time
|
|
}
|
|
|
|
type Engine struct {
|
|
opts Options
|
|
bus *bus.MessageBus
|
|
runner *lifecycle.LoopRunner
|
|
taskStore *TaskStore
|
|
|
|
mu sync.Mutex
|
|
state map[string]*taskState
|
|
lastNotify map[string]time.Time
|
|
lockOwners map[string]string
|
|
roundsWithoutUser int
|
|
lastDailyReportDate string
|
|
}
|
|
|
|
func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
|
|
if opts.TickIntervalSec <= 0 {
|
|
opts.TickIntervalSec = 30
|
|
}
|
|
if opts.MinRunIntervalSec <= 0 {
|
|
opts.MinRunIntervalSec = 20
|
|
}
|
|
if opts.MaxPendingDurationSec <= 0 {
|
|
opts.MaxPendingDurationSec = 180
|
|
}
|
|
if opts.MaxConsecutiveStalls <= 0 {
|
|
opts.MaxConsecutiveStalls = 3
|
|
}
|
|
if opts.MaxDispatchPerTick <= 0 {
|
|
opts.MaxDispatchPerTick = 2
|
|
}
|
|
if opts.NotifyCooldownSec <= 0 {
|
|
opts.NotifyCooldownSec = 300
|
|
}
|
|
if opts.UserIdleResumeSec <= 0 {
|
|
opts.UserIdleResumeSec = 20
|
|
}
|
|
if opts.NotifySameReasonCooldownSec <= 0 {
|
|
opts.NotifySameReasonCooldownSec = 900
|
|
}
|
|
if opts.WaitingResumeDebounceSec <= 0 {
|
|
opts.WaitingResumeDebounceSec = 5
|
|
}
|
|
if opts.MaxRoundsWithoutUser < 0 {
|
|
opts.MaxRoundsWithoutUser = 0
|
|
}
|
|
return &Engine{
|
|
opts: opts,
|
|
bus: msgBus,
|
|
runner: lifecycle.NewLoopRunner(),
|
|
taskStore: NewTaskStore(opts.Workspace),
|
|
state: map[string]*taskState{},
|
|
lastNotify: map[string]time.Time{},
|
|
lockOwners: map[string]string{},
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Start() {
|
|
if !e.opts.Enabled {
|
|
return
|
|
}
|
|
e.runner.Start(e.runLoop)
|
|
}
|
|
|
|
func (e *Engine) Stop() {
|
|
e.runner.Stop()
|
|
}
|
|
|
|
func (e *Engine) runLoop(stopCh <-chan struct{}) {
|
|
ticker := time.NewTicker(time.Duration(e.opts.TickIntervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
e.tick()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) tick() {
|
|
todos := e.scanTodos()
|
|
now := time.Now()
|
|
stored, _ := e.taskStore.Load()
|
|
|
|
e.mu.Lock()
|
|
if e.hasManualPause() {
|
|
for _, st := range e.state {
|
|
if st.Status == "running" {
|
|
e.releaseLocksLocked(st.ID)
|
|
st.Status = "waiting"
|
|
st.BlockReason = "manual_pause"
|
|
st.WaitingSince = now
|
|
st.LastPauseReason = "manual_pause"
|
|
st.LastPauseAt = now
|
|
e.writeReflectLog("waiting", st, "paused by manual switch")
|
|
e.writeTriggerAudit("waiting", st, "manual_pause")
|
|
}
|
|
}
|
|
e.roundsWithoutUser = 0
|
|
e.persistStateLocked()
|
|
e.mu.Unlock()
|
|
return
|
|
}
|
|
if e.hasRecentUserActivity(now) {
|
|
for _, st := range e.state {
|
|
if st.Status == "running" {
|
|
e.releaseLocksLocked(st.ID)
|
|
st.Status = "waiting"
|
|
st.BlockReason = "active_user"
|
|
st.WaitingSince = now
|
|
st.LastPauseReason = "active_user"
|
|
st.LastPauseAt = now
|
|
e.writeReflectLog("waiting", st, "paused due to active user conversation")
|
|
e.writeTriggerAudit("waiting", st, "active_user")
|
|
}
|
|
}
|
|
e.persistStateLocked()
|
|
e.mu.Unlock()
|
|
return
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
storedMap := map[string]TaskItem{}
|
|
for _, it := range stored {
|
|
storedMap[it.ID] = it
|
|
}
|
|
|
|
known := map[string]struct{}{}
|
|
for _, t := range todos {
|
|
known[t.ID] = struct{}{}
|
|
}
|
|
// Merge structured tasks.json entries as todo source too (for WebUI CRUD-created tasks).
|
|
for _, old := range stored {
|
|
if old.ID == "" {
|
|
continue
|
|
}
|
|
if _, ok := known[old.ID]; ok {
|
|
continue
|
|
}
|
|
st := strings.ToLower(old.Status)
|
|
if st == "done" || st == "completed" {
|
|
continue
|
|
}
|
|
todos = append(todos, todoItem{ID: old.ID, Content: old.Content, Priority: old.Priority, DueAt: old.DueAt})
|
|
known[old.ID] = struct{}{}
|
|
}
|
|
for _, t := range todos {
|
|
st, ok := e.state[t.ID]
|
|
if !ok {
|
|
status := "idle"
|
|
retryAfter := time.Time{}
|
|
resourceKeys := deriveResourceKeys(t.Content)
|
|
lastPauseReason := ""
|
|
lastPauseAt := time.Time{}
|
|
if old, ok := storedMap[t.ID]; ok {
|
|
if old.Status == "blocked" {
|
|
status = "blocked"
|
|
}
|
|
if strings.TrimSpace(old.RetryAfter) != "" {
|
|
if rt, err := time.Parse(time.RFC3339, old.RetryAfter); err == nil {
|
|
retryAfter = rt
|
|
}
|
|
}
|
|
if len(old.ResourceKeys) > 0 {
|
|
resourceKeys = append([]string(nil), old.ResourceKeys...)
|
|
}
|
|
lastPauseReason = old.LastPauseReason
|
|
if old.LastPauseAt != "" {
|
|
if pt, err := time.Parse(time.RFC3339, old.LastPauseAt); err == nil {
|
|
lastPauseAt = pt
|
|
}
|
|
}
|
|
}
|
|
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: resourceKeys, LastPauseReason: lastPauseReason, LastPauseAt: lastPauseAt}
|
|
continue
|
|
}
|
|
st.Content = t.Content
|
|
st.Priority = t.Priority
|
|
st.DueAt = t.DueAt
|
|
st.DedupeHits = t.DedupeHits
|
|
st.ResourceKeys = deriveResourceKeys(t.Content)
|
|
if st.Status == "completed" {
|
|
st.Status = "idle"
|
|
}
|
|
}
|
|
|
|
// completed when removed from todo source
|
|
for id, st := range e.state {
|
|
if _, ok := known[id]; !ok {
|
|
if st.Status != "completed" {
|
|
e.releaseLocksLocked(st.ID)
|
|
st.Status = "completed"
|
|
e.sendCompletionNotification(st)
|
|
}
|
|
}
|
|
}
|
|
|
|
ordered := make([]*taskState, 0, len(e.state))
|
|
for _, st := range e.state {
|
|
ordered = append(ordered, st)
|
|
}
|
|
sort.Slice(ordered, func(i, j int) bool {
|
|
si := schedulingScore(ordered[i], now)
|
|
sj := schedulingScore(ordered[j], now)
|
|
if si != sj {
|
|
return si > sj
|
|
}
|
|
return ordered[i].ID < ordered[j].ID
|
|
})
|
|
|
|
dispatched := 0
|
|
for _, st := range ordered {
|
|
if dispatched >= e.opts.MaxDispatchPerTick {
|
|
break
|
|
}
|
|
if st.Status == "completed" {
|
|
continue
|
|
}
|
|
if st.Status == "waiting" {
|
|
if st.BlockReason == "resource_lock" && !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
|
|
continue
|
|
}
|
|
// Debounce waiting/resume flapping
|
|
if !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) < time.Duration(e.opts.WaitingResumeDebounceSec)*time.Second {
|
|
continue
|
|
}
|
|
reason := st.BlockReason
|
|
st.Status = "idle"
|
|
st.BlockReason = ""
|
|
st.WaitingSince = time.Time{}
|
|
pausedFor := 0
|
|
if !st.LastPauseAt.IsZero() {
|
|
pausedFor = int(now.Sub(st.LastPauseAt).Seconds())
|
|
}
|
|
e.writeReflectLog("resume", st, fmt.Sprintf("autonomy resumed from waiting (reason=%s paused_for=%ds)", reason, pausedFor))
|
|
e.writeTriggerAudit("resume", st, reason)
|
|
}
|
|
if st.Status == "blocked" {
|
|
e.releaseLocksLocked(st.ID)
|
|
if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
|
|
continue
|
|
}
|
|
if now.Sub(st.LastRunAt) >= blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec) {
|
|
st.Status = "idle"
|
|
st.BlockReason = ""
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
if !st.LastRunAt.IsZero() && now.Sub(st.LastRunAt) < time.Duration(e.opts.MinRunIntervalSec)*time.Second {
|
|
continue
|
|
}
|
|
if !e.allowTaskByPolicy(st.Content) {
|
|
continue
|
|
}
|
|
if st.Status == "running" && now.Sub(st.LastRunAt) > time.Duration(e.opts.MaxPendingDurationSec)*time.Second {
|
|
st.ConsecutiveStall++
|
|
if st.ConsecutiveStall > e.opts.MaxConsecutiveStalls {
|
|
st.Status = "blocked"
|
|
st.BlockReason = "max_consecutive_stalls"
|
|
st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec))
|
|
e.sendFailureNotification(st, "max consecutive stalls reached")
|
|
continue
|
|
}
|
|
}
|
|
|
|
if e.opts.MaxRoundsWithoutUser > 0 && e.roundsWithoutUser >= e.opts.MaxRoundsWithoutUser {
|
|
st.Status = "waiting"
|
|
st.BlockReason = "idle_round_budget"
|
|
st.WaitingSince = now
|
|
e.writeReflectLog("waiting", st, fmt.Sprintf("paused by idle round budget (%d)", e.opts.MaxRoundsWithoutUser))
|
|
e.writeTriggerAudit("waiting", st, "idle_round_budget")
|
|
continue
|
|
}
|
|
if !e.tryAcquireLocksLocked(st) {
|
|
st.Status = "waiting"
|
|
st.BlockReason = "resource_lock"
|
|
st.WaitingSince = now
|
|
st.RetryAfter = now.Add(30 * time.Second)
|
|
st.WaitAttempts++
|
|
e.writeTriggerAudit("waiting", st, "resource_lock")
|
|
continue
|
|
}
|
|
e.dispatchTask(st)
|
|
st.Status = "running"
|
|
st.WaitAttempts = 0
|
|
st.BlockReason = ""
|
|
st.WaitingSince = time.Time{}
|
|
st.LastPauseReason = ""
|
|
st.LastRunAt = now
|
|
st.LastAutonomyAt = now
|
|
e.writeReflectLog("dispatch", st, "task dispatched to agent loop")
|
|
e.writeTriggerAudit("dispatch", st, "")
|
|
e.roundsWithoutUser++
|
|
dispatched++
|
|
}
|
|
e.persistStateLocked()
|
|
e.maybeWriteDailyReportLocked(now)
|
|
}
|
|
|
|
func (e *Engine) tryAcquireLocksLocked(st *taskState) bool {
|
|
if st == nil {
|
|
return false
|
|
}
|
|
keys := st.ResourceKeys
|
|
if len(keys) == 0 {
|
|
keys = []string{"task:" + st.ID}
|
|
}
|
|
for _, k := range keys {
|
|
if owner, ok := e.lockOwners[k]; ok && owner != "" && owner != st.ID {
|
|
return false
|
|
}
|
|
}
|
|
for _, k := range keys {
|
|
e.lockOwners[k] = st.ID
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) releaseLocksLocked(taskID string) {
|
|
if strings.TrimSpace(taskID) == "" {
|
|
return
|
|
}
|
|
for k, v := range e.lockOwners {
|
|
if v == taskID {
|
|
delete(e.lockOwners, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
func schedulingScore(st *taskState, now time.Time) int {
|
|
if st == nil {
|
|
return 0
|
|
}
|
|
score := priorityWeight(st.Priority)*100 + int(dueWeight(st.DueAt))*10
|
|
if st.Status == "waiting" && st.BlockReason == "resource_lock" && !st.WaitingSince.IsZero() {
|
|
waitSec := int(now.Sub(st.WaitingSince).Seconds())
|
|
if waitSec > 0 {
|
|
score += waitSec / 10
|
|
}
|
|
score += st.WaitAttempts * 5
|
|
}
|
|
return score
|
|
}
|
|
|
|
func deriveResourceKeys(content string) []string {
|
|
raw := content
|
|
if raw == "" {
|
|
return nil
|
|
}
|
|
if explicit := parseExplicitResourceKeys(raw); len(explicit) > 0 {
|
|
return explicit
|
|
}
|
|
content = strings.ToLower(raw)
|
|
keys := make([]string, 0, 8)
|
|
hasRepo := false
|
|
for _, token := range strings.Fields(content) {
|
|
t := strings.Trim(token, "`'\"()[]{}:;,,。!?")
|
|
if strings.Contains(t, "gitea.") || strings.Contains(t, "github.com") || strings.Count(t, "/") >= 1 {
|
|
if strings.Contains(t, "github.com/") || strings.Contains(t, "gitea.") {
|
|
keys = append(keys, "repo:"+t)
|
|
hasRepo = true
|
|
}
|
|
}
|
|
if strings.Contains(t, "/") || strings.HasSuffix(t, ".go") || strings.HasSuffix(t, ".md") || strings.HasSuffix(t, ".json") || strings.HasSuffix(t, ".yaml") || strings.HasSuffix(t, ".yml") {
|
|
keys = append(keys, "file:"+t)
|
|
}
|
|
if t == "main" || strings.HasPrefix(t, "branch:") {
|
|
keys = append(keys, "branch:"+strings.TrimPrefix(t, "branch:"))
|
|
}
|
|
}
|
|
if !hasRepo {
|
|
keys = append(keys, "repo:default")
|
|
}
|
|
if len(keys) == 0 {
|
|
keys = append(keys, "scope:general")
|
|
}
|
|
return normalizeResourceKeys(keys)
|
|
}
|
|
|
|
func parseExplicitResourceKeys(content string) []string {
|
|
lower := strings.ToLower(content)
|
|
start := strings.Index(lower, "[keys:")
|
|
if start < 0 {
|
|
return nil
|
|
}
|
|
rest := content[start+6:]
|
|
end := strings.Index(rest, "]")
|
|
if end < 0 {
|
|
return nil
|
|
}
|
|
body := rest[:end]
|
|
if body == "" {
|
|
return nil
|
|
}
|
|
parts := strings.Split(body, ",")
|
|
keys := make([]string, 0, len(parts))
|
|
for _, p := range parts {
|
|
k := strings.ToLower(strings.TrimSpace(p))
|
|
if k == "" {
|
|
continue
|
|
}
|
|
if !strings.Contains(k, ":") {
|
|
k = "file:" + k
|
|
}
|
|
keys = append(keys, k)
|
|
}
|
|
return normalizeResourceKeys(keys)
|
|
}
|
|
|
|
func normalizeResourceKeys(keys []string) []string {
|
|
if len(keys) == 0 {
|
|
return nil
|
|
}
|
|
sort.Strings(keys)
|
|
uniq := keys[:0]
|
|
for _, k := range keys {
|
|
k = strings.TrimSpace(strings.ToLower(k))
|
|
if k == "" {
|
|
continue
|
|
}
|
|
if len(uniq) == 0 || k != uniq[len(uniq)-1] {
|
|
uniq = append(uniq, k)
|
|
}
|
|
}
|
|
return append([]string(nil), uniq...)
|
|
}
|
|
|
|
type todoItem struct {
|
|
ID string
|
|
Content string
|
|
Priority string
|
|
DueAt string
|
|
DedupeHits int
|
|
}
|
|
|
|
func (e *Engine) scanTodos() []todoItem {
|
|
if strings.TrimSpace(e.opts.Workspace) == "" {
|
|
return nil
|
|
}
|
|
merged := map[string]todoItem{}
|
|
merge := func(it todoItem) {
|
|
if strings.TrimSpace(it.ID) == "" || strings.TrimSpace(it.Content) == "" {
|
|
return
|
|
}
|
|
if cur, ok := merged[it.ID]; ok {
|
|
if priorityWeight(it.Priority) > priorityWeight(cur.Priority) {
|
|
cur.Priority = it.Priority
|
|
}
|
|
if cur.DueAt == "" && it.DueAt != "" {
|
|
cur.DueAt = it.DueAt
|
|
}
|
|
cur.DedupeHits++
|
|
merged[it.ID] = cur
|
|
return
|
|
}
|
|
merged[it.ID] = it
|
|
}
|
|
|
|
// 1) Parse markdown todos from MEMORY + today's daily memory.
|
|
paths := []string{
|
|
filepath.Join(e.opts.Workspace, "MEMORY.md"),
|
|
filepath.Join(e.opts.Workspace, "memory", time.Now().Format("2006-01-02")+".md"),
|
|
}
|
|
for _, p := range paths {
|
|
data, err := os.ReadFile(p)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, line := range strings.Split(string(data), "\n") {
|
|
t := strings.TrimSpace(line)
|
|
if strings.HasPrefix(t, "- [ ]") {
|
|
content := strings.TrimPrefix(t, "- [ ]")
|
|
priority, dueAt, normalized := parseTodoAttributes(content)
|
|
merge(todoItem{ID: hashID(normalized), Content: normalized, Priority: priority, DueAt: dueAt})
|
|
continue
|
|
}
|
|
if strings.HasPrefix(strings.ToLower(t), "todo:") {
|
|
content := t[5:]
|
|
priority, dueAt, normalized := parseTodoAttributes(content)
|
|
merge(todoItem{ID: hashID(normalized), Content: normalized, Priority: priority, DueAt: dueAt})
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2) Merge structured tasks.json items (manual injections / prior state).
|
|
if items, err := e.taskStore.Load(); err == nil {
|
|
for _, it := range items {
|
|
status := strings.ToLower(strings.TrimSpace(it.Status))
|
|
if status == "done" {
|
|
continue
|
|
}
|
|
content := it.Content
|
|
if content == "" {
|
|
continue
|
|
}
|
|
id := strings.TrimSpace(it.ID)
|
|
if id == "" {
|
|
id = hashID(content)
|
|
}
|
|
priority := strings.TrimSpace(it.Priority)
|
|
if priority == "" {
|
|
priority = "normal"
|
|
}
|
|
merge(todoItem{ID: id, Content: content, Priority: priority, DueAt: it.DueAt})
|
|
}
|
|
}
|
|
|
|
out := make([]todoItem, 0, len(merged))
|
|
for _, it := range merged {
|
|
out = append(out, it)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (e *Engine) dispatchTask(st *taskState) {
|
|
content := fmt.Sprintf("Autonomy task (Plan -> Act -> Reflect):\n- Goal: %s\n- Requirements: concise progress update\n- If blocked, explain blocker and next retry hint", st.Content)
|
|
e.bus.PublishInbound(bus.InboundMessage{
|
|
Channel: "system",
|
|
SenderID: "autonomy",
|
|
ChatID: "internal:autonomy",
|
|
Content: content,
|
|
SessionKey: "autonomy:" + st.ID,
|
|
Metadata: map[string]string{
|
|
"trigger": "autonomy",
|
|
"task_id": st.ID,
|
|
"source": "memory_todo",
|
|
},
|
|
})
|
|
}
|
|
|
|
func (e *Engine) sendCompletionNotification(st *taskState) {
|
|
e.writeReflectLog("complete", st, "task marked completed")
|
|
e.writeTriggerAudit("complete", st, "")
|
|
if !e.isHighValueCompletion(st) {
|
|
return
|
|
}
|
|
if !e.shouldNotify("done:"+st.ID, "") {
|
|
return
|
|
}
|
|
tpl := strings.TrimSpace(e.opts.CompletionTemplate)
|
|
if tpl == "" {
|
|
tpl = "✅ Completed: %s\nNext step: reply \"continue %s\" if you want me to proceed."
|
|
}
|
|
e.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: e.opts.DefaultNotifyChannel,
|
|
ChatID: e.opts.DefaultNotifyChatID,
|
|
Content: fmt.Sprintf(tpl, shortTask(st.Content), shortTask(st.Content)),
|
|
})
|
|
}
|
|
|
|
func (e *Engine) sendFailureNotification(st *taskState, reason string) {
|
|
e.writeReflectLog("blocked", st, reason)
|
|
e.writeTriggerAudit("blocked", st, reason)
|
|
if !e.shouldNotify("blocked:"+st.ID, reason) {
|
|
return
|
|
}
|
|
tpl := strings.TrimSpace(e.opts.BlockedTemplate)
|
|
if tpl == "" {
|
|
tpl = "⚠️ Task blocked: %s\nReason: %s\nSuggestion: reply \"continue %s\" and I will retry from current state."
|
|
}
|
|
e.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: e.opts.DefaultNotifyChannel,
|
|
ChatID: e.opts.DefaultNotifyChatID,
|
|
Content: fmt.Sprintf(tpl, shortTask(st.Content), reason, shortTask(st.Content)),
|
|
})
|
|
}
|
|
|
|
func (e *Engine) shouldNotify(key string, reason string) bool {
|
|
if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" {
|
|
return false
|
|
}
|
|
now := time.Now()
|
|
if inQuietHours(now, e.opts.QuietHours) {
|
|
return false
|
|
}
|
|
if last, ok := e.lastNotify[key]; ok {
|
|
if now.Sub(last) < time.Duration(e.opts.NotifyCooldownSec)*time.Second {
|
|
return false
|
|
}
|
|
}
|
|
r := strings.ToLower(strings.TrimSpace(reason))
|
|
if r != "" {
|
|
rk := key + ":reason:" + strings.ReplaceAll(r, " ", "_")
|
|
if last, ok := e.lastNotify[rk]; ok {
|
|
if now.Sub(last) < time.Duration(e.opts.NotifySameReasonCooldownSec)*time.Second {
|
|
return false
|
|
}
|
|
}
|
|
e.lastNotify[rk] = now
|
|
}
|
|
e.lastNotify[key] = now
|
|
return true
|
|
}
|
|
|
|
func (e *Engine) writeTriggerAudit(action string, st *taskState, errText string) {
|
|
if strings.TrimSpace(e.opts.Workspace) == "" || st == nil {
|
|
return
|
|
}
|
|
memDir := filepath.Join(e.opts.Workspace, "memory")
|
|
path := filepath.Join(memDir, "trigger-audit.jsonl")
|
|
_ = os.MkdirAll(filepath.Dir(path), 0755)
|
|
row := map[string]interface{}{
|
|
"time": time.Now().UTC().Format(time.RFC3339),
|
|
"trigger": "autonomy",
|
|
"action": action,
|
|
"session": "autonomy:" + st.ID,
|
|
}
|
|
if errText != "" {
|
|
row["error"] = errText
|
|
}
|
|
if b, err := json.Marshal(row); err == nil {
|
|
f, oErr := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if oErr == nil {
|
|
_, _ = f.Write(append(b, '\n'))
|
|
_ = f.Close()
|
|
}
|
|
}
|
|
|
|
statsPath := filepath.Join(memDir, "trigger-stats.json")
|
|
stats := struct {
|
|
UpdatedAt string `json:"updated_at"`
|
|
Counts map[string]int `json:"counts"`
|
|
}{Counts: map[string]int{}}
|
|
if raw, rErr := os.ReadFile(statsPath); rErr == nil {
|
|
_ = json.Unmarshal(raw, &stats)
|
|
if stats.Counts == nil {
|
|
stats.Counts = map[string]int{}
|
|
}
|
|
}
|
|
stats.Counts["autonomy"]++
|
|
act := strings.ToLower(strings.TrimSpace(action))
|
|
if act != "" {
|
|
stats.Counts["autonomy:"+act]++
|
|
reason := strings.ToLower(errText)
|
|
if reason != "" {
|
|
reason = strings.ReplaceAll(reason, " ", "_")
|
|
reason = strings.ReplaceAll(reason, ":", "_")
|
|
stats.Counts["autonomy:"+act+":"+reason]++
|
|
}
|
|
}
|
|
stats.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if raw, mErr := json.MarshalIndent(stats, "", " "); mErr == nil {
|
|
_ = os.WriteFile(statsPath, raw, 0644)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) writeReflectLog(stage string, st *taskState, outcome string) {
|
|
if strings.TrimSpace(e.opts.Workspace) == "" || st == nil {
|
|
return
|
|
}
|
|
memDir := filepath.Join(e.opts.Workspace, "memory")
|
|
_ = os.MkdirAll(memDir, 0755)
|
|
path := filepath.Join(memDir, time.Now().Format("2006-01-02")+".md")
|
|
line := fmt.Sprintf("- [%s] [autonomy][%s] task=%s status=%s outcome=%s\n", time.Now().Format("15:04"), stage, st.Content, st.Status, outcome)
|
|
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
_, _ = f.WriteString(line)
|
|
}
|
|
|
|
func inQuietHours(now time.Time, spec string) bool {
|
|
spec = strings.TrimSpace(spec)
|
|
if spec == "" {
|
|
return false
|
|
}
|
|
parts := strings.Split(spec, "-")
|
|
if len(parts) != 2 {
|
|
return false
|
|
}
|
|
parseHM := func(v string) (int, bool) {
|
|
hm := strings.Split(strings.TrimSpace(v), ":")
|
|
if len(hm) != 2 {
|
|
return 0, false
|
|
}
|
|
h, err1 := strconv.Atoi(hm[0])
|
|
m, err2 := strconv.Atoi(hm[1])
|
|
if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 {
|
|
return 0, false
|
|
}
|
|
return h*60 + m, true
|
|
}
|
|
start, ok1 := parseHM(parts[0])
|
|
end, ok2 := parseHM(parts[1])
|
|
if !ok1 || !ok2 {
|
|
return false
|
|
}
|
|
nowMin := now.Hour()*60 + now.Minute()
|
|
if start <= end {
|
|
return nowMin >= start && nowMin <= end
|
|
}
|
|
return nowMin >= start || nowMin <= end
|
|
}
|
|
|
|
func (e *Engine) persistStateLocked() {
|
|
items := make([]TaskItem, 0, len(e.state))
|
|
for _, st := range e.state {
|
|
status := "todo"
|
|
switch st.Status {
|
|
case "running":
|
|
status = "doing"
|
|
case "waiting":
|
|
status = "waiting"
|
|
case "blocked":
|
|
status = "blocked"
|
|
case "completed":
|
|
status = "done"
|
|
default:
|
|
status = "todo"
|
|
}
|
|
retryAfter := ""
|
|
if !st.RetryAfter.IsZero() {
|
|
retryAfter = st.RetryAfter.UTC().Format(time.RFC3339)
|
|
}
|
|
lastPauseAt := ""
|
|
if !st.LastPauseAt.IsZero() {
|
|
lastPauseAt = st.LastPauseAt.UTC().Format(time.RFC3339)
|
|
}
|
|
items = append(items, TaskItem{
|
|
ID: st.ID,
|
|
Content: st.Content,
|
|
Priority: st.Priority,
|
|
DueAt: st.DueAt,
|
|
Status: status,
|
|
BlockReason: st.BlockReason,
|
|
RetryAfter: retryAfter,
|
|
Source: "memory_todo",
|
|
DedupeHits: st.DedupeHits,
|
|
ResourceKeys: append([]string(nil), st.ResourceKeys...),
|
|
LastPauseReason: st.LastPauseReason,
|
|
LastPauseAt: lastPauseAt,
|
|
UpdatedAt: nowRFC3339(),
|
|
})
|
|
}
|
|
_ = e.taskStore.Save(items)
|
|
}
|
|
|
|
func (e *Engine) maybeWriteDailyReportLocked(now time.Time) {
|
|
date := now.UTC().Format("2006-01-02")
|
|
if e.lastDailyReportDate == date {
|
|
return
|
|
}
|
|
workspace := e.opts.Workspace
|
|
if workspace == "" {
|
|
return
|
|
}
|
|
auditPath := filepath.Join(workspace, "memory", "task-audit.jsonl")
|
|
f, err := os.Open(auditPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
counts := map[string]int{"total": 0, "success": 0, "error": 0, "suppressed": 0, "running": 0}
|
|
errorReasons := map[string]int{}
|
|
type topTask struct { TaskID string; Duration int; Status string }
|
|
top := make([]topTask, 0, 32)
|
|
s := bufio.NewScanner(f)
|
|
for s.Scan() {
|
|
line := s.Bytes()
|
|
var row map[string]interface{}
|
|
if json.Unmarshal(line, &row) != nil {
|
|
continue
|
|
}
|
|
if fmt.Sprintf("%v", row["source"]) != "autonomy" {
|
|
continue
|
|
}
|
|
ts := fmt.Sprintf("%v", row["time"])
|
|
if len(ts) < 10 || ts[:10] != date {
|
|
continue
|
|
}
|
|
counts["total"]++
|
|
st := fmt.Sprintf("%v", row["status"])
|
|
if _, ok := counts[st]; ok {
|
|
counts[st]++
|
|
}
|
|
if st == "error" {
|
|
errText := fmt.Sprintf("%v", row["error"])
|
|
if errText == "" {
|
|
errText = fmt.Sprintf("%v", row["log"])
|
|
}
|
|
errText = shortTask(strings.ReplaceAll(errText, "\n", " "))
|
|
if errText != "" {
|
|
errorReasons[errText]++
|
|
}
|
|
}
|
|
dur := 0
|
|
switch v := row["duration_ms"].(type) {
|
|
case float64:
|
|
dur = int(v)
|
|
case int:
|
|
dur = v
|
|
case string:
|
|
if n, err := strconv.Atoi(v); err == nil { dur = n }
|
|
}
|
|
top = append(top, topTask{TaskID: fmt.Sprintf("%v", row["task_id"]), Duration: dur, Status: st})
|
|
}
|
|
if counts["total"] == 0 {
|
|
e.lastDailyReportDate = date
|
|
return
|
|
}
|
|
sort.Slice(top, func(i, j int) bool { return top[i].Duration > top[j].Duration })
|
|
maxTop := 3
|
|
if len(top) < maxTop { maxTop = len(top) }
|
|
topLines := make([]string, 0, maxTop)
|
|
for i := 0; i < maxTop; i++ {
|
|
if top[i].TaskID == "" { continue }
|
|
topLines = append(topLines, fmt.Sprintf("- %s (%dms, %s)", top[i].TaskID, top[i].Duration, top[i].Status))
|
|
}
|
|
type kv struct { K string; V int }
|
|
reasons := make([]kv, 0, len(errorReasons))
|
|
for k, v := range errorReasons { reasons = append(reasons, kv{K:k, V:v}) }
|
|
sort.Slice(reasons, func(i, j int) bool { return reasons[i].V > reasons[j].V })
|
|
maxR := 3
|
|
if len(reasons) < maxR { maxR = len(reasons) }
|
|
reasonLines := make([]string, 0, maxR)
|
|
for i := 0; i < maxR; i++ {
|
|
reasonLines = append(reasonLines, fmt.Sprintf("- %s (x%d)", reasons[i].K, reasons[i].V))
|
|
}
|
|
reportLine := fmt.Sprintf("\n## Autonomy Daily Report (%s)\n- total: %d\n- success: %d\n- error: %d\n- suppressed: %d\n- running: %d\n\n### Top Duration Tasks\n%s\n\n### Top Error Reasons\n%s\n", date, counts["total"], counts["success"], counts["error"], counts["suppressed"], counts["running"], strings.Join(topLines, "\n"), strings.Join(reasonLines, "\n"))
|
|
dailyPath := filepath.Join(workspace, "memory", date+".md")
|
|
_ = os.MkdirAll(filepath.Dir(dailyPath), 0755)
|
|
_ = appendUniqueReport(dailyPath, reportLine, date)
|
|
e.lastDailyReportDate = date
|
|
}
|
|
|
|
func appendUniqueReport(path, content, date string) error {
|
|
existing, _ := os.ReadFile(path)
|
|
marker := "Autonomy Daily Report (" + date + ")"
|
|
if strings.Contains(string(existing), marker) {
|
|
return nil
|
|
}
|
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
_, err = f.WriteString(content)
|
|
return err
|
|
}
|
|
|
|
func parseTodoAttributes(content string) (priority, dueAt, normalized string) {
|
|
priority = "normal"
|
|
normalized = content
|
|
l := strings.ToLower(normalized)
|
|
if strings.HasPrefix(l, "[high]") || strings.HasPrefix(l, "p1:") {
|
|
priority = "high"
|
|
normalized = strings.TrimSpace(normalized[6:])
|
|
} else if strings.HasPrefix(l, "[low]") || strings.HasPrefix(l, "p3:") {
|
|
priority = "low"
|
|
normalized = strings.TrimSpace(normalized[5:])
|
|
} else if strings.HasPrefix(l, "[medium]") || strings.HasPrefix(l, "p2:") {
|
|
priority = "normal"
|
|
if strings.HasPrefix(l, "[medium]") {
|
|
normalized = strings.TrimSpace(normalized[8:])
|
|
} else {
|
|
normalized = strings.TrimSpace(normalized[3:])
|
|
}
|
|
}
|
|
if idx := strings.Index(strings.ToLower(normalized), " due:"); idx > 0 {
|
|
dueAt = strings.TrimSpace(normalized[idx+5:])
|
|
normalized = strings.TrimSpace(normalized[:idx])
|
|
}
|
|
if normalized == "" {
|
|
normalized = content
|
|
}
|
|
return priority, dueAt, normalized
|
|
}
|
|
|
|
func priorityWeight(p string) int {
|
|
switch strings.ToLower(strings.TrimSpace(p)) {
|
|
case "high":
|
|
return 3
|
|
case "normal", "medium":
|
|
return 2
|
|
case "low":
|
|
return 1
|
|
default:
|
|
return 2
|
|
}
|
|
}
|
|
|
|
func dueWeight(dueAt string) int64 {
|
|
dueAt = strings.TrimSpace(dueAt)
|
|
if dueAt == "" {
|
|
return 0
|
|
}
|
|
layouts := []string{"2006-01-02", time.RFC3339, time.RFC3339Nano}
|
|
for _, layout := range layouts {
|
|
if t, err := time.Parse(layout, dueAt); err == nil {
|
|
return -t.Unix() // earlier due => bigger score after descending sort
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (e *Engine) pauseFilePath() string {
|
|
if strings.TrimSpace(e.opts.Workspace) == "" {
|
|
return ""
|
|
}
|
|
return filepath.Join(e.opts.Workspace, "memory", "autonomy.pause")
|
|
}
|
|
|
|
func (e *Engine) controlFilePath() string {
|
|
if strings.TrimSpace(e.opts.Workspace) == "" {
|
|
return ""
|
|
}
|
|
return filepath.Join(e.opts.Workspace, "memory", "autonomy.control.json")
|
|
}
|
|
|
|
func (e *Engine) hasManualPause() bool {
|
|
p := e.pauseFilePath()
|
|
if p == "" {
|
|
return false
|
|
}
|
|
_, err := os.Stat(p)
|
|
if err == nil {
|
|
return true
|
|
}
|
|
ctrl := e.controlFilePath()
|
|
if ctrl == "" {
|
|
return false
|
|
}
|
|
data, rErr := os.ReadFile(ctrl)
|
|
if rErr != nil {
|
|
return false
|
|
}
|
|
var c struct {
|
|
Enabled bool `json:"enabled"`
|
|
}
|
|
if jErr := json.Unmarshal(data, &c); jErr != nil {
|
|
return false
|
|
}
|
|
return !c.Enabled
|
|
}
|
|
|
|
func (e *Engine) hasRecentUserActivity(now time.Time) bool {
|
|
if e.opts.UserIdleResumeSec <= 0 || strings.TrimSpace(e.opts.Workspace) == "" {
|
|
return false
|
|
}
|
|
sessionsPath := filepath.Join(filepath.Dir(e.opts.Workspace), "agents", "main", "sessions", "sessions.json")
|
|
data, err := os.ReadFile(sessionsPath)
|
|
if err != nil {
|
|
legacy := filepath.Join(filepath.Dir(e.opts.Workspace), "sessions", "sessions.json")
|
|
data, err = os.ReadFile(legacy)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
}
|
|
var index map[string]struct {
|
|
Kind string `json:"kind"`
|
|
SessionFile string `json:"sessionFile"`
|
|
}
|
|
if err := json.Unmarshal(data, &index); err != nil {
|
|
return false
|
|
}
|
|
cutoff := now.Add(-time.Duration(e.opts.UserIdleResumeSec) * time.Second)
|
|
for _, row := range index {
|
|
if strings.ToLower(strings.TrimSpace(row.Kind)) != "main" {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(row.SessionFile) == "" {
|
|
continue
|
|
}
|
|
if ts := latestUserMessageTime(row.SessionFile); !ts.IsZero() && ts.After(cutoff) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (e *Engine) allowTaskByPolicy(content string) bool {
|
|
if len(e.opts.AllowedTaskKeywords) == 0 {
|
|
return true
|
|
}
|
|
v := strings.ToLower(content)
|
|
for _, kw := range e.opts.AllowedTaskKeywords {
|
|
if kw == "" {
|
|
continue
|
|
}
|
|
if strings.Contains(v, strings.ToLower(kw)) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func latestUserMessageTime(path string) time.Time {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return time.Time{}
|
|
}
|
|
defer f.Close()
|
|
|
|
var latest time.Time
|
|
s := bufio.NewScanner(f)
|
|
for s.Scan() {
|
|
line := s.Bytes()
|
|
|
|
// OpenClaw-like event line
|
|
var ev struct {
|
|
Type string `json:"type"`
|
|
Timestamp string `json:"timestamp"`
|
|
Message *struct {
|
|
Role string `json:"role"`
|
|
} `json:"message"`
|
|
}
|
|
if err := json.Unmarshal(line, &ev); err == nil && ev.Message != nil {
|
|
if strings.ToLower(strings.TrimSpace(ev.Message.Role)) == "user" {
|
|
if t, err := time.Parse(time.RFC3339Nano, strings.TrimSpace(ev.Timestamp)); err == nil && t.After(latest) {
|
|
latest = t
|
|
} else if t, err := time.Parse(time.RFC3339, strings.TrimSpace(ev.Timestamp)); err == nil && t.After(latest) {
|
|
latest = t
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Legacy line
|
|
var msg struct {
|
|
Role string `json:"role"`
|
|
}
|
|
if err := json.Unmarshal(line, &msg); err == nil {
|
|
if strings.ToLower(strings.TrimSpace(msg.Role)) == "user" {
|
|
latest = time.Now().UTC()
|
|
}
|
|
}
|
|
}
|
|
return latest
|
|
}
|
|
|
|
func blockedRetryBackoff(stalls int, minRunIntervalSec int) time.Duration {
|
|
if minRunIntervalSec <= 0 {
|
|
minRunIntervalSec = 20
|
|
}
|
|
if stalls < 1 {
|
|
stalls = 1
|
|
}
|
|
base := time.Duration(minRunIntervalSec) * time.Second
|
|
factor := 1 << min(stalls, 5)
|
|
return time.Duration(factor) * base
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (e *Engine) isHighValueCompletion(st *taskState) bool {
|
|
if st == nil {
|
|
return false
|
|
}
|
|
if priorityWeight(st.Priority) >= 3 {
|
|
return true
|
|
}
|
|
if strings.TrimSpace(st.DueAt) != "" {
|
|
return true
|
|
}
|
|
s := strings.ToLower(st.Content)
|
|
keywords := e.opts.ImportantKeywords
|
|
if len(keywords) == 0 {
|
|
keywords = []string{"urgent", "payment", "release", "deadline", "p0", "asap"}
|
|
}
|
|
for _, k := range keywords {
|
|
kk := strings.ToLower(strings.TrimSpace(k))
|
|
if kk != "" && strings.Contains(s, kk) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func shortTask(s string) string {
|
|
s = strings.TrimSpace(s)
|
|
if len(s) <= 32 {
|
|
return s
|
|
}
|
|
return s[:32] + "..."
|
|
}
|
|
|
|
func hashID(s string) string {
|
|
sum := sha1.Sum([]byte(strings.ToLower(strings.TrimSpace(s))))
|
|
return hex.EncodeToString(sum[:])[:12]
|
|
}
|
|
|
|
func RunOnce(ctx context.Context, engine *Engine) {
|
|
if engine == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
engine.tick()
|
|
}
|
|
}
|