diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index e47efce..e5bdec8 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -63,6 +63,8 @@ const finalizeHeuristicLowThreshold = 0.48 const reflectionCooldownRounds = 2 const toolSummaryMaxRecords = 4 const maxSelfRepairPasses = 2 +const compactionAttemptTimeout = 8 * time.Second +const compactionRetryPerCandidate = 1 type sessionWorker struct { queue chan bus.InboundMessage @@ -2256,6 +2258,23 @@ func addTokenUsageToContext(ctx context.Context, usage *providers.UsageInfo) { } func formatTokenUsageSuffix(totals *tokenUsageTotals) string { + formatTokenUnit := func(v int) string { + switch { + case v >= 1_000_000: + f := float64(v) / 1_000_000 + s := strconv.FormatFloat(f, 'f', 1, 64) + s = strings.TrimSuffix(s, ".0") + return s + "m" + case v >= 1_000: + f := float64(v) / 1_000 + s := strconv.FormatFloat(f, 'f', 1, 64) + s = strings.TrimSuffix(s, ".0") + return s + "k" + default: + return strconv.Itoa(v) + } + } + input := 0 output := 0 total := 0 @@ -2264,8 +2283,8 @@ func formatTokenUsageSuffix(totals *tokenUsageTotals) string { output = totals.output total = totals.total } - return fmt.Sprintf("\n\nUsage: in %d, out %d, total %d", - input, output, total) + return fmt.Sprintf("\n\nUsage: in %s, out %s, total %s", + formatTokenUnit(input), formatTokenUnit(output), formatTokenUnit(total)) } func withUserLanguageHint(ctx context.Context, sessionKey, content string) context.Context { @@ -4556,7 +4575,7 @@ func isForbiddenModelPermissionError(err error) bool { } func shouldRetryWithFallbackModel(err error) bool { - return isQuotaOrRateLimitError(err) || isModelProviderSelectionError(err) || isForbiddenModelPermissionError(err) || isGatewayTransientError(err) || isUpstreamAuthRoutingError(err) + return isQuotaOrRateLimitError(err) || isModelProviderSelectionError(err) || isForbiddenModelPermissionError(err) || isGatewayTransientError(err) || isUpstreamAuthRoutingError(err) || isRequestTimeoutOrTransientNetworkError(err) } func isGatewayTransientError(err error) bool { @@ -4608,6 +4627,34 @@ func isUpstreamAuthRoutingError(err error) bool { return false } +func isRequestTimeoutOrTransientNetworkError(err error) bool { + if err == nil { + return false + } + + msg := strings.ToLower(err.Error()) + keywords := []string{ + "context deadline exceeded", + "deadline exceeded", + "client.timeout exceeded", + "i/o timeout", + "connection reset by peer", + "connection refused", + "unexpected eof", + "eof", + "no such host", + "network is unreachable", + "temporary network error", + } + + for _, keyword := range keywords { + if strings.Contains(msg, keyword) { + return true + } + } + return false +} + func buildProviderToolDefs(toolDefs []map[string]interface{}) ([]providers.ToolDefinition, error) { providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) for i, td := range toolDefs { @@ -4738,21 +4785,21 @@ func (al *AgentLoop) buildCompactedSummary( } if mode == "responses_compact" || mode == "hybrid" { - if compactor, ok := al.provider.(providers.ResponsesCompactor); ok && compactor.SupportsResponsesCompact() { - compactSummary, err := compactor.BuildSummaryViaResponsesCompact(ctx, al.model, existingSummary, messages, maxSummaryChars) - if err == nil && strings.TrimSpace(compactSummary) != "" { - if mode == "responses_compact" { - return compactSummary, nil - } - existingSummary = strings.TrimSpace(existingSummary + "\n\n" + compactSummary) - } else if mode == "responses_compact" { - if err != nil { - return "", err - } - return "", fmt.Errorf("responses_compact produced empty summary") + compactSummary, err := al.buildSummaryViaResponsesCompactWithFallback(ctx, existingSummary, messages, maxSummaryChars) + if err == nil && strings.TrimSpace(compactSummary) != "" { + if mode == "responses_compact" { + return compactSummary, nil } + existingSummary = strings.TrimSpace(existingSummary + "\n\n" + compactSummary) } else if mode == "responses_compact" { - return "", fmt.Errorf("responses_compact mode requires provider support and protocol=responses") + if err != nil { + return "", err + } + return "", fmt.Errorf("responses_compact produced empty summary") + } else if err != nil { + logger.DebugCF("agent", "responses_compact failed in hybrid mode, fallback to summary mode", map[string]interface{}{ + logger.FieldError: err.Error(), + }) } } @@ -4773,6 +4820,146 @@ func (al *AgentLoop) buildCompactedSummary( return resp.Content, nil } +func (al *AgentLoop) buildSummaryViaResponsesCompactWithFallback( + ctx context.Context, + existingSummary string, + messages []providers.Message, + maxSummaryChars int, +) (string, error) { + if len(al.providersByProxy) == 0 { + compactor, ok := al.provider.(providers.ResponsesCompactor) + if !ok || !compactor.SupportsResponsesCompact() { + return "", fmt.Errorf("responses_compact mode requires provider support and protocol=responses") + } + + modelCandidates := al.modelCandidates() + var lastErr error + for _, model := range modelCandidates { + summary, err := al.tryResponsesCompactionOnCandidate(ctx, compactor, model, existingSummary, messages, maxSummaryChars) + if err == nil { + if al.model != model { + logger.WarnCF("agent", "Compaction model switched after availability error", map[string]interface{}{ + "from_model": al.model, + "to_model": model, + }) + al.model = model + } + return summary, nil + } + lastErr = err + if !shouldRetryWithFallbackModel(err) { + return "", err + } + } + if lastErr != nil { + return "", fmt.Errorf("all configured models failed for responses_compact; last error: %w", lastErr) + } + return "", fmt.Errorf("responses_compact produced empty summary") + } + + proxyCandidates := al.proxyCandidates() + var lastErr error + foundCompactor := false + for _, proxyName := range proxyCandidates { + proxyProvider, ok := al.providersByProxy[proxyName] + if !ok || proxyProvider == nil { + continue + } + compactor, ok := proxyProvider.(providers.ResponsesCompactor) + if !ok || !compactor.SupportsResponsesCompact() { + continue + } + foundCompactor = true + modelCandidates := al.modelCandidatesForProxy(proxyName) + for _, model := range modelCandidates { + summary, err := al.tryResponsesCompactionOnCandidate(ctx, compactor, model, existingSummary, messages, maxSummaryChars) + if err == nil { + if al.proxy != proxyName { + logger.WarnCF("agent", "Compaction proxy switched after availability error", map[string]interface{}{ + "from_proxy": al.proxy, + "to_proxy": proxyName, + }) + al.proxy = proxyName + al.provider = proxyProvider + } + if al.model != model { + logger.WarnCF("agent", "Compaction model switched after availability error", map[string]interface{}{ + "from_model": al.model, + "to_model": model, + "proxy": proxyName, + }) + al.model = model + } + return summary, nil + } + lastErr = err + if !shouldRetryWithFallbackModel(err) { + return "", err + } + } + } + if !foundCompactor { + return "", fmt.Errorf("responses_compact mode requires provider support and protocol=responses") + } + if lastErr != nil { + return "", fmt.Errorf("all configured proxies/models failed for responses_compact; last error: %w", lastErr) + } + return "", fmt.Errorf("responses_compact produced empty summary") +} + +func (al *AgentLoop) tryResponsesCompactionOnCandidate( + ctx context.Context, + compactor providers.ResponsesCompactor, + model string, + existingSummary string, + messages []providers.Message, + maxSummaryChars int, +) (string, error) { + var lastErr error + for attempt := 0; attempt <= compactionRetryPerCandidate; attempt++ { + remaining := remainingTimeForCompaction(ctx) + if remaining <= 0 { + if lastErr != nil { + return "", lastErr + } + return "", context.DeadlineExceeded + } + attemptCtx, cancel := context.WithTimeout(ctx, remaining) + summary, err := compactor.BuildSummaryViaResponsesCompact(attemptCtx, model, existingSummary, messages, maxSummaryChars) + cancel() + if err == nil { + if strings.TrimSpace(summary) == "" { + return "", fmt.Errorf("responses_compact produced empty summary") + } + return summary, nil + } + lastErr = err + if attempt < compactionRetryPerCandidate && shouldRetryWithFallbackModel(err) { + continue + } + return "", err + } + if lastErr != nil { + return "", lastErr + } + return "", fmt.Errorf("responses_compact failed") +} + +func remainingTimeForCompaction(ctx context.Context) time.Duration { + deadline, ok := ctx.Deadline() + if !ok { + return compactionAttemptTimeout + } + remaining := time.Until(deadline) + if remaining <= 0 { + return 0 + } + if remaining > compactionAttemptTimeout { + return compactionAttemptTimeout + } + return remaining +} + func normalizeCompactionMode(raw string) string { switch strings.TrimSpace(raw) { case "", "summary": diff --git a/pkg/agent/loop_compaction_test.go b/pkg/agent/loop_compaction_test.go index f6a14a8..ac5750a 100644 --- a/pkg/agent/loop_compaction_test.go +++ b/pkg/agent/loop_compaction_test.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "strings" "testing" @@ -8,6 +9,35 @@ import ( "clawgo/pkg/providers" ) +type compactionTestProvider struct { + errByModel map[string]error + summaryByModel map[string]string + calledModels []string +} + +func (p *compactionTestProvider) Chat(ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, options map[string]interface{}) (*providers.LLMResponse, error) { + return &providers.LLMResponse{Content: "summary-fallback"}, nil +} + +func (p *compactionTestProvider) GetDefaultModel() string { + return "" +} + +func (p *compactionTestProvider) SupportsResponsesCompact() bool { + return true +} + +func (p *compactionTestProvider) BuildSummaryViaResponsesCompact(ctx context.Context, model string, existingSummary string, messages []providers.Message, maxSummaryChars int) (string, error) { + p.calledModels = append(p.calledModels, model) + if err := p.errByModel[model]; err != nil { + return "", err + } + if out := strings.TrimSpace(p.summaryByModel[model]); out != "" { + return out, nil + } + return "", fmt.Errorf(`responses compact request failed (status 400): {"error":{"message":"model not found"}}`) +} + func TestShouldCompactBySize(t *testing.T) { history := []providers.Message{ {Role: "user", Content: strings.Repeat("a", 80)}, @@ -58,3 +88,45 @@ func TestFormatCompactionTranscript_TrimsToolPayloadMoreAggressively(t *testing. t.Fatalf("expected tool content to be trimmed aggressively, got length %d", len(out)) } } + +func TestBuildCompactedSummary_ResponsesCompactFallsBackToBackupProxyOnTimeout(t *testing.T) { + primary := &compactionTestProvider{ + errByModel: map[string]error{ + "gpt-4o-mini": fmt.Errorf("failed to send request: Post \"https://primary/v1/chat/completions\": context deadline exceeded"), + }, + } + backup := &compactionTestProvider{ + summaryByModel: map[string]string{ + "deepseek-chat": "compacted summary", + }, + } + + al := &AgentLoop{ + provider: primary, + proxy: "primary", + proxyFallbacks: []string{"backup"}, + model: "gpt-4o-mini", + providersByProxy: map[string]providers.LLMProvider{ + "primary": primary, + "backup": backup, + }, + modelsByProxy: map[string][]string{ + "primary": []string{"gpt-4o-mini"}, + "backup": []string{"deepseek-chat"}, + }, + } + + out, err := al.buildCompactedSummary(context.Background(), "", []providers.Message{{Role: "user", Content: "a"}}, 2000, 1200, "responses_compact") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if strings.TrimSpace(out) != "compacted summary" { + t.Fatalf("unexpected summary: %q", out) + } + if al.proxy != "backup" { + t.Fatalf("expected proxy switched to backup, got %q", al.proxy) + } + if al.model != "deepseek-chat" { + t.Fatalf("expected model switched to deepseek-chat, got %q", al.model) + } +} diff --git a/pkg/agent/loop_fallback_test.go b/pkg/agent/loop_fallback_test.go index caaba61..2fd8ae1 100644 --- a/pkg/agent/loop_fallback_test.go +++ b/pkg/agent/loop_fallback_test.go @@ -277,3 +277,10 @@ func TestShouldRetryWithFallbackModel_AuthUnavailableError(t *testing.T) { t.Fatalf("expected auth_unavailable error to trigger fallback retry") } } + +func TestShouldRetryWithFallbackModel_ContextDeadlineExceeded(t *testing.T) { + err := fmt.Errorf("failed to send request: Post \"https://v2.kkkk.dev/v1/chat/completions\": context deadline exceeded") + if !shouldRetryWithFallbackModel(err) { + t.Fatalf("expected context deadline exceeded to trigger fallback retry") + } +}