diff --git a/README.md b/README.md index 2c4030f..a7c2109 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ ```bash clawgo onboard ``` +运行 `clawgo onboard` / `clawgo gateway` 时会弹出 `yes/no`,可选择是否授予 root 权限。 +若选择 `yes`,会以 `sudo` 重新执行命令,并启用高权限策略(仅强制禁止 `rm -rf /`)。 **2. 配置 CLIProxyAPI** ClawGo 强制要求使用 [CLIProxyAPI](https://github.com/router-for-me/CLIProxyAPI) 作为模型接入层。 @@ -74,12 +76,18 @@ export CLAWGO_CONFIG=/path/to/config.json ```text /help +/stop /status /config get channels.telegram.enabled /config set channels.telegram.enabled true /reload ``` +消息调度策略(按会话 `session_key`): +- 同一会话严格 FIFO 串行执行,后续消息进入队列等待。 +- `/stop` 会立即中断当前回复,并继续处理队列中的下一条消息。 +- 不同会话可并发执行,互不影响。 + ## 🧾 日志链路 默认启用文件日志,并支持自动分割和过期清理(默认保留 3 天): diff --git a/README_EN.md b/README_EN.md index f97f011..67c98dc 100644 --- a/README_EN.md +++ b/README_EN.md @@ -17,6 +17,8 @@ ```bash clawgo onboard ``` +When running `clawgo onboard` or `clawgo gateway`, a `yes/no` prompt asks whether to grant root privileges. +If `yes`, the command is re-executed via `sudo` and a high-permission shell policy is enabled (with `rm -rf /` still hard-blocked). **2. Configure CLIProxyAPI** ClawGo requires [CLIProxyAPI](https://github.com/router-for-me/CLIProxyAPI) as the model access layer. @@ -74,12 +76,18 @@ Slash commands are also supported in chat channels: ```text /help +/stop /status /config get channels.telegram.enabled /config set channels.telegram.enabled true /reload ``` +Message scheduling policy (per `session_key`): +- Same session runs in strict FIFO order; later messages are queued. +- `/stop` immediately cancels the current response, then processing continues with the next queued message. +- Different sessions can run concurrently. + ## 🧾 Logging Pipeline File logging is enabled by default with automatic rotation and retention cleanup (3 days by default): diff --git a/cmd/clawgo/main.go b/cmd/clawgo/main.go index 622a427..85e22d7 100644 --- a/cmd/clawgo/main.go +++ b/cmd/clawgo/main.go @@ -43,6 +43,8 @@ import ( const version = "0.1.0" const logo = "🦞" const gatewayServiceName = "clawgo-gateway.service" +const envRootPrompted = "CLAWGO_ROOT_PROMPTED" +const envRootGranted = "CLAWGO_ROOT_GRANTED" var globalConfigPathOverride string @@ -109,10 +111,12 @@ func main() { switch command { case "onboard": + maybePromptAndEscalateRoot("onboard") onboard() case "agent": agentCmd() case "gateway": + maybePromptAndEscalateRoot("gateway") gatewayCmd() case "status": statusCmd() @@ -269,6 +273,9 @@ func onboard() { } cfg := config.DefaultConfig() + if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") { + applyMaximumPermissionPolicy(cfg) + } if err := config.SaveConfig(configPath, cfg); err != nil { fmt.Printf("Error saving config: %v\n", err) os.Exit(1) @@ -669,6 +676,9 @@ func gatewayCmd() { fmt.Printf("Error loading config: %v\n", err) os.Exit(1) } + if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") { + applyMaximumPermissionPolicy(cfg) + } msgBus := bus.NewMessageBus() cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") @@ -753,6 +763,9 @@ func gatewayCmd() { fmt.Printf("✗ Reload failed (load config): %v\n", err) continue } + if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") { + applyMaximumPermissionPolicy(newCfg) + } if reflect.DeepEqual(cfg, newCfg) { fmt.Println("✓ Config unchanged, skip reload") @@ -842,6 +855,75 @@ func gatewayCmd() { } } +func maybePromptAndEscalateRoot(command string) { + if os.Getenv(envRootPrompted) == "1" { + return + } + if !isInteractiveStdin() { + return + } + + fmt.Printf("Grant root permissions for `clawgo %s`? (yes/no): ", command) + reader := bufio.NewReader(os.Stdin) + line, _ := reader.ReadString('\n') + answer := strings.ToLower(strings.TrimSpace(line)) + if answer != "yes" && answer != "y" { + _ = os.Setenv(envRootPrompted, "1") + _ = os.Setenv(envRootGranted, "0") + return + } + + _ = os.Setenv(envRootPrompted, "1") + _ = os.Setenv(envRootGranted, "1") + + if os.Geteuid() == 0 { + return + } + + exePath, err := os.Executable() + if err != nil { + fmt.Printf("Error resolving executable for sudo re-run: %v\n", err) + os.Exit(1) + } + exePath, _ = filepath.Abs(exePath) + + cmdArgs := append([]string{"-E", exePath}, os.Args[1:]...) + cmd := exec.Command("sudo", cmdArgs...) + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = append(os.Environ(), + envRootPrompted+"=1", + envRootGranted+"=1", + ) + + if err := cmd.Run(); err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + os.Exit(exitErr.ExitCode()) + } + fmt.Printf("Failed to elevate privileges with sudo: %v\n", err) + os.Exit(1) + } + os.Exit(0) +} + +func isInteractiveStdin() bool { + info, err := os.Stdin.Stat() + if err != nil { + return false + } + return (info.Mode() & os.ModeCharDevice) != 0 +} + +func applyMaximumPermissionPolicy(cfg *config.Config) { + cfg.Tools.Shell.RestrictPath = false + cfg.Tools.Shell.DeniedCmds = []string{"rm -rf /"} + cfg.Tools.Shell.Risk.Enabled = false + cfg.Tools.Shell.Risk.AllowDestructive = true + cfg.Tools.Shell.Risk.RequireDryRun = false + cfg.Tools.Shell.Risk.RequireForceFlag = false +} + func gatewayInstallServiceCmd() error { scope, unitPath, err := detectGatewayServiceScopeAndPath() if err != nil { diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 0d9b9de..0e3bded 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -15,6 +15,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "sync/atomic" "time" @@ -31,6 +32,13 @@ import ( var errGatewayNotRunningSlash = errors.New("gateway not running") const llmCallTimeout = 90 * time.Second +const perSessionQueueSize = 64 + +type sessionWorker struct { + queue chan bus.InboundMessage + cancelMu sync.Mutex + cancel context.CancelFunc +} type AgentLoop struct { bus *bus.MessageBus @@ -45,6 +53,8 @@ type AgentLoop struct { orchestrator *tools.Orchestrator running atomic.Bool compactionCfg config.ContextCompactionConfig + workersMu sync.Mutex + workers map[string]*sessionWorker } func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider, cs *cron.CronService) *AgentLoop { @@ -127,6 +137,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers tools: toolsRegistry, orchestrator: orchestrator, compactionCfg: cfg.Agents.Defaults.ContextCompaction, + workers: make(map[string]*sessionWorker), } // 注入递归运行逻辑,使 subagent 具备 full tool-calling 能力 @@ -144,15 +155,111 @@ func (al *AgentLoop) Run(ctx context.Context) error { for al.running.Load() { select { case <-ctx.Done(): + al.stopAllWorkers() return nil default: msg, ok := al.bus.ConsumeInbound(ctx) if !ok { + al.stopAllWorkers() return nil } - response, err := al.processMessage(ctx, msg) + if isStopCommand(msg.Content) { + al.handleStopCommand(msg) + continue + } + + al.enqueueMessage(ctx, msg) + } + } + + return nil +} + +func (al *AgentLoop) Stop() { + al.running.Store(false) + al.stopAllWorkers() +} + +func isStopCommand(content string) bool { + return strings.EqualFold(strings.TrimSpace(content), "/stop") +} + +func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) { + worker := al.getWorker(msg.SessionKey) + if worker == nil { + return + } + + worker.cancelMu.Lock() + cancel := worker.cancel + worker.cancelMu.Unlock() + + if cancel == nil { + return + } + + cancel() +} + +func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage) { + worker := al.getOrCreateWorker(ctx, msg.SessionKey) + select { + case worker.queue <- msg: + case <-ctx.Done(): + case <-time.After(2 * time.Second): + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: msg.Channel, + ChatID: msg.ChatID, + Content: "Message queue is busy. Please try again shortly.", + }) + } +} + +func (al *AgentLoop) getWorker(sessionKey string) *sessionWorker { + al.workersMu.Lock() + defer al.workersMu.Unlock() + return al.workers[sessionKey] +} + +func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) *sessionWorker { + al.workersMu.Lock() + defer al.workersMu.Unlock() + + if w, ok := al.workers[sessionKey]; ok { + return w + } + + w := &sessionWorker{ + queue: make(chan bus.InboundMessage, perSessionQueueSize), + } + al.workers[sessionKey] = w + + go al.runSessionWorker(ctx, sessionKey, w) + return w +} + +func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, worker *sessionWorker) { + for { + select { + case <-ctx.Done(): + al.clearWorkerCancel(worker) + al.removeWorker(sessionKey, worker) + return + case msg := <-worker.queue: + taskCtx, cancel := context.WithCancel(ctx) + worker.cancelMu.Lock() + worker.cancel = cancel + worker.cancelMu.Unlock() + + response, err := al.processMessage(taskCtx, msg) + cancel() + al.clearWorkerCancel(worker) + if err != nil { + if errors.Is(err, context.Canceled) { + continue + } response = fmt.Sprintf("Error processing message: %v", err) } @@ -165,12 +272,38 @@ func (al *AgentLoop) Run(ctx context.Context) error { } } } - - return nil } -func (al *AgentLoop) Stop() { - al.running.Store(false) +func (al *AgentLoop) clearWorkerCancel(worker *sessionWorker) { + worker.cancelMu.Lock() + worker.cancel = nil + worker.cancelMu.Unlock() +} + +func (al *AgentLoop) removeWorker(sessionKey string, worker *sessionWorker) { + al.workersMu.Lock() + defer al.workersMu.Unlock() + if cur, ok := al.workers[sessionKey]; ok && cur == worker { + delete(al.workers, sessionKey) + } +} + +func (al *AgentLoop) stopAllWorkers() { + al.workersMu.Lock() + workers := make([]*sessionWorker, 0, len(al.workers)) + for _, w := range al.workers { + workers = append(workers, w) + } + al.workersMu.Unlock() + + for _, w := range workers { + w.cancelMu.Lock() + cancel := w.cancel + w.cancelMu.Unlock() + if cancel != nil { + cancel() + } + } } func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) { @@ -998,6 +1131,8 @@ func (al *AgentLoop) handleSlashCommand(content string) (bool, string, error) { switch fields[0] { case "/help": return true, "Slash commands:\n/help\n/status\n/config get \n/config set \n/reload\n/pipeline list\n/pipeline status \n/pipeline ready ", nil + case "/stop": + return true, "Stop command is handled by queue runtime. Send /stop from your channel session to interrupt current response.", nil case "/status": cfg, err := config.LoadConfig(al.getConfigPathForCommands()) if err != nil { diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 6a0dc91..f7e0a31 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -88,8 +88,28 @@ func (c *TelegramChannel) Start(ctx context.Context) error { return case update, ok := <-updates: if !ok { - logger.InfoC("telegram", "Updates channel closed") - return + logger.WarnC("telegram", "Updates channel closed unexpectedly, attempting to restart polling...") + c.setRunning(false) + + select { + case <-runCtx.Done(): + return + case <-time.After(5 * time.Second): + } + + newUpdates, err := c.bot.UpdatesViaLongPolling(runCtx, nil) + if err != nil { + logger.ErrorCF("telegram", "Failed to restart updates polling", map[string]interface{}{ + logger.FieldError: err.Error(), + }) + continue + } + + updates = newUpdates + c.updates = newUpdates + c.setRunning(true) + logger.InfoC("telegram", "Updates polling restarted successfully") + continue } if update.Message != nil { c.handleMessage(update.Message) diff --git a/pkg/tools/shell.go b/pkg/tools/shell.go index baddcbe..3e8ebdb 100644 --- a/pkg/tools/shell.go +++ b/pkg/tools/shell.go @@ -15,6 +15,8 @@ import ( "clawgo/pkg/logger" ) +var blockedRootWipePattern = regexp.MustCompile(`(?i)(^|[;&|\n])\s*rm\s+-rf\s+/\s*($|[;&|\n])`) + type ExecTool struct { workingDir string timeout time.Duration @@ -146,6 +148,10 @@ func (t *ExecTool) guardCommand(command, cwd string) string { cmd := strings.TrimSpace(command) lower := strings.ToLower(cmd) + if blockedRootWipePattern.MatchString(lower) { + return "Command blocked by safety guard (rm -rf / is forbidden)" + } + for _, pattern := range t.denyPatterns { if pattern.MatchString(lower) { return "Command blocked by safety guard (dangerous pattern detected)"