ekg m1: add execution knowledge graph design + error-signature advice to suppress autonomy retry loops

This commit is contained in:
DBT
2026-03-01 04:20:24 +00:00
parent eecb90c76b
commit 0e8d267d78
5 changed files with 362 additions and 1 deletions

View File

@@ -23,6 +23,7 @@ import (
"clawgo/pkg/bus"
"clawgo/pkg/config"
"clawgo/pkg/cron"
"clawgo/pkg/ekg"
"clawgo/pkg/logger"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
@@ -60,6 +61,7 @@ type AgentLoop struct {
sessionRunLocks map[string]*sync.Mutex
providerNames []string
providerPool map[string]providers.LLMProvider
ekg *ekg.Engine
}
// StartupCompactionReport provides startup memory/session maintenance stats.
@@ -236,6 +238,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
running: false,
intentHints: map[string]string{},
sessionRunLocks: map[string]*sync.Mutex{},
ekg: ekg.New(workspace),
}
// Initialize provider fallback chain (primary + proxy_fallbacks).
@@ -435,6 +438,17 @@ func (al *AgentLoop) appendTaskAuditEvent(taskID string, msg bus.InboundMessage,
"media_count": len(msg.MediaItems),
"media_items": msg.MediaItems,
}
if al.ekg != nil {
al.ekg.Record(ekg.Event{
TaskID: taskID,
Session: msg.SessionKey,
Channel: msg.Channel,
Source: source,
Status: status,
Log: logText,
})
}
b, _ := json.Marshal(row)
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {

View File

@@ -16,6 +16,7 @@ import (
"time"
"clawgo/pkg/bus"
"clawgo/pkg/ekg"
"clawgo/pkg/lifecycle"
)
@@ -41,6 +42,7 @@ type Options struct {
ImportantKeywords []string
CompletionTemplate string
BlockedTemplate string
EKGConsecutiveErrorThreshold int
}
type taskState struct {
@@ -75,6 +77,7 @@ type Engine struct {
roundsWithoutUser int
lastDailyReportDate string
lastHistoryCleanupAt time.Time
ekg *ekg.Engine
}
func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
@@ -111,7 +114,10 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
if opts.TaskHistoryRetentionDays <= 0 {
opts.TaskHistoryRetentionDays = 3
}
return &Engine{
if opts.EKGConsecutiveErrorThreshold <= 0 {
opts.EKGConsecutiveErrorThreshold = 3
}
eng := &Engine{
opts: opts,
bus: msgBus,
runner: lifecycle.NewLoopRunner(),
@@ -119,7 +125,12 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine {
state: map[string]*taskState{},
lastNotify: map[string]time.Time{},
lockOwners: map[string]string{},
ekg: ekg.New(opts.Workspace),
}
if eng.ekg != nil {
eng.ekg.SetConsecutiveErrorThreshold(opts.EKGConsecutiveErrorThreshold)
}
return eng
}
func (e *Engine) Start() {
@@ -319,7 +330,18 @@ func (e *Engine) tick() {
continue
}
if outcome == "error" {
errSig := e.latestErrorSignature(st.ID, st.LastRunAt)
advice := ekg.Advice{}
if e.ekg != nil {
advice = e.ekg.GetAdvice(ekg.SignalContext{TaskID: st.ID, ErrSig: errSig, Source: "autonomy", Channel: "system"})
}
st.Status = "blocked"
if advice.ShouldEscalate {
st.BlockReason = "repeated_error_signature"
st.RetryAfter = now.Add(5 * time.Minute)
e.sendFailureNotification(st, "repeated error signature detected; escalate")
continue
}
st.BlockReason = "last_run_error"
st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall+1, e.opts.MinRunIntervalSec))
e.sendFailureNotification(st, "last run ended with error")
@@ -1113,6 +1135,47 @@ func (e *Engine) detectRunOutcome(taskID string, since time.Time) (string, bool)
return latest, true
}
func (e *Engine) latestErrorSignature(taskID string, since time.Time) string {
if e.opts.Workspace == "" || taskID == "" {
return ""
}
path := filepath.Join(e.opts.Workspace, "memory", "task-audit.jsonl")
f, err := os.Open(path)
if err != nil {
return ""
}
defer f.Close()
sessionKey := "autonomy:" + taskID
latestAt := time.Time{}
latestErr := ""
s := bufio.NewScanner(f)
for s.Scan() {
var row map[string]interface{}
if json.Unmarshal(s.Bytes(), &row) != nil {
continue
}
if fmt.Sprintf("%v", row["session"]) != sessionKey {
continue
}
if fmt.Sprintf("%v", row["status"]) != "error" {
continue
}
ts := fmt.Sprintf("%v", row["time"])
tm, err := time.Parse(time.RFC3339, ts)
if err != nil {
continue
}
if !since.IsZero() && tm.Before(since) {
continue
}
if latestAt.IsZero() || tm.After(latestAt) {
latestAt = tm
latestErr = fmt.Sprintf("%v", row["log"])
}
}
return ekg.NormalizeErrorSignature(latestErr)
}
func parseTodoAttributes(content string) (priority, dueAt, normalized string) {
priority = "normal"
normalized = content

186
pkg/ekg/engine.go Normal file
View File

@@ -0,0 +1,186 @@
package ekg
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"regexp"
"strings"
"time"
)
type Event struct {
Time string `json:"time"`
TaskID string `json:"task_id,omitempty"`
Session string `json:"session,omitempty"`
Channel string `json:"channel,omitempty"`
Source string `json:"source,omitempty"`
Status string `json:"status"` // success|error|suppressed
ErrSig string `json:"errsig,omitempty"`
Log string `json:"log,omitempty"`
}
type SignalContext struct {
TaskID string
ErrSig string
Source string
Channel string
}
type Advice struct {
ShouldEscalate bool `json:"should_escalate"`
RetryBackoffSec int `json:"retry_backoff_sec"`
Reason []string `json:"reason"`
}
type Engine struct {
path string
recentLines int
consecutiveErrorThreshold int
}
func New(workspace string) *Engine {
p := filepath.Join(strings.TrimSpace(workspace), "memory", "ekg-events.jsonl")
return &Engine{path: p, recentLines: 2000, consecutiveErrorThreshold: 3}
}
func (e *Engine) SetConsecutiveErrorThreshold(v int) {
if e == nil {
return
}
if v <= 0 {
v = 3
}
e.consecutiveErrorThreshold = v
}
func (e *Engine) Record(ev Event) {
if e == nil || strings.TrimSpace(e.path) == "" {
return
}
if strings.TrimSpace(ev.Time) == "" {
ev.Time = time.Now().UTC().Format(time.RFC3339)
}
ev.TaskID = strings.TrimSpace(ev.TaskID)
ev.Session = strings.TrimSpace(ev.Session)
ev.Channel = strings.TrimSpace(ev.Channel)
ev.Source = strings.TrimSpace(ev.Source)
ev.Status = strings.TrimSpace(strings.ToLower(ev.Status))
if ev.ErrSig == "" && ev.Log != "" {
ev.ErrSig = NormalizeErrorSignature(ev.Log)
}
if ev.ErrSig != "" {
ev.ErrSig = NormalizeErrorSignature(ev.ErrSig)
}
_ = os.MkdirAll(filepath.Dir(e.path), 0o755)
b, err := json.Marshal(ev)
if err != nil {
return
}
f, err := os.OpenFile(e.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return
}
defer f.Close()
_, _ = f.Write(append(b, '\n'))
}
func (e *Engine) GetAdvice(ctx SignalContext) Advice {
adv := Advice{ShouldEscalate: false, RetryBackoffSec: 30, Reason: []string{}}
if e == nil {
return adv
}
taskID := strings.TrimSpace(ctx.TaskID)
errSig := NormalizeErrorSignature(ctx.ErrSig)
if taskID == "" || errSig == "" {
return adv
}
events := e.readRecentEvents()
if len(events) == 0 {
return adv
}
consecutive := 0
for i := len(events) - 1; i >= 0; i-- {
ev := events[i]
if strings.TrimSpace(ev.TaskID) != taskID {
continue
}
evErr := NormalizeErrorSignature(ev.ErrSig)
if evErr == "" {
evErr = NormalizeErrorSignature(ev.Log)
}
if evErr != errSig {
continue
}
if strings.ToLower(strings.TrimSpace(ev.Status)) == "error" {
consecutive++
if consecutive >= e.consecutiveErrorThreshold {
adv.ShouldEscalate = true
adv.RetryBackoffSec = 300
adv.Reason = append(adv.Reason, "repeated_error_signature")
adv.Reason = append(adv.Reason, "same task and error signature exceeded threshold")
return adv
}
continue
}
// Same signature but success/suppressed encountered: reset chain.
break
}
return adv
}
func (e *Engine) readRecentEvents() []Event {
if strings.TrimSpace(e.path) == "" {
return nil
}
f, err := os.Open(e.path)
if err != nil {
return nil
}
defer f.Close()
lines := make([]string, 0, e.recentLines)
s := bufio.NewScanner(f)
for s.Scan() {
line := strings.TrimSpace(s.Text())
if line == "" {
continue
}
lines = append(lines, line)
if len(lines) > e.recentLines {
lines = lines[1:]
}
}
out := make([]Event, 0, len(lines))
for _, l := range lines {
var ev Event
if json.Unmarshal([]byte(l), &ev) == nil {
out = append(out, ev)
}
}
return out
}
var (
rePathNum = regexp.MustCompile(`\b\d+\b`)
rePathHex = regexp.MustCompile(`\b0x[0-9a-fA-F]+\b`)
rePathWin = regexp.MustCompile(`[a-zA-Z]:\\[^\s]+`)
rePathNix = regexp.MustCompile(`/[^\s]+`)
reSpace = regexp.MustCompile(`\s+`)
)
func NormalizeErrorSignature(s string) string {
s = strings.TrimSpace(strings.ToLower(s))
if s == "" {
return ""
}
s = rePathWin.ReplaceAllString(s, "<path>")
s = rePathNix.ReplaceAllString(s, "<path>")
s = rePathHex.ReplaceAllString(s, "<hex>")
s = rePathNum.ReplaceAllString(s, "<n>")
s = reSpace.ReplaceAllString(s, " ")
if len(s) > 240 {
s = s[:240]
}
return s
}