mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-14 18:17:29 +08:00
fix bug
This commit is contained in:
@@ -247,29 +247,39 @@ func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, wo
|
||||
al.removeWorker(sessionKey, worker)
|
||||
return
|
||||
case msg := <-worker.queue:
|
||||
taskCtx, cancel := context.WithCancel(ctx)
|
||||
worker.cancelMu.Lock()
|
||||
worker.cancel = cancel
|
||||
worker.cancelMu.Unlock()
|
||||
func() {
|
||||
taskCtx, cancel := context.WithCancel(ctx)
|
||||
worker.cancelMu.Lock()
|
||||
worker.cancel = cancel
|
||||
worker.cancelMu.Unlock()
|
||||
|
||||
response, err := al.processMessage(taskCtx, msg)
|
||||
cancel()
|
||||
al.clearWorkerCancel(worker)
|
||||
defer func() {
|
||||
cancel()
|
||||
al.clearWorkerCancel(worker)
|
||||
if r := recover(); r != nil {
|
||||
logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{
|
||||
"session_key": sessionKey,
|
||||
"panic": fmt.Sprintf("%v", r),
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
continue
|
||||
response, err := al.processMessage(taskCtx, msg)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
response = fmt.Sprintf("Error processing message: %v", err)
|
||||
}
|
||||
response = fmt.Sprintf("Error processing message: %v", err)
|
||||
}
|
||||
|
||||
if response != "" {
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{
|
||||
Channel: msg.Channel,
|
||||
ChatID: msg.ChatID,
|
||||
Content: response,
|
||||
})
|
||||
}
|
||||
if response != "" {
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{
|
||||
Channel: msg.Channel,
|
||||
ChatID: msg.ChatID,
|
||||
Content: response,
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,16 +21,26 @@ import (
|
||||
"clawgo/pkg/voice"
|
||||
)
|
||||
|
||||
const (
|
||||
telegramDownloadTimeout = 30 * time.Second
|
||||
telegramAPICallTimeout = 15 * time.Second
|
||||
telegramMaxConcurrentHandlers = 32
|
||||
telegramStopWaitHandlersPeriod = 5 * time.Second
|
||||
)
|
||||
|
||||
type TelegramChannel struct {
|
||||
*BaseChannel
|
||||
bot *telego.Bot
|
||||
config config.TelegramConfig
|
||||
chatIDs map[string]int64
|
||||
chatIDsMu sync.RWMutex
|
||||
updates <-chan telego.Update
|
||||
runCancel cancelGuard
|
||||
transcriber *voice.GroqTranscriber
|
||||
placeholders sync.Map // chatID -> messageID
|
||||
stopThinking sync.Map // chatID -> chan struct{}
|
||||
handleSem chan struct{}
|
||||
handleWG sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) {
|
||||
@@ -49,6 +59,7 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr
|
||||
transcriber: nil,
|
||||
placeholders: sync.Map{},
|
||||
stopThinking: sync.Map{},
|
||||
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -112,7 +123,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
if update.Message != nil {
|
||||
c.handleMessage(update.Message)
|
||||
c.dispatchHandleMessage(runCtx, update.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -129,6 +140,17 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
|
||||
c.setRunning(false)
|
||||
c.runCancel.cancelAndClear()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
c.handleWG.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(telegramStopWaitHandlersPeriod):
|
||||
logger.WarnC("telegram", "Timeout waiting for telegram message handlers to stop")
|
||||
}
|
||||
|
||||
c.stopThinking.Range(func(key, value interface{}) bool {
|
||||
safeCloseSignal(value)
|
||||
c.stopThinking.Delete(key)
|
||||
@@ -146,6 +168,32 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) dispatchHandleMessage(runCtx context.Context, message *telego.Message) {
|
||||
if message == nil {
|
||||
return
|
||||
}
|
||||
c.handleWG.Add(1)
|
||||
go func(msg *telego.Message) {
|
||||
defer c.handleWG.Done()
|
||||
|
||||
select {
|
||||
case <-runCtx.Done():
|
||||
return
|
||||
case c.handleSem <- struct{}{}:
|
||||
}
|
||||
defer func() { <-c.handleSem }()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.ErrorCF("telegram", "Recovered panic in telegram message handler", map[string]interface{}{
|
||||
"panic": fmt.Sprintf("%v", r),
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
c.handleMessage(runCtx, msg)
|
||||
}(message)
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("telegram bot not running")
|
||||
@@ -224,7 +272,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.Message) {
|
||||
if message == nil {
|
||||
return
|
||||
}
|
||||
@@ -237,7 +285,9 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
senderID := fmt.Sprintf("%d", user.ID)
|
||||
|
||||
chatID := message.Chat.ID
|
||||
c.chatIDsMu.Lock()
|
||||
c.chatIDs[senderID] = chatID
|
||||
c.chatIDsMu.Unlock()
|
||||
|
||||
content := ""
|
||||
mediaPaths := []string{}
|
||||
@@ -255,7 +305,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
|
||||
if message.Photo != nil && len(message.Photo) > 0 {
|
||||
photo := message.Photo[len(message.Photo)-1]
|
||||
photoPath := c.downloadFile(photo.FileID, ".jpg")
|
||||
photoPath := c.downloadFile(runCtx, photo.FileID, ".jpg")
|
||||
if photoPath != "" {
|
||||
mediaPaths = append(mediaPaths, photoPath)
|
||||
if content != "" {
|
||||
@@ -266,13 +316,13 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
}
|
||||
|
||||
if message.Voice != nil {
|
||||
voicePath := c.downloadFile(message.Voice.FileID, ".ogg")
|
||||
voicePath := c.downloadFile(runCtx, message.Voice.FileID, ".ogg")
|
||||
if voicePath != "" {
|
||||
mediaPaths = append(mediaPaths, voicePath)
|
||||
|
||||
transcribedText := ""
|
||||
if c.transcriber != nil && c.transcriber.IsAvailable() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
ctx, cancel := context.WithTimeout(runCtx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
result, err := c.transcriber.Transcribe(ctx, voicePath)
|
||||
@@ -299,7 +349,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
}
|
||||
|
||||
if message.Audio != nil {
|
||||
audioPath := c.downloadFile(message.Audio.FileID, ".mp3")
|
||||
audioPath := c.downloadFile(runCtx, message.Audio.FileID, ".mp3")
|
||||
if audioPath != "" {
|
||||
mediaPaths = append(mediaPaths, audioPath)
|
||||
if content != "" {
|
||||
@@ -310,7 +360,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
}
|
||||
|
||||
if message.Document != nil {
|
||||
docPath := c.downloadFile(message.Document.FileID, "")
|
||||
docPath := c.downloadFile(runCtx, message.Document.FileID, "")
|
||||
if docPath != "" {
|
||||
mediaPaths = append(mediaPaths, docPath)
|
||||
if content != "" {
|
||||
@@ -338,10 +388,12 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
}
|
||||
|
||||
// Thinking indicator
|
||||
_ = c.bot.SendChatAction(context.Background(), &telego.SendChatActionParams{
|
||||
apiCtx, cancelAPI := context.WithTimeout(runCtx, telegramAPICallTimeout)
|
||||
_ = c.bot.SendChatAction(apiCtx, &telego.SendChatActionParams{
|
||||
ChatID: telegoutil.ID(chatID),
|
||||
Action: telego.ChatActionTyping,
|
||||
})
|
||||
cancelAPI()
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
if prev, ok := c.stopThinking.LoadAndDelete(fmt.Sprintf("%d", chatID)); ok {
|
||||
@@ -353,7 +405,9 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
logger.FieldChatID: chatID,
|
||||
})
|
||||
|
||||
pMsg, err := c.bot.SendMessage(context.Background(), telegoutil.Message(telegoutil.ID(chatID), "Thinking... 💭"))
|
||||
sendCtx, cancelSend := context.WithTimeout(runCtx, telegramAPICallTimeout)
|
||||
pMsg, err := c.bot.SendMessage(sendCtx, telegoutil.Message(telegoutil.ID(chatID), "Thinking... 💭"))
|
||||
cancelSend()
|
||||
if err == nil {
|
||||
pID := pMsg.MessageID
|
||||
c.placeholders.Store(fmt.Sprintf("%d", chatID), pID)
|
||||
@@ -362,7 +416,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
"message_id": pID,
|
||||
})
|
||||
|
||||
go func(cid int64, mid int, stop <-chan struct{}) {
|
||||
go func(cid int64, mid int, stop <-chan struct{}, parentCtx context.Context) {
|
||||
dots := []string{".", "..", "..."}
|
||||
emotes := []string{"💭", "🤔", "☁️"}
|
||||
i := 0
|
||||
@@ -370,6 +424,8 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-parentCtx.Done():
|
||||
return
|
||||
case <-stop:
|
||||
logger.DebugCF("telegram", "Telegram thinking animation stopped", map[string]interface{}{
|
||||
logger.FieldChatID: cid,
|
||||
@@ -378,11 +434,14 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
case <-ticker.C:
|
||||
i++
|
||||
text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)])
|
||||
if _, err := c.bot.EditMessageText(context.Background(), &telego.EditMessageTextParams{
|
||||
editCtx, cancelEdit := context.WithTimeout(parentCtx, telegramAPICallTimeout)
|
||||
_, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{
|
||||
ChatID: telegoutil.ID(cid),
|
||||
MessageID: mid,
|
||||
Text: text,
|
||||
}); err != nil {
|
||||
})
|
||||
cancelEdit()
|
||||
if err != nil {
|
||||
logger.DebugCF("telegram", "Telegram thinking animation edit failed", map[string]interface{}{
|
||||
logger.FieldChatID: cid,
|
||||
"message_id": mid,
|
||||
@@ -391,7 +450,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}(chatID, pID, stopChan)
|
||||
}(chatID, pID, stopChan, runCtx)
|
||||
} else {
|
||||
logger.WarnCF("telegram", "Telegram thinking placeholder create failed", map[string]interface{}{
|
||||
logger.FieldChatID: chatID,
|
||||
@@ -410,8 +469,10 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata)
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) downloadFile(fileID, ext string) string {
|
||||
file, err := c.bot.GetFile(context.Background(), &telego.GetFileParams{FileID: fileID})
|
||||
func (c *TelegramChannel) downloadFile(runCtx context.Context, fileID, ext string) string {
|
||||
getFileCtx, cancelGetFile := context.WithTimeout(runCtx, telegramAPICallTimeout)
|
||||
file, err := c.bot.GetFile(getFileCtx, &telego.GetFileParams{FileID: fileID})
|
||||
cancelGetFile()
|
||||
if err != nil {
|
||||
logger.WarnCF("telegram", "Failed to get file", map[string]interface{}{
|
||||
logger.FieldError: err.Error(),
|
||||
@@ -439,7 +500,7 @@ func (c *TelegramChannel) downloadFile(fileID, ext string) string {
|
||||
|
||||
localPath := filepath.Join(mediaDir, fileID[:min(16, len(fileID))]+ext)
|
||||
|
||||
if err := c.downloadFromURL(url, localPath); err != nil {
|
||||
if err := c.downloadFromURL(runCtx, url, localPath); err != nil {
|
||||
logger.WarnCF("telegram", "Failed to download file", map[string]interface{}{
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
@@ -456,8 +517,17 @@ func min(a, b int) int {
|
||||
return b
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) downloadFromURL(url, localPath string) error {
|
||||
resp, err := http.Get(url)
|
||||
func (c *TelegramChannel) downloadFromURL(runCtx context.Context, url, localPath string) error {
|
||||
downloadCtx, cancelDownload := context.WithTimeout(runCtx, telegramDownloadTimeout)
|
||||
defer cancelDownload()
|
||||
|
||||
req, err := http.NewRequestWithContext(downloadCtx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: telegramDownloadTimeout}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user