From f644ae83ff5d0a00a5b40bdca2b926f4d94e0c14 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 00:19:18 +0000 Subject: [PATCH] upgrade autonomy with task-state persistence and proactive notify cooldown --- cmd/clawgo/cmd_gateway.go | 1 + config.example.json | 1 + pkg/autonomy/engine.go | 85 ++++++++++++++++++++++++++++++++------ pkg/autonomy/task_store.go | 71 +++++++++++++++++++++++++++++++ pkg/config/config.go | 2 + pkg/config/validate.go | 3 ++ 6 files changed, 151 insertions(+), 12 deletions(-) create mode 100644 pkg/autonomy/task_store.go diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 7dde504..3136553 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -648,6 +648,7 @@ func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.E MaxPendingDurationSec: a.MaxPendingDurationSec, MaxConsecutiveStalls: a.MaxConsecutiveStalls, MaxDispatchPerTick: a.MaxDispatchPerTick, + NotifyCooldownSec: a.NotifyCooldownSec, Workspace: cfg.WorkspacePath(), DefaultNotifyChannel: a.NotifyChannel, DefaultNotifyChatID: a.NotifyChatID, diff --git a/config.example.json b/config.example.json index ece649d..ca971cc 100644 --- a/config.example.json +++ b/config.example.json @@ -20,6 +20,7 @@ "max_pending_duration_sec": 180, "max_consecutive_stalls": 3, "max_dispatch_per_tick": 2, + "notify_cooldown_sec": 300, "notify_channel": "", "notify_chat_id": "" }, diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index 1d89457..e46a82e 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -25,6 +25,7 @@ type Options struct { Workspace string DefaultNotifyChannel string DefaultNotifyChatID string + NotifyCooldownSec int } type taskState struct { @@ -37,12 +38,14 @@ type taskState struct { } type Engine struct { - opts Options - bus *bus.MessageBus - runner *lifecycle.LoopRunner + opts Options + bus *bus.MessageBus + runner *lifecycle.LoopRunner + taskStore *TaskStore - mu sync.Mutex - state map[string]*taskState + mu sync.Mutex + state map[string]*taskState + lastNotify map[string]time.Time } func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { @@ -61,11 +64,16 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { if opts.MaxDispatchPerTick <= 0 { opts.MaxDispatchPerTick = 2 } + if opts.NotifyCooldownSec <= 0 { + opts.NotifyCooldownSec = 300 + } return &Engine{ - opts: opts, - bus: msgBus, - runner: lifecycle.NewLoopRunner(), - state: map[string]*taskState{}, + opts: opts, + bus: msgBus, + runner: lifecycle.NewLoopRunner(), + taskStore: NewTaskStore(opts.Workspace), + state: map[string]*taskState{}, + lastNotify: map[string]time.Time{}, } } @@ -96,16 +104,28 @@ func (e *Engine) runLoop(stopCh <-chan struct{}) { func (e *Engine) tick() { todos := e.scanTodos() now := time.Now() + stored, _ := e.taskStore.Load() e.mu.Lock() defer e.mu.Unlock() + storedMap := map[string]TaskItem{} + for _, it := range stored { + storedMap[it.ID] = it + } + known := map[string]struct{}{} for _, t := range todos { known[t.ID] = struct{}{} st, ok := e.state[t.ID] if !ok { - e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Status: "idle"} + status := "idle" + if old, ok := storedMap[t.ID]; ok { + if old.Status == "blocked" { + status = "blocked" + } + } + e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Status: status} continue } st.Content = t.Content @@ -150,6 +170,7 @@ func (e *Engine) tick() { st.LastAutonomyAt = now dispatched++ } + e.persistStateLocked() } type todoItem struct { @@ -210,7 +231,7 @@ func (e *Engine) dispatchTask(st *taskState) { } func (e *Engine) sendCompletionNotification(st *taskState) { - if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" { + if !e.shouldNotify("done:" + st.ID) { return } e.bus.PublishOutbound(bus.OutboundMessage{ @@ -221,7 +242,7 @@ func (e *Engine) sendCompletionNotification(st *taskState) { } func (e *Engine) sendFailureNotification(st *taskState, reason string) { - if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" { + if !e.shouldNotify("blocked:" + st.ID) { return } e.bus.PublishOutbound(bus.OutboundMessage{ @@ -231,6 +252,46 @@ func (e *Engine) sendFailureNotification(st *taskState, reason string) { }) } +func (e *Engine) shouldNotify(key string) bool { + if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" { + return false + } + now := time.Now() + if last, ok := e.lastNotify[key]; ok { + if now.Sub(last) < time.Duration(e.opts.NotifyCooldownSec)*time.Second { + return false + } + } + e.lastNotify[key] = now + return true +} + +func (e *Engine) persistStateLocked() { + items := make([]TaskItem, 0, len(e.state)) + for _, st := range e.state { + status := "todo" + switch st.Status { + case "running": + status = "doing" + case "blocked": + status = "blocked" + case "completed": + status = "done" + default: + status = "todo" + } + items = append(items, TaskItem{ + ID: st.ID, + Content: st.Content, + Priority: "normal", + Status: status, + Source: "memory_todo", + UpdatedAt: nowRFC3339(), + }) + } + _ = e.taskStore.Save(items) +} + func hashID(s string) string { sum := sha1.Sum([]byte(strings.ToLower(strings.TrimSpace(s)))) return hex.EncodeToString(sum[:])[:12] diff --git a/pkg/autonomy/task_store.go b/pkg/autonomy/task_store.go new file mode 100644 index 0000000..bf56e27 --- /dev/null +++ b/pkg/autonomy/task_store.go @@ -0,0 +1,71 @@ +package autonomy + +import ( + "encoding/json" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +type TaskItem struct { + ID string `json:"id"` + Content string `json:"content"` + Priority string `json:"priority"` + DueAt string `json:"due_at,omitempty"` + Status string `json:"status"` // todo|doing|blocked|done + Source string `json:"source"` + UpdatedAt string `json:"updated_at"` +} + +type TaskStore struct { + workspace string +} + +func NewTaskStore(workspace string) *TaskStore { + return &TaskStore{workspace: workspace} +} + +func (s *TaskStore) path() string { + return filepath.Join(s.workspace, "memory", "tasks.json") +} + +func (s *TaskStore) Load() ([]TaskItem, error) { + data, err := os.ReadFile(s.path()) + if err != nil { + if os.IsNotExist(err) { + return []TaskItem{}, nil + } + return nil, err + } + var items []TaskItem + if err := json.Unmarshal(data, &items); err != nil { + return nil, err + } + return items, nil +} + +func (s *TaskStore) Save(items []TaskItem) error { + _ = os.MkdirAll(filepath.Dir(s.path()), 0755) + sort.Slice(items, func(i, j int) bool { return items[i].UpdatedAt > items[j].UpdatedAt }) + data, err := json.MarshalIndent(items, "", " ") + if err != nil { + return err + } + return os.WriteFile(s.path(), data, 0644) +} + +func normalizeStatus(v string) string { + s := strings.ToLower(strings.TrimSpace(v)) + switch s { + case "todo", "doing", "blocked", "done": + return s + default: + return "todo" + } +} + +func nowRFC3339() string { + return time.Now().UTC().Format(time.RFC3339) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 0a8a388..295fb8d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -51,6 +51,7 @@ type AutonomyConfig struct { MaxPendingDurationSec int `json:"max_pending_duration_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MAX_PENDING_DURATION_SEC"` MaxConsecutiveStalls int `json:"max_consecutive_stalls" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MAX_CONSECUTIVE_STALLS"` MaxDispatchPerTick int `json:"max_dispatch_per_tick" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MAX_DISPATCH_PER_TICK"` + NotifyCooldownSec int `json:"notify_cooldown_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_NOTIFY_COOLDOWN_SEC"` NotifyChannel string `json:"notify_channel" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_NOTIFY_CHANNEL"` NotifyChatID string `json:"notify_chat_id" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_NOTIFY_CHAT_ID"` } @@ -303,6 +304,7 @@ func DefaultConfig() *Config { MaxPendingDurationSec: 180, MaxConsecutiveStalls: 3, MaxDispatchPerTick: 2, + NotifyCooldownSec: 300, NotifyChannel: "", NotifyChatID: "", }, diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 66865e7..bf8d2de 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -99,6 +99,9 @@ func Validate(cfg *Config) []error { if aut.MaxDispatchPerTick <= 0 { errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_dispatch_per_tick must be > 0 when enabled=true")) } + if aut.NotifyCooldownSec <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.notify_cooldown_sec must be > 0 when enabled=true")) + } } texts := cfg.Agents.Defaults.Texts if strings.TrimSpace(texts.NoResponseFallback) == "" {