diff --git a/README.md b/README.md index a7c2109..00f622c 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ ```bash clawgo onboard ``` -运行 `clawgo onboard` / `clawgo gateway` 时会弹出 `yes/no`,可选择是否授予 root 权限。 +运行 `clawgo gateway` 时会弹出 `yes/no`,可选择是否授予 root 权限。 若选择 `yes`,会以 `sudo` 重新执行命令,并启用高权限策略(仅强制禁止 `rm -rf /`)。 **2. 配置 CLIProxyAPI** diff --git a/README_EN.md b/README_EN.md index 67c98dc..73bff55 100644 --- a/README_EN.md +++ b/README_EN.md @@ -17,7 +17,7 @@ ```bash clawgo onboard ``` -When running `clawgo onboard` or `clawgo gateway`, a `yes/no` prompt asks whether to grant root privileges. +When running `clawgo gateway`, a `yes/no` prompt asks whether to grant root privileges. If `yes`, the command is re-executed via `sudo` and a high-permission shell policy is enabled (with `rm -rf /` still hard-blocked). **2. Configure CLIProxyAPI** diff --git a/cmd/clawgo/main.go b/cmd/clawgo/main.go index 85e22d7..ead58f2 100644 --- a/cmd/clawgo/main.go +++ b/cmd/clawgo/main.go @@ -111,7 +111,6 @@ func main() { switch command { case "onboard": - maybePromptAndEscalateRoot("onboard") onboard() case "agent": agentCmd() diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 0e3bded..0ef5888 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -247,29 +247,39 @@ func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, wo al.removeWorker(sessionKey, worker) return case msg := <-worker.queue: - taskCtx, cancel := context.WithCancel(ctx) - worker.cancelMu.Lock() - worker.cancel = cancel - worker.cancelMu.Unlock() + func() { + taskCtx, cancel := context.WithCancel(ctx) + worker.cancelMu.Lock() + worker.cancel = cancel + worker.cancelMu.Unlock() - response, err := al.processMessage(taskCtx, msg) - cancel() - al.clearWorkerCancel(worker) + defer func() { + cancel() + al.clearWorkerCancel(worker) + if r := recover(); r != nil { + logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{ + "session_key": sessionKey, + "panic": fmt.Sprintf("%v", r), + }) + } + }() - if err != nil { - if errors.Is(err, context.Canceled) { - continue + response, err := al.processMessage(taskCtx, msg) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + response = fmt.Sprintf("Error processing message: %v", err) } - response = fmt.Sprintf("Error processing message: %v", err) - } - if response != "" { - al.bus.PublishOutbound(bus.OutboundMessage{ - Channel: msg.Channel, - ChatID: msg.ChatID, - Content: response, - }) - } + if response != "" { + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: msg.Channel, + ChatID: msg.ChatID, + Content: response, + }) + } + }() } } } diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index f7e0a31..5560769 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -21,16 +21,26 @@ import ( "clawgo/pkg/voice" ) +const ( + telegramDownloadTimeout = 30 * time.Second + telegramAPICallTimeout = 15 * time.Second + telegramMaxConcurrentHandlers = 32 + telegramStopWaitHandlersPeriod = 5 * time.Second +) + type TelegramChannel struct { *BaseChannel bot *telego.Bot config config.TelegramConfig chatIDs map[string]int64 + chatIDsMu sync.RWMutex updates <-chan telego.Update runCancel cancelGuard transcriber *voice.GroqTranscriber placeholders sync.Map // chatID -> messageID stopThinking sync.Map // chatID -> chan struct{} + handleSem chan struct{} + handleWG sync.WaitGroup } func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) { @@ -49,6 +59,7 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr transcriber: nil, placeholders: sync.Map{}, stopThinking: sync.Map{}, + handleSem: make(chan struct{}, telegramMaxConcurrentHandlers), }, nil } @@ -112,7 +123,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error { continue } if update.Message != nil { - c.handleMessage(update.Message) + c.dispatchHandleMessage(runCtx, update.Message) } } } @@ -129,6 +140,17 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { c.setRunning(false) c.runCancel.cancelAndClear() + done := make(chan struct{}) + go func() { + c.handleWG.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(telegramStopWaitHandlersPeriod): + logger.WarnC("telegram", "Timeout waiting for telegram message handlers to stop") + } + c.stopThinking.Range(func(key, value interface{}) bool { safeCloseSignal(value) c.stopThinking.Delete(key) @@ -146,6 +168,32 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { return nil } +func (c *TelegramChannel) dispatchHandleMessage(runCtx context.Context, message *telego.Message) { + if message == nil { + return + } + c.handleWG.Add(1) + go func(msg *telego.Message) { + defer c.handleWG.Done() + + select { + case <-runCtx.Done(): + return + case c.handleSem <- struct{}{}: + } + defer func() { <-c.handleSem }() + defer func() { + if r := recover(); r != nil { + logger.ErrorCF("telegram", "Recovered panic in telegram message handler", map[string]interface{}{ + "panic": fmt.Sprintf("%v", r), + }) + } + }() + + c.handleMessage(runCtx, msg) + }(message) +} + func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("telegram bot not running") @@ -224,7 +272,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return nil } -func (c *TelegramChannel) handleMessage(message *telego.Message) { +func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.Message) { if message == nil { return } @@ -237,7 +285,9 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { senderID := fmt.Sprintf("%d", user.ID) chatID := message.Chat.ID + c.chatIDsMu.Lock() c.chatIDs[senderID] = chatID + c.chatIDsMu.Unlock() content := "" mediaPaths := []string{} @@ -255,7 +305,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { if message.Photo != nil && len(message.Photo) > 0 { photo := message.Photo[len(message.Photo)-1] - photoPath := c.downloadFile(photo.FileID, ".jpg") + photoPath := c.downloadFile(runCtx, photo.FileID, ".jpg") if photoPath != "" { mediaPaths = append(mediaPaths, photoPath) if content != "" { @@ -266,13 +316,13 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { } if message.Voice != nil { - voicePath := c.downloadFile(message.Voice.FileID, ".ogg") + voicePath := c.downloadFile(runCtx, message.Voice.FileID, ".ogg") if voicePath != "" { mediaPaths = append(mediaPaths, voicePath) transcribedText := "" if c.transcriber != nil && c.transcriber.IsAvailable() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(runCtx, 30*time.Second) defer cancel() result, err := c.transcriber.Transcribe(ctx, voicePath) @@ -299,7 +349,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { } if message.Audio != nil { - audioPath := c.downloadFile(message.Audio.FileID, ".mp3") + audioPath := c.downloadFile(runCtx, message.Audio.FileID, ".mp3") if audioPath != "" { mediaPaths = append(mediaPaths, audioPath) if content != "" { @@ -310,7 +360,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { } if message.Document != nil { - docPath := c.downloadFile(message.Document.FileID, "") + docPath := c.downloadFile(runCtx, message.Document.FileID, "") if docPath != "" { mediaPaths = append(mediaPaths, docPath) if content != "" { @@ -338,10 +388,12 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { } // Thinking indicator - _ = c.bot.SendChatAction(context.Background(), &telego.SendChatActionParams{ + apiCtx, cancelAPI := context.WithTimeout(runCtx, telegramAPICallTimeout) + _ = c.bot.SendChatAction(apiCtx, &telego.SendChatActionParams{ ChatID: telegoutil.ID(chatID), Action: telego.ChatActionTyping, }) + cancelAPI() stopChan := make(chan struct{}) if prev, ok := c.stopThinking.LoadAndDelete(fmt.Sprintf("%d", chatID)); ok { @@ -353,7 +405,9 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { logger.FieldChatID: chatID, }) - pMsg, err := c.bot.SendMessage(context.Background(), telegoutil.Message(telegoutil.ID(chatID), "Thinking... 💭")) + sendCtx, cancelSend := context.WithTimeout(runCtx, telegramAPICallTimeout) + pMsg, err := c.bot.SendMessage(sendCtx, telegoutil.Message(telegoutil.ID(chatID), "Thinking... 💭")) + cancelSend() if err == nil { pID := pMsg.MessageID c.placeholders.Store(fmt.Sprintf("%d", chatID), pID) @@ -362,7 +416,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { "message_id": pID, }) - go func(cid int64, mid int, stop <-chan struct{}) { + go func(cid int64, mid int, stop <-chan struct{}, parentCtx context.Context) { dots := []string{".", "..", "..."} emotes := []string{"💭", "🤔", "☁️"} i := 0 @@ -370,6 +424,8 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { defer ticker.Stop() for { select { + case <-parentCtx.Done(): + return case <-stop: logger.DebugCF("telegram", "Telegram thinking animation stopped", map[string]interface{}{ logger.FieldChatID: cid, @@ -378,11 +434,14 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { case <-ticker.C: i++ text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)]) - if _, err := c.bot.EditMessageText(context.Background(), &telego.EditMessageTextParams{ + editCtx, cancelEdit := context.WithTimeout(parentCtx, telegramAPICallTimeout) + _, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{ ChatID: telegoutil.ID(cid), MessageID: mid, Text: text, - }); err != nil { + }) + cancelEdit() + if err != nil { logger.DebugCF("telegram", "Telegram thinking animation edit failed", map[string]interface{}{ logger.FieldChatID: cid, "message_id": mid, @@ -391,7 +450,7 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { } } } - }(chatID, pID, stopChan) + }(chatID, pID, stopChan, runCtx) } else { logger.WarnCF("telegram", "Telegram thinking placeholder create failed", map[string]interface{}{ logger.FieldChatID: chatID, @@ -410,8 +469,10 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) } -func (c *TelegramChannel) downloadFile(fileID, ext string) string { - file, err := c.bot.GetFile(context.Background(), &telego.GetFileParams{FileID: fileID}) +func (c *TelegramChannel) downloadFile(runCtx context.Context, fileID, ext string) string { + getFileCtx, cancelGetFile := context.WithTimeout(runCtx, telegramAPICallTimeout) + file, err := c.bot.GetFile(getFileCtx, &telego.GetFileParams{FileID: fileID}) + cancelGetFile() if err != nil { logger.WarnCF("telegram", "Failed to get file", map[string]interface{}{ logger.FieldError: err.Error(), @@ -439,7 +500,7 @@ func (c *TelegramChannel) downloadFile(fileID, ext string) string { localPath := filepath.Join(mediaDir, fileID[:min(16, len(fileID))]+ext) - if err := c.downloadFromURL(url, localPath); err != nil { + if err := c.downloadFromURL(runCtx, url, localPath); err != nil { logger.WarnCF("telegram", "Failed to download file", map[string]interface{}{ logger.FieldError: err.Error(), }) @@ -456,8 +517,17 @@ func min(a, b int) int { return b } -func (c *TelegramChannel) downloadFromURL(url, localPath string) error { - resp, err := http.Get(url) +func (c *TelegramChannel) downloadFromURL(runCtx context.Context, url, localPath string) error { + downloadCtx, cancelDownload := context.WithTimeout(runCtx, telegramDownloadTimeout) + defer cancelDownload() + + req, err := http.NewRequestWithContext(downloadCtx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + client := &http.Client{Timeout: telegramDownloadTimeout} + resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to download: %w", err) }