From 085c265319a263a439bcc9cd11b7059d6511afee Mon Sep 17 00:00:00 2001 From: lpf Date: Fri, 13 Feb 2026 13:50:09 +0800 Subject: [PATCH] fix --- README.md | 46 +++ README_EN.md | 46 +++ cmd/clawgo/main.go | 693 +++++++++++++++++++++++++++++++----- config.example.json | 38 +- pkg/agent/loop.go | 376 +++++++++++++++++-- pkg/browser/browser.go | 129 +++++++ pkg/channels/manager.go | 13 +- pkg/config/config.go | 28 ++ pkg/config/validate.go | 79 ++++ pkg/logger/logger.go | 118 +++++- pkg/session/manager.go | 29 +- pkg/tools/browser.go | 46 +-- pkg/tools/parallel.go | 8 +- pkg/tools/parallel_fetch.go | 6 + pkg/tools/web.go | 9 +- 15 files changed, 1485 insertions(+), 179 deletions(-) create mode 100644 pkg/config/validate.go diff --git a/README.md b/README.md index 7da3937..162daf6 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,52 @@ clawgo agent clawgo gateway ``` +## ⚙️ 配置管理与热更新 + +ClawGo 支持直接通过命令修改 `config.json`,并向运行中的网关发送热更新信号: + +```bash +# 设置配置(支持 enable -> enabled 自动映射) +clawgo config set channels.telegram.enable true + +# 读取配置 +clawgo config get channels.telegram.enabled + +# 校验配置 +clawgo config check + +# 手动触发热更新(向 gateway 发送 SIGHUP) +clawgo config reload +``` + +全局支持自定义配置文件: + +```bash +clawgo --config /path/to/config.json status +``` + +也可使用环境变量: + +```bash +export CLAWGO_CONFIG=/path/to/config.json +``` + +`config set` 采用原子写入,并在网关运行且热更新失败时自动回滚到备份,避免配置损坏导致服务不可用。 + +## 🧾 日志链路 + +默认启用文件日志,并支持自动分割和过期清理(默认保留 3 天): + +```json +"logging": { + "enabled": true, + "dir": "~/.clawgo/logs", + "filename": "clawgo.log", + "max_size_mb": 20, + "retention_days": 3 +} +``` + ## 📦 迁移与技能 ClawGo 现在集成了原 OpenClaw 的所有核心扩展能力: diff --git a/README_EN.md b/README_EN.md index d3191ae..be40654 100644 --- a/README_EN.md +++ b/README_EN.md @@ -31,6 +31,52 @@ clawgo agent clawgo gateway ``` +## ⚙️ Config Management & Hot Reload + +ClawGo can update `config.json` from CLI and trigger hot reload for a running gateway: + +```bash +# Set config value (supports enable -> enabled alias) +clawgo config set channels.telegram.enable true + +# Read config value +clawgo config get channels.telegram.enabled + +# Validate config +clawgo config check + +# Trigger hot reload manually (sends SIGHUP to gateway) +clawgo config reload +``` + +Global custom config path: + +```bash +clawgo --config /path/to/config.json status +``` + +Or via environment variable: + +```bash +export CLAWGO_CONFIG=/path/to/config.json +``` + +`config set` now uses atomic write, and if gateway is running but hot reload fails, it rolls back to backup automatically. + +## 🧾 Logging Pipeline + +File logging is enabled by default with automatic rotation and retention cleanup (3 days by default): + +```json +"logging": { + "enabled": true, + "dir": "~/.clawgo/logs", + "filename": "clawgo.log", + "max_size_mb": 20, + "retention_days": 3 +} +``` + ## 📦 Migration & Skills ClawGo now integrates all core extended capabilities from the original OpenClaw: diff --git a/cmd/clawgo/main.go b/cmd/clawgo/main.go index a107c0d..5ad3f5d 100644 --- a/cmd/clawgo/main.go +++ b/cmd/clawgo/main.go @@ -9,13 +9,18 @@ package main import ( "bufio" "context" + "encoding/json" + "errors" "fmt" "io" "os" "os/signal" "path/filepath" + "reflect" + "strconv" "strings" + "syscall" "time" "clawgo/pkg/agent" @@ -35,6 +40,10 @@ import ( const version = "0.1.0" const logo = "🦞" +var globalConfigPathOverride string + +var errGatewayNotRunning = errors.New("gateway not running") + func copyDirectory(src, dst string) error { return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -70,6 +79,8 @@ func copyDirectory(src, dst string) error { } func main() { + globalConfigPathOverride = detectConfigPathFromArgs(os.Args) + // Detect debug mode early for _, arg := range os.Args { if arg == "--debug" || arg == "-d" { @@ -79,6 +90,9 @@ func main() { } } + // Normalize global flags so command can appear after --config/--debug. + os.Args = normalizeCLIArgs(os.Args) + if len(os.Args) < 2 { printHelp() os.Exit(1) @@ -98,6 +112,8 @@ func main() { gatewayCmd() case "status": statusCmd() + case "config": + configCmd() case "cron": cronCmd() case "login": @@ -162,20 +178,63 @@ func main() { } } +func normalizeCLIArgs(args []string) []string { + if len(args) == 0 { + return args + } + + normalized := []string{args[0]} + for i := 1; i < len(args); i++ { + arg := args[i] + if arg == "--debug" || arg == "-d" { + continue + } + if arg == "--config" { + if i+1 < len(args) { + i++ + } + continue + } + if strings.HasPrefix(arg, "--config=") { + continue + } + normalized = append(normalized, arg) + } + return normalized +} + +func detectConfigPathFromArgs(args []string) string { + for i := 0; i < len(args); i++ { + arg := args[i] + if arg == "--config" && i+1 < len(args) { + return strings.TrimSpace(args[i+1]) + } + if strings.HasPrefix(arg, "--config=") { + return strings.TrimSpace(strings.TrimPrefix(arg, "--config=")) + } + } + return "" +} + func printHelp() { fmt.Printf("%s clawgo - Personal AI Assistant v%s\n\n", logo, version) - fmt.Println("Usage: clawgo ") + fmt.Println("Usage: clawgo [options]") fmt.Println() fmt.Println("Commands:") fmt.Println(" onboard Initialize clawgo configuration and workspace") fmt.Println(" agent Interact with the agent directly") fmt.Println(" gateway Start clawgo gateway") fmt.Println(" status Show clawgo status") + fmt.Println(" config Get/set config values") fmt.Println(" cron Manage scheduled tasks") fmt.Println(" login Configure CLIProxyAPI upstream") fmt.Println(" channel Test and manage messaging channels") fmt.Println(" skills Manage skills (install, list, remove)") fmt.Println(" version Show version information") + fmt.Println() + fmt.Println("Global options:") + fmt.Println(" --config Use custom config file") + fmt.Println(" --debug, -d Enable debug logging") } func onboard() { @@ -199,11 +258,23 @@ func onboard() { } workspace := cfg.WorkspacePath() - os.MkdirAll(workspace, 0755) - os.MkdirAll(filepath.Join(workspace, "memory"), 0755) - os.MkdirAll(filepath.Join(workspace, "skills"), 0755) + if err := os.MkdirAll(workspace, 0755); err != nil { + fmt.Printf("Error creating workspace: %v\n", err) + os.Exit(1) + } + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0755); err != nil { + fmt.Printf("Error creating memory directory: %v\n", err) + os.Exit(1) + } + if err := os.MkdirAll(filepath.Join(workspace, "skills"), 0755); err != nil { + fmt.Printf("Error creating skills directory: %v\n", err) + os.Exit(1) + } - createWorkspaceTemplates(workspace) + if err := createWorkspaceTemplates(workspace); err != nil { + fmt.Printf("Error creating workspace templates: %v\n", err) + os.Exit(1) + } fmt.Printf("%s clawgo is ready!\n", logo) fmt.Println("\nNext steps:") @@ -212,7 +283,7 @@ func onboard() { fmt.Println(" 2. Chat: clawgo agent -m \"Hello!\"") } -func createWorkspaceTemplates(workspace string) { +func createWorkspaceTemplates(workspace string) error { templates := map[string]string{ "AGENTS.md": `# Agent Instructions @@ -329,13 +400,17 @@ Discussions: https://github.com/sipeed/clawgo/discussions for filename, content := range templates { filePath := filepath.Join(workspace, filename) if _, err := os.Stat(filePath); os.IsNotExist(err) { - os.WriteFile(filePath, []byte(content), 0644) + if err := os.WriteFile(filePath, []byte(content), 0644); err != nil { + return fmt.Errorf("failed to write %s: %w", filename, err) + } fmt.Printf(" Created %s\n", filename) } } memoryDir := filepath.Join(workspace, "memory") - os.MkdirAll(memoryDir, 0755) + if err := os.MkdirAll(memoryDir, 0755); err != nil { + return fmt.Errorf("failed to create memory directory: %w", err) + } memoryFile := filepath.Join(memoryDir, "MEMORY.md") if _, err := os.Stat(memoryFile); os.IsNotExist(err) { memoryContent := `# Long-term Memory @@ -360,23 +435,20 @@ This file stores important information that should persist across sessions. - Channel settings - Skills enabled ` - os.WriteFile(memoryFile, []byte(memoryContent), 0644) + if err := os.WriteFile(memoryFile, []byte(memoryContent), 0644); err != nil { + return fmt.Errorf("failed to write memory file: %w", err) + } fmt.Println(" Created memory/MEMORY.md") skillsDir := filepath.Join(workspace, "skills") if _, err := os.Stat(skillsDir); os.IsNotExist(err) { - os.MkdirAll(skillsDir, 0755) + if err := os.MkdirAll(skillsDir, 0755); err != nil { + return fmt.Errorf("failed to create skills directory: %w", err) + } fmt.Println(" Created skills/") } } - - for filename, content := range templates { - filePath := filepath.Join(workspace, filename) - if _, err := os.Stat(filePath); os.IsNotExist(err) { - os.WriteFile(filePath, []byte(content), 0644) - fmt.Printf(" Created %s\n", filename) - } - } + return nil } func agentCmd() { @@ -548,39 +620,9 @@ func gatewayCmd() { os.Exit(1) } - provider, err := providers.CreateProvider(cfg) - if err != nil { - fmt.Printf("Error creating provider: %v\n", err) - os.Exit(1) - } - msgBus := bus.NewMessageBus() - cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") cronService := cron.NewCronService(cronStorePath, nil) - - agentLoop := agent.NewAgentLoop(cfg, msgBus, provider, cronService) - - // Print agent startup info - fmt.Println("\n📦 Agent Status:") - startupInfo := agentLoop.GetStartupInfo() - toolsInfo := startupInfo["tools"].(map[string]interface{}) - skillsInfo := startupInfo["skills"].(map[string]interface{}) - fmt.Printf(" • Tools: %d loaded\n", toolsInfo["count"]) - fmt.Printf(" • Skills: %d/%d available\n", - skillsInfo["available"], - skillsInfo["total"]) - - // Log to file as well - logger.InfoCF("agent", "Agent initialized", - map[string]interface{}{ - "tools_count": toolsInfo["count"], - "skills_total": skillsInfo["total"], - "skills_available": skillsInfo["available"], - }) - - // Cron service initialized earlier - heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), nil, @@ -588,10 +630,137 @@ func gatewayCmd() { true, ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + agentLoop, channelManager, err := buildGatewayRuntime(ctx, cfg, msgBus, cronService) + if err != nil { + fmt.Printf("Error initializing gateway runtime: %v\n", err) + os.Exit(1) + } + + pidFile := filepath.Join(filepath.Dir(getConfigPath()), "gateway.pid") + if err := os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644); err != nil { + fmt.Printf("Warning: failed to write PID file: %v\n", err) + } else { + defer os.Remove(pidFile) + } + + enabledChannels := channelManager.GetEnabledChannels() + if len(enabledChannels) > 0 { + fmt.Printf("✓ Channels enabled: %s\n", enabledChannels) + } else { + fmt.Println("⚠ Warning: No channels enabled") + } + + fmt.Printf("✓ Gateway started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port) + fmt.Println("Press Ctrl+C to stop. Send SIGHUP to hot-reload config.") + + if err := cronService.Start(); err != nil { + fmt.Printf("Error starting cron service: %v\n", err) + } + fmt.Println("✓ Cron service started") + + if err := heartbeatService.Start(); err != nil { + fmt.Printf("Error starting heartbeat service: %v\n", err) + } + fmt.Println("✓ Heartbeat service started") + + if err := channelManager.StartAll(ctx); err != nil { + fmt.Printf("Error starting channels: %v\n", err) + } + + go agentLoop.Run(ctx) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) + for { + sig := <-sigChan + switch sig { + case syscall.SIGHUP: + fmt.Println("\n↻ Reloading config...") + newCfg, err := config.LoadConfig(getConfigPath()) + if err != nil { + fmt.Printf("✗ Reload failed (load config): %v\n", err) + continue + } + + if reflect.DeepEqual(cfg, newCfg) { + fmt.Println("✓ Config unchanged, skip reload") + continue + } + + runtimeSame := reflect.DeepEqual(cfg.Agents, newCfg.Agents) && + reflect.DeepEqual(cfg.Providers, newCfg.Providers) && + reflect.DeepEqual(cfg.Tools, newCfg.Tools) && + reflect.DeepEqual(cfg.Channels, newCfg.Channels) + + if runtimeSame { + configureLogging(newCfg) + cfg = newCfg + fmt.Println("✓ Config hot-reload applied (logging/metadata only)") + continue + } + + newAgentLoop, newChannelManager, err := buildGatewayRuntime(ctx, newCfg, msgBus, cronService) + if err != nil { + fmt.Printf("✗ Reload failed (init runtime): %v\n", err) + continue + } + + channelManager.StopAll(ctx) + agentLoop.Stop() + + channelManager = newChannelManager + agentLoop = newAgentLoop + cfg = newCfg + + if err := channelManager.StartAll(ctx); err != nil { + fmt.Printf("✗ Reload failed (start channels): %v\n", err) + continue + } + go agentLoop.Run(ctx) + fmt.Println("✓ Config hot-reload applied") + default: + fmt.Println("\nShutting down...") + cancel() + heartbeatService.Stop() + cronService.Stop() + agentLoop.Stop() + channelManager.StopAll(ctx) + fmt.Println("✓ Gateway stopped") + return + } + } +} + +func buildGatewayRuntime(ctx context.Context, cfg *config.Config, msgBus *bus.MessageBus, cronService *cron.CronService) (*agent.AgentLoop, *channels.Manager, error) { + provider, err := providers.CreateProvider(cfg) + if err != nil { + return nil, nil, fmt.Errorf("create provider: %w", err) + } + + agentLoop := agent.NewAgentLoop(cfg, msgBus, provider, cronService) + + startupInfo := agentLoop.GetStartupInfo() + toolsInfo := startupInfo["tools"].(map[string]interface{}) + skillsInfo := startupInfo["skills"].(map[string]interface{}) + fmt.Println("\n📦 Agent Status:") + fmt.Printf(" • Tools: %d loaded\n", toolsInfo["count"]) + fmt.Printf(" • Skills: %d/%d available\n", + skillsInfo["available"], + skillsInfo["total"]) + + logger.InfoCF("agent", "Agent initialized", + map[string]interface{}{ + "tools_count": toolsInfo["count"], + "skills_total": skillsInfo["total"], + "skills_available": skillsInfo["available"], + }) + channelManager, err := channels.NewManager(cfg, msgBus) if err != nil { - fmt.Printf("Error creating channel manager: %v\n", err) - os.Exit(1) + return nil, nil, fmt.Errorf("create channel manager: %w", err) } var transcriber *voice.GroqTranscriber @@ -615,46 +784,343 @@ func gatewayCmd() { } } - enabledChannels := channelManager.GetEnabledChannels() - if len(enabledChannels) > 0 { - fmt.Printf("✓ Channels enabled: %s\n", enabledChannels) + return agentLoop, channelManager, nil +} + +func configCmd() { + if len(os.Args) < 3 { + configHelp() + return + } + + switch os.Args[2] { + case "set": + configSetCmd() + case "get": + configGetCmd() + case "check": + configCheckCmd() + case "reload": + configReloadCmd() + default: + fmt.Printf("Unknown config command: %s\n", os.Args[2]) + configHelp() + } +} + +func configHelp() { + fmt.Println("\nConfig commands:") + fmt.Println(" set Set config value and trigger hot reload") + fmt.Println(" get Get config value") + fmt.Println(" check Validate current config") + fmt.Println(" reload Trigger gateway hot reload") + fmt.Println() + fmt.Println("Examples:") + fmt.Println(" clawgo config set channels.telegram.enabled true") + fmt.Println(" clawgo config set channels.telegram.enable true") + fmt.Println(" clawgo config get providers.proxy.api_base") + fmt.Println(" clawgo config check") + fmt.Println(" clawgo config reload") +} + +func configSetCmd() { + if len(os.Args) < 5 { + fmt.Println("Usage: clawgo config set ") + return + } + + configPath := getConfigPath() + cfgMap, err := loadConfigAsMap(configPath) + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return + } + + path := normalizeConfigPath(os.Args[3]) + args := os.Args[4:] + valueParts := make([]string, 0, len(args)) + for i := 0; i < len(args); i++ { + part := args[i] + if part == "--debug" || part == "-d" { + continue + } + if part == "--config" { + i++ + continue + } + if strings.HasPrefix(part, "--config=") { + continue + } + valueParts = append(valueParts, part) + } + if len(valueParts) == 0 { + fmt.Println("Error: value is required") + return + } + value := parseConfigValue(strings.Join(valueParts, " ")) + if err := setMapValueByPath(cfgMap, path, value); err != nil { + fmt.Printf("Error setting value: %v\n", err) + return + } + + data, err := json.MarshalIndent(cfgMap, "", " ") + if err != nil { + fmt.Printf("Error serializing config: %v\n", err) + return + } + backupPath, err := writeConfigAtomicWithBackup(configPath, data) + if err != nil { + fmt.Printf("Error writing config: %v\n", err) + return + } + + fmt.Printf("✓ Updated %s = %v\n", path, value) + running, err := triggerGatewayReload() + if err != nil { + if running { + if rbErr := rollbackConfigFromBackup(configPath, backupPath); rbErr != nil { + fmt.Printf("Hot reload failed and rollback failed: %v\n", rbErr) + } else { + fmt.Printf("Hot reload failed, config rolled back: %v\n", err) + } + return + } + fmt.Printf("Updated config file. Hot reload not applied: %v\n", err) } else { - fmt.Println("⚠ Warning: No channels enabled") + fmt.Println("✓ Gateway hot reload signal sent") + } +} + +func configGetCmd() { + if len(os.Args) < 4 { + fmt.Println("Usage: clawgo config get ") + return } - fmt.Printf("✓ Gateway started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port) - fmt.Println("Press Ctrl+C to stop") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - if err := cronService.Start(); err != nil { - fmt.Printf("Error starting cron service: %v\n", err) - } - fmt.Println("✓ Cron service started") - - if err := heartbeatService.Start(); err != nil { - fmt.Printf("Error starting heartbeat service: %v\n", err) - } - fmt.Println("✓ Heartbeat service started") - - if err := channelManager.StartAll(ctx); err != nil { - fmt.Printf("Error starting channels: %v\n", err) + configPath := getConfigPath() + cfgMap, err := loadConfigAsMap(configPath) + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return } - go agentLoop.Run(ctx) + path := normalizeConfigPath(os.Args[3]) + value, ok := getMapValueByPath(cfgMap, path) + if !ok { + fmt.Printf("Path not found: %s\n", path) + return + } - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt) - <-sigChan + data, err := json.Marshal(value) + if err != nil { + fmt.Printf("%v\n", value) + return + } + fmt.Println(string(data)) +} - fmt.Println("\nShutting down...") - cancel() - heartbeatService.Stop() - cronService.Stop() - agentLoop.Stop() - channelManager.StopAll(ctx) - fmt.Println("✓ Gateway stopped") +func configReloadCmd() { + if _, err := triggerGatewayReload(); err != nil { + fmt.Printf("Hot reload not applied: %v\n", err) + return + } + fmt.Println("✓ Gateway hot reload signal sent") +} + +func configCheckCmd() { + cfg, err := config.LoadConfig(getConfigPath()) + if err != nil { + fmt.Printf("Config load failed: %v\n", err) + return + } + validationErrors := config.Validate(cfg) + if len(validationErrors) == 0 { + fmt.Println("✓ Config validation passed") + return + } + + fmt.Println("✗ Config validation failed:") + for _, ve := range validationErrors { + fmt.Printf(" - %v\n", ve) + } +} + +func loadConfigAsMap(path string) (map[string]interface{}, error) { + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + defaultCfg := config.DefaultConfig() + defData, mErr := json.Marshal(defaultCfg) + if mErr != nil { + return nil, mErr + } + var cfgMap map[string]interface{} + if uErr := json.Unmarshal(defData, &cfgMap); uErr != nil { + return nil, uErr + } + return cfgMap, nil + } + return nil, err + } + + var cfgMap map[string]interface{} + if err := json.Unmarshal(data, &cfgMap); err != nil { + return nil, err + } + return cfgMap, nil +} + +func normalizeConfigPath(path string) string { + p := strings.TrimSpace(path) + p = strings.Trim(p, ".") + parts := strings.Split(p, ".") + for i, part := range parts { + if part == "enable" { + parts[i] = "enabled" + } + } + return strings.Join(parts, ".") +} + +func parseConfigValue(raw string) interface{} { + v := strings.TrimSpace(raw) + lv := strings.ToLower(v) + if lv == "true" { + return true + } + if lv == "false" { + return false + } + if lv == "null" { + return nil + } + if i, err := strconv.ParseInt(v, 10, 64); err == nil { + return i + } + if f, err := strconv.ParseFloat(v, 64); err == nil && strings.Contains(v, ".") { + return f + } + if len(v) >= 2 && ((v[0] == '"' && v[len(v)-1] == '"') || (v[0] == '\'' && v[len(v)-1] == '\'')) { + return v[1 : len(v)-1] + } + return v +} + +func setMapValueByPath(root map[string]interface{}, path string, value interface{}) error { + if path == "" { + return fmt.Errorf("path is empty") + } + parts := strings.Split(path, ".") + cur := root + for i := 0; i < len(parts)-1; i++ { + key := parts[i] + if key == "" { + return fmt.Errorf("invalid path: %s", path) + } + next, ok := cur[key] + if !ok { + child := map[string]interface{}{} + cur[key] = child + cur = child + continue + } + child, ok := next.(map[string]interface{}) + if !ok { + return fmt.Errorf("path segment is not object: %s", key) + } + cur = child + } + last := parts[len(parts)-1] + if last == "" { + return fmt.Errorf("invalid path: %s", path) + } + cur[last] = value + return nil +} + +func getMapValueByPath(root map[string]interface{}, path string) (interface{}, bool) { + if path == "" { + return nil, false + } + parts := strings.Split(path, ".") + var cur interface{} = root + for _, key := range parts { + obj, ok := cur.(map[string]interface{}) + if !ok { + return nil, false + } + next, ok := obj[key] + if !ok { + return nil, false + } + cur = next + } + return cur, true +} + +func writeConfigAtomicWithBackup(configPath string, data []byte) (string, error) { + if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { + return "", err + } + + backupPath := configPath + ".bak" + if oldData, err := os.ReadFile(configPath); err == nil { + if err := os.WriteFile(backupPath, oldData, 0644); err != nil { + return "", fmt.Errorf("write backup failed: %w", err) + } + } else if !os.IsNotExist(err) { + return "", fmt.Errorf("read existing config failed: %w", err) + } + + tmpPath := configPath + ".tmp" + if err := os.WriteFile(tmpPath, data, 0644); err != nil { + return "", fmt.Errorf("write temp config failed: %w", err) + } + if err := os.Rename(tmpPath, configPath); err != nil { + _ = os.Remove(tmpPath) + return "", fmt.Errorf("atomic replace config failed: %w", err) + } + return backupPath, nil +} + +func rollbackConfigFromBackup(configPath, backupPath string) error { + backupData, err := os.ReadFile(backupPath) + if err != nil { + return fmt.Errorf("read backup failed: %w", err) + } + + tmpPath := configPath + ".rollback.tmp" + if err := os.WriteFile(tmpPath, backupData, 0644); err != nil { + return fmt.Errorf("write rollback temp failed: %w", err) + } + if err := os.Rename(tmpPath, configPath); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("rollback replace failed: %w", err) + } + return nil +} + +func triggerGatewayReload() (bool, error) { + pidPath := filepath.Join(filepath.Dir(getConfigPath()), "gateway.pid") + data, err := os.ReadFile(pidPath) + if err != nil { + return false, fmt.Errorf("%w (pid file not found: %s)", errGatewayNotRunning, pidPath) + } + + pidStr := strings.TrimSpace(string(data)) + pid, err := strconv.Atoi(pidStr) + if err != nil || pid <= 0 { + return true, fmt.Errorf("invalid gateway pid: %q", pidStr) + } + + proc, err := os.FindProcess(pid) + if err != nil { + return true, fmt.Errorf("find process failed: %w", err) + } + if err := proc.Signal(syscall.SIGHUP); err != nil { + return true, fmt.Errorf("send SIGHUP failed: %w", err) + } + return true, nil } func statusCmd() { @@ -690,15 +1156,54 @@ func statusCmd() { status = "✓" } fmt.Printf("CLIProxyAPI Key: %s\n", status) + fmt.Printf("Logging: %v\n", cfg.Logging.Enabled) + if cfg.Logging.Enabled { + fmt.Printf("Log File: %s\n", cfg.LogFilePath()) + fmt.Printf("Log Max Size: %d MB\n", cfg.Logging.MaxSizeMB) + fmt.Printf("Log Retention: %d days\n", cfg.Logging.RetentionDays) + } } } func getConfigPath() string { + if strings.TrimSpace(globalConfigPathOverride) != "" { + return globalConfigPathOverride + } + if fromEnv := strings.TrimSpace(os.Getenv("CLAWGO_CONFIG")); fromEnv != "" { + return fromEnv + } + args := os.Args + for i := 0; i < len(args); i++ { + arg := args[i] + if arg == "--config" && i+1 < len(args) { + return args[i+1] + } + if strings.HasPrefix(arg, "--config=") { + return strings.TrimPrefix(arg, "--config=") + } + } return filepath.Join(config.GetConfigDir(), "config.json") } func loadConfig() (*config.Config, error) { - return config.LoadConfig(getConfigPath()) + cfg, err := config.LoadConfig(getConfigPath()) + if err != nil { + return nil, err + } + configureLogging(cfg) + return cfg, nil +} + +func configureLogging(cfg *config.Config) { + if !cfg.Logging.Enabled { + logger.DisableFileLogging() + return + } + + logFile := cfg.LogFilePath() + if err := logger.EnableFileLoggingWithRotation(logFile, cfg.Logging.MaxSizeMB, cfg.Logging.RetentionDays); err != nil { + fmt.Printf("Warning: failed to enable file logging: %v\n", err) + } } func cronCmd() { @@ -1025,7 +1530,7 @@ func skillsRemoveCmd(installer *skills.SkillInstaller, skillName string) { } func skillsInstallBuiltinCmd(workspace string) { - builtinSkillsDir := "./clawgo/skills" + builtinSkillsDir := detectBuiltinSkillsDir(workspace) workspaceSkillsDir := filepath.Join(workspace, "skills") fmt.Printf("Copying builtin skills to workspace...\n") @@ -1066,7 +1571,7 @@ func skillsListBuiltinCmd() { fmt.Printf("Error loading config: %v\n", err) return } - builtinSkillsDir := filepath.Join(filepath.Dir(cfg.WorkspacePath()), "clawgo", "skills") + builtinSkillsDir := detectBuiltinSkillsDir(cfg.WorkspacePath()) fmt.Println("\nAvailable Builtin Skills:") fmt.Println("-----------------------") @@ -1112,6 +1617,21 @@ func skillsListBuiltinCmd() { } } +func detectBuiltinSkillsDir(workspace string) string { + candidates := []string{ + filepath.Join(".", "skills"), + filepath.Join(filepath.Dir(workspace), "clawgo", "skills"), + filepath.Join(config.GetConfigDir(), "clawgo", "skills"), + } + for _, dir := range candidates { + if info, err := os.Stat(dir); err == nil && info.IsDir() { + return dir + } + } + // Fallback to repository-style path for error output consistency. + return filepath.Join(".", "skills") +} + func skillsSearchCmd(installer *skills.SkillInstaller) { fmt.Println("Searching for available skills...") @@ -1276,4 +1796,3 @@ func channelTestCmd() { fmt.Println("✓ Test message sent successfully!") } - diff --git a/config.example.json b/config.example.json index d5b0604..72e86db 100644 --- a/config.example.json +++ b/config.example.json @@ -47,33 +47,10 @@ } }, "providers": { - "anthropic": { - "api_key": "", - "api_base": "" - }, - "openai": { - "api_key": "", - "api_base": "" - }, - "openrouter": { - "api_key": "sk-or-v1-xxx", - "api_base": "" - }, - "groq": { - "api_key": "gsk_xxx", - "api_base": "" - }, - "zhipu": { - "api_key": "YOUR_ZHIPU_API_KEY", - "api_base": "" - }, - "gemini": { - "api_key": "", - "api_base": "" - }, - "vllm": { - "api_key": "", - "api_base": "" + "proxy": { + "api_key": "YOUR_CLIPROXYAPI_KEY", + "api_base": "http://localhost:8080/v1", + "auth": "bearer" } }, "tools": { @@ -87,5 +64,12 @@ "gateway": { "host": "0.0.0.0", "port": 18790 + }, + "logging": { + "enabled": true, + "dir": "~/.clawgo/logs", + "filename": "clawgo.log", + "max_size_mb": 20, + "retention_days": 3 } } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 5826d86..35601b1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -9,11 +9,14 @@ package agent import ( "context" "encoding/json" + "errors" "fmt" "os" "path/filepath" "regexp" + "strconv" "strings" + "syscall" "clawgo/pkg/bus" "clawgo/pkg/config" @@ -24,6 +27,8 @@ import ( "clawgo/pkg/tools" ) +var errGatewayNotRunningSlash = errors.New("gateway not running") + type AgentLoop struct { bus *bus.MessageBus provider providers.LLMProvider @@ -180,6 +185,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return al.processSystemMessage(ctx, msg) } + // Built-in slash commands (deterministic, no LLM roundtrip) + if handled, result, err := al.handleSlashCommand(msg.Content); handled { + return result, err + } + // Update tool contexts if tool, ok := al.tools.Get("message"); ok { if mt, ok := tool.(*tools.MessageTool); ok { @@ -216,17 +226,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) "max": al.maxIterations, }) - toolDefs := al.tools.GetDefinitions() - providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) - for _, td := range toolDefs { - providerToolDefs = append(providerToolDefs, providers.ToolDefinition{ - Type: td["type"].(string), - Function: providers.ToolFunctionDefinition{ - Name: td["function"].(map[string]interface{})["name"].(string), - Description: td["function"].(map[string]interface{})["description"].(string), - Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}), - }, - }) + providerToolDefs, err := buildProviderToolDefs(al.tools.GetDefinitions()) + if err != nil { + return "", fmt.Errorf("invalid tool definition: %w", err) } // Log LLM request details @@ -356,7 +358,12 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) Content: userContent, }) - al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) + if err := al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)); err != nil { + logger.WarnCF("agent", "Failed to save session metadata", map[string]interface{}{ + "session_key": msg.SessionKey, + "error": err.Error(), + }) + } // Log response preview (original content) responsePreview := truncate(finalContent, 120) @@ -426,17 +433,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe for iteration < al.maxIterations { iteration++ - toolDefs := al.tools.GetDefinitions() - providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) - for _, td := range toolDefs { - providerToolDefs = append(providerToolDefs, providers.ToolDefinition{ - Type: td["type"].(string), - Function: providers.ToolFunctionDefinition{ - Name: td["function"].(map[string]interface{})["name"].(string), - Description: td["function"].(map[string]interface{})["description"].(string), - Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}), - }, - }) + providerToolDefs, err := buildProviderToolDefs(al.tools.GetDefinitions()) + if err != nil { + return "", fmt.Errorf("invalid tool definition: %w", err) } // Log LLM request details @@ -530,7 +529,12 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe Content: finalContent, }) - al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) + if err := al.sessions.Save(al.sessions.GetOrCreate(sessionKey)); err != nil { + logger.WarnCF("agent", "Failed to save session metadata", map[string]interface{}{ + "session_key": sessionKey, + "error": err.Error(), + }) + } logger.InfoCF("agent", "System message processing completed", map[string]interface{}{ @@ -690,6 +694,46 @@ func isQuotaOrRateLimitError(err error) bool { return false } +func buildProviderToolDefs(toolDefs []map[string]interface{}) ([]providers.ToolDefinition, error) { + providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) + for i, td := range toolDefs { + toolType, ok := td["type"].(string) + if !ok || strings.TrimSpace(toolType) == "" { + return nil, fmt.Errorf("tool[%d] missing/invalid type", i) + } + + fnRaw, ok := td["function"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("tool[%d] missing/invalid function object", i) + } + + name, ok := fnRaw["name"].(string) + if !ok || strings.TrimSpace(name) == "" { + return nil, fmt.Errorf("tool[%d] missing/invalid function.name", i) + } + + description, ok := fnRaw["description"].(string) + if !ok { + return nil, fmt.Errorf("tool[%d] missing/invalid function.description", i) + } + + parameters, ok := fnRaw["parameters"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("tool[%d] missing/invalid function.parameters", i) + } + + providerToolDefs = append(providerToolDefs, providers.ToolDefinition{ + Type: toolType, + Function: providers.ToolFunctionDefinition{ + Name: name, + Description: description, + Parameters: parameters, + }, + }) + } + return providerToolDefs, nil +} + // formatToolsForLog formats tool definitions for logging func formatToolsForLog(tools []providers.ToolDefinition) string { if len(tools) == 0 { @@ -709,6 +753,290 @@ func formatToolsForLog(tools []providers.ToolDefinition) string { return result } +func (al *AgentLoop) handleSlashCommand(content string) (bool, string, error) { + text := strings.TrimSpace(content) + if !strings.HasPrefix(text, "/") { + return false, "", nil + } + + fields := strings.Fields(text) + if len(fields) == 0 { + return true, "", nil + } + + switch fields[0] { + case "/help": + return true, "Slash commands:\n/help\n/status\n/config get \n/config set \n/reload", nil + case "/status": + cfg, err := config.LoadConfig(al.getConfigPathForCommands()) + if err != nil { + return true, "", fmt.Errorf("status failed: %w", err) + } + return true, fmt.Sprintf("Model: %s\nAPI Base: %s\nLogging: %v\nConfig: %s", + cfg.Agents.Defaults.Model, + cfg.Providers.Proxy.APIBase, + cfg.Logging.Enabled, + al.getConfigPathForCommands(), + ), nil + case "/reload": + running, err := al.triggerGatewayReloadFromAgent() + if err != nil { + if running { + return true, "", err + } + return true, fmt.Sprintf("Hot reload not applied: %v", err), nil + } + return true, "Gateway hot reload signal sent", nil + case "/config": + if len(fields) < 2 { + return true, "Usage: /config get | /config set ", nil + } + + switch fields[1] { + case "get": + if len(fields) < 3 { + return true, "Usage: /config get ", nil + } + cfgMap, err := al.loadConfigAsMapForAgent() + if err != nil { + return true, "", err + } + path := al.normalizeConfigPathForAgent(fields[2]) + value, ok := al.getMapValueByPathForAgent(cfgMap, path) + if !ok { + return true, fmt.Sprintf("Path not found: %s", path), nil + } + data, err := json.Marshal(value) + if err != nil { + return true, fmt.Sprintf("%v", value), nil + } + return true, string(data), nil + case "set": + if len(fields) < 4 { + return true, "Usage: /config set ", nil + } + cfgMap, err := al.loadConfigAsMapForAgent() + if err != nil { + return true, "", err + } + path := al.normalizeConfigPathForAgent(fields[2]) + value := al.parseConfigValueForAgent(strings.Join(fields[3:], " ")) + if err := al.setMapValueByPathForAgent(cfgMap, path, value); err != nil { + return true, "", err + } + + data, err := json.MarshalIndent(cfgMap, "", " ") + if err != nil { + return true, "", err + } + + configPath := al.getConfigPathForCommands() + backupPath, err := al.writeConfigAtomicWithBackupForAgent(configPath, data) + if err != nil { + return true, "", err + } + + running, err := al.triggerGatewayReloadFromAgent() + if err != nil { + if running { + if rbErr := al.rollbackConfigFromBackupForAgent(configPath, backupPath); rbErr != nil { + return true, "", fmt.Errorf("hot reload failed and rollback failed: %w", rbErr) + } + return true, "", fmt.Errorf("hot reload failed, config rolled back: %w", err) + } + return true, fmt.Sprintf("Updated %s = %v\nHot reload not applied: %v", path, value, err), nil + } + return true, fmt.Sprintf("Updated %s = %v\nGateway hot reload signal sent", path, value), nil + default: + return true, "Usage: /config get | /config set ", nil + } + default: + return false, "", nil + } +} + +func (al *AgentLoop) getConfigPathForCommands() string { + if fromEnv := strings.TrimSpace(os.Getenv("CLAWGO_CONFIG")); fromEnv != "" { + return fromEnv + } + return filepath.Join(config.GetConfigDir(), "config.json") +} + +func (al *AgentLoop) normalizeConfigPathForAgent(path string) string { + p := strings.TrimSpace(path) + p = strings.Trim(p, ".") + parts := strings.Split(p, ".") + for i, part := range parts { + if part == "enable" { + parts[i] = "enabled" + } + } + return strings.Join(parts, ".") +} + +func (al *AgentLoop) parseConfigValueForAgent(raw string) interface{} { + v := strings.TrimSpace(raw) + lv := strings.ToLower(v) + if lv == "true" { + return true + } + if lv == "false" { + return false + } + if lv == "null" { + return nil + } + if i, err := strconv.ParseInt(v, 10, 64); err == nil { + return i + } + if f, err := strconv.ParseFloat(v, 64); err == nil && strings.Contains(v, ".") { + return f + } + if len(v) >= 2 && ((v[0] == '"' && v[len(v)-1] == '"') || (v[0] == '\'' && v[len(v)-1] == '\'')) { + return v[1 : len(v)-1] + } + return v +} + +func (al *AgentLoop) loadConfigAsMapForAgent() (map[string]interface{}, error) { + configPath := al.getConfigPathForCommands() + data, err := os.ReadFile(configPath) + if err != nil { + if os.IsNotExist(err) { + defaultCfg := config.DefaultConfig() + defData, mErr := json.Marshal(defaultCfg) + if mErr != nil { + return nil, mErr + } + var cfgMap map[string]interface{} + if uErr := json.Unmarshal(defData, &cfgMap); uErr != nil { + return nil, uErr + } + return cfgMap, nil + } + return nil, err + } + var cfgMap map[string]interface{} + if err := json.Unmarshal(data, &cfgMap); err != nil { + return nil, err + } + return cfgMap, nil +} + +func (al *AgentLoop) setMapValueByPathForAgent(root map[string]interface{}, path string, value interface{}) error { + if path == "" { + return fmt.Errorf("path is empty") + } + parts := strings.Split(path, ".") + cur := root + for i := 0; i < len(parts)-1; i++ { + key := parts[i] + if key == "" { + return fmt.Errorf("invalid path: %s", path) + } + next, ok := cur[key] + if !ok { + child := map[string]interface{}{} + cur[key] = child + cur = child + continue + } + child, ok := next.(map[string]interface{}) + if !ok { + return fmt.Errorf("path segment is not object: %s", key) + } + cur = child + } + last := parts[len(parts)-1] + if last == "" { + return fmt.Errorf("invalid path: %s", path) + } + cur[last] = value + return nil +} + +func (al *AgentLoop) getMapValueByPathForAgent(root map[string]interface{}, path string) (interface{}, bool) { + if path == "" { + return nil, false + } + parts := strings.Split(path, ".") + var cur interface{} = root + for _, key := range parts { + obj, ok := cur.(map[string]interface{}) + if !ok { + return nil, false + } + next, ok := obj[key] + if !ok { + return nil, false + } + cur = next + } + return cur, true +} + +func (al *AgentLoop) writeConfigAtomicWithBackupForAgent(configPath string, data []byte) (string, error) { + if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { + return "", err + } + + backupPath := configPath + ".bak" + if oldData, err := os.ReadFile(configPath); err == nil { + if err := os.WriteFile(backupPath, oldData, 0644); err != nil { + return "", fmt.Errorf("write backup failed: %w", err) + } + } else if !os.IsNotExist(err) { + return "", fmt.Errorf("read existing config failed: %w", err) + } + + tmpPath := configPath + ".tmp" + if err := os.WriteFile(tmpPath, data, 0644); err != nil { + return "", fmt.Errorf("write temp config failed: %w", err) + } + if err := os.Rename(tmpPath, configPath); err != nil { + _ = os.Remove(tmpPath) + return "", fmt.Errorf("atomic replace config failed: %w", err) + } + return backupPath, nil +} + +func (al *AgentLoop) rollbackConfigFromBackupForAgent(configPath, backupPath string) error { + backupData, err := os.ReadFile(backupPath) + if err != nil { + return fmt.Errorf("read backup failed: %w", err) + } + tmpPath := configPath + ".rollback.tmp" + if err := os.WriteFile(tmpPath, backupData, 0644); err != nil { + return fmt.Errorf("write rollback temp failed: %w", err) + } + if err := os.Rename(tmpPath, configPath); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("rollback replace failed: %w", err) + } + return nil +} + +func (al *AgentLoop) triggerGatewayReloadFromAgent() (bool, error) { + pidPath := filepath.Join(filepath.Dir(al.getConfigPathForCommands()), "gateway.pid") + data, err := os.ReadFile(pidPath) + if err != nil { + return false, fmt.Errorf("%w (pid file not found: %s)", errGatewayNotRunningSlash, pidPath) + } + pidStr := strings.TrimSpace(string(data)) + pid, err := strconv.Atoi(pidStr) + if err != nil || pid <= 0 { + return true, fmt.Errorf("invalid gateway pid: %q", pidStr) + } + proc, err := os.FindProcess(pid) + if err != nil { + return true, fmt.Errorf("find process failed: %w", err) + } + if err := proc.Signal(syscall.SIGHUP); err != nil { + return true, fmt.Errorf("send SIGHUP failed: %w", err) + } + return true, nil +} + // truncateString truncates a string to max length func truncateString(s string, maxLen int) string { if len(s) <= maxLen { diff --git a/pkg/browser/browser.go b/pkg/browser/browser.go index e69de29..fa93b66 100644 --- a/pkg/browser/browser.go +++ b/pkg/browser/browser.go @@ -0,0 +1,129 @@ +package browser + +import ( + "context" + "fmt" + "os/exec" + "strings" + "time" +) + +const ( + defaultTimeout = 30 * time.Second + defaultUserAgent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) clawgo/1.0 Safari/537.36" +) + +// Browser provides a small, dependency-free wrapper around a Chromium-compatible +// binary. It is intentionally simple to keep clawgo deployable as a single Go +// binary without extra Go browser dependencies. +type Browser struct { + binPath string + timeout time.Duration + userAgent string +} + +func New() *Browser { + return &Browser{ + binPath: detectChromeBinary(), + timeout: defaultTimeout, + userAgent: defaultUserAgent, + } +} + +func (b *Browser) SetTimeout(timeout time.Duration) { + if timeout > 0 { + b.timeout = timeout + } +} + +func (b *Browser) Available() bool { + return b.binPath != "" +} + +func (b *Browser) Screenshot(ctx context.Context, url, outputPath string) error { + if strings.TrimSpace(url) == "" { + return fmt.Errorf("url is required") + } + if strings.TrimSpace(outputPath) == "" { + return fmt.Errorf("output path is required") + } + if b.binPath == "" { + return fmt.Errorf("no chromium-compatible browser found (tried chromium-browser/chromium/google-chrome/chrome)") + } + + ctx, cancel := context.WithTimeout(ctx, b.timeout) + defer cancel() + + cmd := exec.CommandContext(ctx, b.binPath, + "--headless=new", + "--disable-gpu", + "--no-sandbox", + "--hide-scrollbars", + "--window-size=1280,720", + "--user-agent="+b.userAgent, + "--screenshot="+outputPath, + url, + ) + + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("screenshot failed: %w, output=%s", err, truncate(string(out), 512)) + } + return nil +} + +func (b *Browser) Content(ctx context.Context, url string) (string, error) { + if strings.TrimSpace(url) == "" { + return "", fmt.Errorf("url is required") + } + if b.binPath == "" { + return "", fmt.Errorf("no chromium-compatible browser found (tried chromium-browser/chromium/google-chrome/chrome)") + } + + ctx, cancel := context.WithTimeout(ctx, b.timeout) + defer cancel() + + cmd := exec.CommandContext(ctx, b.binPath, + "--headless=new", + "--disable-gpu", + "--no-sandbox", + "--hide-scrollbars", + "--user-agent="+b.userAgent, + "--dump-dom", + url, + ) + + output, err := cmd.Output() + if err != nil { + if ee, ok := err.(*exec.ExitError); ok { + return "", fmt.Errorf("content fetch failed: %w, stderr=%s", err, truncate(string(ee.Stderr), 512)) + } + return "", fmt.Errorf("content fetch failed: %w", err) + } + + return string(output), nil +} + +func detectChromeBinary() string { + candidates := []string{ + "chromium-browser", + "chromium", + "google-chrome", + "chrome", + } + for _, name := range candidates { + if path, err := exec.LookPath(name); err == nil { + return path + } + } + return "" +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + if max <= 3 { + return s[:max] + } + return s[:max-3] + "..." +} diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index f2d08dd..a87d596 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -21,6 +21,7 @@ type Manager struct { bus *bus.MessageBus config *config.Config dispatchTask *asyncTask + dispatchSem chan struct{} mu sync.RWMutex } @@ -33,6 +34,8 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error channels: make(map[string]Channel), bus: messageBus, config: cfg, + // Limit concurrent outbound sends to avoid unbounded goroutine growth. + dispatchSem: make(chan struct{}, 32), } if err := m.initChannels(); err != nil { @@ -239,11 +242,13 @@ func (m *Manager) dispatchOutbound(ctx context.Context) { continue } - // 使用 goroutine 实现并发消息分发,避免单个通道延迟阻塞全局 dispatcher - go func(c Channel, m bus.OutboundMessage) { - if err := c.Send(ctx, m); err != nil { + // Bound fan-out concurrency to prevent goroutine explosion under burst traffic. + m.dispatchSem <- struct{}{} + go func(c Channel, outbound bus.OutboundMessage) { + defer func() { <-m.dispatchSem }() + if err := c.Send(ctx, outbound); err != nil { logger.ErrorCF("channels", "Error sending message to channel", map[string]interface{}{ - "channel": m.Channel, + "channel": outbound.Channel, "error": err.Error(), }) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 6e47e7e..4e86a8f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -16,6 +16,7 @@ type Config struct { Providers ProvidersConfig `json:"providers"` Gateway GatewayConfig `json:"gateway"` Tools ToolsConfig `json:"tools"` + Logging LoggingConfig `json:"logging"` mu sync.RWMutex } @@ -140,6 +141,14 @@ type ToolsConfig struct { Filesystem FilesystemConfig `json:"filesystem"` } +type LoggingConfig struct { + Enabled bool `json:"enabled" env:"CLAWGO_LOGGING_ENABLED"` + Dir string `json:"dir" env:"CLAWGO_LOGGING_DIR"` + Filename string `json:"filename" env:"CLAWGO_LOGGING_FILENAME"` + MaxSizeMB int `json:"max_size_mb" env:"CLAWGO_LOGGING_MAX_SIZE_MB"` + RetentionDays int `json:"retention_days" env:"CLAWGO_LOGGING_RETENTION_DAYS"` +} + var ( isDebug bool muDebug sync.RWMutex @@ -253,6 +262,13 @@ func DefaultConfig() *Config { DeniedPaths: []string{"/etc/shadow", "/etc/passwd"}, }, }, + Logging: LoggingConfig{ + Enabled: true, + Dir: filepath.Join(configDir, "logs"), + Filename: "clawgo.log", + MaxSizeMB: 20, + RetentionDays: 3, + }, } } @@ -313,6 +329,18 @@ func (c *Config) GetAPIBase() string { return c.Providers.Proxy.APIBase } +func (c *Config) LogFilePath() string { + c.mu.RLock() + defer c.mu.RUnlock() + + dir := expandHome(c.Logging.Dir) + filename := c.Logging.Filename + if filename == "" { + filename = "clawgo.log" + } + return filepath.Join(dir, filename) +} + func expandHome(path string) string { if path == "" { return path diff --git a/pkg/config/validate.go b/pkg/config/validate.go new file mode 100644 index 0000000..5b66e06 --- /dev/null +++ b/pkg/config/validate.go @@ -0,0 +1,79 @@ +package config + +import "fmt" + +// Validate returns configuration problems found in cfg. +// It does not mutate cfg. +func Validate(cfg *Config) []error { + if cfg == nil { + return []error{fmt.Errorf("config is nil")} + } + + var errs []error + + if cfg.Agents.Defaults.Model == "" { + errs = append(errs, fmt.Errorf("agents.defaults.model is required")) + } + if cfg.Agents.Defaults.MaxToolIterations <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.max_tool_iterations must be > 0")) + } + + if cfg.Providers.Proxy.APIBase == "" { + errs = append(errs, fmt.Errorf("providers.proxy.api_base is required")) + } + + if cfg.Gateway.Port <= 0 || cfg.Gateway.Port > 65535 { + errs = append(errs, fmt.Errorf("gateway.port must be in 1..65535")) + } + + if cfg.Logging.Enabled { + if cfg.Logging.Dir == "" { + errs = append(errs, fmt.Errorf("logging.dir is required when logging.enabled=true")) + } + if cfg.Logging.Filename == "" { + errs = append(errs, fmt.Errorf("logging.filename is required when logging.enabled=true")) + } + if cfg.Logging.MaxSizeMB <= 0 { + errs = append(errs, fmt.Errorf("logging.max_size_mb must be > 0")) + } + if cfg.Logging.RetentionDays <= 0 { + errs = append(errs, fmt.Errorf("logging.retention_days must be > 0")) + } + } + + if cfg.Channels.Telegram.Enabled && cfg.Channels.Telegram.Token == "" { + errs = append(errs, fmt.Errorf("channels.telegram.token is required when channels.telegram.enabled=true")) + } + if cfg.Channels.Discord.Enabled && cfg.Channels.Discord.Token == "" { + errs = append(errs, fmt.Errorf("channels.discord.token is required when channels.discord.enabled=true")) + } + if cfg.Channels.WhatsApp.Enabled && cfg.Channels.WhatsApp.BridgeURL == "" { + errs = append(errs, fmt.Errorf("channels.whatsapp.bridge_url is required when channels.whatsapp.enabled=true")) + } + if cfg.Channels.DingTalk.Enabled { + if cfg.Channels.DingTalk.ClientID == "" { + errs = append(errs, fmt.Errorf("channels.dingtalk.client_id is required when channels.dingtalk.enabled=true")) + } + if cfg.Channels.DingTalk.ClientSecret == "" { + errs = append(errs, fmt.Errorf("channels.dingtalk.client_secret is required when channels.dingtalk.enabled=true")) + } + } + if cfg.Channels.Feishu.Enabled { + if cfg.Channels.Feishu.AppID == "" { + errs = append(errs, fmt.Errorf("channels.feishu.app_id is required when channels.feishu.enabled=true")) + } + if cfg.Channels.Feishu.AppSecret == "" { + errs = append(errs, fmt.Errorf("channels.feishu.app_secret is required when channels.feishu.enabled=true")) + } + } + if cfg.Channels.QQ.Enabled { + if cfg.Channels.QQ.AppID == "" { + errs = append(errs, fmt.Errorf("channels.qq.app_id is required when channels.qq.enabled=true")) + } + if cfg.Channels.QQ.AppSecret == "" { + errs = append(errs, fmt.Errorf("channels.qq.app_secret is required when channels.qq.enabled=true")) + } + } + + return errs +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 22f6682..91f9348 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "path/filepath" "runtime" "strings" "sync" @@ -37,7 +38,11 @@ var ( ) type Logger struct { - file *os.File + file *os.File + filePath string + maxSizeBytes int64 + maxAgeDays int + fileMu sync.Mutex } type LogEntry struct { @@ -68,9 +73,25 @@ func GetLevel() LogLevel { } func EnableFileLogging(filePath string) error { + return EnableFileLoggingWithRotation(filePath, 20, 3) +} + +func EnableFileLoggingWithRotation(filePath string, maxSizeMB, maxAgeDays int) error { mu.Lock() defer mu.Unlock() + if maxSizeMB <= 0 { + maxSizeMB = 20 + } + if maxAgeDays <= 0 { + maxAgeDays = 3 + } + + dir := filepath.Dir(filePath) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create log directory: %w", err) + } + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return fmt.Errorf("failed to open log file: %w", err) @@ -81,6 +102,12 @@ func EnableFileLogging(filePath string) error { } logger.file = file + logger.filePath = filePath + logger.maxSizeBytes = int64(maxSizeMB) * 1024 * 1024 + logger.maxAgeDays = maxAgeDays + if err := logger.cleanupOldLogFiles(); err != nil { + log.Println("Failed to clean up old log files:", err) + } log.Println("File logging enabled:", filePath) return nil } @@ -92,6 +119,9 @@ func DisableFileLogging() { if logger.file != nil { logger.file.Close() logger.file = nil + logger.filePath = "" + logger.maxSizeBytes = 0 + logger.maxAgeDays = 0 log.Println("File logging disabled") } } @@ -119,7 +149,9 @@ func logMessage(level LogLevel, component string, message string, fields map[str if logger.file != nil { jsonData, err := json.Marshal(entry) if err == nil { - logger.file.WriteString(string(jsonData) + "\n") + if err := logger.writeLine(append(jsonData, '\n')); err != nil { + log.Println("Failed to write file log:", err) + } } } @@ -143,6 +175,88 @@ func logMessage(level LogLevel, component string, message string, fields map[str } } +func (l *Logger) writeLine(line []byte) error { + l.fileMu.Lock() + defer l.fileMu.Unlock() + + if l.file == nil { + return nil + } + + if l.maxSizeBytes > 0 { + if err := l.rotateIfNeeded(int64(len(line))); err != nil { + return err + } + } + + _, err := l.file.Write(line) + return err +} + +func (l *Logger) rotateIfNeeded(nextWrite int64) error { + info, err := l.file.Stat() + if err != nil { + return err + } + + if info.Size()+nextWrite <= l.maxSizeBytes { + return nil + } + + if err := l.file.Close(); err != nil { + return err + } + + backupPath := fmt.Sprintf("%s.%s", l.filePath, time.Now().UTC().Format("20060102-150405")) + if err := os.Rename(l.filePath, backupPath); err != nil { + return err + } + + file, err := os.OpenFile(l.filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + l.file = file + + return l.cleanupOldLogFiles() +} + +func (l *Logger) cleanupOldLogFiles() error { + if l.maxAgeDays <= 0 || l.filePath == "" { + return nil + } + + dir := filepath.Dir(l.filePath) + base := filepath.Base(l.filePath) + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + + cutoff := time.Now().AddDate(0, 0, -l.maxAgeDays) + for _, entry := range entries { + if entry.IsDir() { + continue + } + + name := entry.Name() + // Only delete rotated files like clawgo.log.20260213-120000 + if !strings.HasPrefix(name, base+".") { + continue + } + + info, err := entry.Info() + if err != nil { + continue + } + if info.ModTime().Before(cutoff) { + _ = os.Remove(filepath.Join(dir, name)) + } + } + + return nil +} + func formatComponent(component string) string { if component == "" { return "" diff --git a/pkg/session/manager.go b/pkg/session/manager.go index 06d5244..d7694ff 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "clawgo/pkg/logger" "clawgo/pkg/providers" ) @@ -34,7 +35,12 @@ func NewSessionManager(storage string) *SessionManager { } if storage != "" { - os.MkdirAll(storage, 0755) + if err := os.MkdirAll(storage, 0755); err != nil { + logger.ErrorCF("session", "Failed to create session storage", map[string]interface{}{ + "storage": storage, + "error": err.Error(), + }) + } sm.loadSessions() } @@ -85,7 +91,12 @@ func (sm *SessionManager) AddMessageFull(sessionKey string, msg providers.Messag session.mu.Unlock() // 立即持久化 (Append-only) - sm.appendMessage(sessionKey, msg) + if err := sm.appendMessage(sessionKey, msg); err != nil { + logger.ErrorCF("session", "Failed to persist session message", map[string]interface{}{ + "session_key": sessionKey, + "error": err.Error(), + }) + } } func (sm *SessionManager) appendMessage(sessionKey string, msg providers.Message) error { @@ -188,7 +199,10 @@ func (sm *SessionManager) Save(session *Session) error { "updated": session.Updated, "created": session.Created, } - data, _ := json.MarshalIndent(meta, "", " ") + data, err := json.MarshalIndent(meta, "", " ") + if err != nil { + return err + } return os.WriteFile(metaPath, data, 0644) } @@ -212,7 +226,6 @@ func (sm *SessionManager) loadSessions() error { if err != nil { continue } - scanner := bufio.NewScanner(f) session.mu.Lock() for scanner.Scan() { @@ -222,7 +235,13 @@ func (sm *SessionManager) loadSessions() error { } } session.mu.Unlock() - f.Close() + if err := scanner.Err(); err != nil { + logger.WarnCF("session", "Error while scanning session history", map[string]interface{}{ + "file": file.Name(), + "error": err.Error(), + }) + } + _ = f.Close() } // 处理元数据 diff --git a/pkg/tools/browser.go b/pkg/tools/browser.go index 59e8f5a..ad76373 100644 --- a/pkg/tools/browser.go +++ b/pkg/tools/browser.go @@ -3,18 +3,23 @@ package tools import ( "context" "fmt" - "os/exec" + "path/filepath" "time" + + "clawgo/pkg/browser" ) type BrowserTool struct { - chromePath string - timeout time.Duration + client *browser.Browser } func NewBrowserTool() *BrowserTool { + client := browser.New() + timeout := 30 * time.Second + client.SetTimeout(timeout) + return &BrowserTool{ - timeout: 30 * time.Second, + client: client, } } @@ -57,35 +62,22 @@ func (t *BrowserTool) Execute(ctx context.Context, args map[string]interface{}) } func (t *BrowserTool) takeScreenshot(ctx context.Context, url string) (string, error) { - // 基于 CLI 的简单实现:使用 chromium-browser --headless outputPath := fmt.Sprintf("/tmp/screenshot_%d.png", time.Now().UnixNano()) - cmd := exec.CommandContext(ctx, "chromium-browser", - "--headless", - "--disable-gpu", - "--no-sandbox", - "--screenshot="+outputPath, - url) - - if err := cmd.Run(); err != nil { - return "", fmt.Errorf("failed to take screenshot: %w (ensure chromium-browser is installed)", err) + if !t.client.Available() { + return "", fmt.Errorf("failed to take screenshot: no chromium-compatible browser available") } - return fmt.Sprintf("Screenshot saved to: %s", outputPath), nil + if err := t.client.Screenshot(ctx, url, outputPath); err != nil { + return "", err + } + + return fmt.Sprintf("Screenshot saved to: %s", filepath.Clean(outputPath)), nil } func (t *BrowserTool) fetchDynamicContent(ctx context.Context, url string) (string, error) { - // 简单实现:dump-dom - cmd := exec.CommandContext(ctx, "chromium-browser", - "--headless", - "--disable-gpu", - "--no-sandbox", - "--dump-dom", - url) - - output, err := cmd.Output() - if err != nil { - return "", fmt.Errorf("failed to fetch content: %w", err) + if !t.client.Available() { + return "", fmt.Errorf("failed to fetch content: no chromium-compatible browser available") } - return string(output), nil + return t.client.Content(ctx, url) } diff --git a/pkg/tools/parallel.go b/pkg/tools/parallel.go index d7a26d4..bb2c36b 100644 --- a/pkg/tools/parallel.go +++ b/pkg/tools/parallel.go @@ -6,6 +6,8 @@ import ( "sync" ) +const maxParallelToolCalls = 8 + type ParallelTool struct { registry *ToolRegistry } @@ -61,6 +63,7 @@ func (t *ParallelTool) Execute(ctx context.Context, args map[string]interface{}) results := make(map[string]string) var mu sync.Mutex var wg sync.WaitGroup + sem := make(chan struct{}, maxParallelToolCalls) for i, c := range callsRaw { call, ok := c.(map[string]interface{}) @@ -78,8 +81,11 @@ func (t *ParallelTool) Execute(ctx context.Context, args map[string]interface{}) wg.Add(1) go func(id, name string, args map[string]interface{}) { defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + res, err := t.registry.Execute(ctx, name, args) - + mu.Lock() defer mu.Unlock() if err != nil { diff --git a/pkg/tools/parallel_fetch.go b/pkg/tools/parallel_fetch.go index ed42ff9..76903aa 100644 --- a/pkg/tools/parallel_fetch.go +++ b/pkg/tools/parallel_fetch.go @@ -6,6 +6,8 @@ import ( "sync" ) +const maxParallelFetchCalls = 8 + type ParallelFetchTool struct { fetcher *WebFetchTool } @@ -46,6 +48,7 @@ func (t *ParallelFetchTool) Execute(ctx context.Context, args map[string]interfa results := make([]string, len(urlsRaw)) var wg sync.WaitGroup + sem := make(chan struct{}, maxParallelFetchCalls) for i, u := range urlsRaw { urlStr, ok := u.(string) @@ -56,6 +59,9 @@ func (t *ParallelFetchTool) Execute(ctx context.Context, args map[string]interfa wg.Add(1) go func(index int, url string) { defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + res, err := t.fetcher.Execute(ctx, map[string]interface{}{"url": url}) if err != nil { results[index] = fmt.Sprintf("Error fetching %s: %v", url, err) diff --git a/pkg/tools/web.go b/pkg/tools/web.go index 218815e..462d83d 100644 --- a/pkg/tools/web.go +++ b/pkg/tools/web.go @@ -13,7 +13,8 @@ import ( ) const ( - userAgent = "Mozilla/5.0 (compatible; clawgo/1.0)" + userAgent = "Mozilla/5.0 (compatible; clawgo/1.0)" + maxFetchResponseBytes = 8 * 1024 * 1024 ) type WebSearchTool struct { @@ -93,10 +94,14 @@ func (t *WebSearchTool) Execute(ctx context.Context, args map[string]interface{} } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + limitedReader := io.LimitReader(resp.Body, maxFetchResponseBytes+1) + body, err := io.ReadAll(limitedReader) if err != nil { return "", fmt.Errorf("failed to read response: %w", err) } + if len(body) > maxFetchResponseBytes { + return "", fmt.Errorf("response body too large (>%d bytes)", maxFetchResponseBytes) + } var searchResp struct { Web struct {