mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-20 16:07:29 +08:00
186 lines
5.9 KiB
Go
186 lines
5.9 KiB
Go
package providers
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
func newProviderExecutionError(code, message, stage string, retryable bool, source string) *ProviderExecutionError {
|
|
return &ProviderExecutionError{
|
|
Code: code,
|
|
Message: message,
|
|
Stage: stage,
|
|
Retryable: retryable,
|
|
Source: source,
|
|
}
|
|
}
|
|
|
|
func NewProviderExecutionError(code, message, stage string, retryable bool, source string) *ProviderExecutionError {
|
|
return newProviderExecutionError(code, message, stage, retryable, source)
|
|
}
|
|
|
|
func ExecutionErrorCode(err error) string {
|
|
var execErr *ProviderExecutionError
|
|
if errors.As(err, &execErr) && execErr != nil {
|
|
return execErr.Code
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (p *HTTPProvider) executeJSONAttempts(ctx context.Context, endpoint string, payload interface{}, mutate func(*http.Request, authAttempt), classify func(int, []byte) (oauthFailureReason, bool)) (ProviderExecutionResult, error) {
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return ProviderExecutionResult{Error: newProviderExecutionError("marshal_failed", err.Error(), "marshal", false, p.providerName)}, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
attempts, err := p.authAttempts(ctx)
|
|
if err != nil {
|
|
return ProviderExecutionResult{Error: newProviderExecutionError("auth_unavailable", err.Error(), "auth", false, p.providerName)}, err
|
|
}
|
|
var last ProviderExecutionResult
|
|
for _, attempt := range attempts {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(jsonData))
|
|
if err != nil {
|
|
return ProviderExecutionResult{Error: newProviderExecutionError("request_build_failed", err.Error(), "request", false, p.providerName)}, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Accept", "application/json")
|
|
applyAttemptAuth(req, attempt)
|
|
applyAttemptProviderHeaders(req, attempt, p, false)
|
|
if mutate != nil {
|
|
mutate(req, attempt)
|
|
}
|
|
body, status, ctype, err := p.doJSONAttempt(req, attempt)
|
|
if err != nil {
|
|
execErr := newProviderExecutionError("request_failed", err.Error(), "request", false, p.providerName)
|
|
return ProviderExecutionResult{
|
|
StatusCode: status,
|
|
ContentType: ctype,
|
|
AttemptKind: attempt.kind,
|
|
Error: execErr,
|
|
}, execErr
|
|
}
|
|
reason, retry := classify(status, body)
|
|
last = ProviderExecutionResult{
|
|
Body: body,
|
|
StatusCode: status,
|
|
ContentType: ctype,
|
|
AttemptKind: attempt.kind,
|
|
Retryable: retry,
|
|
Failure: reason,
|
|
}
|
|
if !retry {
|
|
p.markAttemptSuccess(attempt)
|
|
return last, nil
|
|
}
|
|
applyAttemptFailure(p, attempt, reason, nil)
|
|
}
|
|
return last, nil
|
|
}
|
|
|
|
func (p *HTTPProvider) executeStreamAttempts(ctx context.Context, endpoint string, payload interface{}, mutate func(*http.Request, authAttempt), onEvent func(string)) (ProviderExecutionResult, error) {
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return ProviderExecutionResult{Error: newProviderExecutionError("marshal_failed", err.Error(), "marshal", false, p.providerName)}, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
attempts, err := p.authAttempts(ctx)
|
|
if err != nil {
|
|
return ProviderExecutionResult{Error: newProviderExecutionError("auth_unavailable", err.Error(), "auth", false, p.providerName)}, err
|
|
}
|
|
var last ProviderExecutionResult
|
|
for _, attempt := range attempts {
|
|
attemptCtx, cancel := context.WithCancel(ctx)
|
|
req, err := http.NewRequestWithContext(attemptCtx, http.MethodPost, endpoint, bytes.NewReader(jsonData))
|
|
if err != nil {
|
|
cancel()
|
|
return ProviderExecutionResult{Error: newProviderExecutionError("request_build_failed", err.Error(), "request", false, p.providerName)}, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Accept", "text/event-stream")
|
|
applyAttemptAuth(req, attempt)
|
|
applyAttemptProviderHeaders(req, attempt, p, true)
|
|
if mutate != nil {
|
|
mutate(req, attempt)
|
|
}
|
|
streamOptions := streamAttemptTimeouts(ctx)
|
|
body, status, ctype, quotaHit, err := p.doStreamAttempt(req, attempt, onEvent, streamOptions, cancel)
|
|
cancel()
|
|
if err != nil {
|
|
code := "stream_failed"
|
|
if streamOptions.staleTriggered {
|
|
code = "stream_stale"
|
|
}
|
|
execErr := newProviderExecutionError(code, err.Error(), "request", true, p.providerName)
|
|
return ProviderExecutionResult{
|
|
StatusCode: status,
|
|
ContentType: ctype,
|
|
AttemptKind: attempt.kind,
|
|
Error: execErr,
|
|
}, execErr
|
|
}
|
|
reason, _ := classifyOAuthFailure(status, body)
|
|
last = ProviderExecutionResult{
|
|
Body: body,
|
|
StatusCode: status,
|
|
ContentType: ctype,
|
|
AttemptKind: attempt.kind,
|
|
Retryable: quotaHit,
|
|
Failure: reason,
|
|
}
|
|
if !quotaHit {
|
|
p.markAttemptSuccess(attempt)
|
|
return last, nil
|
|
}
|
|
applyAttemptFailure(p, attempt, reason, nil)
|
|
}
|
|
return last, nil
|
|
}
|
|
|
|
type streamAttemptOptions struct {
|
|
firstDeltaTimeout time.Duration
|
|
idleDeltaTimeout time.Duration
|
|
staleTriggered bool
|
|
}
|
|
|
|
func streamAttemptTimeouts(ctx context.Context) *streamAttemptOptions {
|
|
opts := &streamAttemptOptions{
|
|
firstDeltaTimeout: 20 * time.Second,
|
|
idleDeltaTimeout: 45 * time.Second,
|
|
}
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
remaining := time.Until(deadline)
|
|
if remaining > 0 {
|
|
first := remaining / 4
|
|
idle := remaining / 3
|
|
if remaining < 5*time.Second {
|
|
first = maxDuration(100*time.Millisecond, first)
|
|
} else if first < 5*time.Second {
|
|
first = 5 * time.Second
|
|
}
|
|
if remaining < 15*time.Second {
|
|
idle = maxDuration(250*time.Millisecond, idle)
|
|
} else if idle < 15*time.Second {
|
|
idle = 15 * time.Second
|
|
}
|
|
if first < opts.firstDeltaTimeout {
|
|
opts.firstDeltaTimeout = first
|
|
}
|
|
if idle < opts.idleDeltaTimeout {
|
|
opts.idleDeltaTimeout = idle
|
|
}
|
|
}
|
|
}
|
|
return opts
|
|
}
|
|
|
|
func maxDuration(a, b time.Duration) time.Duration {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|