diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index f600de9..2a44858 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -52,6 +52,7 @@ type taskState struct { RetryAfter time.Time ConsecutiveStall int DedupeHits int + ResourceKeys []string } type Engine struct { @@ -63,6 +64,7 @@ type Engine struct { mu sync.Mutex state map[string]*taskState lastNotify map[string]time.Time + lockOwners map[string]string } func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { @@ -100,6 +102,7 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { taskStore: NewTaskStore(opts.Workspace), state: map[string]*taskState{}, lastNotify: map[string]time.Time{}, + lockOwners: map[string]string{}, } } @@ -136,6 +139,7 @@ func (e *Engine) tick() { if e.hasManualPause() { for _, st := range e.state { if st.Status == "running" { + e.releaseLocksLocked(st.ID) st.Status = "waiting" st.BlockReason = "manual_pause" st.WaitingSince = now @@ -150,6 +154,7 @@ func (e *Engine) tick() { if e.hasRecentUserActivity(now) { for _, st := range e.state { if st.Status == "running" { + e.releaseLocksLocked(st.ID) st.Status = "waiting" st.BlockReason = "active_user" st.WaitingSince = now @@ -188,13 +193,14 @@ func (e *Engine) tick() { } } } - e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits} + e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: deriveResourceKeys(t.Content)} continue } st.Content = t.Content st.Priority = t.Priority st.DueAt = t.DueAt st.DedupeHits = t.DedupeHits + st.ResourceKeys = deriveResourceKeys(t.Content) if st.Status == "completed" { st.Status = "idle" } @@ -204,6 +210,7 @@ func (e *Engine) tick() { for id, st := range e.state { if _, ok := known[id]; !ok { if st.Status != "completed" { + e.releaseLocksLocked(st.ID) st.Status = "completed" e.sendCompletionNotification(st) } @@ -237,6 +244,9 @@ func (e *Engine) tick() { continue } if st.Status == "waiting" { + if st.BlockReason == "resource_lock" && !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) { + continue + } // Debounce waiting/resume flapping if !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) < time.Duration(e.opts.WaitingResumeDebounceSec)*time.Second { continue @@ -249,6 +259,7 @@ func (e *Engine) tick() { e.writeTriggerAudit("resume", st, reason) } if st.Status == "blocked" { + e.releaseLocksLocked(st.ID) if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) { continue } @@ -273,6 +284,14 @@ func (e *Engine) tick() { } } + if !e.tryAcquireLocksLocked(st) { + st.Status = "waiting" + st.BlockReason = "resource_lock" + st.WaitingSince = now + st.RetryAfter = now.Add(30 * time.Second) + e.writeTriggerAudit("waiting", st, "resource_lock") + continue + } e.dispatchTask(st) st.Status = "running" st.BlockReason = "" @@ -286,6 +305,61 @@ func (e *Engine) tick() { e.persistStateLocked() } +func (e *Engine) tryAcquireLocksLocked(st *taskState) bool { + if st == nil { + return false + } + keys := st.ResourceKeys + if len(keys) == 0 { + keys = []string{"task:" + st.ID} + } + for _, k := range keys { + if owner, ok := e.lockOwners[k]; ok && owner != "" && owner != st.ID { + return false + } + } + for _, k := range keys { + e.lockOwners[k] = st.ID + } + return true +} + +func (e *Engine) releaseLocksLocked(taskID string) { + if strings.TrimSpace(taskID) == "" { + return + } + for k, v := range e.lockOwners { + if v == taskID { + delete(e.lockOwners, k) + } + } +} + +func deriveResourceKeys(content string) []string { + content = strings.TrimSpace(strings.ToLower(content)) + if content == "" { + return nil + } + keys := make([]string, 0, 4) + for _, token := range strings.Fields(content) { + t := strings.Trim(token, "`'\"()[]{}:;,,。!?") + if strings.Contains(t, "/") || strings.HasSuffix(t, ".go") || strings.HasSuffix(t, ".md") || strings.HasSuffix(t, ".json") || strings.HasSuffix(t, ".yaml") || strings.HasSuffix(t, ".yml") { + keys = append(keys, "file:"+t) + } + } + if len(keys) == 0 { + keys = append(keys, "scope:general") + } + sort.Strings(keys) + uniq := keys[:0] + for i, k := range keys { + if i == 0 || k != keys[i-1] { + uniq = append(uniq, k) + } + } + return append([]string(nil), uniq...) +} + type todoItem struct { ID string Content string