From cc48c028cadeb8fbbff2e6a04b5ff33ed09f9e48 Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 06:21:04 +0000 Subject: [PATCH] ekg-memory integration: record repeated-error incidents to memory and use memory-linked signatures for earlier escalation --- README.md | 1 + README_EN.md | 1 + pkg/autonomy/engine.go | 29 +++++++++++++++++++++++++++++ pkg/ekg/engine.go | 41 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a40b1cd..8c039b8 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ ClawGo 现已内置执行知识图谱能力(轻量 JSONL 事件流,不依赖 - provider fallback 按历史效果排序(含 errsig-aware) - 任务审计支持 provider/model 可观测 - EKG 统计按 source/channel 分层(heartbeat 与 workload 分离) +- EKG 与 Memory 联动:重复错误升级时自动写入 `memory/YYYY-MM-DD.md` 与 `MEMORY.md` 的结构化 incident,后续 advice 可提前触发抑制 > 为什么需要时间窗口: > 历史全量统计会被旧数据与 heartbeat 噪音稀释,导致当前阶段决策失真。建议默认观察近 24h(或 6h/7d 可切换),让 fallback 和告警更贴近“当前”系统状态。 diff --git a/README_EN.md b/README_EN.md index 82110ad..ed6a95f 100644 --- a/README_EN.md +++ b/README_EN.md @@ -79,6 +79,7 @@ ClawGo now includes a built-in execution knowledge graph (lightweight JSONL even - Provider fallback ranking by historical outcomes (errsig-aware) - Task-audit visibility for provider/model - Source/channel-stratified EKG stats (heartbeat separated from workload) +- EKG-memory integration: repeated-error escalation writes structured incidents to `memory/YYYY-MM-DD.md` and `MEMORY.md`, and future advice can escalate earlier for known signatures > Why time windows matter: > Full-history stats get diluted by stale data and heartbeat noise, which degrades current decisions. A recent window (e.g., 24h, optionally 6h/7d) keeps fallback and alerts aligned with present runtime behavior. diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index 78034a9..0e5e09f 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -340,6 +340,7 @@ func (e *Engine) tick() { st.BlockReason = "repeated_error_signature" st.RetryAfter = now.Add(5 * time.Minute) e.enqueueAutoRepairTaskLocked(st, errSig) + e.appendMemoryIncidentLocked(st, errSig, advice.Reason) e.sendFailureNotification(st, "repeated error signature detected; escalate") continue } @@ -753,6 +754,34 @@ func (e *Engine) enqueueAutoRepairTaskLocked(st *taskState, errSig string) { e.writeReflectLog("infer", st, "generated auto-repair task due to repeated error signature") } +func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reasons []string) { + if st == nil || strings.TrimSpace(e.opts.Workspace) == "" { + return + } + errSig = ekg.NormalizeErrorSignature(errSig) + if errSig == "" { + errSig = "unknown_error_signature" + } + marker := "[EKG_INCIDENT] errsig=" + errSig + line := fmt.Sprintf("- [EKG_INCIDENT] errsig=%s task=%s reason=%s time=%s", errSig, shortTask(st.Content), strings.Join(reasons, ";"), time.Now().UTC().Format(time.RFC3339)) + appendIfMissing := func(path string) { + _ = os.MkdirAll(filepath.Dir(path), 0755) + b, _ := os.ReadFile(path) + if strings.Contains(string(b), marker) { + return + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return + } + defer f.Close() + _, _ = f.WriteString(line + "\n") + } + dayPath := filepath.Join(e.opts.Workspace, "memory", time.Now().UTC().Format("2006-01-02")+".md") + appendIfMissing(dayPath) + appendIfMissing(filepath.Join(e.opts.Workspace, "MEMORY.md")) +} + func (e *Engine) sendFailureNotification(st *taskState, reason string) { e.writeReflectLog("blocked", st, reason) e.writeTriggerAudit("blocked", st, reason) diff --git a/pkg/ekg/engine.go b/pkg/ekg/engine.go index 9017167..ec12fee 100644 --- a/pkg/ekg/engine.go +++ b/pkg/ekg/engine.go @@ -38,14 +38,16 @@ type Advice struct { } type Engine struct { + workspace string path string recentLines int consecutiveErrorThreshold int } func New(workspace string) *Engine { - p := filepath.Join(strings.TrimSpace(workspace), "memory", "ekg-events.jsonl") - return &Engine{path: p, recentLines: 2000, consecutiveErrorThreshold: 3} + ws := strings.TrimSpace(workspace) + p := filepath.Join(ws, "memory", "ekg-events.jsonl") + return &Engine{workspace: ws, path: p, recentLines: 2000, consecutiveErrorThreshold: 3} } func (e *Engine) SetConsecutiveErrorThreshold(v int) { @@ -127,6 +129,14 @@ func (e *Engine) GetAdvice(ctx SignalContext) Advice { adv.Reason = append(adv.Reason, "same task and error signature exceeded threshold") return adv } + // Memory-linked fast path: if this errsig was documented as incident, escalate one step earlier. + if consecutive >= e.consecutiveErrorThreshold-1 && e.hasMemoryIncident(errSig) { + adv.ShouldEscalate = true + adv.RetryBackoffSec = 300 + adv.Reason = append(adv.Reason, "memory_linked_repeated_error_signature") + adv.Reason = append(adv.Reason, "same errsig already recorded in memory incident") + return adv + } continue } // Same signature but success/suppressed encountered: reset chain. @@ -225,6 +235,33 @@ func (e *Engine) RankProvidersForError(candidates []string, errSig string) []str return ordered } +func (e *Engine) hasMemoryIncident(errSig string) bool { + if e == nil || strings.TrimSpace(e.workspace) == "" { + return false + } + errSig = NormalizeErrorSignature(errSig) + if errSig == "" { + return false + } + needle := "[EKG_INCIDENT]" + candidates := []string{ + filepath.Join(e.workspace, "MEMORY.md"), + filepath.Join(e.workspace, "memory", time.Now().UTC().Format("2006-01-02")+".md"), + filepath.Join(e.workspace, "memory", time.Now().UTC().AddDate(0, 0, -1).Format("2006-01-02")+".md"), + } + for _, p := range candidates { + b, err := os.ReadFile(p) + if err != nil || len(b) == 0 { + continue + } + txt := strings.ToLower(string(b)) + if strings.Contains(txt, strings.ToLower(needle)) && strings.Contains(txt, errSig) { + return true + } + } + return false +} + func NormalizeErrorSignature(s string) string { s = strings.TrimSpace(strings.ToLower(s)) if s == "" {