refactor(responses): remove chat_completions and wire responses features end-to-end

This commit is contained in:
lpf
2026-03-04 10:25:50 +08:00
parent d42b6a6f10
commit 6c78ef3b1c
14 changed files with 726 additions and 486 deletions

View File

@@ -29,6 +29,8 @@ const (
telegramAPICallTimeout = 15 * time.Second
telegramMaxConcurrentHandlers = 32
telegramStopWaitHandlersPeriod = 5 * time.Second
telegramSafeHTMLMaxRunes = 3500
telegramStreamPreviewMaxRunes = 3200
)
type TelegramChannel struct {
@@ -42,11 +44,18 @@ type TelegramChannel struct {
handleSem chan struct{}
handleWG sync.WaitGroup
botUsername string
streamMu sync.Mutex
streamState map[string]telegramStreamState
}
type telegramStreamState struct {
MessageID int
LastContent string
}
func (c *TelegramChannel) SupportsAction(action string) bool {
switch strings.ToLower(strings.TrimSpace(action)) {
case "", "send", "edit", "delete", "react":
case "", "send", "edit", "delete", "react", "stream":
return true
default:
return false
@@ -67,6 +76,7 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr
config: cfg,
chatIDs: make(map[string]int64),
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
streamState: make(map[string]telegramStreamState),
}, nil
}
@@ -252,6 +262,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
if action != "send" {
return c.handleAction(ctx, chatIDInt, action, msg)
}
streamKey := telegramStreamKey(chatIDInt, msg.ReplyToID)
htmlContent := sanitizeTelegramHTML(markdownToTelegramHTML(msg.Content))
@@ -292,6 +303,9 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return err
}
}
c.streamMu.Lock()
delete(c.streamState, streamKey)
c.streamMu.Unlock()
return nil
}
@@ -325,8 +339,14 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return err
}
}
c.streamMu.Lock()
delete(c.streamState, streamKey)
c.streamMu.Unlock()
return nil
}
c.streamMu.Lock()
delete(c.streamState, streamKey)
c.streamMu.Unlock()
return nil
}
@@ -730,21 +750,132 @@ func parseChatID(chatIDStr string) (int64, error) {
return id, err
}
func telegramStreamKey(chatID int64, replyToID string) string {
return fmt.Sprintf("%d:%s", chatID, strings.TrimSpace(replyToID))
}
func tailRunes(s string, maxRunes int) string {
if maxRunes <= 0 {
return s
}
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return "…\n" + string(r[len(r)-maxRunes:])
}
func clampTelegramHTML(markdown string, maxRunes int) string {
if maxRunes <= 0 {
maxRunes = telegramSafeHTMLMaxRunes
}
htmlContent := sanitizeTelegramHTML(markdownToTelegramHTML(markdown))
if len([]rune(htmlContent)) <= maxRunes {
return htmlContent
}
chunks := splitTelegramMarkdown(markdown, maxRunes-400)
if len(chunks) == 0 {
return ""
}
return sanitizeTelegramHTML(markdownToTelegramHTML(chunks[0]))
}
func (c *TelegramChannel) handleStreamAction(ctx context.Context, chatID int64, msg bus.OutboundMessage) error {
streamKey := telegramStreamKey(chatID, msg.ReplyToID)
content := tailRunes(msg.Content, telegramStreamPreviewMaxRunes)
htmlContent := clampTelegramHTML(content, telegramSafeHTMLMaxRunes)
if strings.TrimSpace(htmlContent) == "" {
return nil
}
c.streamMu.Lock()
state := c.streamState[streamKey]
if state.LastContent == htmlContent {
c.streamMu.Unlock()
return nil
}
c.streamMu.Unlock()
if state.MessageID == 0 {
sendParams := telegoutil.Message(telegoutil.ID(chatID), htmlContent).WithParseMode(telego.ModeHTML)
if replyID, ok := parseTelegramMessageID(msg.ReplyToID); ok {
sendParams.ReplyParameters = &telego.ReplyParameters{MessageID: replyID}
}
sendCtx, cancel := withTelegramAPITimeout(ctx)
sent, err := c.bot.SendMessage(sendCtx, sendParams)
cancel()
if err != nil {
plain := tailRunes(plainTextFromTelegramHTML(htmlContent), telegramSafeHTMLMaxRunes)
if strings.TrimSpace(plain) == "" {
return err
}
sendPlainCtx, cancelPlain := withTelegramAPITimeout(ctx)
sent, err = c.bot.SendMessage(sendPlainCtx, telegoutil.Message(telegoutil.ID(chatID), plain))
cancelPlain()
if err != nil {
return err
}
}
c.streamMu.Lock()
c.streamState[streamKey] = telegramStreamState{MessageID: sent.MessageID, LastContent: htmlContent}
c.streamMu.Unlock()
return nil
}
editCtx, cancel := withTelegramAPITimeout(ctx)
_, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{
ChatID: telegoutil.ID(chatID),
MessageID: state.MessageID,
Text: htmlContent,
ParseMode: telego.ModeHTML,
})
cancel()
if err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "message is not modified") {
return nil
}
if strings.Contains(errText, "message is too long") || strings.Contains(errText, "can't parse entities") {
plain := tailRunes(plainTextFromTelegramHTML(htmlContent), telegramSafeHTMLMaxRunes)
if strings.TrimSpace(plain) == "" {
return err
}
editPlainCtx, cancelPlain := withTelegramAPITimeout(ctx)
_, err = c.bot.EditMessageText(editPlainCtx, &telego.EditMessageTextParams{
ChatID: telegoutil.ID(chatID),
MessageID: state.MessageID,
Text: plain,
})
cancelPlain()
if err != nil && !strings.Contains(strings.ToLower(err.Error()), "message is not modified") {
return err
}
} else {
return err
}
}
c.streamMu.Lock()
state.LastContent = htmlContent
c.streamState[streamKey] = state
c.streamMu.Unlock()
return nil
}
func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action string, msg bus.OutboundMessage) error {
messageID, ok := parseTelegramMessageID(msg.MessageID)
if !ok && action != "send" {
if !ok && action != "send" && action != "stream" {
return fmt.Errorf("message_id required for action=%s", action)
}
switch action {
case "edit":
htmlContent := sanitizeTelegramHTML(markdownToTelegramHTML(msg.Content))
if len([]rune(htmlContent)) > 3500 {
htmlContent = sanitizeTelegramHTML(markdownToTelegramHTML(splitTelegramMarkdown(msg.Content, 3000)[0]))
}
htmlContent := clampTelegramHTML(msg.Content, telegramSafeHTMLMaxRunes)
editCtx, cancel := withTelegramAPITimeout(ctx)
defer cancel()
_, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{ChatID: telegoutil.ID(chatID), MessageID: messageID, Text: htmlContent, ParseMode: telego.ModeHTML})
return err
case "stream":
return c.handleStreamAction(ctx, chatID, msg)
case "delete":
delCtx, cancel := withTelegramAPITimeout(ctx)
defer cancel()