mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-14 19:37:31 +08:00
track autonomy retry windows and dedupe metrics in task/status views
This commit is contained in:
@@ -39,7 +39,9 @@ type taskState struct {
|
||||
Status string // idle|running|waiting|blocked|completed
|
||||
LastRunAt time.Time
|
||||
LastAutonomyAt time.Time
|
||||
RetryAfter time.Time
|
||||
ConsecutiveStall int
|
||||
DedupeHits int
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
@@ -125,17 +127,24 @@ func (e *Engine) tick() {
|
||||
st, ok := e.state[t.ID]
|
||||
if !ok {
|
||||
status := "idle"
|
||||
retryAfter := time.Time{}
|
||||
if old, ok := storedMap[t.ID]; ok {
|
||||
if old.Status == "blocked" {
|
||||
status = "blocked"
|
||||
}
|
||||
if strings.TrimSpace(old.RetryAfter) != "" {
|
||||
if rt, err := time.Parse(time.RFC3339, old.RetryAfter); err == nil {
|
||||
retryAfter = rt
|
||||
}
|
||||
}
|
||||
}
|
||||
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status}
|
||||
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits}
|
||||
continue
|
||||
}
|
||||
st.Content = t.Content
|
||||
st.Priority = t.Priority
|
||||
st.DueAt = t.DueAt
|
||||
st.DedupeHits = t.DedupeHits
|
||||
if st.Status == "completed" {
|
||||
st.Status = "idle"
|
||||
}
|
||||
@@ -178,6 +187,9 @@ func (e *Engine) tick() {
|
||||
continue
|
||||
}
|
||||
if st.Status == "blocked" {
|
||||
if !st.RetryAfter.IsZero() && now.Before(st.RetryAfter) {
|
||||
continue
|
||||
}
|
||||
if now.Sub(st.LastRunAt) >= blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec) {
|
||||
st.Status = "idle"
|
||||
} else {
|
||||
@@ -191,6 +203,7 @@ func (e *Engine) tick() {
|
||||
st.ConsecutiveStall++
|
||||
if st.ConsecutiveStall > e.opts.MaxConsecutiveStalls {
|
||||
st.Status = "blocked"
|
||||
st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall, e.opts.MinRunIntervalSec))
|
||||
e.sendFailureNotification(st, "max consecutive stalls reached")
|
||||
continue
|
||||
}
|
||||
@@ -207,10 +220,11 @@ func (e *Engine) tick() {
|
||||
}
|
||||
|
||||
type todoItem struct {
|
||||
ID string
|
||||
Content string
|
||||
Priority string
|
||||
DueAt string
|
||||
ID string
|
||||
Content string
|
||||
Priority string
|
||||
DueAt string
|
||||
DedupeHits int
|
||||
}
|
||||
|
||||
func (e *Engine) scanTodos() []todoItem {
|
||||
@@ -229,6 +243,7 @@ func (e *Engine) scanTodos() []todoItem {
|
||||
if cur.DueAt == "" && it.DueAt != "" {
|
||||
cur.DueAt = it.DueAt
|
||||
}
|
||||
cur.DedupeHits++
|
||||
merged[it.ID] = cur
|
||||
return
|
||||
}
|
||||
@@ -411,14 +426,20 @@ func (e *Engine) persistStateLocked() {
|
||||
default:
|
||||
status = "todo"
|
||||
}
|
||||
retryAfter := ""
|
||||
if !st.RetryAfter.IsZero() {
|
||||
retryAfter = st.RetryAfter.UTC().Format(time.RFC3339)
|
||||
}
|
||||
items = append(items, TaskItem{
|
||||
ID: st.ID,
|
||||
Content: st.Content,
|
||||
Priority: st.Priority,
|
||||
DueAt: st.DueAt,
|
||||
Status: status,
|
||||
Source: "memory_todo",
|
||||
UpdatedAt: nowRFC3339(),
|
||||
ID: st.ID,
|
||||
Content: st.Content,
|
||||
Priority: st.Priority,
|
||||
DueAt: st.DueAt,
|
||||
Status: status,
|
||||
RetryAfter: retryAfter,
|
||||
Source: "memory_todo",
|
||||
DedupeHits: st.DedupeHits,
|
||||
UpdatedAt: nowRFC3339(),
|
||||
})
|
||||
}
|
||||
_ = e.taskStore.Save(items)
|
||||
|
||||
Reference in New Issue
Block a user