This commit is contained in:
lpf
2026-02-19 15:40:27 +08:00
parent 75e678061a
commit 15bd337c49
3 changed files with 282 additions and 16 deletions

View File

@@ -63,6 +63,8 @@ const finalizeHeuristicLowThreshold = 0.48
const reflectionCooldownRounds = 2
const toolSummaryMaxRecords = 4
const maxSelfRepairPasses = 2
const compactionAttemptTimeout = 8 * time.Second
const compactionRetryPerCandidate = 1
type sessionWorker struct {
queue chan bus.InboundMessage
@@ -2256,6 +2258,23 @@ func addTokenUsageToContext(ctx context.Context, usage *providers.UsageInfo) {
}
func formatTokenUsageSuffix(totals *tokenUsageTotals) string {
formatTokenUnit := func(v int) string {
switch {
case v >= 1_000_000:
f := float64(v) / 1_000_000
s := strconv.FormatFloat(f, 'f', 1, 64)
s = strings.TrimSuffix(s, ".0")
return s + "m"
case v >= 1_000:
f := float64(v) / 1_000
s := strconv.FormatFloat(f, 'f', 1, 64)
s = strings.TrimSuffix(s, ".0")
return s + "k"
default:
return strconv.Itoa(v)
}
}
input := 0
output := 0
total := 0
@@ -2264,8 +2283,8 @@ func formatTokenUsageSuffix(totals *tokenUsageTotals) string {
output = totals.output
total = totals.total
}
return fmt.Sprintf("\n\nUsage: in %d, out %d, total %d",
input, output, total)
return fmt.Sprintf("\n\nUsage: in %s, out %s, total %s",
formatTokenUnit(input), formatTokenUnit(output), formatTokenUnit(total))
}
func withUserLanguageHint(ctx context.Context, sessionKey, content string) context.Context {
@@ -4556,7 +4575,7 @@ func isForbiddenModelPermissionError(err error) bool {
}
func shouldRetryWithFallbackModel(err error) bool {
return isQuotaOrRateLimitError(err) || isModelProviderSelectionError(err) || isForbiddenModelPermissionError(err) || isGatewayTransientError(err) || isUpstreamAuthRoutingError(err)
return isQuotaOrRateLimitError(err) || isModelProviderSelectionError(err) || isForbiddenModelPermissionError(err) || isGatewayTransientError(err) || isUpstreamAuthRoutingError(err) || isRequestTimeoutOrTransientNetworkError(err)
}
func isGatewayTransientError(err error) bool {
@@ -4608,6 +4627,34 @@ func isUpstreamAuthRoutingError(err error) bool {
return false
}
func isRequestTimeoutOrTransientNetworkError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
keywords := []string{
"context deadline exceeded",
"deadline exceeded",
"client.timeout exceeded",
"i/o timeout",
"connection reset by peer",
"connection refused",
"unexpected eof",
"eof",
"no such host",
"network is unreachable",
"temporary network error",
}
for _, keyword := range keywords {
if strings.Contains(msg, keyword) {
return true
}
}
return false
}
func buildProviderToolDefs(toolDefs []map[string]interface{}) ([]providers.ToolDefinition, error) {
providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs))
for i, td := range toolDefs {
@@ -4738,21 +4785,21 @@ func (al *AgentLoop) buildCompactedSummary(
}
if mode == "responses_compact" || mode == "hybrid" {
if compactor, ok := al.provider.(providers.ResponsesCompactor); ok && compactor.SupportsResponsesCompact() {
compactSummary, err := compactor.BuildSummaryViaResponsesCompact(ctx, al.model, existingSummary, messages, maxSummaryChars)
if err == nil && strings.TrimSpace(compactSummary) != "" {
if mode == "responses_compact" {
return compactSummary, nil
}
existingSummary = strings.TrimSpace(existingSummary + "\n\n" + compactSummary)
} else if mode == "responses_compact" {
if err != nil {
return "", err
}
return "", fmt.Errorf("responses_compact produced empty summary")
compactSummary, err := al.buildSummaryViaResponsesCompactWithFallback(ctx, existingSummary, messages, maxSummaryChars)
if err == nil && strings.TrimSpace(compactSummary) != "" {
if mode == "responses_compact" {
return compactSummary, nil
}
existingSummary = strings.TrimSpace(existingSummary + "\n\n" + compactSummary)
} else if mode == "responses_compact" {
return "", fmt.Errorf("responses_compact mode requires provider support and protocol=responses")
if err != nil {
return "", err
}
return "", fmt.Errorf("responses_compact produced empty summary")
} else if err != nil {
logger.DebugCF("agent", "responses_compact failed in hybrid mode, fallback to summary mode", map[string]interface{}{
logger.FieldError: err.Error(),
})
}
}
@@ -4773,6 +4820,146 @@ func (al *AgentLoop) buildCompactedSummary(
return resp.Content, nil
}
func (al *AgentLoop) buildSummaryViaResponsesCompactWithFallback(
ctx context.Context,
existingSummary string,
messages []providers.Message,
maxSummaryChars int,
) (string, error) {
if len(al.providersByProxy) == 0 {
compactor, ok := al.provider.(providers.ResponsesCompactor)
if !ok || !compactor.SupportsResponsesCompact() {
return "", fmt.Errorf("responses_compact mode requires provider support and protocol=responses")
}
modelCandidates := al.modelCandidates()
var lastErr error
for _, model := range modelCandidates {
summary, err := al.tryResponsesCompactionOnCandidate(ctx, compactor, model, existingSummary, messages, maxSummaryChars)
if err == nil {
if al.model != model {
logger.WarnCF("agent", "Compaction model switched after availability error", map[string]interface{}{
"from_model": al.model,
"to_model": model,
})
al.model = model
}
return summary, nil
}
lastErr = err
if !shouldRetryWithFallbackModel(err) {
return "", err
}
}
if lastErr != nil {
return "", fmt.Errorf("all configured models failed for responses_compact; last error: %w", lastErr)
}
return "", fmt.Errorf("responses_compact produced empty summary")
}
proxyCandidates := al.proxyCandidates()
var lastErr error
foundCompactor := false
for _, proxyName := range proxyCandidates {
proxyProvider, ok := al.providersByProxy[proxyName]
if !ok || proxyProvider == nil {
continue
}
compactor, ok := proxyProvider.(providers.ResponsesCompactor)
if !ok || !compactor.SupportsResponsesCompact() {
continue
}
foundCompactor = true
modelCandidates := al.modelCandidatesForProxy(proxyName)
for _, model := range modelCandidates {
summary, err := al.tryResponsesCompactionOnCandidate(ctx, compactor, model, existingSummary, messages, maxSummaryChars)
if err == nil {
if al.proxy != proxyName {
logger.WarnCF("agent", "Compaction proxy switched after availability error", map[string]interface{}{
"from_proxy": al.proxy,
"to_proxy": proxyName,
})
al.proxy = proxyName
al.provider = proxyProvider
}
if al.model != model {
logger.WarnCF("agent", "Compaction model switched after availability error", map[string]interface{}{
"from_model": al.model,
"to_model": model,
"proxy": proxyName,
})
al.model = model
}
return summary, nil
}
lastErr = err
if !shouldRetryWithFallbackModel(err) {
return "", err
}
}
}
if !foundCompactor {
return "", fmt.Errorf("responses_compact mode requires provider support and protocol=responses")
}
if lastErr != nil {
return "", fmt.Errorf("all configured proxies/models failed for responses_compact; last error: %w", lastErr)
}
return "", fmt.Errorf("responses_compact produced empty summary")
}
func (al *AgentLoop) tryResponsesCompactionOnCandidate(
ctx context.Context,
compactor providers.ResponsesCompactor,
model string,
existingSummary string,
messages []providers.Message,
maxSummaryChars int,
) (string, error) {
var lastErr error
for attempt := 0; attempt <= compactionRetryPerCandidate; attempt++ {
remaining := remainingTimeForCompaction(ctx)
if remaining <= 0 {
if lastErr != nil {
return "", lastErr
}
return "", context.DeadlineExceeded
}
attemptCtx, cancel := context.WithTimeout(ctx, remaining)
summary, err := compactor.BuildSummaryViaResponsesCompact(attemptCtx, model, existingSummary, messages, maxSummaryChars)
cancel()
if err == nil {
if strings.TrimSpace(summary) == "" {
return "", fmt.Errorf("responses_compact produced empty summary")
}
return summary, nil
}
lastErr = err
if attempt < compactionRetryPerCandidate && shouldRetryWithFallbackModel(err) {
continue
}
return "", err
}
if lastErr != nil {
return "", lastErr
}
return "", fmt.Errorf("responses_compact failed")
}
func remainingTimeForCompaction(ctx context.Context) time.Duration {
deadline, ok := ctx.Deadline()
if !ok {
return compactionAttemptTimeout
}
remaining := time.Until(deadline)
if remaining <= 0 {
return 0
}
if remaining > compactionAttemptTimeout {
return compactionAttemptTimeout
}
return remaining
}
func normalizeCompactionMode(raw string) string {
switch strings.TrimSpace(raw) {
case "", "summary":

View File

@@ -1,6 +1,7 @@
package agent
import (
"context"
"fmt"
"strings"
"testing"
@@ -8,6 +9,35 @@ import (
"clawgo/pkg/providers"
)
type compactionTestProvider struct {
errByModel map[string]error
summaryByModel map[string]string
calledModels []string
}
func (p *compactionTestProvider) Chat(ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, options map[string]interface{}) (*providers.LLMResponse, error) {
return &providers.LLMResponse{Content: "summary-fallback"}, nil
}
func (p *compactionTestProvider) GetDefaultModel() string {
return ""
}
func (p *compactionTestProvider) SupportsResponsesCompact() bool {
return true
}
func (p *compactionTestProvider) BuildSummaryViaResponsesCompact(ctx context.Context, model string, existingSummary string, messages []providers.Message, maxSummaryChars int) (string, error) {
p.calledModels = append(p.calledModels, model)
if err := p.errByModel[model]; err != nil {
return "", err
}
if out := strings.TrimSpace(p.summaryByModel[model]); out != "" {
return out, nil
}
return "", fmt.Errorf(`responses compact request failed (status 400): {"error":{"message":"model not found"}}`)
}
func TestShouldCompactBySize(t *testing.T) {
history := []providers.Message{
{Role: "user", Content: strings.Repeat("a", 80)},
@@ -58,3 +88,45 @@ func TestFormatCompactionTranscript_TrimsToolPayloadMoreAggressively(t *testing.
t.Fatalf("expected tool content to be trimmed aggressively, got length %d", len(out))
}
}
func TestBuildCompactedSummary_ResponsesCompactFallsBackToBackupProxyOnTimeout(t *testing.T) {
primary := &compactionTestProvider{
errByModel: map[string]error{
"gpt-4o-mini": fmt.Errorf("failed to send request: Post \"https://primary/v1/chat/completions\": context deadline exceeded"),
},
}
backup := &compactionTestProvider{
summaryByModel: map[string]string{
"deepseek-chat": "compacted summary",
},
}
al := &AgentLoop{
provider: primary,
proxy: "primary",
proxyFallbacks: []string{"backup"},
model: "gpt-4o-mini",
providersByProxy: map[string]providers.LLMProvider{
"primary": primary,
"backup": backup,
},
modelsByProxy: map[string][]string{
"primary": []string{"gpt-4o-mini"},
"backup": []string{"deepseek-chat"},
},
}
out, err := al.buildCompactedSummary(context.Background(), "", []providers.Message{{Role: "user", Content: "a"}}, 2000, 1200, "responses_compact")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if strings.TrimSpace(out) != "compacted summary" {
t.Fatalf("unexpected summary: %q", out)
}
if al.proxy != "backup" {
t.Fatalf("expected proxy switched to backup, got %q", al.proxy)
}
if al.model != "deepseek-chat" {
t.Fatalf("expected model switched to deepseek-chat, got %q", al.model)
}
}

View File

@@ -277,3 +277,10 @@ func TestShouldRetryWithFallbackModel_AuthUnavailableError(t *testing.T) {
t.Fatalf("expected auth_unavailable error to trigger fallback retry")
}
}
func TestShouldRetryWithFallbackModel_ContextDeadlineExceeded(t *testing.T) {
err := fmt.Errorf("failed to send request: Post \"https://v2.kkkk.dev/v1/chat/completions\": context deadline exceeded")
if !shouldRetryWithFallbackModel(err) {
t.Fatalf("expected context deadline exceeded to trigger fallback retry")
}
}