telegram rollback: remove thinking placeholder/stream/finalize action paths

This commit is contained in:
DBT
2026-02-27 01:01:46 +00:00
parent d51fdb634c
commit 8a422ea922

View File

@@ -39,8 +39,6 @@ type TelegramChannel struct {
chatIDsMu sync.RWMutex
updates <-chan telego.Update
runCancel cancelGuard
placeholders sync.Map // chatID -> messageID
stopThinking sync.Map // chatID -> chan struct{}
handleSem chan struct{}
handleWG sync.WaitGroup
botUsername string
@@ -48,7 +46,7 @@ type TelegramChannel struct {
func (c *TelegramChannel) SupportsAction(action string) bool {
switch strings.ToLower(strings.TrimSpace(action)) {
case "", "send", "stream", "finalize", "edit", "delete", "react":
case "", "send", "edit", "delete", "react":
return true
default:
return false
@@ -67,10 +65,8 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr
BaseChannel: base,
bot: bot,
config: cfg,
chatIDs: make(map[string]int64),
placeholders: sync.Map{},
stopThinking: sync.Map{},
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
chatIDs: make(map[string]int64),
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
}, nil
}
@@ -180,16 +176,6 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
logger.WarnC("telegram", "Timeout waiting for telegram message handlers to stop")
}
c.stopThinking.Range(func(key, value interface{}) bool {
safeCloseSignal(value)
c.stopThinking.Delete(key)
return true
})
c.placeholders.Range(func(key, _ interface{}) bool {
c.placeholders.Delete(key)
return true
})
return nil
}
@@ -263,23 +249,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
if action == "" {
action = "send"
}
if action == "send" {
if stop, ok := c.stopThinking.LoadAndDelete(msg.ChatID); ok {
safeCloseSignal(stop)
}
}
if action == "finalize" {
if stop, ok := c.stopThinking.LoadAndDelete(msg.ChatID); ok {
safeCloseSignal(stop)
}
if pID, ok := c.placeholders.LoadAndDelete(msg.ChatID); ok {
delCtx, cancel := withTelegramAPITimeout(ctx)
_ = c.bot.DeleteMessage(delCtx, &telego.DeleteMessageParams{ChatID: chatID, MessageID: pID.(int)})
cancel()
}
return nil
}
if action != "send" && action != "stream" {
if action != "send" {
return c.handleAction(ctx, chatIDInt, action, msg)
}
@@ -302,39 +272,6 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
markup = telegoutil.InlineKeyboard(rows...)
}
if pID, ok := c.placeholders.Load(msg.ChatID); ok {
htmlForEdit := htmlContent
if len([]rune(htmlForEdit)) > 3500 {
htmlForEdit = sanitizeTelegramHTML(markdownToTelegramHTML(splitTelegramText(plainTextFromTelegramHTML(htmlForEdit), 3500)[0]))
}
editCtx, cancelEdit := withTelegramAPITimeout(ctx)
params := &telego.EditMessageTextParams{
ChatID: chatID,
MessageID: pID.(int),
Text: htmlForEdit,
ParseMode: telego.ModeHTML,
ReplyMarkup: markup,
}
_, err := c.bot.EditMessageText(editCtx, params)
cancelEdit()
if err == nil {
if action == "send" {
c.placeholders.Delete(msg.ChatID)
}
return nil
}
logger.WarnCF("telegram", "Placeholder update failed; fallback to new message", map[string]interface{}{
logger.FieldChatID: msg.ChatID,
logger.FieldError: err.Error(),
})
}
if action == "stream" {
// stream updates should target existing placeholder only
return nil
}
if len([]rune(htmlContent)) > 3500 {
plain := plainTextFromTelegramHTML(htmlContent)
chunks := splitTelegramText(plain, 3500)
@@ -657,52 +594,6 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata)
}
func (c *TelegramChannel) startThinkingPlaceholder(runCtx context.Context, chatID int64, replyTo int) {
chatKey := fmt.Sprintf("%d", chatID)
if stop, ok := c.stopThinking.LoadAndDelete(chatKey); ok {
safeCloseSignal(stop)
}
sendCtx, cancelSend := withTelegramAPITimeout(runCtx)
params := telegoutil.Message(telegoutil.ID(chatID), "⏳ Thinking...")
if replyTo > 0 {
params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo}
}
sent, err := c.bot.SendMessage(sendCtx, params)
cancelSend()
if err != nil || sent == nil {
return
}
c.placeholders.Store(chatKey, sent.MessageID)
stopCh := make(chan struct{})
c.stopThinking.Store(chatKey, stopCh)
go func(chatID int64, messageID int, stop <-chan struct{}) {
frames := []string{"⏳ Thinking.", "⏳ Thinking..", "⏳ Thinking..."}
ticker := time.NewTicker(4 * time.Second)
defer ticker.Stop()
i := 0
for {
select {
case <-runCtx.Done():
return
case <-stop:
return
case <-ticker.C:
editCtx, cancel := withTelegramAPITimeout(runCtx)
_, _ = c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{
ChatID: telegoutil.ID(chatID),
MessageID: messageID,
Text: frames[i%len(frames)],
})
cancel()
i++
}
}
}(chatID, sent.MessageID, stopCh)
}
func (c *TelegramChannel) downloadFile(runCtx context.Context, fileID, ext, fileName string) string {
getFileCtx, cancelGetFile := context.WithTimeout(runCtx, telegramAPICallTimeout)
file, err := c.bot.GetFile(getFileCtx, &telego.GetFileParams{FileID: fileID})