diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 4658e84..2db5fa9 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -972,6 +972,7 @@ func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.E ImportantKeywords: cfg.Agents.Defaults.Texts.AutonomyImportantKeywords, CompletionTemplate: cfg.Agents.Defaults.Texts.AutonomyCompletionTemplate, BlockedTemplate: cfg.Agents.Defaults.Texts.AutonomyBlockedTemplate, + EKGConsecutiveErrorThreshold: 3, Workspace: cfg.WorkspacePath(), DefaultNotifyChannel: notifyChannel, DefaultNotifyChatID: notifyChatID, diff --git a/docs/ekg-design.md b/docs/ekg-design.md new file mode 100644 index 0000000..47ba293 --- /dev/null +++ b/docs/ekg-design.md @@ -0,0 +1,97 @@ +# EKG 设计稿(Execution Knowledge Graph) + +> 目标:在不引入重型图数据库的前提下,为 ClawGo 提供“可审计、可回放、可降错”的执行知识图谱能力,优先降低 agent 重复报错与自治死循环。 + +## 1. 范围与阶段 + +### M1(本次实现) +- 记录执行结果事件(成功/失败/抑制)到 `memory/ekg-events.jsonl` +- 对错误文本做签名归一化(errsig) +- 在自治引擎中读取 advice:同任务同 errsig 连续失败达到阈值时,直接阻断重试(避免死循环) + +### M2(后续) +- provider/model/tool 维度的成功率建议(preferred / banned) +- channel/source 维度的策略分层 + +### M3(后续) +- WAL + 快照(snapshot) +- WebUI 可视化(errsig 热点、抑制命中率) + +--- + +## 2. 数据模型(接口草图) + +```go +type Event struct { + Time string `json:"time"` + TaskID string `json:"task_id,omitempty"` + Session string `json:"session,omitempty"` + Channel string `json:"channel,omitempty"` + Source string `json:"source,omitempty"` + Status string `json:"status"` // success|error|suppressed + ErrSig string `json:"errsig,omitempty"` + Log string `json:"log,omitempty"` +} + +type Advice struct { + ShouldEscalate bool `json:"should_escalate"` + RetryBackoffSec int `json:"retry_backoff_sec"` + Reason []string `json:"reason"` +} + +type SignalContext struct { + TaskID string + ErrSig string + Source string + Channel string +} +``` + +--- + +## 3. 存储与性能 + +- 存储:`memory/ekg-events.jsonl`(append-only) +- 读取:仅扫描最近窗口(默认 2000 行) +- 复杂度:O(N_recent) +- 设计取舍:M1 以正确性优先,后续再加入 snapshot 与索引 + +--- + +## 4. 规则(M1) + +- 错误签名归一化: + - 路径归一化 `` + - 数字归一化 `` + - hex 归一化 `` + - 空白压缩 +- 阈值规则: + - 若 `task_id + errsig` 连续 `>=3` 次 error,则 + - `ShouldEscalate=true`,自治任务进入 `blocked:repeated_error_signature` + +--- + +## 5. 接入点 + +1) `pkg/agent/loop.go` +- 在 `appendTaskAuditEvent` 处同步写入 EKG 事件(与 task-audit 同步) + +2) `pkg/autonomy/engine.go` +- 在运行结果为 error 的分支读取 EKG advice +- 命中升级条件时,直接阻断重试并标记 block reason + +--- + +## 6. 风险与回滚 + +- 风险:阈值过低导致过早阻断 +- 缓解:默认阈值 3,且仅在同 task+同 errsig 命中时触发 +- 回滚:移除 advice 判断即可恢复原重试路径 + +--- + +## 7. 验收标准(M1) + +- 能生成并追加 `memory/ekg-events.jsonl` +- 相同任务在相同错误签名下连续失败 3 次后,自治不再继续循环 dispatch +- `make test`(Docker compile)通过 diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 414400c..77c26ef 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -23,6 +23,7 @@ import ( "clawgo/pkg/bus" "clawgo/pkg/config" "clawgo/pkg/cron" + "clawgo/pkg/ekg" "clawgo/pkg/logger" "clawgo/pkg/nodes" "clawgo/pkg/providers" @@ -60,6 +61,7 @@ type AgentLoop struct { sessionRunLocks map[string]*sync.Mutex providerNames []string providerPool map[string]providers.LLMProvider + ekg *ekg.Engine } // StartupCompactionReport provides startup memory/session maintenance stats. @@ -236,6 +238,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers running: false, intentHints: map[string]string{}, sessionRunLocks: map[string]*sync.Mutex{}, + ekg: ekg.New(workspace), } // Initialize provider fallback chain (primary + proxy_fallbacks). @@ -435,6 +438,17 @@ func (al *AgentLoop) appendTaskAuditEvent(taskID string, msg bus.InboundMessage, "media_count": len(msg.MediaItems), "media_items": msg.MediaItems, } + if al.ekg != nil { + al.ekg.Record(ekg.Event{ + TaskID: taskID, + Session: msg.SessionKey, + Channel: msg.Channel, + Source: source, + Status: status, + Log: logText, + }) + } + b, _ := json.Marshal(row) f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index e5956be..0559eb3 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -16,6 +16,7 @@ import ( "time" "clawgo/pkg/bus" + "clawgo/pkg/ekg" "clawgo/pkg/lifecycle" ) @@ -41,6 +42,7 @@ type Options struct { ImportantKeywords []string CompletionTemplate string BlockedTemplate string + EKGConsecutiveErrorThreshold int } type taskState struct { @@ -75,6 +77,7 @@ type Engine struct { roundsWithoutUser int lastDailyReportDate string lastHistoryCleanupAt time.Time + ekg *ekg.Engine } func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { @@ -111,7 +114,10 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { if opts.TaskHistoryRetentionDays <= 0 { opts.TaskHistoryRetentionDays = 3 } - return &Engine{ + if opts.EKGConsecutiveErrorThreshold <= 0 { + opts.EKGConsecutiveErrorThreshold = 3 + } + eng := &Engine{ opts: opts, bus: msgBus, runner: lifecycle.NewLoopRunner(), @@ -119,7 +125,12 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { state: map[string]*taskState{}, lastNotify: map[string]time.Time{}, lockOwners: map[string]string{}, + ekg: ekg.New(opts.Workspace), } + if eng.ekg != nil { + eng.ekg.SetConsecutiveErrorThreshold(opts.EKGConsecutiveErrorThreshold) + } + return eng } func (e *Engine) Start() { @@ -319,7 +330,18 @@ func (e *Engine) tick() { continue } if outcome == "error" { + errSig := e.latestErrorSignature(st.ID, st.LastRunAt) + advice := ekg.Advice{} + if e.ekg != nil { + advice = e.ekg.GetAdvice(ekg.SignalContext{TaskID: st.ID, ErrSig: errSig, Source: "autonomy", Channel: "system"}) + } st.Status = "blocked" + if advice.ShouldEscalate { + st.BlockReason = "repeated_error_signature" + st.RetryAfter = now.Add(5 * time.Minute) + e.sendFailureNotification(st, "repeated error signature detected; escalate") + continue + } st.BlockReason = "last_run_error" st.RetryAfter = now.Add(blockedRetryBackoff(st.ConsecutiveStall+1, e.opts.MinRunIntervalSec)) e.sendFailureNotification(st, "last run ended with error") @@ -1113,6 +1135,47 @@ func (e *Engine) detectRunOutcome(taskID string, since time.Time) (string, bool) return latest, true } +func (e *Engine) latestErrorSignature(taskID string, since time.Time) string { + if e.opts.Workspace == "" || taskID == "" { + return "" + } + path := filepath.Join(e.opts.Workspace, "memory", "task-audit.jsonl") + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + sessionKey := "autonomy:" + taskID + latestAt := time.Time{} + latestErr := "" + s := bufio.NewScanner(f) + for s.Scan() { + var row map[string]interface{} + if json.Unmarshal(s.Bytes(), &row) != nil { + continue + } + if fmt.Sprintf("%v", row["session"]) != sessionKey { + continue + } + if fmt.Sprintf("%v", row["status"]) != "error" { + continue + } + ts := fmt.Sprintf("%v", row["time"]) + tm, err := time.Parse(time.RFC3339, ts) + if err != nil { + continue + } + if !since.IsZero() && tm.Before(since) { + continue + } + if latestAt.IsZero() || tm.After(latestAt) { + latestAt = tm + latestErr = fmt.Sprintf("%v", row["log"]) + } + } + return ekg.NormalizeErrorSignature(latestErr) +} + func parseTodoAttributes(content string) (priority, dueAt, normalized string) { priority = "normal" normalized = content diff --git a/pkg/ekg/engine.go b/pkg/ekg/engine.go new file mode 100644 index 0000000..0a9a2ba --- /dev/null +++ b/pkg/ekg/engine.go @@ -0,0 +1,186 @@ +package ekg + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "regexp" + "strings" + "time" +) + +type Event struct { + Time string `json:"time"` + TaskID string `json:"task_id,omitempty"` + Session string `json:"session,omitempty"` + Channel string `json:"channel,omitempty"` + Source string `json:"source,omitempty"` + Status string `json:"status"` // success|error|suppressed + ErrSig string `json:"errsig,omitempty"` + Log string `json:"log,omitempty"` +} + +type SignalContext struct { + TaskID string + ErrSig string + Source string + Channel string +} + +type Advice struct { + ShouldEscalate bool `json:"should_escalate"` + RetryBackoffSec int `json:"retry_backoff_sec"` + Reason []string `json:"reason"` +} + +type Engine struct { + path string + recentLines int + consecutiveErrorThreshold int +} + +func New(workspace string) *Engine { + p := filepath.Join(strings.TrimSpace(workspace), "memory", "ekg-events.jsonl") + return &Engine{path: p, recentLines: 2000, consecutiveErrorThreshold: 3} +} + +func (e *Engine) SetConsecutiveErrorThreshold(v int) { + if e == nil { + return + } + if v <= 0 { + v = 3 + } + e.consecutiveErrorThreshold = v +} + +func (e *Engine) Record(ev Event) { + if e == nil || strings.TrimSpace(e.path) == "" { + return + } + if strings.TrimSpace(ev.Time) == "" { + ev.Time = time.Now().UTC().Format(time.RFC3339) + } + ev.TaskID = strings.TrimSpace(ev.TaskID) + ev.Session = strings.TrimSpace(ev.Session) + ev.Channel = strings.TrimSpace(ev.Channel) + ev.Source = strings.TrimSpace(ev.Source) + ev.Status = strings.TrimSpace(strings.ToLower(ev.Status)) + if ev.ErrSig == "" && ev.Log != "" { + ev.ErrSig = NormalizeErrorSignature(ev.Log) + } + if ev.ErrSig != "" { + ev.ErrSig = NormalizeErrorSignature(ev.ErrSig) + } + _ = os.MkdirAll(filepath.Dir(e.path), 0o755) + b, err := json.Marshal(ev) + if err != nil { + return + } + f, err := os.OpenFile(e.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return + } + defer f.Close() + _, _ = f.Write(append(b, '\n')) +} + +func (e *Engine) GetAdvice(ctx SignalContext) Advice { + adv := Advice{ShouldEscalate: false, RetryBackoffSec: 30, Reason: []string{}} + if e == nil { + return adv + } + taskID := strings.TrimSpace(ctx.TaskID) + errSig := NormalizeErrorSignature(ctx.ErrSig) + if taskID == "" || errSig == "" { + return adv + } + events := e.readRecentEvents() + if len(events) == 0 { + return adv + } + consecutive := 0 + for i := len(events) - 1; i >= 0; i-- { + ev := events[i] + if strings.TrimSpace(ev.TaskID) != taskID { + continue + } + evErr := NormalizeErrorSignature(ev.ErrSig) + if evErr == "" { + evErr = NormalizeErrorSignature(ev.Log) + } + if evErr != errSig { + continue + } + if strings.ToLower(strings.TrimSpace(ev.Status)) == "error" { + consecutive++ + if consecutive >= e.consecutiveErrorThreshold { + adv.ShouldEscalate = true + adv.RetryBackoffSec = 300 + adv.Reason = append(adv.Reason, "repeated_error_signature") + adv.Reason = append(adv.Reason, "same task and error signature exceeded threshold") + return adv + } + continue + } + // Same signature but success/suppressed encountered: reset chain. + break + } + return adv +} + +func (e *Engine) readRecentEvents() []Event { + if strings.TrimSpace(e.path) == "" { + return nil + } + f, err := os.Open(e.path) + if err != nil { + return nil + } + defer f.Close() + lines := make([]string, 0, e.recentLines) + s := bufio.NewScanner(f) + for s.Scan() { + line := strings.TrimSpace(s.Text()) + if line == "" { + continue + } + lines = append(lines, line) + if len(lines) > e.recentLines { + lines = lines[1:] + } + } + out := make([]Event, 0, len(lines)) + for _, l := range lines { + var ev Event + if json.Unmarshal([]byte(l), &ev) == nil { + out = append(out, ev) + } + } + return out +} + +var ( + rePathNum = regexp.MustCompile(`\b\d+\b`) + rePathHex = regexp.MustCompile(`\b0x[0-9a-fA-F]+\b`) + rePathWin = regexp.MustCompile(`[a-zA-Z]:\\[^\s]+`) + rePathNix = regexp.MustCompile(`/[^\s]+`) + reSpace = regexp.MustCompile(`\s+`) +) + +func NormalizeErrorSignature(s string) string { + s = strings.TrimSpace(strings.ToLower(s)) + if s == "" { + return "" + } + s = rePathWin.ReplaceAllString(s, "") + s = rePathNix.ReplaceAllString(s, "") + s = rePathHex.ReplaceAllString(s, "") + s = rePathNum.ReplaceAllString(s, "") + s = reSpace.ReplaceAllString(s, " ") + if len(s) > 240 { + s = s[:240] + } + return s +}