ekg m2: rank fallback providers by recent success/error history and record provider outcomes

This commit is contained in:
DBT
2026-03-01 04:27:24 +00:00
parent 0e8d267d78
commit 1d5454066a
2 changed files with 67 additions and 13 deletions

View File

@@ -343,18 +343,30 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() {
return func() { mu.Unlock() } return func() { mu.Unlock() }
} }
func (al *AgentLoop) tryFallbackProviders(ctx context.Context, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, error) { func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, error) {
if len(al.providerNames) <= 1 { if len(al.providerNames) <= 1 {
return nil, primaryErr return nil, primaryErr
} }
lastErr := primaryErr lastErr := primaryErr
for i := 1; i < len(al.providerNames); i++ { candidates := append([]string(nil), al.providerNames[1:]...)
name := al.providerNames[i] if al.ekg != nil {
candidates = al.ekg.RankProviders(candidates)
}
for _, name := range candidates {
p, ok := al.providerPool[name] p, ok := al.providerPool[name]
if !ok || p == nil { if !ok || p == nil {
continue continue
} }
resp, err := p.Chat(ctx, messages, toolDefs, al.model, options) resp, err := p.Chat(ctx, messages, toolDefs, al.model, options)
if al.ekg != nil {
st := "success"
lg := "fallback provider success"
if err != nil {
st = "error"
lg = err.Error()
}
al.ekg.Record(ekg.Event{Session: msg.SessionKey, Channel: msg.Channel, Source: "provider_fallback", Status: st, Provider: name, Model: al.model, Log: lg})
}
if err == nil { if err == nil {
logger.WarnCF("agent", "LLM fallback provider switched", map[string]interface{}{"provider": name}) logger.WarnCF("agent", "LLM fallback provider switched", map[string]interface{}{"provider": name})
return resp, nil return resp, nil
@@ -753,7 +765,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
} }
if err != nil { if err != nil {
if fb, ferr := al.tryFallbackProviders(ctx, messages, providerToolDefs, options, err); ferr == nil && fb != nil { if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil {
response = fb response = fb
err = nil err = nil
} else { } else {
@@ -1096,7 +1108,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, options) response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, options)
if err != nil { if err != nil {
if fb, ferr := al.tryFallbackProviders(ctx, messages, providerToolDefs, options, err); ferr == nil && fb != nil { if fb, ferr := al.tryFallbackProviders(ctx, msg, messages, providerToolDefs, options, err); ferr == nil && fb != nil {
response = fb response = fb
err = nil err = nil
} else { } else {

View File

@@ -6,19 +6,22 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sort"
"strings" "strings"
"time" "time"
) )
type Event struct { type Event struct {
Time string `json:"time"` Time string `json:"time"`
TaskID string `json:"task_id,omitempty"` TaskID string `json:"task_id,omitempty"`
Session string `json:"session,omitempty"` Session string `json:"session,omitempty"`
Channel string `json:"channel,omitempty"` Channel string `json:"channel,omitempty"`
Source string `json:"source,omitempty"` Source string `json:"source,omitempty"`
Status string `json:"status"` // success|error|suppressed Status string `json:"status"` // success|error|suppressed
ErrSig string `json:"errsig,omitempty"` Provider string `json:"provider,omitempty"`
Log string `json:"log,omitempty"` Model string `json:"model,omitempty"`
ErrSig string `json:"errsig,omitempty"`
Log string `json:"log,omitempty"`
} }
type SignalContext struct { type SignalContext struct {
@@ -67,6 +70,8 @@ func (e *Engine) Record(ev Event) {
ev.Channel = strings.TrimSpace(ev.Channel) ev.Channel = strings.TrimSpace(ev.Channel)
ev.Source = strings.TrimSpace(ev.Source) ev.Source = strings.TrimSpace(ev.Source)
ev.Status = strings.TrimSpace(strings.ToLower(ev.Status)) ev.Status = strings.TrimSpace(strings.ToLower(ev.Status))
ev.Provider = strings.TrimSpace(ev.Provider)
ev.Model = strings.TrimSpace(ev.Model)
if ev.ErrSig == "" && ev.Log != "" { if ev.ErrSig == "" && ev.Log != "" {
ev.ErrSig = NormalizeErrorSignature(ev.Log) ev.ErrSig = NormalizeErrorSignature(ev.Log)
} }
@@ -169,6 +174,43 @@ var (
reSpace = regexp.MustCompile(`\s+`) reSpace = regexp.MustCompile(`\s+`)
) )
func (e *Engine) RankProviders(candidates []string) []string {
if len(candidates) <= 1 || e == nil {
return append([]string(nil), candidates...)
}
events := e.readRecentEvents()
score := map[string]float64{}
for _, c := range candidates {
score[c] = 0
}
for _, ev := range events {
p := strings.TrimSpace(ev.Provider)
if p == "" {
continue
}
if _, ok := score[p]; !ok {
continue
}
switch strings.ToLower(strings.TrimSpace(ev.Status)) {
case "success":
score[p] += 1.0
case "suppressed":
score[p] += 0.2
case "error":
score[p] -= 1.0
}
}
ordered := append([]string(nil), candidates...)
sort.SliceStable(ordered, func(i, j int) bool {
si, sj := score[ordered[i]], score[ordered[j]]
if si == sj {
return ordered[i] < ordered[j]
}
return si > sj
})
return ordered
}
func NormalizeErrorSignature(s string) string { func NormalizeErrorSignature(s string) string {
s = strings.TrimSpace(strings.ToLower(s)) s = strings.TrimSpace(strings.ToLower(s))
if s == "" { if s == "" {