From 548f25a8a1eafb0d3b444a9039de7f5470a21af5 Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 06:41:29 +0000 Subject: [PATCH] ekg hardening: add snapshot cache and throttle memory incident writes per errsig --- README.md | 2 ++ README_EN.md | 2 ++ pkg/autonomy/engine.go | 34 ++++++++++++++++++---- pkg/ekg/engine.go | 64 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 8c039b8..8fe5134 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,8 @@ ClawGo 现已内置执行知识图谱能力(轻量 JSONL 事件流,不依赖 - 任务审计支持 provider/model 可观测 - EKG 统计按 source/channel 分层(heartbeat 与 workload 分离) - EKG 与 Memory 联动:重复错误升级时自动写入 `memory/YYYY-MM-DD.md` 与 `MEMORY.md` 的结构化 incident,后续 advice 可提前触发抑制 +- EKG 快照:自动维护 `memory/ekg-snapshot.json`,降低重启后扫描 JSONL 的开销 +- Incident 写入节流:同 errsig 默认 6 小时内不重复写 memory > 为什么需要时间窗口: > 历史全量统计会被旧数据与 heartbeat 噪音稀释,导致当前阶段决策失真。建议默认观察近 24h(或 6h/7d 可切换),让 fallback 和告警更贴近“当前”系统状态。 diff --git a/README_EN.md b/README_EN.md index ed6a95f..313457a 100644 --- a/README_EN.md +++ b/README_EN.md @@ -80,6 +80,8 @@ ClawGo now includes a built-in execution knowledge graph (lightweight JSONL even - Task-audit visibility for provider/model - Source/channel-stratified EKG stats (heartbeat separated from workload) - EKG-memory integration: repeated-error escalation writes structured incidents to `memory/YYYY-MM-DD.md` and `MEMORY.md`, and future advice can escalate earlier for known signatures +- EKG snapshot: auto-maintains `memory/ekg-snapshot.json` to reduce cold-start JSONL scanning cost +- Incident write throttling: same errsig is not re-written to memory within a default 6-hour cooldown > Why time windows matter: > Full-history stats get diluted by stale data and heartbeat noise, which degrades current decisions. A recent window (e.g., 24h, optionally 6h/7d) keeps fallback and alerts aligned with present runtime behavior. diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index 0e5e09f..25e4a2e 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -763,11 +763,33 @@ func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reason errSig = "unknown_error_signature" } marker := "[EKG_INCIDENT] errsig=" + errSig - line := fmt.Sprintf("- [EKG_INCIDENT] errsig=%s task=%s reason=%s time=%s", errSig, shortTask(st.Content), strings.Join(reasons, ";"), time.Now().UTC().Format(time.RFC3339)) - appendIfMissing := func(path string) { + now := time.Now().UTC() + line := fmt.Sprintf("- [EKG_INCIDENT] errsig=%s task=%s reason=%s time=%s", errSig, shortTask(st.Content), strings.Join(reasons, ";"), now.Format(time.RFC3339)) + cooldown := 6 * time.Hour + hasRecentIncident := func(content string) bool { + for _, ln := range strings.Split(content, "\n") { + if !strings.Contains(ln, marker) { + continue + } + idx := strings.LastIndex(ln, "time=") + if idx < 0 { + return true + } + ts := strings.TrimSpace(ln[idx+len("time="):]) + if tm, err := time.Parse(time.RFC3339, ts); err == nil { + if now.Sub(tm) < cooldown { + return true + } + continue + } + return true + } + return false + } + appendIfDue := func(path string) { _ = os.MkdirAll(filepath.Dir(path), 0755) b, _ := os.ReadFile(path) - if strings.Contains(string(b), marker) { + if hasRecentIncident(string(b)) { return } f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) @@ -777,9 +799,9 @@ func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reason defer f.Close() _, _ = f.WriteString(line + "\n") } - dayPath := filepath.Join(e.opts.Workspace, "memory", time.Now().UTC().Format("2006-01-02")+".md") - appendIfMissing(dayPath) - appendIfMissing(filepath.Join(e.opts.Workspace, "MEMORY.md")) + dayPath := filepath.Join(e.opts.Workspace, "memory", now.Format("2006-01-02")+".md") + appendIfDue(dayPath) + appendIfDue(filepath.Join(e.opts.Workspace, "MEMORY.md")) } func (e *Engine) sendFailureNotification(st *taskState, reason string) { diff --git a/pkg/ekg/engine.go b/pkg/ekg/engine.go index ec12fee..7a789e6 100644 --- a/pkg/ekg/engine.go +++ b/pkg/ekg/engine.go @@ -40,6 +40,7 @@ type Advice struct { type Engine struct { workspace string path string + snapshotPath string recentLines int consecutiveErrorThreshold int } @@ -47,7 +48,8 @@ type Engine struct { func New(workspace string) *Engine { ws := strings.TrimSpace(workspace) p := filepath.Join(ws, "memory", "ekg-events.jsonl") - return &Engine{workspace: ws, path: p, recentLines: 2000, consecutiveErrorThreshold: 3} + sp := filepath.Join(ws, "memory", "ekg-snapshot.json") + return &Engine{workspace: ws, path: p, snapshotPath: sp, recentLines: 2000, consecutiveErrorThreshold: 3} } func (e *Engine) SetConsecutiveErrorThreshold(v int) { @@ -149,14 +151,20 @@ func (e *Engine) readRecentEvents() []Event { if strings.TrimSpace(e.path) == "" { return nil } + snapshotEvents, snapshotLines := e.readSnapshot() f, err := os.Open(e.path) if err != nil { - return nil + return snapshotEvents } defer f.Close() lines := make([]string, 0, e.recentLines) s := bufio.NewScanner(f) + lineNo := 0 for s.Scan() { + lineNo++ + if lineNo <= snapshotLines { + continue + } line := strings.TrimSpace(s.Text()) if line == "" { continue @@ -166,16 +174,66 @@ func (e *Engine) readRecentEvents() []Event { lines = lines[1:] } } - out := make([]Event, 0, len(lines)) + out := append([]Event{}, snapshotEvents...) for _, l := range lines { var ev Event if json.Unmarshal([]byte(l), &ev) == nil { out = append(out, ev) } } + if len(out) > e.recentLines { + out = out[len(out)-e.recentLines:] + } + e.writeSnapshot(out, lineNo) return out } +type snapshotFile struct { + UpdatedAt string `json:"updated_at"` + LineCount int `json:"line_count"` + Events []Event `json:"events"` +} + +func (e *Engine) readSnapshot() ([]Event, int) { + if e == nil || strings.TrimSpace(e.snapshotPath) == "" { + return nil, 0 + } + b, err := os.ReadFile(e.snapshotPath) + if err != nil || len(b) == 0 { + return nil, 0 + } + var snap snapshotFile + if json.Unmarshal(b, &snap) != nil { + return nil, 0 + } + if snap.LineCount < 0 { + snap.LineCount = 0 + } + if len(snap.Events) > e.recentLines { + snap.Events = snap.Events[len(snap.Events)-e.recentLines:] + } + return snap.Events, snap.LineCount +} + +func (e *Engine) writeSnapshot(events []Event, lineCount int) { + if e == nil || strings.TrimSpace(e.snapshotPath) == "" || lineCount <= 0 { + return + } + if len(events) > e.recentLines { + events = events[len(events)-e.recentLines:] + } + snap := snapshotFile{UpdatedAt: time.Now().UTC().Format(time.RFC3339), LineCount: lineCount, Events: events} + b, err := json.MarshalIndent(snap, "", " ") + if err != nil { + return + } + _ = os.MkdirAll(filepath.Dir(e.snapshotPath), 0o755) + tmp := e.snapshotPath + ".tmp" + if os.WriteFile(tmp, append(b, '\n'), 0o644) == nil { + _ = os.Rename(tmp, e.snapshotPath) + } +} + var ( rePathNum = regexp.MustCompile(`\b\d+\b`) rePathHex = regexp.MustCompile(`\b0x[0-9a-fA-F]+\b`)