ekg p0: configurable threshold, auto-repair task on repeated errsig, and provider/model audit visibility

This commit is contained in:
DBT
2026-03-01 04:43:21 +00:00
parent ed9e4203d3
commit 78e3179135
7 changed files with 80 additions and 9 deletions

View File

@@ -972,7 +972,7 @@ func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.E
ImportantKeywords: cfg.Agents.Defaults.Texts.AutonomyImportantKeywords,
CompletionTemplate: cfg.Agents.Defaults.Texts.AutonomyCompletionTemplate,
BlockedTemplate: cfg.Agents.Defaults.Texts.AutonomyBlockedTemplate,
EKGConsecutiveErrorThreshold: 3,
EKGConsecutiveErrorThreshold: a.EKGConsecutiveErrorThreshold,
Workspace: cfg.WorkspacePath(),
DefaultNotifyChannel: notifyChannel,
DefaultNotifyChatID: notifyChatID,

View File

@@ -27,7 +27,8 @@
"max_rounds_without_user": 12,
"task_history_retention_days": 3,
"waiting_resume_debounce_sec": 5,
"allowed_task_keywords": []
"allowed_task_keywords": [],
"ekg_consecutive_error_threshold": 3
},
"texts": {
"no_response_fallback": "I've completed processing but have no response to give.",

View File

@@ -62,6 +62,8 @@ type AgentLoop struct {
providerNames []string
providerPool map[string]providers.LLMProvider
ekg *ekg.Engine
providerMu sync.RWMutex
sessionProvider map[string]string
}
// StartupCompactionReport provides startup memory/session maintenance stats.
@@ -239,6 +241,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
intentHints: map[string]string{},
sessionRunLocks: map[string]*sync.Mutex{},
ekg: ekg.New(workspace),
sessionProvider: map[string]string{},
}
// Initialize provider fallback chain (primary + proxy_fallbacks).
@@ -343,9 +346,9 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() {
return func() { mu.Unlock() }
}
func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, error) {
func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, string, error) {
if len(al.providerNames) <= 1 {
return nil, primaryErr
return nil, "", primaryErr
}
lastErr := primaryErr
candidates := append([]string(nil), al.providerNames[1:]...)
@@ -375,11 +378,30 @@ func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMe
}
if err == nil {
logger.WarnCF("agent", "LLM fallback provider switched", map[string]interface{}{"provider": name})
return resp, nil
return resp, name, nil
}
lastErr = err
}
return nil, lastErr
return nil, "", lastErr
}
func (al *AgentLoop) setSessionProvider(sessionKey, provider string) {
key := strings.TrimSpace(sessionKey)
if key == "" { return }
provider = strings.TrimSpace(provider)
if provider == "" { return }
al.providerMu.Lock()
al.sessionProvider[key] = provider
al.providerMu.Unlock()
}
func (al *AgentLoop) getSessionProvider(sessionKey string) string {
key := strings.TrimSpace(sessionKey)
if key == "" { return "" }
al.providerMu.RLock()
v := al.sessionProvider[key]
al.providerMu.RUnlock()
return v
}
func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) {
@@ -455,6 +477,8 @@ func (al *AgentLoop) appendTaskAuditEvent(taskID string, msg bus.InboundMessage,
"input_preview": truncate(strings.ReplaceAll(msg.Content, "\n", " "), 180),
"media_count": len(msg.MediaItems),
"media_items": msg.MediaItems,
"provider": al.getSessionProvider(msg.SessionKey),
"model": al.model,
}
if al.ekg != nil {
al.ekg.Record(ekg.Event{
@@ -591,6 +615,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
}
unlock := al.lockSessionRun(msg.SessionKey)
defer unlock()
if len(al.providerNames) > 0 {
al.setSessionProvider(msg.SessionKey, al.providerNames[0])
}
// Add message preview to log
preview := truncate(msg.Content, 80)
logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview),
@@ -771,9 +798,12 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
}
if err != nil {
if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil {
if fb, fbProvider, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil {
response = fb
err = nil
if fbProvider != "" {
al.setSessionProvider(msg.SessionKey, fbProvider)
}
} else {
err = ferr
}
@@ -1114,9 +1144,12 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, options)
if err != nil {
if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil {
if fb, fbProvider, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil {
response = fb
err = nil
if fbProvider != "" {
al.setSessionProvider(msg.SessionKey, fbProvider)
}
} else {
err = ferr
}

View File

@@ -339,6 +339,7 @@ func (e *Engine) tick() {
if advice.ShouldEscalate {
st.BlockReason = "repeated_error_signature"
st.RetryAfter = now.Add(5 * time.Minute)
e.enqueueAutoRepairTaskLocked(st, errSig)
e.sendFailureNotification(st, "repeated error signature detected; escalate")
continue
}
@@ -725,6 +726,33 @@ func (e *Engine) enqueueInferredNextTasksLocked(st *taskState) {
}
}
func (e *Engine) enqueueAutoRepairTaskLocked(st *taskState, errSig string) {
if st == nil {
return
}
errSig = strings.TrimSpace(errSig)
if errSig == "" {
errSig = "unknown_error_signature"
}
content := fmt.Sprintf("[auto-repair] 排查任务 %s 的重复错误签名并给出修复步骤errsig=%s", shortTask(st.Content), shortTask(errSig))
existing := map[string]bool{}
for _, cur := range e.state {
existing[strings.TrimSpace(cur.Content)] = true
}
items, _ := e.taskStore.Load()
for _, it := range items {
existing[strings.TrimSpace(it.Content)] = true
}
if existing[content] {
return
}
id := hashID(content)
e.state[id] = &taskState{ID: id, Content: content, Priority: "high", Status: "idle"}
items = append(items, TaskItem{ID: id, Content: content, Priority: "high", Status: "todo", Source: "autonomy_repair", UpdatedAt: nowRFC3339()})
_ = e.taskStore.Save(items)
e.writeReflectLog("infer", st, "generated auto-repair task due to repeated error signature")
}
func (e *Engine) sendFailureNotification(st *taskState, reason string) {
e.writeReflectLog("blocked", st, reason)
e.writeTriggerAudit("blocked", st, reason)

View File

@@ -59,6 +59,7 @@ type AutonomyConfig struct {
TaskHistoryRetentionDays int `json:"task_history_retention_days" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_TASK_HISTORY_RETENTION_DAYS"`
WaitingResumeDebounceSec int `json:"waiting_resume_debounce_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_WAITING_RESUME_DEBOUNCE_SEC"`
AllowedTaskKeywords []string `json:"allowed_task_keywords" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_ALLOWED_TASK_KEYWORDS"`
EKGConsecutiveErrorThreshold int `json:"ekg_consecutive_error_threshold" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_EKG_CONSECUTIVE_ERROR_THRESHOLD"`
// Deprecated: kept for backward compatibility with existing config files.
NotifyChannel string `json:"notify_channel,omitempty"`
// Deprecated: kept for backward compatibility with existing config files.
@@ -332,6 +333,7 @@ func DefaultConfig() *Config {
TaskHistoryRetentionDays: 3,
WaitingResumeDebounceSec: 5,
AllowedTaskKeywords: []string{},
EKGConsecutiveErrorThreshold: 3,
},
Texts: AgentTextConfig{
NoResponseFallback: "I've completed processing but have no response to give.",

View File

@@ -120,6 +120,9 @@ func Validate(cfg *Config) []error {
if aut.TaskHistoryRetentionDays <= 0 {
errs = append(errs, fmt.Errorf("agents.defaults.autonomy.task_history_retention_days must be > 0 when enabled=true"))
}
if aut.EKGConsecutiveErrorThreshold <= 0 {
errs = append(errs, fmt.Errorf("agents.defaults.autonomy.ekg_consecutive_error_threshold must be > 0 when enabled=true"))
}
}
texts := cfg.Agents.Defaults.Texts
if strings.TrimSpace(texts.NoResponseFallback) == "" {

View File

@@ -18,6 +18,8 @@ type TaskAuditItem = {
duration_ms?: number;
retry_count?: number;
error?: string;
provider?: string;
model?: string;
input_preview?: string;
logs?: string[];
media_items?: Array<{ source?: string; type?: string; ref?: string; path?: string; channel?: string }>;
@@ -152,7 +154,7 @@ const TaskAudit: React.FC = () => {
className={`w-full text-left px-3 py-2 border-b border-zinc-800/60 hover:bg-zinc-800/40 ${active ? 'bg-indigo-500/15' : ''}`}
>
<div className="text-sm font-medium text-zinc-100 truncate">{it.task_id || `task-${idx + 1}`}</div>
<div className="text-xs text-zinc-400 truncate">{it.channel || '-'} · {it.status} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0} · {it.source || '-'}</div>
<div className="text-xs text-zinc-400 truncate">{it.channel || '-'} · {it.status} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0} · {it.source || '-'} · {it.provider || '-'} / {it.model || '-'}</div>
<div className="text-[11px] text-zinc-500 truncate">{it.time}</div>
</button>
);
@@ -182,6 +184,8 @@ const TaskAudit: React.FC = () => {
<div><div className="text-zinc-500 text-xs">Duration</div><div>{selected.duration_ms || 0}ms</div></div>
<div><div className="text-zinc-500 text-xs">Channel</div><div>{selected.channel}</div></div>
<div><div className="text-zinc-500 text-xs">Session</div><div className="font-mono break-all">{selected.session}</div></div>
<div><div className="text-zinc-500 text-xs">Provider</div><div>{selected.provider || '-'}</div></div>
<div><div className="text-zinc-500 text-xs">Model</div><div>{selected.model || '-'}</div></div>
<div><div className="text-zinc-500 text-xs">Time</div><div>{selected.time}</div></div>
</div>