diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 2db5fa9..75ed886 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -972,7 +972,7 @@ func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.E ImportantKeywords: cfg.Agents.Defaults.Texts.AutonomyImportantKeywords, CompletionTemplate: cfg.Agents.Defaults.Texts.AutonomyCompletionTemplate, BlockedTemplate: cfg.Agents.Defaults.Texts.AutonomyBlockedTemplate, - EKGConsecutiveErrorThreshold: 3, + EKGConsecutiveErrorThreshold: a.EKGConsecutiveErrorThreshold, Workspace: cfg.WorkspacePath(), DefaultNotifyChannel: notifyChannel, DefaultNotifyChatID: notifyChatID, diff --git a/config.example.json b/config.example.json index fb8eaf5..055519e 100644 --- a/config.example.json +++ b/config.example.json @@ -27,7 +27,8 @@ "max_rounds_without_user": 12, "task_history_retention_days": 3, "waiting_resume_debounce_sec": 5, - "allowed_task_keywords": [] + "allowed_task_keywords": [], + "ekg_consecutive_error_threshold": 3 }, "texts": { "no_response_fallback": "I've completed processing but have no response to give.", diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 778d9e1..feac6f9 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -62,6 +62,8 @@ type AgentLoop struct { providerNames []string providerPool map[string]providers.LLMProvider ekg *ekg.Engine + providerMu sync.RWMutex + sessionProvider map[string]string } // StartupCompactionReport provides startup memory/session maintenance stats. @@ -239,6 +241,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers intentHints: map[string]string{}, sessionRunLocks: map[string]*sync.Mutex{}, ekg: ekg.New(workspace), + sessionProvider: map[string]string{}, } // Initialize provider fallback chain (primary + proxy_fallbacks). @@ -343,9 +346,9 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() { return func() { mu.Unlock() } } -func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, error) { +func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, string, error) { if len(al.providerNames) <= 1 { - return nil, primaryErr + return nil, "", primaryErr } lastErr := primaryErr candidates := append([]string(nil), al.providerNames[1:]...) @@ -375,11 +378,30 @@ func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMe } if err == nil { logger.WarnCF("agent", "LLM fallback provider switched", map[string]interface{}{"provider": name}) - return resp, nil + return resp, name, nil } lastErr = err } - return nil, lastErr + return nil, "", lastErr +} + +func (al *AgentLoop) setSessionProvider(sessionKey, provider string) { + key := strings.TrimSpace(sessionKey) + if key == "" { return } + provider = strings.TrimSpace(provider) + if provider == "" { return } + al.providerMu.Lock() + al.sessionProvider[key] = provider + al.providerMu.Unlock() +} + +func (al *AgentLoop) getSessionProvider(sessionKey string) string { + key := strings.TrimSpace(sessionKey) + if key == "" { return "" } + al.providerMu.RLock() + v := al.sessionProvider[key] + al.providerMu.RUnlock() + return v } func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) { @@ -455,6 +477,8 @@ func (al *AgentLoop) appendTaskAuditEvent(taskID string, msg bus.InboundMessage, "input_preview": truncate(strings.ReplaceAll(msg.Content, "\n", " "), 180), "media_count": len(msg.MediaItems), "media_items": msg.MediaItems, + "provider": al.getSessionProvider(msg.SessionKey), + "model": al.model, } if al.ekg != nil { al.ekg.Record(ekg.Event{ @@ -591,6 +615,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } unlock := al.lockSessionRun(msg.SessionKey) defer unlock() + if len(al.providerNames) > 0 { + al.setSessionProvider(msg.SessionKey, al.providerNames[0]) + } // Add message preview to log preview := truncate(msg.Content, 80) logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview), @@ -771,9 +798,12 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } if err != nil { - if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil { + if fb, fbProvider, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil { response = fb err = nil + if fbProvider != "" { + al.setSessionProvider(msg.SessionKey, fbProvider) + } } else { err = ferr } @@ -1114,9 +1144,12 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, options) if err != nil { - if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil { + if fb, fbProvider, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil { response = fb err = nil + if fbProvider != "" { + al.setSessionProvider(msg.SessionKey, fbProvider) + } } else { err = ferr } diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index 0559eb3..78034a9 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -339,6 +339,7 @@ func (e *Engine) tick() { if advice.ShouldEscalate { st.BlockReason = "repeated_error_signature" st.RetryAfter = now.Add(5 * time.Minute) + e.enqueueAutoRepairTaskLocked(st, errSig) e.sendFailureNotification(st, "repeated error signature detected; escalate") continue } @@ -725,6 +726,33 @@ func (e *Engine) enqueueInferredNextTasksLocked(st *taskState) { } } +func (e *Engine) enqueueAutoRepairTaskLocked(st *taskState, errSig string) { + if st == nil { + return + } + errSig = strings.TrimSpace(errSig) + if errSig == "" { + errSig = "unknown_error_signature" + } + content := fmt.Sprintf("[auto-repair] 排查任务 %s 的重复错误签名并给出修复步骤(errsig=%s)", shortTask(st.Content), shortTask(errSig)) + existing := map[string]bool{} + for _, cur := range e.state { + existing[strings.TrimSpace(cur.Content)] = true + } + items, _ := e.taskStore.Load() + for _, it := range items { + existing[strings.TrimSpace(it.Content)] = true + } + if existing[content] { + return + } + id := hashID(content) + e.state[id] = &taskState{ID: id, Content: content, Priority: "high", Status: "idle"} + items = append(items, TaskItem{ID: id, Content: content, Priority: "high", Status: "todo", Source: "autonomy_repair", UpdatedAt: nowRFC3339()}) + _ = e.taskStore.Save(items) + e.writeReflectLog("infer", st, "generated auto-repair task due to repeated error signature") +} + func (e *Engine) sendFailureNotification(st *taskState, reason string) { e.writeReflectLog("blocked", st, reason) e.writeTriggerAudit("blocked", st, reason) diff --git a/pkg/config/config.go b/pkg/config/config.go index eb00e94..49e4f69 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -59,6 +59,7 @@ type AutonomyConfig struct { TaskHistoryRetentionDays int `json:"task_history_retention_days" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_TASK_HISTORY_RETENTION_DAYS"` WaitingResumeDebounceSec int `json:"waiting_resume_debounce_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_WAITING_RESUME_DEBOUNCE_SEC"` AllowedTaskKeywords []string `json:"allowed_task_keywords" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_ALLOWED_TASK_KEYWORDS"` + EKGConsecutiveErrorThreshold int `json:"ekg_consecutive_error_threshold" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_EKG_CONSECUTIVE_ERROR_THRESHOLD"` // Deprecated: kept for backward compatibility with existing config files. NotifyChannel string `json:"notify_channel,omitempty"` // Deprecated: kept for backward compatibility with existing config files. @@ -332,6 +333,7 @@ func DefaultConfig() *Config { TaskHistoryRetentionDays: 3, WaitingResumeDebounceSec: 5, AllowedTaskKeywords: []string{}, + EKGConsecutiveErrorThreshold: 3, }, Texts: AgentTextConfig{ NoResponseFallback: "I've completed processing but have no response to give.", diff --git a/pkg/config/validate.go b/pkg/config/validate.go index fd63654..1108b64 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -120,6 +120,9 @@ func Validate(cfg *Config) []error { if aut.TaskHistoryRetentionDays <= 0 { errs = append(errs, fmt.Errorf("agents.defaults.autonomy.task_history_retention_days must be > 0 when enabled=true")) } + if aut.EKGConsecutiveErrorThreshold <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.ekg_consecutive_error_threshold must be > 0 when enabled=true")) + } } texts := cfg.Agents.Defaults.Texts if strings.TrimSpace(texts.NoResponseFallback) == "" { diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index ab8617b..5654e8a 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -18,6 +18,8 @@ type TaskAuditItem = { duration_ms?: number; retry_count?: number; error?: string; + provider?: string; + model?: string; input_preview?: string; logs?: string[]; media_items?: Array<{ source?: string; type?: string; ref?: string; path?: string; channel?: string }>; @@ -152,7 +154,7 @@ const TaskAudit: React.FC = () => { className={`w-full text-left px-3 py-2 border-b border-zinc-800/60 hover:bg-zinc-800/40 ${active ? 'bg-indigo-500/15' : ''}`} >
{it.task_id || `task-${idx + 1}`}
-
{it.channel || '-'} · {it.status} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0} · {it.source || '-'}
+
{it.channel || '-'} · {it.status} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0} · {it.source || '-'} · {it.provider || '-'} / {it.model || '-'}
{it.time}
); @@ -182,6 +184,8 @@ const TaskAudit: React.FC = () => {
Duration
{selected.duration_ms || 0}ms
Channel
{selected.channel}
Session
{selected.session}
+
Provider
{selected.provider || '-'}
+
Model
{selected.model || '-'}
Time
{selected.time}