From 8a422ea922fc620a9089623522918b1f8e18bd76 Mon Sep 17 00:00:00 2001 From: DBT Date: Fri, 27 Feb 2026 01:01:46 +0000 Subject: [PATCH] telegram rollback: remove thinking placeholder/stream/finalize action paths --- pkg/channels/telegram.go | 117 ++------------------------------------- 1 file changed, 4 insertions(+), 113 deletions(-) diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index f7c5cad..9545de2 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -39,8 +39,6 @@ type TelegramChannel struct { chatIDsMu sync.RWMutex updates <-chan telego.Update runCancel cancelGuard - placeholders sync.Map // chatID -> messageID - stopThinking sync.Map // chatID -> chan struct{} handleSem chan struct{} handleWG sync.WaitGroup botUsername string @@ -48,7 +46,7 @@ type TelegramChannel struct { func (c *TelegramChannel) SupportsAction(action string) bool { switch strings.ToLower(strings.TrimSpace(action)) { - case "", "send", "stream", "finalize", "edit", "delete", "react": + case "", "send", "edit", "delete", "react": return true default: return false @@ -67,10 +65,8 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr BaseChannel: base, bot: bot, config: cfg, - chatIDs: make(map[string]int64), - placeholders: sync.Map{}, - stopThinking: sync.Map{}, - handleSem: make(chan struct{}, telegramMaxConcurrentHandlers), + chatIDs: make(map[string]int64), + handleSem: make(chan struct{}, telegramMaxConcurrentHandlers), }, nil } @@ -180,16 +176,6 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { logger.WarnC("telegram", "Timeout waiting for telegram message handlers to stop") } - c.stopThinking.Range(func(key, value interface{}) bool { - safeCloseSignal(value) - c.stopThinking.Delete(key) - return true - }) - c.placeholders.Range(func(key, _ interface{}) bool { - c.placeholders.Delete(key) - return true - }) - return nil } @@ -263,23 +249,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err if action == "" { action = "send" } - if action == "send" { - if stop, ok := c.stopThinking.LoadAndDelete(msg.ChatID); ok { - safeCloseSignal(stop) - } - } - if action == "finalize" { - if stop, ok := c.stopThinking.LoadAndDelete(msg.ChatID); ok { - safeCloseSignal(stop) - } - if pID, ok := c.placeholders.LoadAndDelete(msg.ChatID); ok { - delCtx, cancel := withTelegramAPITimeout(ctx) - _ = c.bot.DeleteMessage(delCtx, &telego.DeleteMessageParams{ChatID: chatID, MessageID: pID.(int)}) - cancel() - } - return nil - } - if action != "send" && action != "stream" { + if action != "send" { return c.handleAction(ctx, chatIDInt, action, msg) } @@ -302,39 +272,6 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err markup = telegoutil.InlineKeyboard(rows...) } - if pID, ok := c.placeholders.Load(msg.ChatID); ok { - htmlForEdit := htmlContent - if len([]rune(htmlForEdit)) > 3500 { - htmlForEdit = sanitizeTelegramHTML(markdownToTelegramHTML(splitTelegramText(plainTextFromTelegramHTML(htmlForEdit), 3500)[0])) - } - editCtx, cancelEdit := withTelegramAPITimeout(ctx) - params := &telego.EditMessageTextParams{ - ChatID: chatID, - MessageID: pID.(int), - Text: htmlForEdit, - ParseMode: telego.ModeHTML, - ReplyMarkup: markup, - } - _, err := c.bot.EditMessageText(editCtx, params) - cancelEdit() - - if err == nil { - if action == "send" { - c.placeholders.Delete(msg.ChatID) - } - return nil - } - logger.WarnCF("telegram", "Placeholder update failed; fallback to new message", map[string]interface{}{ - logger.FieldChatID: msg.ChatID, - logger.FieldError: err.Error(), - }) - } - - if action == "stream" { - // stream updates should target existing placeholder only - return nil - } - if len([]rune(htmlContent)) > 3500 { plain := plainTextFromTelegramHTML(htmlContent) chunks := splitTelegramText(plain, 3500) @@ -657,52 +594,6 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego. c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) } -func (c *TelegramChannel) startThinkingPlaceholder(runCtx context.Context, chatID int64, replyTo int) { - chatKey := fmt.Sprintf("%d", chatID) - if stop, ok := c.stopThinking.LoadAndDelete(chatKey); ok { - safeCloseSignal(stop) - } - - sendCtx, cancelSend := withTelegramAPITimeout(runCtx) - params := telegoutil.Message(telegoutil.ID(chatID), "⏳ Thinking...") - if replyTo > 0 { - params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo} - } - sent, err := c.bot.SendMessage(sendCtx, params) - cancelSend() - if err != nil || sent == nil { - return - } - - c.placeholders.Store(chatKey, sent.MessageID) - stopCh := make(chan struct{}) - c.stopThinking.Store(chatKey, stopCh) - - go func(chatID int64, messageID int, stop <-chan struct{}) { - frames := []string{"⏳ Thinking.", "⏳ Thinking..", "⏳ Thinking..."} - ticker := time.NewTicker(4 * time.Second) - defer ticker.Stop() - i := 0 - for { - select { - case <-runCtx.Done(): - return - case <-stop: - return - case <-ticker.C: - editCtx, cancel := withTelegramAPITimeout(runCtx) - _, _ = c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{ - ChatID: telegoutil.ID(chatID), - MessageID: messageID, - Text: frames[i%len(frames)], - }) - cancel() - i++ - } - } - }(chatID, sent.MessageID, stopCh) -} - func (c *TelegramChannel) downloadFile(runCtx context.Context, fileID, ext, fileName string) string { getFileCtx, cancelGetFile := context.WithTimeout(runCtx, telegramAPICallTimeout) file, err := c.bot.GetFile(getFileCtx, &telego.GetFileParams{FileID: fileID})