ekg hardening: add snapshot cache and throttle memory incident writes per errsig

This commit is contained in:
DBT
2026-03-01 06:41:29 +00:00
parent cc48c028ca
commit 548f25a8a1
4 changed files with 93 additions and 9 deletions

View File

@@ -763,11 +763,33 @@ func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reason
errSig = "unknown_error_signature"
}
marker := "[EKG_INCIDENT] errsig=" + errSig
line := fmt.Sprintf("- [EKG_INCIDENT] errsig=%s task=%s reason=%s time=%s", errSig, shortTask(st.Content), strings.Join(reasons, ";"), time.Now().UTC().Format(time.RFC3339))
appendIfMissing := func(path string) {
now := time.Now().UTC()
line := fmt.Sprintf("- [EKG_INCIDENT] errsig=%s task=%s reason=%s time=%s", errSig, shortTask(st.Content), strings.Join(reasons, ";"), now.Format(time.RFC3339))
cooldown := 6 * time.Hour
hasRecentIncident := func(content string) bool {
for _, ln := range strings.Split(content, "\n") {
if !strings.Contains(ln, marker) {
continue
}
idx := strings.LastIndex(ln, "time=")
if idx < 0 {
return true
}
ts := strings.TrimSpace(ln[idx+len("time="):])
if tm, err := time.Parse(time.RFC3339, ts); err == nil {
if now.Sub(tm) < cooldown {
return true
}
continue
}
return true
}
return false
}
appendIfDue := func(path string) {
_ = os.MkdirAll(filepath.Dir(path), 0755)
b, _ := os.ReadFile(path)
if strings.Contains(string(b), marker) {
if hasRecentIncident(string(b)) {
return
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
@@ -777,9 +799,9 @@ func (e *Engine) appendMemoryIncidentLocked(st *taskState, errSig string, reason
defer f.Close()
_, _ = f.WriteString(line + "\n")
}
dayPath := filepath.Join(e.opts.Workspace, "memory", time.Now().UTC().Format("2006-01-02")+".md")
appendIfMissing(dayPath)
appendIfMissing(filepath.Join(e.opts.Workspace, "MEMORY.md"))
dayPath := filepath.Join(e.opts.Workspace, "memory", now.Format("2006-01-02")+".md")
appendIfDue(dayPath)
appendIfDue(filepath.Join(e.opts.Workspace, "MEMORY.md"))
}
func (e *Engine) sendFailureNotification(st *taskState, reason string) {

View File

@@ -40,6 +40,7 @@ type Advice struct {
type Engine struct {
workspace string
path string
snapshotPath string
recentLines int
consecutiveErrorThreshold int
}
@@ -47,7 +48,8 @@ type Engine struct {
func New(workspace string) *Engine {
ws := strings.TrimSpace(workspace)
p := filepath.Join(ws, "memory", "ekg-events.jsonl")
return &Engine{workspace: ws, path: p, recentLines: 2000, consecutiveErrorThreshold: 3}
sp := filepath.Join(ws, "memory", "ekg-snapshot.json")
return &Engine{workspace: ws, path: p, snapshotPath: sp, recentLines: 2000, consecutiveErrorThreshold: 3}
}
func (e *Engine) SetConsecutiveErrorThreshold(v int) {
@@ -149,14 +151,20 @@ func (e *Engine) readRecentEvents() []Event {
if strings.TrimSpace(e.path) == "" {
return nil
}
snapshotEvents, snapshotLines := e.readSnapshot()
f, err := os.Open(e.path)
if err != nil {
return nil
return snapshotEvents
}
defer f.Close()
lines := make([]string, 0, e.recentLines)
s := bufio.NewScanner(f)
lineNo := 0
for s.Scan() {
lineNo++
if lineNo <= snapshotLines {
continue
}
line := strings.TrimSpace(s.Text())
if line == "" {
continue
@@ -166,16 +174,66 @@ func (e *Engine) readRecentEvents() []Event {
lines = lines[1:]
}
}
out := make([]Event, 0, len(lines))
out := append([]Event{}, snapshotEvents...)
for _, l := range lines {
var ev Event
if json.Unmarshal([]byte(l), &ev) == nil {
out = append(out, ev)
}
}
if len(out) > e.recentLines {
out = out[len(out)-e.recentLines:]
}
e.writeSnapshot(out, lineNo)
return out
}
type snapshotFile struct {
UpdatedAt string `json:"updated_at"`
LineCount int `json:"line_count"`
Events []Event `json:"events"`
}
func (e *Engine) readSnapshot() ([]Event, int) {
if e == nil || strings.TrimSpace(e.snapshotPath) == "" {
return nil, 0
}
b, err := os.ReadFile(e.snapshotPath)
if err != nil || len(b) == 0 {
return nil, 0
}
var snap snapshotFile
if json.Unmarshal(b, &snap) != nil {
return nil, 0
}
if snap.LineCount < 0 {
snap.LineCount = 0
}
if len(snap.Events) > e.recentLines {
snap.Events = snap.Events[len(snap.Events)-e.recentLines:]
}
return snap.Events, snap.LineCount
}
func (e *Engine) writeSnapshot(events []Event, lineCount int) {
if e == nil || strings.TrimSpace(e.snapshotPath) == "" || lineCount <= 0 {
return
}
if len(events) > e.recentLines {
events = events[len(events)-e.recentLines:]
}
snap := snapshotFile{UpdatedAt: time.Now().UTC().Format(time.RFC3339), LineCount: lineCount, Events: events}
b, err := json.MarshalIndent(snap, "", " ")
if err != nil {
return
}
_ = os.MkdirAll(filepath.Dir(e.snapshotPath), 0o755)
tmp := e.snapshotPath + ".tmp"
if os.WriteFile(tmp, append(b, '\n'), 0o644) == nil {
_ = os.Rename(tmp, e.snapshotPath)
}
}
var (
rePathNum = regexp.MustCompile(`\b\d+\b`)
rePathHex = regexp.MustCompile(`\b0x[0-9a-fA-F]+\b`)