mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-18 12:27:29 +08:00
persist autonomy resource keys and expose lock wait stats in status
This commit is contained in:
@@ -144,11 +144,14 @@ func statusCmd() {
|
|||||||
cfg.Agents.Defaults.Autonomy.NotifyCooldownSec,
|
cfg.Agents.Defaults.Autonomy.NotifyCooldownSec,
|
||||||
cfg.Agents.Defaults.Autonomy.NotifySameReasonCooldownSec,
|
cfg.Agents.Defaults.Autonomy.NotifySameReasonCooldownSec,
|
||||||
)
|
)
|
||||||
if summary, prio, reasons, nextRetry, dedupeHits, err := collectAutonomyTaskSummary(filepath.Join(workspace, "memory", "tasks.json")); err == nil {
|
if summary, prio, reasons, nextRetry, dedupeHits, waitingLocks, lockKeys, err := collectAutonomyTaskSummary(filepath.Join(workspace, "memory", "tasks.json")); err == nil {
|
||||||
fmt.Printf("Autonomy Tasks: todo=%d doing=%d waiting=%d blocked=%d done=%d dedupe_hits=%d\n", summary["todo"], summary["doing"], summary["waiting"], summary["blocked"], summary["done"], dedupeHits)
|
fmt.Printf("Autonomy Tasks: todo=%d doing=%d waiting=%d blocked=%d done=%d dedupe_hits=%d\n", summary["todo"], summary["doing"], summary["waiting"], summary["blocked"], summary["done"], dedupeHits)
|
||||||
fmt.Printf("Autonomy Priority: high=%d normal=%d low=%d\n", prio["high"], prio["normal"], prio["low"])
|
fmt.Printf("Autonomy Priority: high=%d normal=%d low=%d\n", prio["high"], prio["normal"], prio["low"])
|
||||||
if reasons["active_user"] > 0 || reasons["manual_pause"] > 0 || reasons["max_consecutive_stalls"] > 0 {
|
if reasons["active_user"] > 0 || reasons["manual_pause"] > 0 || reasons["max_consecutive_stalls"] > 0 || reasons["resource_lock"] > 0 {
|
||||||
fmt.Printf("Autonomy Block Reasons: active_user=%d manual_pause=%d max_stalls=%d\n", reasons["active_user"], reasons["manual_pause"], reasons["max_consecutive_stalls"])
|
fmt.Printf("Autonomy Block Reasons: active_user=%d manual_pause=%d max_stalls=%d resource_lock=%d\n", reasons["active_user"], reasons["manual_pause"], reasons["max_consecutive_stalls"], reasons["resource_lock"])
|
||||||
|
}
|
||||||
|
if waitingLocks > 0 || lockKeys > 0 {
|
||||||
|
fmt.Printf("Autonomy Locks: waiting=%d unique_keys=%d\n", waitingLocks, lockKeys)
|
||||||
}
|
}
|
||||||
if nextRetry != "" {
|
if nextRetry != "" {
|
||||||
fmt.Printf("Autonomy Next Retry: %s\n", nextRetry)
|
fmt.Printf("Autonomy Next Retry: %s\n", nextRetry)
|
||||||
@@ -343,30 +346,33 @@ func collectTriggerErrorCounts(path string) (map[string]int, error) {
|
|||||||
return counts, nil
|
return counts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, map[string]int, string, int, error) {
|
func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, map[string]int, string, int, int, int, error) {
|
||||||
data, err := os.ReadFile(path)
|
data, err := os.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return map[string]int{"todo": 0, "doing": 0, "waiting": 0, "blocked": 0, "done": 0}, map[string]int{"high": 0, "normal": 0, "low": 0}, map[string]int{"active_user": 0, "manual_pause": 0, "max_consecutive_stalls": 0}, "", 0, nil
|
return map[string]int{"todo": 0, "doing": 0, "waiting": 0, "blocked": 0, "done": 0}, map[string]int{"high": 0, "normal": 0, "low": 0}, map[string]int{"active_user": 0, "manual_pause": 0, "max_consecutive_stalls": 0, "resource_lock": 0}, "", 0, 0, 0, nil
|
||||||
}
|
}
|
||||||
return nil, nil, nil, "", 0, err
|
return nil, nil, nil, "", 0, 0, 0, err
|
||||||
}
|
}
|
||||||
var items []struct {
|
var items []struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Priority string `json:"priority"`
|
Priority string `json:"priority"`
|
||||||
BlockReason string `json:"block_reason"`
|
BlockReason string `json:"block_reason"`
|
||||||
RetryAfter string `json:"retry_after"`
|
RetryAfter string `json:"retry_after"`
|
||||||
DedupeHits int `json:"dedupe_hits"`
|
DedupeHits int `json:"dedupe_hits"`
|
||||||
|
ResourceKeys []string `json:"resource_keys"`
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(data, &items); err != nil {
|
if err := json.Unmarshal(data, &items); err != nil {
|
||||||
return nil, nil, nil, "", 0, err
|
return nil, nil, nil, "", 0, 0, 0, err
|
||||||
}
|
}
|
||||||
summary := map[string]int{"todo": 0, "doing": 0, "waiting": 0, "blocked": 0, "done": 0}
|
summary := map[string]int{"todo": 0, "doing": 0, "waiting": 0, "blocked": 0, "done": 0}
|
||||||
priorities := map[string]int{"high": 0, "normal": 0, "low": 0}
|
priorities := map[string]int{"high": 0, "normal": 0, "low": 0}
|
||||||
reasons := map[string]int{"active_user": 0, "manual_pause": 0, "max_consecutive_stalls": 0}
|
reasons := map[string]int{"active_user": 0, "manual_pause": 0, "max_consecutive_stalls": 0, "resource_lock": 0}
|
||||||
nextRetry := ""
|
nextRetry := ""
|
||||||
nextRetryAt := time.Time{}
|
nextRetryAt := time.Time{}
|
||||||
totalDedupe := 0
|
totalDedupe := 0
|
||||||
|
waitingLocks := 0
|
||||||
|
lockKeySet := map[string]struct{}{}
|
||||||
for _, it := range items {
|
for _, it := range items {
|
||||||
s := strings.ToLower(strings.TrimSpace(it.Status))
|
s := strings.ToLower(strings.TrimSpace(it.Status))
|
||||||
if _, ok := summary[s]; ok {
|
if _, ok := summary[s]; ok {
|
||||||
@@ -377,6 +383,15 @@ func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, ma
|
|||||||
if _, ok := reasons[r]; ok {
|
if _, ok := reasons[r]; ok {
|
||||||
reasons[r]++
|
reasons[r]++
|
||||||
}
|
}
|
||||||
|
if s == "waiting" && r == "resource_lock" {
|
||||||
|
waitingLocks++
|
||||||
|
for _, k := range it.ResourceKeys {
|
||||||
|
kk := strings.TrimSpace(strings.ToLower(k))
|
||||||
|
if kk != "" {
|
||||||
|
lockKeySet[kk] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
p := strings.ToLower(strings.TrimSpace(it.Priority))
|
p := strings.ToLower(strings.TrimSpace(it.Priority))
|
||||||
if _, ok := priorities[p]; ok {
|
if _, ok := priorities[p]; ok {
|
||||||
priorities[p]++
|
priorities[p]++
|
||||||
@@ -392,7 +407,7 @@ func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, ma
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return summary, priorities, reasons, nextRetry, totalDedupe, nil
|
return summary, priorities, reasons, nextRetry, totalDedupe, waitingLocks, len(lockKeySet), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectSkillExecStats(path string) (int, int, int, float64, string, error) {
|
func collectSkillExecStats(path string) (int, int, int, float64, string, error) {
|
||||||
|
|||||||
@@ -184,6 +184,7 @@ func (e *Engine) tick() {
|
|||||||
if !ok {
|
if !ok {
|
||||||
status := "idle"
|
status := "idle"
|
||||||
retryAfter := time.Time{}
|
retryAfter := time.Time{}
|
||||||
|
resourceKeys := deriveResourceKeys(t.Content)
|
||||||
if old, ok := storedMap[t.ID]; ok {
|
if old, ok := storedMap[t.ID]; ok {
|
||||||
if old.Status == "blocked" {
|
if old.Status == "blocked" {
|
||||||
status = "blocked"
|
status = "blocked"
|
||||||
@@ -193,8 +194,11 @@ func (e *Engine) tick() {
|
|||||||
retryAfter = rt
|
retryAfter = rt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(old.ResourceKeys) > 0 {
|
||||||
|
resourceKeys = append([]string(nil), old.ResourceKeys...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: deriveResourceKeys(t.Content)}
|
e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Priority: t.Priority, DueAt: t.DueAt, Status: status, RetryAfter: retryAfter, DedupeHits: t.DedupeHits, ResourceKeys: resourceKeys}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
st.Content = t.Content
|
st.Content = t.Content
|
||||||
@@ -675,16 +679,17 @@ func (e *Engine) persistStateLocked() {
|
|||||||
retryAfter = st.RetryAfter.UTC().Format(time.RFC3339)
|
retryAfter = st.RetryAfter.UTC().Format(time.RFC3339)
|
||||||
}
|
}
|
||||||
items = append(items, TaskItem{
|
items = append(items, TaskItem{
|
||||||
ID: st.ID,
|
ID: st.ID,
|
||||||
Content: st.Content,
|
Content: st.Content,
|
||||||
Priority: st.Priority,
|
Priority: st.Priority,
|
||||||
DueAt: st.DueAt,
|
DueAt: st.DueAt,
|
||||||
Status: status,
|
Status: status,
|
||||||
BlockReason: st.BlockReason,
|
BlockReason: st.BlockReason,
|
||||||
RetryAfter: retryAfter,
|
RetryAfter: retryAfter,
|
||||||
Source: "memory_todo",
|
Source: "memory_todo",
|
||||||
DedupeHits: st.DedupeHits,
|
DedupeHits: st.DedupeHits,
|
||||||
UpdatedAt: nowRFC3339(),
|
ResourceKeys: append([]string(nil), st.ResourceKeys...),
|
||||||
|
UpdatedAt: nowRFC3339(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
_ = e.taskStore.Save(items)
|
_ = e.taskStore.Save(items)
|
||||||
|
|||||||
@@ -18,8 +18,9 @@ type TaskItem struct {
|
|||||||
BlockReason string `json:"block_reason,omitempty"`
|
BlockReason string `json:"block_reason,omitempty"`
|
||||||
RetryAfter string `json:"retry_after,omitempty"`
|
RetryAfter string `json:"retry_after,omitempty"`
|
||||||
Source string `json:"source"`
|
Source string `json:"source"`
|
||||||
DedupeHits int `json:"dedupe_hits,omitempty"`
|
DedupeHits int `json:"dedupe_hits,omitempty"`
|
||||||
UpdatedAt string `json:"updated_at"`
|
ResourceKeys []string `json:"resource_keys,omitempty"`
|
||||||
|
UpdatedAt string `json:"updated_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaskStore struct {
|
type TaskStore struct {
|
||||||
|
|||||||
Reference in New Issue
Block a user