diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 6551e10..a51aa93 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -66,6 +66,8 @@ type AgentLoop struct { ekg *ekg.Engine providerMu sync.RWMutex sessionProvider map[string]string + streamMu sync.Mutex + sessionStreamed map[string]bool } // StartupCompactionReport provides startup memory/session maintenance stats. @@ -244,6 +246,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers sessionRunLocks: map[string]*sync.Mutex{}, ekg: ekg.New(workspace), sessionProvider: map[string]string{}, + sessionStreamed: map[string]bool{}, providerResponses: map[string]config.ProviderResponsesConfig{}, telegramStreaming: cfg.Channels.Telegram.Streaming, } @@ -422,6 +425,28 @@ func (al *AgentLoop) getSessionProvider(sessionKey string) string { return v } +func (al *AgentLoop) markSessionStreamed(sessionKey string) { + key := strings.TrimSpace(sessionKey) + if key == "" { + return + } + al.streamMu.Lock() + al.sessionStreamed[key] = true + al.streamMu.Unlock() +} + +func (al *AgentLoop) consumeSessionStreamed(sessionKey string) bool { + key := strings.TrimSpace(sessionKey) + if key == "" { + return false + } + al.streamMu.Lock() + defer al.streamMu.Unlock() + v := al.sessionStreamed[key] + delete(al.sessionStreamed, key) + return v +} + func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) { taskID := buildAuditTaskID(msg) started := time.Now() @@ -435,7 +460,9 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) trigger := al.getTrigger(msg) suppressed := false if response != "" { - if outbound, ok := al.prepareOutbound(msg, response); ok { + if msg.Channel == "telegram" && al.telegramStreaming && al.consumeSessionStreamed(msg.SessionKey) { + suppressed = true + } else if outbound, ok := al.prepareOutbound(msg, response); ok { al.bus.PublishOutbound(outbound) } else { suppressed = true @@ -833,6 +860,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) replyID = msg.Metadata["message_id"] } al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: streamText, Action: "stream", ReplyToID: replyID}) + al.markSessionStreamed(msg.SessionKey) }) } else { response, err = al.provider.Chat(ctx, messages, providerToolDefs, al.model, options) diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index fa8c33a..1773835 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -18,6 +18,7 @@ import ( "github.com/mymmrac/telego" "github.com/mymmrac/telego/telegoutil" + "golang.org/x/time/rate" "clawgo/pkg/bus" "clawgo/pkg/config" @@ -30,7 +31,9 @@ const ( telegramMaxConcurrentHandlers = 32 telegramStopWaitHandlersPeriod = 5 * time.Second telegramSafeHTMLMaxRunes = 3500 - telegramStreamPreviewMaxRunes = 3200 + telegramSafeMDV2MaxRunes = 3800 + telegramStreamSplitMaxRunes = 3000 + telegramStreamMaxRetries = 4 ) type TelegramChannel struct { @@ -46,11 +49,18 @@ type TelegramChannel struct { botUsername string streamMu sync.Mutex streamState map[string]telegramStreamState + streamLimit *rate.Limiter } type telegramStreamState struct { - MessageID int - LastContent string + MessageIDs []int + LastPayloads []string + LastModes []string +} + +type telegramRenderedChunk struct { + payload string + parseMode string } func (c *TelegramChannel) SupportsAction(action string) bool { @@ -77,6 +87,7 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr chatIDs: make(map[string]int64), handleSem: make(chan struct{}, telegramMaxConcurrentHandlers), streamState: make(map[string]telegramStreamState), + streamLimit: rate.NewLimiter(rate.Limit(2), 1), }, nil } @@ -754,17 +765,6 @@ func telegramStreamKey(chatID int64, replyToID string) string { return fmt.Sprintf("%d:%s", chatID, strings.TrimSpace(replyToID)) } -func tailRunes(s string, maxRunes int) string { - if maxRunes <= 0 { - return s - } - r := []rune(s) - if len(r) <= maxRunes { - return s - } - return "…\n" + string(r[len(r)-maxRunes:]) -} - func clampTelegramHTML(markdown string, maxRunes int) string { if maxRunes <= 0 { maxRunes = telegramSafeHTMLMaxRunes @@ -782,84 +782,276 @@ func clampTelegramHTML(markdown string, maxRunes int) string { func (c *TelegramChannel) handleStreamAction(ctx context.Context, chatID int64, msg bus.OutboundMessage) error { streamKey := telegramStreamKey(chatID, msg.ReplyToID) - content := tailRunes(msg.Content, telegramStreamPreviewMaxRunes) - htmlContent := clampTelegramHTML(content, telegramSafeHTMLMaxRunes) - if strings.TrimSpace(htmlContent) == "" { + chunks := renderTelegramStreamChunks(msg.Content) + if len(chunks) == 0 { return nil } c.streamMu.Lock() state := c.streamState[streamKey] - if state.LastContent == htmlContent { - c.streamMu.Unlock() - return nil - } + messageIDs := append([]int(nil), state.MessageIDs...) + lastPayloads := append([]string(nil), state.LastPayloads...) + lastModes := append([]string(nil), state.LastModes...) c.streamMu.Unlock() - if state.MessageID == 0 { - sendParams := telegoutil.Message(telegoutil.ID(chatID), htmlContent).WithParseMode(telego.ModeHTML) - if replyID, ok := parseTelegramMessageID(msg.ReplyToID); ok { - sendParams.ReplyParameters = &telego.ReplyParameters{MessageID: replyID} + // Ensure each chunk has a message slot (send if missing, edit if changed). + for i, ch := range chunks { + if i < len(messageIDs) { + prevPayload := "" + prevMode := "" + if i < len(lastPayloads) { + prevPayload = lastPayloads[i] + } + if i < len(lastModes) { + prevMode = lastModes[i] + } + if prevPayload == ch.payload && prevMode == ch.parseMode { + continue + } + if err := c.editTelegramMessageWithRetry(ctx, chatID, messageIDs[i], ch.payload, ch.parseMode); err != nil { + return err + } + continue } - sendCtx, cancel := withTelegramAPITimeout(ctx) - sent, err := c.bot.SendMessage(sendCtx, sendParams) - cancel() + + replyToID := "" + if i == 0 { + replyToID = msg.ReplyToID + } + sentID, err := c.sendTelegramMessageWithRetry(ctx, chatID, replyToID, ch.payload, ch.parseMode) if err != nil { - plain := tailRunes(plainTextFromTelegramHTML(htmlContent), telegramSafeHTMLMaxRunes) - if strings.TrimSpace(plain) == "" { - return err - } - sendPlainCtx, cancelPlain := withTelegramAPITimeout(ctx) - sent, err = c.bot.SendMessage(sendPlainCtx, telegoutil.Message(telegoutil.ID(chatID), plain)) - cancelPlain() - if err != nil { - return err - } + return err } - c.streamMu.Lock() - c.streamState[streamKey] = telegramStreamState{MessageID: sent.MessageID, LastContent: htmlContent} - c.streamMu.Unlock() + messageIDs = append(messageIDs, sentID) + } + + // Remove stale tail chunks when content shrinks. + if len(messageIDs) > len(chunks) { + for i := len(chunks); i < len(messageIDs); i++ { + _ = c.deleteTelegramMessageWithRetry(ctx, chatID, messageIDs[i]) + } + messageIDs = messageIDs[:len(chunks)] + } + + nextPayloads := make([]string, 0, len(chunks)) + nextModes := make([]string, 0, len(chunks)) + for _, ch := range chunks { + nextPayloads = append(nextPayloads, ch.payload) + nextModes = append(nextModes, ch.parseMode) + } + + c.streamMu.Lock() + c.streamState[streamKey] = telegramStreamState{ + MessageIDs: messageIDs, + LastPayloads: nextPayloads, + LastModes: nextModes, + } + c.streamMu.Unlock() + return nil +} + +func renderTelegramStreamChunks(content string) []telegramRenderedChunk { + raw := strings.TrimSpace(content) + if raw == "" { + return nil + } + mode, body := detectTelegramStreamMode(raw) + if strings.TrimSpace(body) == "" { return nil } - editCtx, cancel := withTelegramAPITimeout(ctx) - _, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{ - ChatID: telegoutil.ID(chatID), - MessageID: state.MessageID, - Text: htmlContent, - ParseMode: telego.ModeHTML, - }) - cancel() - if err != nil { + var parts []string + switch mode { + case telego.ModeMarkdownV2: + parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes) + case telego.ModeHTML: + parts = splitTelegramText(body, telegramSafeHTMLMaxRunes) + default: + parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes) + mode = "auto_markdown" + } + + out := make([]telegramRenderedChunk, 0, len(parts)) + for _, p := range parts { + trimmed := strings.TrimSpace(p) + if trimmed == "" { + continue + } + switch mode { + case telego.ModeHTML: + payload := sanitizeTelegramHTML(trimmed) + if len([]rune(payload)) > telegramSafeHTMLMaxRunes { + payload = splitTelegramText(payload, telegramSafeHTMLMaxRunes)[0] + } + if strings.TrimSpace(payload) != "" { + out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeHTML}) + } + case telego.ModeMarkdownV2: + payload := trimmed + if len([]rune(payload)) > telegramSafeMDV2MaxRunes { + payload = splitTelegramText(payload, telegramSafeMDV2MaxRunes)[0] + } + if strings.TrimSpace(payload) != "" { + out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeMarkdownV2}) + } + default: + payload := sanitizeTelegramHTML(markdownToTelegramHTML(trimmed)) + if len([]rune(payload)) > telegramSafeHTMLMaxRunes { + payload = clampTelegramHTML(trimmed, telegramSafeHTMLMaxRunes) + } + if strings.TrimSpace(payload) != "" { + out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeHTML}) + } + } + } + return out +} + +func detectTelegramStreamMode(content string) (mode string, body string) { + trimmed := strings.TrimSpace(content) + switch { + case strings.HasPrefix(strings.ToLower(trimmed), "[mode:html]"): + return telego.ModeHTML, strings.TrimSpace(trimmed[len("[mode:html]"):]) + case strings.HasPrefix(strings.ToLower(trimmed), "[mode:markdownv2]"): + return telego.ModeMarkdownV2, strings.TrimSpace(trimmed[len("[mode:markdownv2]"):]) + default: + return "auto_markdown", content + } +} + +func (c *TelegramChannel) withTelegramRetry(ctx context.Context, opName string, fn func(context.Context) error) error { + var lastErr error + for attempt := 1; attempt <= telegramStreamMaxRetries; attempt++ { + if c.streamLimit != nil { + if err := c.streamLimit.Wait(ctx); err != nil { + return err + } + } + opCtx, cancel := withTelegramAPITimeout(ctx) + err := fn(opCtx) + cancel() + if err == nil { + return nil + } + + lastErr = err errText := strings.ToLower(err.Error()) if strings.Contains(errText, "message is not modified") { return nil } - if strings.Contains(errText, "message is too long") || strings.Contains(errText, "can't parse entities") { - plain := tailRunes(plainTextFromTelegramHTML(htmlContent), telegramSafeHTMLMaxRunes) - if strings.TrimSpace(plain) == "" { - return err - } - editPlainCtx, cancelPlain := withTelegramAPITimeout(ctx) - _, err = c.bot.EditMessageText(editPlainCtx, &telego.EditMessageTextParams{ + if attempt == telegramStreamMaxRetries { + break + } + if !shouldRetryTelegramError(err) { + break + } + waitFor := retryAfterFromTelegramError(err) + if waitFor <= 0 { + waitFor = time.Duration(attempt) * 500 * time.Millisecond + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitFor): + } + logger.WarnCF("telegram", logger.C0063, map[string]interface{}{ + "op": opName, + "attempt": attempt, + "retry": true, + "error": err.Error(), + }) + } + return lastErr +} + +func shouldRetryTelegramError(err error) bool { + if err == nil { + return false + } + text := strings.ToLower(err.Error()) + return strings.Contains(text, "too many requests") || + strings.Contains(text, "retry after") || + strings.Contains(text, "429") || + strings.Contains(text, "timeout") || + strings.Contains(text, "connection reset") || + strings.Contains(text, "502") || + strings.Contains(text, "503") || + strings.Contains(text, "504") +} + +func retryAfterFromTelegramError(err error) time.Duration { + if err == nil { + return 0 + } + re := regexp.MustCompile(`(?i)retry after\s+(\d+)`) + m := re.FindStringSubmatch(err.Error()) + if len(m) < 2 { + return 0 + } + sec, convErr := strconv.Atoi(strings.TrimSpace(m[1])) + if convErr != nil || sec <= 0 { + return 0 + } + return time.Duration(sec) * time.Second +} + +func (c *TelegramChannel) sendTelegramMessageWithRetry(ctx context.Context, chatID int64, replyToID string, payload, parseMode string) (int, error) { + var sent *telego.Message + err := c.withTelegramRetry(ctx, "send", func(callCtx context.Context) error { + params := telegoutil.Message(telegoutil.ID(chatID), payload) + if strings.TrimSpace(parseMode) != "" { + params.WithParseMode(parseMode) + } + if replyID, ok := parseTelegramMessageID(replyToID); ok { + params.ReplyParameters = &telego.ReplyParameters{MessageID: replyID} + } + msg, sendErr := c.bot.SendMessage(callCtx, params) + if sendErr != nil && parseMode == telego.ModeHTML { + plain := plainTextFromTelegramHTML(payload) + msg, sendErr = c.bot.SendMessage(callCtx, telegoutil.Message(telegoutil.ID(chatID), plain)) + } + sent = msg + return sendErr + }) + if err != nil { + return 0, err + } + if sent == nil { + return 0, fmt.Errorf("send returned empty response") + } + return sent.MessageID, nil +} + +func (c *TelegramChannel) editTelegramMessageWithRetry(ctx context.Context, chatID int64, messageID int, payload, parseMode string) error { + return c.withTelegramRetry(ctx, "edit", func(callCtx context.Context) error { + params := &telego.EditMessageTextParams{ + ChatID: telegoutil.ID(chatID), + MessageID: messageID, + Text: payload, + } + if strings.TrimSpace(parseMode) != "" { + params.ParseMode = parseMode + } + _, editErr := c.bot.EditMessageText(callCtx, params) + if editErr != nil && parseMode == telego.ModeHTML { + plain := plainTextFromTelegramHTML(payload) + _, editErr = c.bot.EditMessageText(callCtx, &telego.EditMessageTextParams{ ChatID: telegoutil.ID(chatID), - MessageID: state.MessageID, + MessageID: messageID, Text: plain, }) - cancelPlain() - if err != nil && !strings.Contains(strings.ToLower(err.Error()), "message is not modified") { - return err - } - } else { - return err } - } + return editErr + }) +} - c.streamMu.Lock() - state.LastContent = htmlContent - c.streamState[streamKey] = state - c.streamMu.Unlock() - return nil +func (c *TelegramChannel) deleteTelegramMessageWithRetry(ctx context.Context, chatID int64, messageID int) error { + return c.withTelegramRetry(ctx, "delete", func(callCtx context.Context) error { + return c.bot.DeleteMessage(callCtx, &telego.DeleteMessageParams{ + ChatID: telegoutil.ID(chatID), + MessageID: messageID, + }) + }) } func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action string, msg bus.OutboundMessage) error {