fix(telegram-stream): chunked streaming with retry/rate-limit and suppress duplicate final send

This commit is contained in:
lpf
2026-03-04 12:00:25 +08:00
parent d639a5113f
commit ba8cfbe131
2 changed files with 291 additions and 71 deletions

View File

@@ -66,6 +66,8 @@ type AgentLoop struct {
ekg *ekg.Engine
providerMu sync.RWMutex
sessionProvider map[string]string
streamMu sync.Mutex
sessionStreamed map[string]bool
}
// StartupCompactionReport provides startup memory/session maintenance stats.
@@ -244,6 +246,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
sessionRunLocks: map[string]*sync.Mutex{},
ekg: ekg.New(workspace),
sessionProvider: map[string]string{},
sessionStreamed: map[string]bool{},
providerResponses: map[string]config.ProviderResponsesConfig{},
telegramStreaming: cfg.Channels.Telegram.Streaming,
}
@@ -422,6 +425,28 @@ func (al *AgentLoop) getSessionProvider(sessionKey string) string {
return v
}
func (al *AgentLoop) markSessionStreamed(sessionKey string) {
key := strings.TrimSpace(sessionKey)
if key == "" {
return
}
al.streamMu.Lock()
al.sessionStreamed[key] = true
al.streamMu.Unlock()
}
func (al *AgentLoop) consumeSessionStreamed(sessionKey string) bool {
key := strings.TrimSpace(sessionKey)
if key == "" {
return false
}
al.streamMu.Lock()
defer al.streamMu.Unlock()
v := al.sessionStreamed[key]
delete(al.sessionStreamed, key)
return v
}
func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) {
taskID := buildAuditTaskID(msg)
started := time.Now()
@@ -435,7 +460,9 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage)
trigger := al.getTrigger(msg)
suppressed := false
if response != "" {
if outbound, ok := al.prepareOutbound(msg, response); ok {
if msg.Channel == "telegram" && al.telegramStreaming && al.consumeSessionStreamed(msg.SessionKey) {
suppressed = true
} else if outbound, ok := al.prepareOutbound(msg, response); ok {
al.bus.PublishOutbound(outbound)
} else {
suppressed = true
@@ -833,6 +860,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
replyID = msg.Metadata["message_id"]
}
al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: streamText, Action: "stream", ReplyToID: replyID})
al.markSessionStreamed(msg.SessionKey)
})
} else {
response, err = al.provider.Chat(ctx, messages, providerToolDefs, al.model, options)

View File

@@ -18,6 +18,7 @@ import (
"github.com/mymmrac/telego"
"github.com/mymmrac/telego/telegoutil"
"golang.org/x/time/rate"
"clawgo/pkg/bus"
"clawgo/pkg/config"
@@ -30,7 +31,9 @@ const (
telegramMaxConcurrentHandlers = 32
telegramStopWaitHandlersPeriod = 5 * time.Second
telegramSafeHTMLMaxRunes = 3500
telegramStreamPreviewMaxRunes = 3200
telegramSafeMDV2MaxRunes = 3800
telegramStreamSplitMaxRunes = 3000
telegramStreamMaxRetries = 4
)
type TelegramChannel struct {
@@ -46,11 +49,18 @@ type TelegramChannel struct {
botUsername string
streamMu sync.Mutex
streamState map[string]telegramStreamState
streamLimit *rate.Limiter
}
type telegramStreamState struct {
MessageID int
LastContent string
MessageIDs []int
LastPayloads []string
LastModes []string
}
type telegramRenderedChunk struct {
payload string
parseMode string
}
func (c *TelegramChannel) SupportsAction(action string) bool {
@@ -77,6 +87,7 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr
chatIDs: make(map[string]int64),
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
streamState: make(map[string]telegramStreamState),
streamLimit: rate.NewLimiter(rate.Limit(2), 1),
}, nil
}
@@ -754,17 +765,6 @@ func telegramStreamKey(chatID int64, replyToID string) string {
return fmt.Sprintf("%d:%s", chatID, strings.TrimSpace(replyToID))
}
func tailRunes(s string, maxRunes int) string {
if maxRunes <= 0 {
return s
}
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return "…\n" + string(r[len(r)-maxRunes:])
}
func clampTelegramHTML(markdown string, maxRunes int) string {
if maxRunes <= 0 {
maxRunes = telegramSafeHTMLMaxRunes
@@ -782,84 +782,276 @@ func clampTelegramHTML(markdown string, maxRunes int) string {
func (c *TelegramChannel) handleStreamAction(ctx context.Context, chatID int64, msg bus.OutboundMessage) error {
streamKey := telegramStreamKey(chatID, msg.ReplyToID)
content := tailRunes(msg.Content, telegramStreamPreviewMaxRunes)
htmlContent := clampTelegramHTML(content, telegramSafeHTMLMaxRunes)
if strings.TrimSpace(htmlContent) == "" {
chunks := renderTelegramStreamChunks(msg.Content)
if len(chunks) == 0 {
return nil
}
c.streamMu.Lock()
state := c.streamState[streamKey]
if state.LastContent == htmlContent {
c.streamMu.Unlock()
return nil
}
messageIDs := append([]int(nil), state.MessageIDs...)
lastPayloads := append([]string(nil), state.LastPayloads...)
lastModes := append([]string(nil), state.LastModes...)
c.streamMu.Unlock()
if state.MessageID == 0 {
sendParams := telegoutil.Message(telegoutil.ID(chatID), htmlContent).WithParseMode(telego.ModeHTML)
if replyID, ok := parseTelegramMessageID(msg.ReplyToID); ok {
sendParams.ReplyParameters = &telego.ReplyParameters{MessageID: replyID}
// Ensure each chunk has a message slot (send if missing, edit if changed).
for i, ch := range chunks {
if i < len(messageIDs) {
prevPayload := ""
prevMode := ""
if i < len(lastPayloads) {
prevPayload = lastPayloads[i]
}
if i < len(lastModes) {
prevMode = lastModes[i]
}
if prevPayload == ch.payload && prevMode == ch.parseMode {
continue
}
if err := c.editTelegramMessageWithRetry(ctx, chatID, messageIDs[i], ch.payload, ch.parseMode); err != nil {
return err
}
continue
}
sendCtx, cancel := withTelegramAPITimeout(ctx)
sent, err := c.bot.SendMessage(sendCtx, sendParams)
cancel()
replyToID := ""
if i == 0 {
replyToID = msg.ReplyToID
}
sentID, err := c.sendTelegramMessageWithRetry(ctx, chatID, replyToID, ch.payload, ch.parseMode)
if err != nil {
plain := tailRunes(plainTextFromTelegramHTML(htmlContent), telegramSafeHTMLMaxRunes)
if strings.TrimSpace(plain) == "" {
return err
}
sendPlainCtx, cancelPlain := withTelegramAPITimeout(ctx)
sent, err = c.bot.SendMessage(sendPlainCtx, telegoutil.Message(telegoutil.ID(chatID), plain))
cancelPlain()
if err != nil {
return err
}
return err
}
c.streamMu.Lock()
c.streamState[streamKey] = telegramStreamState{MessageID: sent.MessageID, LastContent: htmlContent}
c.streamMu.Unlock()
messageIDs = append(messageIDs, sentID)
}
// Remove stale tail chunks when content shrinks.
if len(messageIDs) > len(chunks) {
for i := len(chunks); i < len(messageIDs); i++ {
_ = c.deleteTelegramMessageWithRetry(ctx, chatID, messageIDs[i])
}
messageIDs = messageIDs[:len(chunks)]
}
nextPayloads := make([]string, 0, len(chunks))
nextModes := make([]string, 0, len(chunks))
for _, ch := range chunks {
nextPayloads = append(nextPayloads, ch.payload)
nextModes = append(nextModes, ch.parseMode)
}
c.streamMu.Lock()
c.streamState[streamKey] = telegramStreamState{
MessageIDs: messageIDs,
LastPayloads: nextPayloads,
LastModes: nextModes,
}
c.streamMu.Unlock()
return nil
}
func renderTelegramStreamChunks(content string) []telegramRenderedChunk {
raw := strings.TrimSpace(content)
if raw == "" {
return nil
}
mode, body := detectTelegramStreamMode(raw)
if strings.TrimSpace(body) == "" {
return nil
}
editCtx, cancel := withTelegramAPITimeout(ctx)
_, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{
ChatID: telegoutil.ID(chatID),
MessageID: state.MessageID,
Text: htmlContent,
ParseMode: telego.ModeHTML,
})
cancel()
if err != nil {
var parts []string
switch mode {
case telego.ModeMarkdownV2:
parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes)
case telego.ModeHTML:
parts = splitTelegramText(body, telegramSafeHTMLMaxRunes)
default:
parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes)
mode = "auto_markdown"
}
out := make([]telegramRenderedChunk, 0, len(parts))
for _, p := range parts {
trimmed := strings.TrimSpace(p)
if trimmed == "" {
continue
}
switch mode {
case telego.ModeHTML:
payload := sanitizeTelegramHTML(trimmed)
if len([]rune(payload)) > telegramSafeHTMLMaxRunes {
payload = splitTelegramText(payload, telegramSafeHTMLMaxRunes)[0]
}
if strings.TrimSpace(payload) != "" {
out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeHTML})
}
case telego.ModeMarkdownV2:
payload := trimmed
if len([]rune(payload)) > telegramSafeMDV2MaxRunes {
payload = splitTelegramText(payload, telegramSafeMDV2MaxRunes)[0]
}
if strings.TrimSpace(payload) != "" {
out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeMarkdownV2})
}
default:
payload := sanitizeTelegramHTML(markdownToTelegramHTML(trimmed))
if len([]rune(payload)) > telegramSafeHTMLMaxRunes {
payload = clampTelegramHTML(trimmed, telegramSafeHTMLMaxRunes)
}
if strings.TrimSpace(payload) != "" {
out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeHTML})
}
}
}
return out
}
func detectTelegramStreamMode(content string) (mode string, body string) {
trimmed := strings.TrimSpace(content)
switch {
case strings.HasPrefix(strings.ToLower(trimmed), "[mode:html]"):
return telego.ModeHTML, strings.TrimSpace(trimmed[len("[mode:html]"):])
case strings.HasPrefix(strings.ToLower(trimmed), "[mode:markdownv2]"):
return telego.ModeMarkdownV2, strings.TrimSpace(trimmed[len("[mode:markdownv2]"):])
default:
return "auto_markdown", content
}
}
func (c *TelegramChannel) withTelegramRetry(ctx context.Context, opName string, fn func(context.Context) error) error {
var lastErr error
for attempt := 1; attempt <= telegramStreamMaxRetries; attempt++ {
if c.streamLimit != nil {
if err := c.streamLimit.Wait(ctx); err != nil {
return err
}
}
opCtx, cancel := withTelegramAPITimeout(ctx)
err := fn(opCtx)
cancel()
if err == nil {
return nil
}
lastErr = err
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "message is not modified") {
return nil
}
if strings.Contains(errText, "message is too long") || strings.Contains(errText, "can't parse entities") {
plain := tailRunes(plainTextFromTelegramHTML(htmlContent), telegramSafeHTMLMaxRunes)
if strings.TrimSpace(plain) == "" {
return err
}
editPlainCtx, cancelPlain := withTelegramAPITimeout(ctx)
_, err = c.bot.EditMessageText(editPlainCtx, &telego.EditMessageTextParams{
if attempt == telegramStreamMaxRetries {
break
}
if !shouldRetryTelegramError(err) {
break
}
waitFor := retryAfterFromTelegramError(err)
if waitFor <= 0 {
waitFor = time.Duration(attempt) * 500 * time.Millisecond
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(waitFor):
}
logger.WarnCF("telegram", logger.C0063, map[string]interface{}{
"op": opName,
"attempt": attempt,
"retry": true,
"error": err.Error(),
})
}
return lastErr
}
func shouldRetryTelegramError(err error) bool {
if err == nil {
return false
}
text := strings.ToLower(err.Error())
return strings.Contains(text, "too many requests") ||
strings.Contains(text, "retry after") ||
strings.Contains(text, "429") ||
strings.Contains(text, "timeout") ||
strings.Contains(text, "connection reset") ||
strings.Contains(text, "502") ||
strings.Contains(text, "503") ||
strings.Contains(text, "504")
}
func retryAfterFromTelegramError(err error) time.Duration {
if err == nil {
return 0
}
re := regexp.MustCompile(`(?i)retry after\s+(\d+)`)
m := re.FindStringSubmatch(err.Error())
if len(m) < 2 {
return 0
}
sec, convErr := strconv.Atoi(strings.TrimSpace(m[1]))
if convErr != nil || sec <= 0 {
return 0
}
return time.Duration(sec) * time.Second
}
func (c *TelegramChannel) sendTelegramMessageWithRetry(ctx context.Context, chatID int64, replyToID string, payload, parseMode string) (int, error) {
var sent *telego.Message
err := c.withTelegramRetry(ctx, "send", func(callCtx context.Context) error {
params := telegoutil.Message(telegoutil.ID(chatID), payload)
if strings.TrimSpace(parseMode) != "" {
params.WithParseMode(parseMode)
}
if replyID, ok := parseTelegramMessageID(replyToID); ok {
params.ReplyParameters = &telego.ReplyParameters{MessageID: replyID}
}
msg, sendErr := c.bot.SendMessage(callCtx, params)
if sendErr != nil && parseMode == telego.ModeHTML {
plain := plainTextFromTelegramHTML(payload)
msg, sendErr = c.bot.SendMessage(callCtx, telegoutil.Message(telegoutil.ID(chatID), plain))
}
sent = msg
return sendErr
})
if err != nil {
return 0, err
}
if sent == nil {
return 0, fmt.Errorf("send returned empty response")
}
return sent.MessageID, nil
}
func (c *TelegramChannel) editTelegramMessageWithRetry(ctx context.Context, chatID int64, messageID int, payload, parseMode string) error {
return c.withTelegramRetry(ctx, "edit", func(callCtx context.Context) error {
params := &telego.EditMessageTextParams{
ChatID: telegoutil.ID(chatID),
MessageID: messageID,
Text: payload,
}
if strings.TrimSpace(parseMode) != "" {
params.ParseMode = parseMode
}
_, editErr := c.bot.EditMessageText(callCtx, params)
if editErr != nil && parseMode == telego.ModeHTML {
plain := plainTextFromTelegramHTML(payload)
_, editErr = c.bot.EditMessageText(callCtx, &telego.EditMessageTextParams{
ChatID: telegoutil.ID(chatID),
MessageID: state.MessageID,
MessageID: messageID,
Text: plain,
})
cancelPlain()
if err != nil && !strings.Contains(strings.ToLower(err.Error()), "message is not modified") {
return err
}
} else {
return err
}
}
return editErr
})
}
c.streamMu.Lock()
state.LastContent = htmlContent
c.streamState[streamKey] = state
c.streamMu.Unlock()
return nil
func (c *TelegramChannel) deleteTelegramMessageWithRetry(ctx context.Context, chatID int64, messageID int) error {
return c.withTelegramRetry(ctx, "delete", func(callCtx context.Context) error {
return c.bot.DeleteMessage(callCtx, &telego.DeleteMessageParams{
ChatID: telegoutil.ID(chatID),
MessageID: messageID,
})
})
}
func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action string, msg bus.OutboundMessage) error {