From 1d5454066a3bfd93f8ae627b6ef6fdf11e2bc469 Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 04:27:24 +0000 Subject: [PATCH] ekg m2: rank fallback providers by recent success/error history and record provider outcomes --- pkg/agent/loop.go | 22 ++++++++++++++---- pkg/ekg/engine.go | 58 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 77c26ef..03d5bb7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -343,18 +343,30 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() { return func() { mu.Unlock() } } -func (al *AgentLoop) tryFallbackProviders(ctx context.Context, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, error) { +func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, error) { if len(al.providerNames) <= 1 { return nil, primaryErr } lastErr := primaryErr - for i := 1; i < len(al.providerNames); i++ { - name := al.providerNames[i] + candidates := append([]string(nil), al.providerNames[1:]...) + if al.ekg != nil { + candidates = al.ekg.RankProviders(candidates) + } + for _, name := range candidates { p, ok := al.providerPool[name] if !ok || p == nil { continue } resp, err := p.Chat(ctx, messages, toolDefs, al.model, options) + if al.ekg != nil { + st := "success" + lg := "fallback provider success" + if err != nil { + st = "error" + lg = err.Error() + } + al.ekg.Record(ekg.Event{Session: msg.SessionKey, Channel: msg.Channel, Source: "provider_fallback", Status: st, Provider: name, Model: al.model, Log: lg}) + } if err == nil { logger.WarnCF("agent", "LLM fallback provider switched", map[string]interface{}{"provider": name}) return resp, nil @@ -753,7 +765,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } if err != nil { - if fb, ferr := al.tryFallbackProviders(ctx, messages, providerToolDefs, options, err); ferr == nil && fb != nil { + if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil { response = fb err = nil } else { @@ -1096,7 +1108,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, options) if err != nil { - if fb, ferr := al.tryFallbackProviders(ctx, messages, providerToolDefs, options, err); ferr == nil && fb != nil { + if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil { response = fb err = nil } else { diff --git a/pkg/ekg/engine.go b/pkg/ekg/engine.go index 0a9a2ba..d7ea5ca 100644 --- a/pkg/ekg/engine.go +++ b/pkg/ekg/engine.go @@ -6,19 +6,22 @@ import ( "os" "path/filepath" "regexp" + "sort" "strings" "time" ) type Event struct { - Time string `json:"time"` - TaskID string `json:"task_id,omitempty"` - Session string `json:"session,omitempty"` - Channel string `json:"channel,omitempty"` - Source string `json:"source,omitempty"` - Status string `json:"status"` // success|error|suppressed - ErrSig string `json:"errsig,omitempty"` - Log string `json:"log,omitempty"` + Time string `json:"time"` + TaskID string `json:"task_id,omitempty"` + Session string `json:"session,omitempty"` + Channel string `json:"channel,omitempty"` + Source string `json:"source,omitempty"` + Status string `json:"status"` // success|error|suppressed + Provider string `json:"provider,omitempty"` + Model string `json:"model,omitempty"` + ErrSig string `json:"errsig,omitempty"` + Log string `json:"log,omitempty"` } type SignalContext struct { @@ -67,6 +70,8 @@ func (e *Engine) Record(ev Event) { ev.Channel = strings.TrimSpace(ev.Channel) ev.Source = strings.TrimSpace(ev.Source) ev.Status = strings.TrimSpace(strings.ToLower(ev.Status)) + ev.Provider = strings.TrimSpace(ev.Provider) + ev.Model = strings.TrimSpace(ev.Model) if ev.ErrSig == "" && ev.Log != "" { ev.ErrSig = NormalizeErrorSignature(ev.Log) } @@ -169,6 +174,43 @@ var ( reSpace = regexp.MustCompile(`\s+`) ) +func (e *Engine) RankProviders(candidates []string) []string { + if len(candidates) <= 1 || e == nil { + return append([]string(nil), candidates...) + } + events := e.readRecentEvents() + score := map[string]float64{} + for _, c := range candidates { + score[c] = 0 + } + for _, ev := range events { + p := strings.TrimSpace(ev.Provider) + if p == "" { + continue + } + if _, ok := score[p]; !ok { + continue + } + switch strings.ToLower(strings.TrimSpace(ev.Status)) { + case "success": + score[p] += 1.0 + case "suppressed": + score[p] += 0.2 + case "error": + score[p] -= 1.0 + } + } + ordered := append([]string(nil), candidates...) + sort.SliceStable(ordered, func(i, j int) bool { + si, sj := score[ordered[i]], score[ordered[j]] + if si == sj { + return ordered[i] < ordered[j] + } + return si > sj + }) + return ordered +} + func NormalizeErrorSignature(s string) string { s = strings.TrimSpace(strings.ToLower(s)) if s == "" {