Add multi-service provider mode

This commit is contained in:
lpf
2026-02-18 21:58:03 +08:00
parent 3c27a0be27
commit ddca0605c4
9 changed files with 776 additions and 171 deletions

View File

@@ -14,6 +14,7 @@ import (
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
@@ -67,25 +68,28 @@ type autonomySession struct {
}
type AgentLoop struct {
bus *bus.MessageBus
provider providers.LLMProvider
workspace string
model string
modelFallbacks []string
maxIterations int
sessions *session.SessionManager
contextBuilder *ContextBuilder
tools *tools.ToolRegistry
orchestrator *tools.Orchestrator
running atomic.Bool
compactionCfg config.ContextCompactionConfig
llmCallTimeout time.Duration
workersMu sync.Mutex
workers map[string]*sessionWorker
autoLearnMu sync.Mutex
autoLearners map[string]*autoLearner
autonomyMu sync.Mutex
autonomyBySess map[string]*autonomySession
bus *bus.MessageBus
provider providers.LLMProvider
providersByProxy map[string]providers.LLMProvider
modelsByProxy map[string][]string
proxy string
proxyFallbacks []string
workspace string
model string
maxIterations int
sessions *session.SessionManager
contextBuilder *ContextBuilder
tools *tools.ToolRegistry
orchestrator *tools.Orchestrator
running atomic.Bool
compactionCfg config.ContextCompactionConfig
llmCallTimeout time.Duration
workersMu sync.Mutex
workers map[string]*sessionWorker
autoLearnMu sync.Mutex
autoLearners map[string]*autoLearner
autonomyMu sync.Mutex
autonomyBySess map[string]*autonomySession
}
type taskExecutionDirectives struct {
@@ -218,22 +222,51 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions"))
providersByProxy, err := providers.CreateProviders(cfg)
if err != nil {
logger.WarnCF("agent", "Create providers map failed, fallback to single provider mode", map[string]interface{}{
logger.FieldError: err.Error(),
})
providersByProxy = map[string]providers.LLMProvider{
"proxy": provider,
}
}
modelsByProxy := map[string][]string{}
for _, name := range providers.ListProviderNames(cfg) {
modelsByProxy[name] = providers.GetProviderModels(cfg, name)
}
primaryProxy := strings.TrimSpace(cfg.Agents.Defaults.Proxy)
if primaryProxy == "" {
primaryProxy = "proxy"
}
if p, ok := providersByProxy[primaryProxy]; ok {
provider = p
} else if p, ok := providersByProxy["proxy"]; ok {
primaryProxy = "proxy"
provider = p
}
defaultModel := defaultModelFromModels(modelsByProxy[primaryProxy], provider)
loop := &AgentLoop{
bus: msgBus,
provider: provider,
workspace: workspace,
model: cfg.Agents.Defaults.Model,
modelFallbacks: cfg.Agents.Defaults.ModelFallbacks,
maxIterations: cfg.Agents.Defaults.MaxToolIterations,
sessions: sessionsManager,
contextBuilder: NewContextBuilder(workspace, cfg.Memory, func() []string { return toolsRegistry.GetSummaries() }),
tools: toolsRegistry,
orchestrator: orchestrator,
compactionCfg: cfg.Agents.Defaults.ContextCompaction,
llmCallTimeout: time.Duration(cfg.Providers.Proxy.TimeoutSec) * time.Second,
workers: make(map[string]*sessionWorker),
autoLearners: make(map[string]*autoLearner),
autonomyBySess: make(map[string]*autonomySession),
bus: msgBus,
provider: provider,
providersByProxy: providersByProxy,
modelsByProxy: modelsByProxy,
proxy: primaryProxy,
proxyFallbacks: parseStringList(cfg.Agents.Defaults.ProxyFallbacks),
workspace: workspace,
model: defaultModel,
maxIterations: cfg.Agents.Defaults.MaxToolIterations,
sessions: sessionsManager,
contextBuilder: NewContextBuilder(workspace, cfg.Memory, func() []string { return toolsRegistry.GetSummaries() }),
tools: toolsRegistry,
orchestrator: orchestrator,
compactionCfg: cfg.Agents.Defaults.ContextCompaction,
llmCallTimeout: time.Duration(cfg.Providers.Proxy.TimeoutSec) * time.Second,
workers: make(map[string]*sessionWorker),
autoLearners: make(map[string]*autoLearner),
autonomyBySess: make(map[string]*autonomySession),
}
// 注入递归运行逻辑,使 subagent 具备 full tool-calling 能力
@@ -1652,38 +1685,102 @@ func (al *AgentLoop) callLLMWithModelFallback(
tools []providers.ToolDefinition,
options map[string]interface{},
) (*providers.LLMResponse, error) {
candidates := al.modelCandidates()
if len(al.providersByProxy) == 0 {
candidates := al.modelCandidates()
var lastErr error
for idx, model := range candidates {
response, err := al.provider.Chat(ctx, messages, tools, model, options)
if err == nil {
if al.model != model {
logger.WarnCF("agent", "Model switched after quota/rate-limit error", map[string]interface{}{
"from_model": al.model,
"to_model": model,
})
al.model = model
}
return response, nil
}
lastErr = err
if !shouldRetryWithFallbackModel(err) {
return nil, err
}
if idx < len(candidates)-1 {
logger.DebugCF("agent", "Model request failed, trying fallback model", map[string]interface{}{
"failed_model": model,
"next_model": candidates[idx+1],
logger.FieldError: err.Error(),
})
continue
}
}
return nil, fmt.Errorf("all configured models failed; last error: %w", lastErr)
}
proxyCandidates := al.proxyCandidates()
var lastErr error
for idx, model := range candidates {
response, err := al.provider.Chat(ctx, messages, tools, model, options)
if err == nil {
if al.model != model {
logger.WarnCF("agent", "Model switched after quota/rate-limit error", map[string]interface{}{
"from_model": al.model,
"to_model": model,
})
al.model = model
}
return response, nil
}
lastErr = err
if !shouldRetryWithFallbackModel(err) {
return nil, err
}
if idx < len(candidates)-1 {
logger.DebugCF("agent", "Model request failed, trying fallback model", map[string]interface{}{
"failed_model": model,
"next_model": candidates[idx+1],
logger.FieldError: err.Error(),
})
for pidx, proxyName := range proxyCandidates {
proxyProvider, ok := al.providersByProxy[proxyName]
if !ok || proxyProvider == nil {
continue
}
modelCandidates := al.modelCandidatesForProxy(proxyName)
if len(modelCandidates) == 0 {
continue
}
for midx, model := range modelCandidates {
response, err := proxyProvider.Chat(ctx, messages, tools, model, options)
if err == nil {
if al.proxy != proxyName {
logger.WarnCF("agent", "Proxy switched after model unavailability", map[string]interface{}{
"from_proxy": al.proxy,
"to_proxy": proxyName,
})
al.proxy = proxyName
al.provider = proxyProvider
}
if al.model != model {
logger.WarnCF("agent", "Model switched after availability error", map[string]interface{}{
"from_model": al.model,
"to_model": model,
"proxy": proxyName,
})
al.model = model
}
return response, nil
}
lastErr = err
if !shouldRetryWithFallbackModel(err) {
return nil, err
}
if midx < len(modelCandidates)-1 {
logger.DebugCF("agent", "Model request failed, trying next model in proxy", map[string]interface{}{
"proxy": proxyName,
"failed_model": model,
"next_model": modelCandidates[midx+1],
logger.FieldError: err.Error(),
})
continue
}
if pidx < len(proxyCandidates)-1 {
logger.DebugCF("agent", "All models failed in proxy, trying next proxy", map[string]interface{}{
"failed_proxy": proxyName,
"next_proxy": proxyCandidates[pidx+1],
logger.FieldError: err.Error(),
})
}
}
}
return nil, fmt.Errorf("all configured models failed; last error: %w", lastErr)
return nil, fmt.Errorf("all configured proxies/models failed; last error: %w", lastErr)
}
func (al *AgentLoop) modelCandidates() []string {
@@ -1700,13 +1797,67 @@ func (al *AgentLoop) modelCandidates() []string {
}
add(al.model)
for _, m := range al.modelFallbacks {
return candidates
}
func (al *AgentLoop) modelCandidatesForProxy(proxyName string) []string {
candidates := []string{}
seen := map[string]bool{}
add := func(model string) {
m := strings.TrimSpace(model)
if m == "" || seen[m] {
return
}
seen[m] = true
candidates = append(candidates, m)
}
add(al.model)
models := al.modelsByProxy[proxyName]
for _, m := range models {
add(m)
}
return candidates
}
func (al *AgentLoop) proxyCandidates() []string {
candidates := []string{}
seen := map[string]bool{}
add := func(name string) {
n := strings.TrimSpace(name)
if n == "" || seen[n] {
return
}
if _, ok := al.providersByProxy[n]; !ok {
return
}
seen[n] = true
candidates = append(candidates, n)
}
add(al.proxy)
for _, n := range al.proxyFallbacks {
add(n)
}
rest := make([]string, 0, len(al.providersByProxy))
for name := range al.providersByProxy {
if seen[name] {
continue
}
rest = append(rest, name)
}
sort.Strings(rest)
for _, name := range rest {
add(name)
}
return candidates
}
func isQuotaOrRateLimitError(err error) bool {
if err == nil {
return false
@@ -2421,7 +2572,7 @@ func (al *AgentLoop) handleSlashCommand(ctx context.Context, msg bus.InboundMess
return true, "", fmt.Errorf("status failed: %w", err)
}
return true, fmt.Sprintf("Model: %s\nAPI Base: %s\nLogging: %v\nConfig: %s",
cfg.Agents.Defaults.Model,
al.model,
cfg.Providers.Proxy.APIBase,
cfg.Logging.Enabled,
al.getConfigPathForCommands(),
@@ -2702,18 +2853,34 @@ func (al *AgentLoop) triggerGatewayReloadFromAgent() (bool, error) {
func (al *AgentLoop) applyRuntimeModelConfig(path string, value interface{}) {
switch path {
case "agents.defaults.model":
newModel := strings.TrimSpace(fmt.Sprintf("%v", value))
if newModel == "" {
return
case "agents.defaults.proxy":
newProxy := strings.TrimSpace(fmt.Sprintf("%v", value))
if newProxy != "" {
al.proxy = newProxy
if p, ok := al.providersByProxy[newProxy]; ok {
al.provider = p
}
al.model = defaultModelFromModels(al.modelsByProxy[newProxy], al.provider)
}
al.model = newModel
case "agents.defaults.model_fallbacks":
al.modelFallbacks = parseModelFallbacks(value)
case "agents.defaults.proxy_fallbacks":
al.proxyFallbacks = parseStringList(value)
}
}
func parseModelFallbacks(value interface{}) []string {
func defaultModelFromModels(models []string, provider providers.LLMProvider) string {
for _, m := range models {
model := strings.TrimSpace(m)
if model != "" {
return model
}
}
if provider != nil {
return strings.TrimSpace(provider.GetDefaultModel())
}
return ""
}
func parseStringList(value interface{}) []string {
switch v := value.(type) {
case []string:
out := make([]string, 0, len(v))