add resource-key lock scheduling with 30s retry for autonomy task dispatch

This commit is contained in:
DBT
2026-02-24 12:04:45 +00:00
parent f05853472a
commit 149253f45c

View File

@@ -52,6 +52,7 @@ type taskState struct {
RetryAfter time.Time RetryAfter time.Time
ConsecutiveStall int ConsecutiveStall int
DedupeHits int DedupeHits int
ResourceKeys []string
} }
type Engine struct { type Engine struct {
@@ -63,6 +64,7 @@ type Engine struct {
mu sync.Mutex mu sync.Mutex
state map[string]*taskState state map[string]*taskState
lastNotify map[string]time.Time lastNotify map[string]time.Time
lockOwners map[string]string
} }
func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
@@ -100,6 +102,7 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
taskStore: NewTaskStore(opts.Workspace), taskStore: NewTaskStore(opts.Workspace),
state: map[string]*taskState{}, state: map[string]*taskState{},
lastNotify: map[string]time.Time{}, lastNotify: map[string]time.Time{},
lockOwners: map[string]string{},
} }
} }
@@ -136,6 +139,7 @@ func (e *Engine) tick() {
if e.hasManualPause() { if e.hasManualPause() {
for _, st := range e.state { for _, st := range e.state {
if st.Status == "running" { if st.Status == "running" {
e.releaseLocksLocked(st.ID)
st.Status = "waiting" st.Status = "waiting"
st.BlockReason = "manual_pause" st.BlockReason = "manual_pause"
st.WaitingSince = now st.WaitingSince = now
@@ -150,6 +154,7 @@ func (e *Engine) tick() {
if e.hasRecentUserActivity(now) { if e.hasRecentUserActivity(now) {
for _, st := range e.state { for _, st := range e.state {
if st.Status == "running" { if st.Status == "running" {
e.releaseLocksLocked(st.ID)
st.Status = "waiting" st.Status = "waiting"
st.BlockReason = "active_user" st.BlockReason = "active_user"
st.WaitingSince = now st.WaitingSince = now
@@ -188,13 +193,14 @@ func (e *Engine) tick() {
} }
} }
} }
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits} e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: deriveResourceKeys(t.Content)}
continue continue
} }
st.Content = t.Content st.Content = t.Content
st.Priority = t.Priority st.Priority = t.Priority
st.DueAt = t.DueAt st.DueAt = t.DueAt
st.DedupeHits = t.DedupeHits st.DedupeHits = t.DedupeHits
st.ResourceKeys = deriveResourceKeys(t.Content)
if st.Status == "completed" { if st.Status == "completed" {
st.Status = "idle" st.Status = "idle"
} }
@@ -204,6 +210,7 @@ func (e *Engine) tick() {
for id, st := range e.state { for id, st := range e.state {
if _, ok := known[id]; !ok { if _, ok := known[id]; !ok {
if st.Status != "completed" { if st.Status != "completed" {
e.releaseLocksLocked(st.ID)
st.Status = "completed" st.Status = "completed"
e.sendCompletionNotification(st) e.sendCompletionNotification(st)
} }
@@ -237,6 +244,9 @@ func (e *Engine) tick() {
continue continue
} }
if st.Status == "waiting" { if st.Status == "waiting" {
if st.BlockReason == "resource_lock" && !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
continue
}
// Debounce waiting/resume flapping // Debounce waiting/resume flapping
if !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) < time.Duration(e.opts.WaitingResumeDebounceSec)*time.Second { if !st.WaitingSince.IsZero() && now.Sub(st.WaitingSince) < time.Duration(e.opts.WaitingResumeDebounceSec)*time.Second {
continue continue
@@ -249,6 +259,7 @@ func (e *Engine) tick() {
e.writeTriggerAudit("resume", st, reason) e.writeTriggerAudit("resume", st, reason)
} }
if st.Status == "blocked" { if st.Status == "blocked" {
e.releaseLocksLocked(st.ID)
if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) { if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
continue continue
} }
@@ -273,6 +284,14 @@ func (e *Engine) tick() {
} }
} }
if !e.tryAcquireLocksLocked(st) {
st.Status = "waiting"
st.BlockReason = "resource_lock"
st.WaitingSince = now
st.RetryAfter = now.Add(30 * time.Second)
e.writeTriggerAudit("waiting", st, "resource_lock")
continue
}
e.dispatchTask(st) e.dispatchTask(st)
st.Status = "running" st.Status = "running"
st.BlockReason = "" st.BlockReason = ""
@@ -286,6 +305,61 @@ func (e *Engine) tick() {
e.persistStateLocked() e.persistStateLocked()
} }
func (e *Engine) tryAcquireLocksLocked(st *taskState) bool {
if st == nil {
return false
}
keys := st.ResourceKeys
if len(keys) == 0 {
keys = []string{"task:" + st.ID}
}
for _, k := range keys {
if owner, ok := e.lockOwners[k]; ok && owner != "" && owner != st.ID {
return false
}
}
for _, k := range keys {
e.lockOwners[k] = st.ID
}
return true
}
func (e *Engine) releaseLocksLocked(taskID string) {
if strings.TrimSpace(taskID) == "" {
return
}
for k, v := range e.lockOwners {
if v == taskID {
delete(e.lockOwners, k)
}
}
}
func deriveResourceKeys(content string) []string {
content = strings.TrimSpace(strings.ToLower(content))
if content == "" {
return nil
}
keys := make([]string, 0, 4)
for _, token := range strings.Fields(content) {
t := strings.Trim(token, "`'\"()[]{}:;,,。!?")
if strings.Contains(t, "/") || strings.HasSuffix(t, ".go") || strings.HasSuffix(t, ".md") || strings.HasSuffix(t, ".json") || strings.HasSuffix(t, ".yaml") || strings.HasSuffix(t, ".yml") {
keys = append(keys, "file:"+t)
}
}
if len(keys) == 0 {
keys = append(keys, "scope:general")
}
sort.Strings(keys)
uniq := keys[:0]
for i, k := range keys {
if i == 0 || k != keys[i-1] {
uniq = append(uniq, k)
}
}
return append([]string(nil), uniq...)
}
type todoItem struct { type todoItem struct {
ID string ID string
Content string Content string