From 154ab3f7f9836ad9a4c9d6fcb5fb38d8f1eabcf0 Mon Sep 17 00:00:00 2001 From: lpf Date: Wed, 4 Mar 2026 12:44:31 +0800 Subject: [PATCH] feat: add session auto-planning and resource-based concurrency scheduling --- README.md | 247 +++++++---------- README_EN.md | 249 +++++++---------- config.example.json | 4 + pkg/agent/loop.go | 73 +++-- pkg/agent/loop_session_regression_test.go | 31 +++ pkg/agent/session_planner.go | 182 +++++++++++++ pkg/agent/session_planner_test.go | 54 ++++ pkg/agent/session_scheduler.go | 265 +++++++++++++++++++ pkg/agent/session_scheduler_test.go | 103 +++++++ pkg/autonomy/engine.go | 175 ++++-------- pkg/autonomy/engine_notify_allowlist_test.go | 9 +- pkg/config/config.go | 58 ++-- pkg/config/validate.go | 6 + pkg/scheduling/resource_keys.go | 166 ++++++++++++ pkg/scheduling/resource_keys_test.go | 49 ++++ webui/src/i18n/index.ts | 8 + 16 files changed, 1193 insertions(+), 486 deletions(-) create mode 100644 pkg/agent/session_planner.go create mode 100644 pkg/agent/session_planner_test.go create mode 100644 pkg/agent/session_scheduler.go create mode 100644 pkg/agent/session_scheduler_test.go create mode 100644 pkg/scheduling/resource_keys.go create mode 100644 pkg/scheduling/resource_keys_test.go diff --git a/README.md b/README.md index 2d78596..68fda44 100644 --- a/README.md +++ b/README.md @@ -1,104 +1,67 @@ -# ClawGo +# ClawGo 🚀 -高性能、可长期运行的 Go 原生 AI Agent(支持多平台构建与多通道接入)。 +一个 Go 写的长期运行 AI Agent:轻量、可审计、可多通道接入。 [English](./README_EN.md) --- -## 一句话介绍 +## 它能做什么 ✨ -**ClawGo = 单二进制网关 + 多通道消息 + 工具调用 + 自治执行 + 可审计记忆。** - -适合: -- 私有化 AI 助手 -- 持续巡检/自动任务 -- 多通道机器人(Telegram/Feishu/Discord 等) -- 需要可控、可追踪、可回滚的 Agent 系统 +- 🤖 本地对话模式:`clawgo agent` +- 🌐 网关服务模式:`clawgo gateway` +- 💬 多通道接入:Telegram / Feishu / Discord / WhatsApp / QQ / DingTalk / MaixCam +- 🧰 工具调用、技能系统、子任务协同 +- 🧠 自治任务(队列、审计、冲突锁) +- 📊 WebUI 可视化(Chat / Logs / Config / Cron / Tasks / EKG) --- -## 核心能力 +## 并发调度(新)⚙️ -- **双模式运行** - - `clawgo agent`:本地交互模式 - - `clawgo gateway`:服务化网关模式(推荐长期运行) - -- **多通道支持** - - Telegram / Feishu / Discord / WhatsApp / QQ / DingTalk / MaixCam - -- **工具与技能体系** - - 内置工具调用、技能安装与执行 - - 支持任务编排与子任务协同 - -- **自治与任务治理** - - 会话级自治(idle 预算、暂停/恢复) - - Task Queue / Task Audit 分层治理 - - 自治任务冲突锁(resource_keys) - -- **记忆与上下文治理** - - `memory_search` / 分层记忆 - - 自动上下文压缩 - - 启动自检与任务续跑 - -- **可靠性增强** - - Provider fallback(含 errsig-aware 排序) - - 入站/出站去重(防重复收发) - - 审计可观测(provider/model/source/channel) +- 🧩 一条复合消息会先自动拆成多个子任务(可配置上限) +- 🔀 同一会话内:无资源冲突的子任务可并发执行 +- 🔒 同一会话内:有资源冲突的子任务自动串行(避免互相踩) +- 🏷️ 默认会自动推断 `resource_keys`,无需手动填写 --- -## EKG(Execution Knowledge Graph) +## 3 分钟上手 ⚡ -ClawGo 内置轻量 EKG(无需外部图数据库),用于降低重复错误与无效重试: - -- 事件流:`memory/ekg-events.jsonl` -- 快照:`memory/ekg-snapshot.json` -- 错误签名归一化(路径/数字/hex 去噪) -- 重复错误抑制(可配置阈值) -- Provider fallback 历史打分(含错误签名维度) -- 与 Memory 联动(`[EKG_INCIDENT]` 结构化沉淀,支持提前拦截) -- WebUI 支持按 `6h/24h/7d` 时间窗口查看 - ---- - -## 快速开始 - -### 一键安装(install.sh) - -- GitHub 脚本链接: -- 一键安装命令: +### 1) 安装 ```bash curl -fsSL https://raw.githubusercontent.com/YspCoder/clawgo/main/install.sh | bash ``` -### 1) 初始化 +### 2) 初始化 ```bash clawgo onboard ``` -### 2) 配置上游模型/代理 +### 3) 配置模型 ```bash clawgo login ``` -### 3) 查看状态 +### 4) 看状态 ```bash clawgo status ``` -### 4) 本地模式 +### 5) 开始使用 + +本地模式: ```bash clawgo agent clawgo agent -m "Hello" ``` -### 5) 网关模式 +网关模式: ```bash # 注册并启用 systemd 服务 @@ -106,21 +69,22 @@ clawgo gateway clawgo gateway start clawgo gateway status -# 前台运行 +# 或前台运行 clawgo gateway run ``` --- -## WebUI +## WebUI 🖥️ -访问: +访问地址: ```text http://:/webui?token= ``` 主要页面: + - Dashboard - Chat - Logs @@ -135,88 +99,7 @@ http://:/webui?token= --- -## 多平台构建(Make) - -### 构建所有默认平台 - -```bash -make build-all -``` - -### Linux 专项瘦身构建(不禁用通道) - -```bash -make build-linux-slim -``` - -说明(仅 Linux): -- 在不禁用任何通道能力前提下,启用 `purego,netgo,osusergo` 与 `CGO_ENABLED=0`,降低体积并减少动态库依赖。 -- 可选叠加 `COMPRESS_BINARY=1`(若安装 upx)做进一步压缩。 - -默认矩阵: -- linux/amd64 -- linux/arm64 -- linux/riscv64 -- darwin/amd64 -- darwin/arm64 -- windows/amd64 -- windows/arm64 - -### 自定义平台矩阵 - -```bash -make build-all BUILD_TARGETS="linux/amd64 linux/arm64 darwin/arm64 windows/amd64" -``` - -### 极致瘦身构建(目标 <10MB) - -```bash -make build COMPRESS_BINARY=1 -``` - -说明: -- 默认已启用 `-trimpath -buildvcs=false -s -w`,可减少路径与符号信息。 -- `COMPRESS_BINARY=1` 时会尝试使用 `upx --best --lzma` 进一步压缩可执行文件。 -- 若环境未安装 `upx`,会自动跳过并给出提示,不影响构建成功。 - -### 打包与校验 - -```bash -make package-all -``` - -输出: -- `build/*.tar.gz`(Linux/macOS) -- `build/*.zip`(Windows) -- `build/checksums.txt` - ---- - -## GitHub Release 自动发布 - -已内置 `.github/workflows/release.yml`: - -触发方式: -- 推送 tag:`v*`(如 `v0.0.1`) -- 手动触发(workflow_dispatch) - -自动完成: -- 多平台编译 -- 产物打包 -- checksums 生成 -- WebUI dist 打包 -- 发布到 GitHub Releases - -示例: - -```bash -git tag v0.0.2 -git push origin v0.0.2 -``` - ---- - -## 常用命令 +## 常用命令 📌 ```text clawgo onboard @@ -233,26 +116,74 @@ clawgo uninstall [--purge] [--remove-bin] --- -## 配置与热更新 +## 构建与发布 🛠️ -- 支持 `clawgo config set/get/check/reload` -- 严格 JSON 解析(未知字段会报错) -- 配置热更新失败自动回滚备份 -- Provider 接口已统一为 `responses` +构建全部默认平台: +```bash +make build-all +``` + +Linux 瘦身构建: + +```bash +make build-linux-slim +``` + +自定义平台矩阵: + +```bash +make build-all BUILD_TARGETS="linux/amd64 linux/arm64 darwin/arm64 windows/amd64" +``` + +打包并生成校验: + +```bash +make package-all +``` + +产物: + +- `build/*.tar.gz`(Linux/macOS) +- `build/*.zip`(Windows) +- `build/checksums.txt` --- -## 稳定性与审计建议 +## 自动发布(GitHub Release)📦 -生产建议开启: -- 通道去重窗口配置 -- task-audit heartbeat 默认过滤 -- EKG 时间窗口观察(默认 24h) -- 定期查看 EKG Top errsig 与 provider 分数 +仓库内置 `.github/workflows/release.yml`。 + +触发方式: + +- 推送 tag(如 `v0.0.2`) +- 手动触发 workflow_dispatch + +示例: + +```bash +git tag v0.0.2 +git push origin v0.0.2 +``` --- -## License +## 配置说明 ⚙️ -请参考仓库中的 License 文件。 +- 配置文件严格 JSON 校验(未知字段会报错) +- 支持热更新:`clawgo config reload` +- 热更新失败会自动回滚备份 + +--- + +## 稳定运行建议 ✅ + +- 打开通道去重窗口(防重复消息) +- 定期看 Task Audit 和 EKG(查慢任务与高频错误) +- 长期运行推荐使用 `gateway` 服务模式 + +--- + +## License 📄 + +见仓库中的 `LICENSE` 文件。 diff --git a/README_EN.md b/README_EN.md index 85f7752..ed37ba7 100644 --- a/README_EN.md +++ b/README_EN.md @@ -1,126 +1,90 @@ -# ClawGo +# ClawGo 🚀 -A high-performance, long-running Go-native AI Agent with multi-platform builds and multi-channel messaging. +A long-running AI Agent written in Go: lightweight, auditable, and multi-channel ready. [中文](./README.md) --- -## What is ClawGo? +## What It Does ✨ -**ClawGo = single-binary gateway + channel integrations + tool calling + autonomy + auditable memory.** - -Best for: -- Self-hosted AI assistants -- Continuous automation / inspections -- Multi-channel bots (Telegram/Feishu/Discord/...) -- Agent systems requiring control, traceability, and rollback safety +- 🤖 Local chat mode: `clawgo agent` +- 🌐 Gateway service mode: `clawgo gateway` +- 💬 Multi-channel support: Telegram / Feishu / Discord / WhatsApp / QQ / DingTalk / MaixCam +- 🧰 Tool calling, skills, and sub-task collaboration +- 🧠 Autonomous tasks (queue, audit, conflict locks) +- 📊 WebUI for visibility (Chat / Logs / Config / Cron / Tasks / EKG) --- -## Core Capabilities +## Concurrency Scheduling (New) ⚙️ -- **Dual runtime modes** - - `clawgo agent`: local interactive mode - - `clawgo gateway`: service mode for long-running workloads - -- **Multi-channel support** - - Telegram / Feishu / Discord / WhatsApp / QQ / DingTalk / MaixCam - -- **Tools & skills** - - Built-in tool-calling and skill execution - - Task orchestration support - -- **Autonomy & task governance** - - Session-level autonomy (idle budget, pause/resume) - - Task Queue + Task Audit governance - - Resource-key locking for conflict control - -- **Memory & context governance** - - `memory_search` and layered memory - - Automatic context compaction - - Startup self-check and task continuation - -- **Reliability hardening** - - Provider fallback (errsig-aware ranking) - - Inbound/outbound dedupe protection - - Better observability (provider/model/source/channel) +- 🧩 A composite message is auto-split into sub-tasks (configurable max) +- 🔀 Within the same session: non-conflicting tasks run in parallel +- 🔒 Within the same session: conflicting tasks run serially (to avoid collisions) +- 🏷️ `resource_keys` are inferred automatically by default (no manual input needed) --- -## EKG (Execution Knowledge Graph) +## 3-Minute Quick Start ⚡ -ClawGo includes a lightweight EKG (no external graph DB required): - -- Event log: `memory/ekg-events.jsonl` -- Snapshot cache: `memory/ekg-snapshot.json` -- Normalized error signatures (path/number/hex denoise) -- Repeated-error suppression (configurable threshold) -- Historical provider scoring (including error-signature dimension) -- Memory linkage (`[EKG_INCIDENT]` structured notes for earlier suppression) -- WebUI time windows: `6h / 24h / 7d` - ---- - -## Quick Start - -### One-Click Install (install.sh) - -- GitHub script link: -- One-click install command: +### 1) Install ```bash curl -fsSL https://raw.githubusercontent.com/YspCoder/clawgo/main/install.sh | bash ``` -### 1) Initialize +### 2) Initialize ```bash clawgo onboard ``` -### 2) Configure upstream model/proxy +### 3) Configure model/proxy ```bash clawgo login ``` -### 3) Check status +### 4) Check status ```bash clawgo status ``` -### 4) Local mode +### 5) Start using it + +Local mode: ```bash clawgo agent clawgo agent -m "Hello" ``` -### 5) Gateway mode +Gateway mode: ```bash -# register + enable systemd service +# Register + enable systemd service clawgo gateway clawgo gateway start clawgo gateway status -# foreground mode +# Or run in foreground clawgo gateway run ``` --- -## WebUI +## WebUI 🖥️ -Access: +Open: ```text http://:/webui?token= ``` Main pages: + - Dashboard - Chat - Logs @@ -135,88 +99,7 @@ Main pages: --- -## Multi-Platform Build (Make) - -### Build all default targets - -```bash -make build-all -``` - -### Linux slim build (without disabling channels) - -```bash -make build-linux-slim -``` - -Notes (Linux only): -- Keeps all channel capabilities while enabling `purego,netgo,osusergo` with `CGO_ENABLED=0` to reduce size and dynamic library coupling. -- Optionally combine with `COMPRESS_BINARY=1` (if `upx` is installed) for additional compression. - -Default matrix: -- linux/amd64 -- linux/arm64 -- linux/riscv64 -- darwin/amd64 -- darwin/arm64 -- windows/amd64 -- windows/arm64 - -### Custom build matrix - -```bash -make build-all BUILD_TARGETS="linux/amd64 linux/arm64 darwin/arm64 windows/amd64" -``` - -### Ultra-slim build (target <10MB) - -```bash -make build COMPRESS_BINARY=1 -``` - -Notes: -- Default build now uses `-trimpath -buildvcs=false -s -w` to remove path/symbol overhead. -- With `COMPRESS_BINARY=1`, the build will try `upx --best --lzma` for further executable compression. -- If `upx` is unavailable, build still succeeds and prints a warning. - -### Package + checksums - -```bash -make package-all -``` - -Outputs: -- `build/*.tar.gz` (Linux/macOS) -- `build/*.zip` (Windows) -- `build/checksums.txt` - ---- - -## GitHub Release Automation - -Built-in workflow: `.github/workflows/release.yml` - -Triggers: -- tag push: `v*` (e.g. `v0.0.1`) -- manual dispatch (workflow_dispatch) - -Pipeline includes: -- Multi-platform compilation -- Artifact packaging -- Checksum generation -- WebUI dist packaging -- GitHub Releases publishing - -Example: - -```bash -git tag v0.0.2 -git push origin v0.0.2 -``` - ---- - -## Common Commands +## Common Commands 📌 ```text clawgo onboard @@ -233,26 +116,74 @@ clawgo uninstall [--purge] [--remove-bin] --- -## Config & Hot Reload +## Build & Release 🛠️ -- Supports `clawgo config set/get/check/reload` -- Strict JSON parsing (unknown fields fail fast) -- Auto rollback on failed hot reload -- Provider interface is now `responses` only +Build all default targets: +```bash +make build-all +``` + +Linux slim build: + +```bash +make build-linux-slim +``` + +Custom target matrix: + +```bash +make build-all BUILD_TARGETS="linux/amd64 linux/arm64 darwin/arm64 windows/amd64" +``` + +Package + checksums: + +```bash +make package-all +``` + +Outputs: + +- `build/*.tar.gz` (Linux/macOS) +- `build/*.zip` (Windows) +- `build/checksums.txt` --- -## Stability / Operations Notes +## GitHub Release Automation 📦 -Recommended for production: -- tune channel dedupe windows -- keep heartbeat filtered in task-audit by default -- monitor EKG with 24h window baseline -- review Top errsig/provider score periodically +Built-in workflow: `.github/workflows/release.yml` + +Triggers: + +- Tag push (e.g. `v0.0.2`) +- Manual `workflow_dispatch` + +Example: + +```bash +git tag v0.0.2 +git push origin v0.0.2 +``` --- -## License +## Config Notes ⚙️ -See repository License file. +- Strict JSON validation (unknown fields fail fast) +- Supports hot reload: `clawgo config reload` +- Auto rollback if hot reload fails + +--- + +## Production Tips ✅ + +- Enable channel dedupe windows +- Monitor Task Audit and EKG regularly +- Use gateway mode for long-running workloads + +--- + +## License 📄 + +See `LICENSE` in this repository. diff --git a/config.example.json b/config.example.json index 77da77a..726b3ed 100644 --- a/config.example.json +++ b/config.example.json @@ -69,6 +69,10 @@ "run_state_max": 500, "tool_parallel_safe_names": ["read_file", "list_files", "find_files", "grep_files", "memory_search", "web_search", "repo_map", "system_info"], "tool_max_parallel_calls": 2, + "session_resource_scheduling_enabled": true, + "session_max_parallel_runs": 4, + "session_auto_plan_enabled": true, + "session_auto_plan_max_tasks": 4, "system_summary": { "marker": "## System Task Summary", "completed_prefix": "- Completed:", diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index a51aa93..3a1be33 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -27,6 +27,7 @@ import ( "clawgo/pkg/logger" "clawgo/pkg/nodes" "clawgo/pkg/providers" + "clawgo/pkg/scheduling" "clawgo/pkg/session" "clawgo/pkg/tools" ) @@ -53,12 +54,13 @@ type AgentLoop struct { runtimeCompactionNote string startupCompactionNote string systemRewriteTemplate string + sessionAutoPlan bool + sessionAutoPlanMax int audit *triggerAudit running bool intentMu sync.RWMutex intentHints map[string]string - sessionRunMu sync.Mutex - sessionRunLocks map[string]*sync.Mutex + sessionScheduler *SessionScheduler providerNames []string providerPool map[string]providers.LLMProvider providerResponses map[string]config.ProviderResponsesConfig @@ -240,16 +242,24 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers runtimeCompactionNote: cfg.Agents.Defaults.Texts.RuntimeCompactionNote, startupCompactionNote: cfg.Agents.Defaults.Texts.StartupCompactionNote, systemRewriteTemplate: cfg.Agents.Defaults.Texts.SystemRewriteTemplate, + sessionAutoPlan: cfg.Agents.Defaults.RuntimeControl.SessionAutoPlanEnabled, + sessionAutoPlanMax: cfg.Agents.Defaults.RuntimeControl.SessionAutoPlanMaxTasks, audit: newTriggerAudit(workspace), running: false, intentHints: map[string]string{}, - sessionRunLocks: map[string]*sync.Mutex{}, + sessionScheduler: NewSessionScheduler(cfg.Agents.Defaults.RuntimeControl.SessionMaxParallelRuns), ekg: ekg.New(workspace), sessionProvider: map[string]string{}, sessionStreamed: map[string]bool{}, providerResponses: map[string]config.ProviderResponsesConfig{}, telegramStreaming: cfg.Channels.Telegram.Streaming, } + if !cfg.Agents.Defaults.RuntimeControl.SessionResourceSchedulingEnabled { + loop.sessionScheduler = nil + } + if loop.sessionAutoPlanMax <= 0 { + loop.sessionAutoPlanMax = 4 + } // Initialize provider fallback chain (primary + proxy_fallbacks). loop.providerPool = map[string]providers.LLMProvider{} @@ -345,22 +355,6 @@ func (al *AgentLoop) buildSessionShards(ctx context.Context) []chan bus.InboundM return shards } -func (al *AgentLoop) lockSessionRun(sessionKey string) func() { - key := strings.TrimSpace(sessionKey) - if key == "" { - key = "default" - } - al.sessionRunMu.Lock() - mu, ok := al.sessionRunLocks[key] - if !ok { - mu = &sync.Mutex{} - al.sessionRunLocks[key] = mu - } - al.sessionRunMu.Unlock() - mu.Lock() - return func() { mu.Unlock() } -} - func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, string, error) { if len(al.providerNames) <= 1 { return nil, "", primaryErr @@ -452,7 +446,7 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) started := time.Now() al.appendTaskAuditEvent(taskID, msg, "running", started, 0, "started", false) - response, err := al.processMessage(ctx, msg) + response, err := al.processPlannedMessage(ctx, msg) if err != nil { response = fmt.Sprintf("Error processing message: %v", err) } @@ -676,7 +670,7 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri SessionKey: sessionKey, } - return al.processMessage(ctx, msg) + return al.processPlannedMessage(ctx, msg) } func (al *AgentLoop) GetSessionHistory(sessionKey string) []providers.Message { @@ -687,8 +681,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) if msg.SessionKey == "" { msg.SessionKey = "main" } - unlock := al.lockSessionRun(msg.SessionKey) - defer unlock() + release, err := al.acquireSessionResources(ctx, &msg) + if err != nil { + return "", err + } + defer release() if len(al.providerNames) > 0 { al.setSessionProvider(msg.SessionKey, al.providerNames[0]) } @@ -1038,6 +1035,36 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return userContent, nil } +func (al *AgentLoop) acquireSessionResources(ctx context.Context, msg *bus.InboundMessage) (func(), error) { + if al == nil || msg == nil || al.sessionScheduler == nil { + return func() {}, nil + } + keys, cleaned := al.resolveMessageResourceKeys(msg) + msg.Content = cleaned + return al.sessionScheduler.Acquire(ctx, msg.SessionKey, keys) +} + +func (al *AgentLoop) resolveMessageResourceKeys(msg *bus.InboundMessage) ([]string, string) { + if msg == nil { + return nil, "" + } + content := msg.Content + if msg.Metadata != nil { + if raw := strings.TrimSpace(msg.Metadata["resource_keys"]); raw != "" { + if explicit := scheduling.ParseResourceKeyList(raw); len(explicit) > 0 { + return explicit, content + } + } + } + if explicit, cleaned, ok := scheduling.ExtractResourceKeysDirective(content); ok { + if strings.TrimSpace(cleaned) != "" { + content = cleaned + } + return explicit, content + } + return scheduling.DeriveResourceKeys(content), content +} + func (al *AgentLoop) appendDailySummaryLog(msg bus.InboundMessage, response string) { if strings.TrimSpace(al.workspace) == "" { return diff --git a/pkg/agent/loop_session_regression_test.go b/pkg/agent/loop_session_regression_test.go index 21e025f..294ef27 100644 --- a/pkg/agent/loop_session_regression_test.go +++ b/pkg/agent/loop_session_regression_test.go @@ -138,3 +138,34 @@ func summarizeUsers(msgs []providers.Message) []string { } return out } + +func TestResolveMessageResourceKeys_FromMetadata(t *testing.T) { + loop := &AgentLoop{} + msg := &bus.InboundMessage{ + Content: "do task", + Metadata: map[string]string{ + "resource_keys": "repo:acme/app,file:pkg/a.go", + }, + } + keys, cleaned := loop.resolveMessageResourceKeys(msg) + if cleaned != "do task" { + t.Fatalf("unexpected cleaned content: %q", cleaned) + } + if len(keys) != 2 { + t.Fatalf("unexpected keys: %#v", keys) + } +} + +func TestResolveMessageResourceKeys_FromContentDirective(t *testing.T) { + loop := &AgentLoop{} + msg := &bus.InboundMessage{ + Content: "[resource_keys: repo:acme/app,file:pkg/a.go]\nplease fix", + } + keys, cleaned := loop.resolveMessageResourceKeys(msg) + if len(keys) != 2 { + t.Fatalf("unexpected keys: %#v", keys) + } + if cleaned != "please fix" { + t.Fatalf("unexpected cleaned content: %q", cleaned) + } +} diff --git a/pkg/agent/session_planner.go b/pkg/agent/session_planner.go new file mode 100644 index 0000000..f76a419 --- /dev/null +++ b/pkg/agent/session_planner.go @@ -0,0 +1,182 @@ +package agent + +import ( + "context" + "fmt" + "regexp" + "sort" + "strings" + "sync" + + "clawgo/pkg/bus" + "clawgo/pkg/scheduling" +) + +const defaultSessionAutoPlanMaxTasks = 4 + +type plannedTask struct { + Index int + Content string + ResourceKeys []string +} + +type plannedTaskResult struct { + Index int + Task plannedTask + Output string + ErrText string +} + +var reLeadingNumber = regexp.MustCompile(`^\d+[\.)、]\s*`) + +func (al *AgentLoop) processPlannedMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { + tasks := al.planSessionTasks(msg) + if len(tasks) <= 1 { + return al.processMessage(ctx, msg) + } + return al.runPlannedTasks(ctx, msg, tasks) +} + +func (al *AgentLoop) planSessionTasks(msg bus.InboundMessage) []plannedTask { + base := strings.TrimSpace(msg.Content) + if base == "" { + return nil + } + if !al.sessionAutoPlan { + return []plannedTask{{Index: 1, Content: base, ResourceKeys: scheduling.DeriveResourceKeys(base)}} + } + if msg.Channel == "system" || msg.Channel == "internal" { + return []plannedTask{{Index: 1, Content: base, ResourceKeys: scheduling.DeriveResourceKeys(base)}} + } + if msg.Metadata != nil { + if strings.TrimSpace(msg.Metadata["trigger"]) != "" { + return []plannedTask{{Index: 1, Content: base, ResourceKeys: scheduling.DeriveResourceKeys(base)}} + } + } + if strings.HasPrefix(base, "/") { + return []plannedTask{{Index: 1, Content: base, ResourceKeys: scheduling.DeriveResourceKeys(base)}} + } + + segments := splitPlannedSegments(base) + if len(segments) <= 1 { + return []plannedTask{{Index: 1, Content: base, ResourceKeys: scheduling.DeriveResourceKeys(base)}} + } + + maxTasks := al.sessionAutoPlanMax + if maxTasks <= 0 { + maxTasks = defaultSessionAutoPlanMaxTasks + } + if len(segments) > maxTasks { + segments = segments[:maxTasks] + } + + out := make([]plannedTask, 0, len(segments)) + for i, seg := range segments { + content := strings.TrimSpace(seg) + if content == "" { + continue + } + out = append(out, plannedTask{ + Index: i + 1, + Content: content, + ResourceKeys: scheduling.DeriveResourceKeys(content), + }) + } + if len(out) == 0 { + return []plannedTask{{Index: 1, Content: base, ResourceKeys: scheduling.DeriveResourceKeys(base)}} + } + if len(out) == 1 { + out[0].Content = base + out[0].ResourceKeys = scheduling.DeriveResourceKeys(base) + } + return out +} + +func splitPlannedSegments(content string) []string { + lines := strings.Split(content, "\n") + bullet := make([]string, 0, len(lines)) + for _, line := range lines { + t := strings.TrimSpace(line) + if t == "" { + continue + } + if strings.HasPrefix(t, "- ") || strings.HasPrefix(t, "* ") { + bullet = append(bullet, strings.TrimSpace(t[2:])) + continue + } + if reLeadingNumber.MatchString(t) { + bullet = append(bullet, strings.TrimSpace(reLeadingNumber.ReplaceAllString(t, ""))) + } + } + if len(bullet) >= 2 { + return bullet + } + + replaced := strings.NewReplacer(";", ";", "\n", ";", "。然后", ";", " 然后 ", ";", " and then ", ";") + norm := replaced.Replace(content) + parts := strings.Split(norm, ";") + out := make([]string, 0, len(parts)) + for _, p := range parts { + t := strings.TrimSpace(p) + if t == "" { + continue + } + out = append(out, t) + } + return out +} + +func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage, tasks []plannedTask) (string, error) { + results := make([]plannedTaskResult, len(tasks)) + var wg sync.WaitGroup + for i, task := range tasks { + wg.Add(1) + go func(index int, t plannedTask) { + defer wg.Done() + subMsg := msg + subMsg.Content = t.Content + subMsg.Metadata = cloneMetadata(msg.Metadata) + if subMsg.Metadata == nil { + subMsg.Metadata = map[string]string{} + } + subMsg.Metadata["resource_keys"] = strings.Join(t.ResourceKeys, ",") + subMsg.Metadata["planned_task_index"] = fmt.Sprintf("%d", t.Index) + subMsg.Metadata["planned_task_total"] = fmt.Sprintf("%d", len(tasks)) + out, err := al.processMessage(ctx, subMsg) + res := plannedTaskResult{Index: index, Task: t, Output: strings.TrimSpace(out)} + if err != nil { + res.ErrText = err.Error() + } + results[index] = res + }(i, task) + } + wg.Wait() + + sort.SliceStable(results, func(i, j int) bool { return results[i].Task.Index < results[j].Task.Index }) + var b strings.Builder + b.WriteString(fmt.Sprintf("已自动拆解为 %d 个任务并执行:\n\n", len(results))) + for _, r := range results { + b.WriteString(fmt.Sprintf("[%d] %s\n", r.Task.Index, r.Task.Content)) + if r.ErrText != "" { + b.WriteString("执行失败:" + r.ErrText + "\n\n") + continue + } + if r.Output == "" { + b.WriteString("(无输出)\n\n") + continue + } + b.WriteString(r.Output + "\n\n") + } + return strings.TrimSpace(b.String()), nil +} + +func cloneMetadata(m map[string]string) map[string]string { + if len(m) == 0 { + return nil + } + out := make(map[string]string, len(m)) + for k, v := range m { + out[k] = v + } + return out +} diff --git a/pkg/agent/session_planner_test.go b/pkg/agent/session_planner_test.go new file mode 100644 index 0000000..990d372 --- /dev/null +++ b/pkg/agent/session_planner_test.go @@ -0,0 +1,54 @@ +package agent + +import ( + "context" + "testing" + + "clawgo/pkg/bus" + "clawgo/pkg/providers" +) + +func TestSplitPlannedSegments_Bullets(t *testing.T) { + parts := splitPlannedSegments("- 修复 a.go\n- 补充 b.go 测试") + if len(parts) != 2 { + t.Fatalf("unexpected parts: %#v", parts) + } +} + +func TestPlanSessionTasks_Semicolon(t *testing.T) { + loop := &AgentLoop{sessionAutoPlan: true, sessionAutoPlanMax: 4} + tasks := loop.planSessionTasks(bus.InboundMessage{Channel: "cli", Content: "修复 pkg/a.go;修复 pkg/b.go"}) + if len(tasks) != 2 { + t.Fatalf("expected 2 tasks, got %#v", tasks) + } + if tasks[0].Content == tasks[1].Content { + t.Fatalf("expected distinct tasks: %#v", tasks) + } +} + +func TestProcessPlannedMessage_AggregatesResults(t *testing.T) { + rp := &recordingProvider{responses: []providers.LLMResponse{ + {Content: "done-a", FinishReason: "stop"}, + {Content: "done-b", FinishReason: "stop"}, + }} + loop := setupLoop(t, rp) + loop.sessionAutoPlan = true + loop.sessionAutoPlanMax = 4 + + resp, err := loop.processPlannedMessage(context.Background(), bus.InboundMessage{ + Channel: "cli", + SenderID: "u", + ChatID: "direct", + SessionKey: "sess-plan", + Content: "修复 pkg/a.go;补充 pkg/b.go 测试", + }) + if err != nil { + t.Fatalf("processPlannedMessage error: %v", err) + } + if len(rp.calls) != 2 { + t.Fatalf("expected 2 provider calls, got %d", len(rp.calls)) + } + if resp == "" { + t.Fatalf("expected aggregate response") + } +} diff --git a/pkg/agent/session_scheduler.go b/pkg/agent/session_scheduler.go new file mode 100644 index 0000000..2eb461e --- /dev/null +++ b/pkg/agent/session_scheduler.go @@ -0,0 +1,265 @@ +package agent + +import ( + "context" + "errors" + "strings" + "sync" + "sync/atomic" + + "clawgo/pkg/scheduling" +) + +const ( + defaultSessionMaxParallelRuns = 4 + maxSessionMaxParallelRuns = 16 +) + +var errSessionSchedulerClosed = errors.New("session scheduler closed") + +type sessionWaiter struct { + id uint64 + keys []string + ch chan struct{} +} + +type sessionState struct { + running int + owners map[string]uint64 + inflight map[uint64][]string + waiters []*sessionWaiter +} + +type SessionScheduler struct { + maxParallel int + + mu sync.Mutex + sessions map[string]*sessionState + closed bool + nextID uint64 +} + +func NewSessionScheduler(maxParallel int) *SessionScheduler { + if maxParallel <= 0 { + maxParallel = defaultSessionMaxParallelRuns + } + if maxParallel < 1 { + maxParallel = 1 + } + if maxParallel > maxSessionMaxParallelRuns { + maxParallel = maxSessionMaxParallelRuns + } + return &SessionScheduler{ + maxParallel: maxParallel, + sessions: map[string]*sessionState{}, + } +} + +func (s *SessionScheduler) Acquire(ctx context.Context, sessionKey string, keys []string) (func(), error) { + if s == nil { + return func() {}, nil + } + if ctx == nil { + ctx = context.Background() + } + sessionKey = normalizeSessionKey(sessionKey) + keys = scheduling.NormalizeResourceKeys(keys) + if len(keys) == 0 { + keys = []string{"session:" + sessionKey} + } + + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return nil, errSessionSchedulerClosed + } + st := s.ensureSessionLocked(sessionKey) + runID := atomic.AddUint64(&s.nextID, 1) + if s.canRunLocked(st, keys) { + s.grantLocked(st, runID, keys) + s.mu.Unlock() + return s.releaseFunc(sessionKey, runID), nil + } + + w := &sessionWaiter{id: runID, keys: keys, ch: make(chan struct{}, 1)} + st.waiters = append(st.waiters, w) + s.mu.Unlock() + + for { + select { + case <-ctx.Done(): + s.mu.Lock() + if st = s.sessions[sessionKey]; st != nil { + s.removeWaiterLocked(st, runID) + s.pruneSessionLocked(sessionKey, st) + } + s.mu.Unlock() + return nil, ctx.Err() + case <-w.ch: + s.mu.Lock() + st = s.sessions[sessionKey] + if st != nil { + if _, ok := st.inflight[runID]; ok { + s.mu.Unlock() + return s.releaseFunc(sessionKey, runID), nil + } + } + if s.closed { + s.mu.Unlock() + return nil, errSessionSchedulerClosed + } + s.mu.Unlock() + } + } +} + +func (s *SessionScheduler) Close() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return + } + s.closed = true + for _, st := range s.sessions { + for _, w := range st.waiters { + select { + case w.ch <- struct{}{}: + default: + } + } + } +} + +func (s *SessionScheduler) releaseFunc(sessionKey string, runID uint64) func() { + var once sync.Once + return func() { + once.Do(func() { + s.release(sessionKey, runID) + }) + } +} + +func (s *SessionScheduler) release(sessionKey string, runID uint64) { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + st := s.sessions[sessionKey] + if st == nil { + return + } + keys, ok := st.inflight[runID] + if !ok { + return + } + for _, k := range keys { + delete(st.owners, k) + } + delete(st.inflight, runID) + if st.running > 0 { + st.running-- + } + s.scheduleWaitersLocked(st) + s.pruneSessionLocked(sessionKey, st) +} + +func (s *SessionScheduler) ensureSessionLocked(sessionKey string) *sessionState { + st := s.sessions[sessionKey] + if st != nil { + return st + } + st = &sessionState{ + owners: map[string]uint64{}, + inflight: map[uint64][]string{}, + waiters: make([]*sessionWaiter, 0, 4), + } + s.sessions[sessionKey] = st + return st +} + +func (s *SessionScheduler) canRunLocked(st *sessionState, keys []string) bool { + if st == nil { + return false + } + if st.running >= s.maxParallel { + return false + } + for _, k := range keys { + if _, ok := st.owners[k]; ok { + return false + } + } + return true +} + +func (s *SessionScheduler) grantLocked(st *sessionState, runID uint64, keys []string) { + if st == nil { + return + } + st.running++ + st.inflight[runID] = append([]string(nil), keys...) + for _, k := range keys { + st.owners[k] = runID + } +} + +func (s *SessionScheduler) scheduleWaitersLocked(st *sessionState) { + if st == nil || len(st.waiters) == 0 { + return + } + for { + progress := false + for i := 0; i < len(st.waiters); { + w := st.waiters[i] + if !s.canRunLocked(st, w.keys) { + i++ + continue + } + s.grantLocked(st, w.id, w.keys) + st.waiters = append(st.waiters[:i], st.waiters[i+1:]...) + select { + case w.ch <- struct{}{}: + default: + } + progress = true + } + if !progress { + break + } + } +} + +func (s *SessionScheduler) removeWaiterLocked(st *sessionState, runID uint64) { + if st == nil || len(st.waiters) == 0 { + return + } + for i, w := range st.waiters { + if w.id != runID { + continue + } + st.waiters = append(st.waiters[:i], st.waiters[i+1:]...) + return + } +} + +func (s *SessionScheduler) pruneSessionLocked(sessionKey string, st *sessionState) { + if st == nil { + delete(s.sessions, sessionKey) + return + } + if st.running == 0 && len(st.waiters) == 0 { + delete(s.sessions, sessionKey) + } +} + +func normalizeSessionKey(sessionKey string) string { + sessionKey = strings.TrimSpace(sessionKey) + if sessionKey == "" { + return "default" + } + return sessionKey +} diff --git a/pkg/agent/session_scheduler_test.go b/pkg/agent/session_scheduler_test.go new file mode 100644 index 0000000..acb7a22 --- /dev/null +++ b/pkg/agent/session_scheduler_test.go @@ -0,0 +1,103 @@ +package agent + +import ( + "context" + "testing" + "time" +) + +func TestSessionSchedulerConflictSerializes(t *testing.T) { + s := NewSessionScheduler(4) + release1, err := s.Acquire(context.Background(), "sess-a", []string{"file:a.go"}) + if err != nil { + t.Fatalf("acquire first: %v", err) + } + defer release1() + + acquired2 := make(chan struct{}, 1) + done2 := make(chan struct{}, 1) + go func() { + defer func() { done2 <- struct{}{} }() + release2, err := s.Acquire(context.Background(), "sess-a", []string{"file:a.go"}) + if err != nil { + return + } + acquired2 <- struct{}{} + release2() + }() + + select { + case <-acquired2: + t.Fatalf("second conflicting run should wait") + case <-time.After(80 * time.Millisecond): + } + + release1() + + select { + case <-acquired2: + case <-time.After(time.Second): + t.Fatalf("second run should acquire after release") + } + <-done2 +} + +func TestSessionSchedulerNonConflictingCanRunInParallel(t *testing.T) { + s := NewSessionScheduler(4) + release1, err := s.Acquire(context.Background(), "sess-a", []string{"file:a.go"}) + if err != nil { + t.Fatalf("acquire first: %v", err) + } + defer release1() + + acquired2 := make(chan struct{}, 1) + done2 := make(chan struct{}, 1) + go func() { + defer func() { done2 <- struct{}{} }() + release2, err := s.Acquire(context.Background(), "sess-a", []string{"file:b.go"}) + if err != nil { + return + } + acquired2 <- struct{}{} + release2() + }() + + select { + case <-acquired2: + case <-time.After(time.Second): + t.Fatalf("second non-conflicting run should acquire immediately") + } + <-done2 +} + +func TestSessionSchedulerHonorsSessionMaxParallel(t *testing.T) { + s := NewSessionScheduler(1) + release1, err := s.Acquire(context.Background(), "sess-a", []string{"file:a.go"}) + if err != nil { + t.Fatalf("acquire first: %v", err) + } + defer release1() + + acquired2 := make(chan struct{}, 1) + go func() { + release2, err := s.Acquire(context.Background(), "sess-a", []string{"file:b.go"}) + if err != nil { + return + } + acquired2 <- struct{}{} + release2() + }() + + select { + case <-acquired2: + t.Fatalf("second run should wait when max parallel is 1") + case <-time.After(80 * time.Millisecond): + } + + release1() + select { + case <-acquired2: + case <-time.After(time.Second): + t.Fatalf("second run should continue after first release") + } +} diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index 9efcfc2..daac913 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -18,31 +18,32 @@ import ( "clawgo/pkg/bus" "clawgo/pkg/ekg" "clawgo/pkg/lifecycle" + "clawgo/pkg/scheduling" ) type Options struct { - Enabled bool - TickIntervalSec int - MinRunIntervalSec int - MaxPendingDurationSec int - MaxConsecutiveStalls int - MaxDispatchPerTick int - Workspace string - DefaultNotifyChannel string - DefaultNotifyChatID string - NotifyAllowFrom []string - NotifyCooldownSec int - NotifySameReasonCooldownSec int - QuietHours string - UserIdleResumeSec int - WaitingResumeDebounceSec int - IdleRoundBudgetReleaseSec int - MaxRoundsWithoutUser int - TaskHistoryRetentionDays int - AllowedTaskKeywords []string - ImportantKeywords []string - CompletionTemplate string - BlockedTemplate string + Enabled bool + TickIntervalSec int + MinRunIntervalSec int + MaxPendingDurationSec int + MaxConsecutiveStalls int + MaxDispatchPerTick int + Workspace string + DefaultNotifyChannel string + DefaultNotifyChatID string + NotifyAllowFrom []string + NotifyCooldownSec int + NotifySameReasonCooldownSec int + QuietHours string + UserIdleResumeSec int + WaitingResumeDebounceSec int + IdleRoundBudgetReleaseSec int + MaxRoundsWithoutUser int + TaskHistoryRetentionDays int + AllowedTaskKeywords []string + ImportantKeywords []string + CompletionTemplate string + BlockedTemplate string EKGConsecutiveErrorThreshold int } @@ -61,8 +62,8 @@ type taskState struct { DedupeHits int ResourceKeys []string WaitAttempts int - LastPauseReason string - LastPauseAt time.Time + LastPauseReason string + LastPauseAt time.Time } type Engine struct { @@ -71,14 +72,14 @@ type Engine struct { runner *lifecycle.LoopRunner taskStore *TaskStore - mu sync.Mutex - state map[string]*taskState - lastNotify map[string]time.Time - lockOwners map[string]string - roundsWithoutUser int - lastDailyReportDate string + mu sync.Mutex + state map[string]*taskState + lastNotify map[string]time.Time + lockOwners map[string]string + roundsWithoutUser int + lastDailyReportDate string lastHistoryCleanupAt time.Time - ekg *ekg.Engine + ekg *ekg.Engine } func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { @@ -482,86 +483,7 @@ func schedulingScore(st *taskState, now time.Time) int { } func deriveResourceKeys(content string) []string { - raw := content - if raw == "" { - return nil - } - if explicit := parseExplicitResourceKeys(raw); len(explicit) > 0 { - return explicit - } - content = strings.ToLower(raw) - keys := make([]string, 0, 8) - hasRepo := false - for _, token := range strings.Fields(content) { - t := strings.Trim(token, "`'\"()[]{}:;,,。!?") - if strings.Contains(t, "gitea.") || strings.Contains(t, "github.com") || strings.Count(t, "/") >= 1 { - if strings.Contains(t, "github.com/") || strings.Contains(t, "gitea.") { - keys = append(keys, "repo:"+t) - hasRepo = true - } - } - if strings.Contains(t, "/") || strings.HasSuffix(t, ".go") || strings.HasSuffix(t, ".md") || strings.HasSuffix(t, ".json") || strings.HasSuffix(t, ".yaml") || strings.HasSuffix(t, ".yml") { - keys = append(keys, "file:"+t) - } - if t == "main" || strings.HasPrefix(t, "branch:") { - keys = append(keys, "branch:"+strings.TrimPrefix(t, "branch:")) - } - } - if !hasRepo { - keys = append(keys, "repo:default") - } - if len(keys) == 0 { - keys = append(keys, "scope:general") - } - return normalizeResourceKeys(keys) -} - -func parseExplicitResourceKeys(content string) []string { - lower := strings.ToLower(content) - start := strings.Index(lower, "[keys:") - if start < 0 { - return nil - } - rest := content[start+6:] - end := strings.Index(rest, "]") - if end < 0 { - return nil - } - body := rest[:end] - if body == "" { - return nil - } - parts := strings.Split(body, ",") - keys := make([]string, 0, len(parts)) - for _, p := range parts { - k := strings.ToLower(strings.TrimSpace(p)) - if k == "" { - continue - } - if !strings.Contains(k, ":") { - k = "file:" + k - } - keys = append(keys, k) - } - return normalizeResourceKeys(keys) -} - -func normalizeResourceKeys(keys []string) []string { - if len(keys) == 0 { - return nil - } - sort.Strings(keys) - uniq := keys[:0] - for _, k := range keys { - k = strings.TrimSpace(strings.ToLower(k)) - if k == "" { - continue - } - if len(uniq) == 0 || k != uniq[len(uniq)-1] { - uniq = append(uniq, k) - } - } - return append([]string(nil), uniq...) + return scheduling.DeriveResourceKeys(content) } type todoItem struct { @@ -1110,7 +1032,11 @@ func (e *Engine) maybeWriteDailyReportLocked(now time.Time) { defer f.Close() counts := map[string]int{"total": 0, "success": 0, "error": 0, "suppressed": 0, "running": 0} errorReasons := map[string]int{} - type topTask struct { TaskID string; Duration int; Status string } + type topTask struct { + TaskID string + Duration int + Status string + } top := make([]topTask, 0, 32) s := bufio.NewScanner(f) for s.Scan() { @@ -1148,7 +1074,9 @@ func (e *Engine) maybeWriteDailyReportLocked(now time.Time) { case int: dur = v case string: - if n, err := strconv.Atoi(v); err == nil { dur = n } + if n, err := strconv.Atoi(v); err == nil { + dur = n + } } top = append(top, topTask{TaskID: fmt.Sprintf("%v", row["task_id"]), Duration: dur, Status: st}) } @@ -1158,18 +1086,29 @@ func (e *Engine) maybeWriteDailyReportLocked(now time.Time) { } sort.Slice(top, func(i, j int) bool { return top[i].Duration > top[j].Duration }) maxTop := 3 - if len(top) < maxTop { maxTop = len(top) } + if len(top) < maxTop { + maxTop = len(top) + } topLines := make([]string, 0, maxTop) for i := 0; i < maxTop; i++ { - if top[i].TaskID == "" { continue } + if top[i].TaskID == "" { + continue + } topLines = append(topLines, fmt.Sprintf("- %s (%dms, %s)", top[i].TaskID, top[i].Duration, top[i].Status)) } - type kv struct { K string; V int } + type kv struct { + K string + V int + } reasons := make([]kv, 0, len(errorReasons)) - for k, v := range errorReasons { reasons = append(reasons, kv{K:k, V:v}) } + for k, v := range errorReasons { + reasons = append(reasons, kv{K: k, V: v}) + } sort.Slice(reasons, func(i, j int) bool { return reasons[i].V > reasons[j].V }) maxR := 3 - if len(reasons) < maxR { maxR = len(reasons) } + if len(reasons) < maxR { + maxR = len(reasons) + } reasonLines := make([]string, 0, maxR) for i := 0; i < maxR; i++ { reasonLines = append(reasonLines, fmt.Sprintf("- %s (x%d)", reasons[i].K, reasons[i].V)) diff --git a/pkg/autonomy/engine_notify_allowlist_test.go b/pkg/autonomy/engine_notify_allowlist_test.go index f4bee8f..e4fbaea 100644 --- a/pkg/autonomy/engine_notify_allowlist_test.go +++ b/pkg/autonomy/engine_notify_allowlist_test.go @@ -1,15 +1,18 @@ package autonomy -import "testing" +import ( + "testing" + "time" +) func TestShouldNotify_RespectsNotifyAllowFrom(t *testing.T) { e := &Engine{opts: Options{ DefaultNotifyChannel: "telegram", DefaultNotifyChatID: "chat-1", - NotifyAllowFrom: []string{"chat-2", "chat-3"}, + NotifyAllowFrom: []string{"chat-2", "chat-3"}, NotifyCooldownSec: 1, NotifySameReasonCooldownSec: 1, - }} + }, lastNotify: map[string]time.Time{}} if e.shouldNotify("k1", "") { t.Fatalf("expected notify to be blocked when chat not in allowlist") } diff --git a/pkg/config/config.go b/pkg/config/config.go index cd42e4e..5bcb56d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -93,19 +93,23 @@ type HeartbeatConfig struct { } type RuntimeControlConfig struct { - IntentMaxInputChars int `json:"intent_max_input_chars" env:"CLAWGO_INTENT_MAX_INPUT_CHARS"` - AutonomyTickIntervalSec int `json:"autonomy_tick_interval_sec" env:"CLAWGO_AUTONOMY_TICK_INTERVAL_SEC"` - AutonomyMinRunIntervalSec int `json:"autonomy_min_run_interval_sec" env:"CLAWGO_AUTONOMY_MIN_RUN_INTERVAL_SEC"` - AutonomyIdleThresholdSec int `json:"autonomy_idle_threshold_sec" env:"CLAWGO_AUTONOMY_IDLE_THRESHOLD_SEC"` - AutonomyMaxRoundsWithoutUser int `json:"autonomy_max_rounds_without_user" env:"CLAWGO_AUTONOMY_MAX_ROUNDS_WITHOUT_USER"` - AutonomyMaxPendingDurationSec int `json:"autonomy_max_pending_duration_sec" env:"CLAWGO_AUTONOMY_MAX_PENDING_DURATION_SEC"` - AutonomyMaxConsecutiveStalls int `json:"autonomy_max_consecutive_stalls" env:"CLAWGO_AUTONOMY_MAX_STALLS"` - AutoLearnMaxRoundsWithoutUser int `json:"autolearn_max_rounds_without_user" env:"CLAWGO_AUTOLEARN_MAX_ROUNDS_WITHOUT_USER"` - RunStateTTLSeconds int `json:"run_state_ttl_seconds" env:"CLAWGO_RUN_STATE_TTL_SECONDS"` - RunStateMax int `json:"run_state_max" env:"CLAWGO_RUN_STATE_MAX"` - ToolParallelSafeNames []string `json:"tool_parallel_safe_names"` - ToolMaxParallelCalls int `json:"tool_max_parallel_calls"` - SystemSummary SystemSummaryPolicyConfig `json:"system_summary"` + IntentMaxInputChars int `json:"intent_max_input_chars" env:"CLAWGO_INTENT_MAX_INPUT_CHARS"` + AutonomyTickIntervalSec int `json:"autonomy_tick_interval_sec" env:"CLAWGO_AUTONOMY_TICK_INTERVAL_SEC"` + AutonomyMinRunIntervalSec int `json:"autonomy_min_run_interval_sec" env:"CLAWGO_AUTONOMY_MIN_RUN_INTERVAL_SEC"` + AutonomyIdleThresholdSec int `json:"autonomy_idle_threshold_sec" env:"CLAWGO_AUTONOMY_IDLE_THRESHOLD_SEC"` + AutonomyMaxRoundsWithoutUser int `json:"autonomy_max_rounds_without_user" env:"CLAWGO_AUTONOMY_MAX_ROUNDS_WITHOUT_USER"` + AutonomyMaxPendingDurationSec int `json:"autonomy_max_pending_duration_sec" env:"CLAWGO_AUTONOMY_MAX_PENDING_DURATION_SEC"` + AutonomyMaxConsecutiveStalls int `json:"autonomy_max_consecutive_stalls" env:"CLAWGO_AUTONOMY_MAX_STALLS"` + AutoLearnMaxRoundsWithoutUser int `json:"autolearn_max_rounds_without_user" env:"CLAWGO_AUTOLEARN_MAX_ROUNDS_WITHOUT_USER"` + RunStateTTLSeconds int `json:"run_state_ttl_seconds" env:"CLAWGO_RUN_STATE_TTL_SECONDS"` + RunStateMax int `json:"run_state_max" env:"CLAWGO_RUN_STATE_MAX"` + ToolParallelSafeNames []string `json:"tool_parallel_safe_names"` + ToolMaxParallelCalls int `json:"tool_max_parallel_calls"` + SessionResourceSchedulingEnabled bool `json:"session_resource_scheduling_enabled" env:"CLAWGO_SESSION_RESOURCE_SCHEDULING_ENABLED"` + SessionMaxParallelRuns int `json:"session_max_parallel_runs" env:"CLAWGO_SESSION_MAX_PARALLEL_RUNS"` + SessionAutoPlanEnabled bool `json:"session_auto_plan_enabled" env:"CLAWGO_SESSION_AUTO_PLAN_ENABLED"` + SessionAutoPlanMaxTasks int `json:"session_auto_plan_max_tasks" env:"CLAWGO_SESSION_AUTO_PLAN_MAX_TASKS"` + SystemSummary SystemSummaryPolicyConfig `json:"system_summary"` } type SystemSummaryPolicyConfig struct { @@ -419,18 +423,22 @@ func DefaultConfig() *Config { MaxTranscriptChars: 20000, }, RuntimeControl: RuntimeControlConfig{ - IntentMaxInputChars: 1200, - AutonomyTickIntervalSec: 20, - AutonomyMinRunIntervalSec: 20, - AutonomyIdleThresholdSec: 20, - AutonomyMaxRoundsWithoutUser: 120, - AutonomyMaxPendingDurationSec: 180, - AutonomyMaxConsecutiveStalls: 3, - AutoLearnMaxRoundsWithoutUser: 200, - RunStateTTLSeconds: 1800, - RunStateMax: 500, - ToolParallelSafeNames: []string{"read_file", "list_files", "find_files", "grep_files", "memory_search", "web_search", "repo_map", "system_info"}, - ToolMaxParallelCalls: 2, + IntentMaxInputChars: 1200, + AutonomyTickIntervalSec: 20, + AutonomyMinRunIntervalSec: 20, + AutonomyIdleThresholdSec: 20, + AutonomyMaxRoundsWithoutUser: 120, + AutonomyMaxPendingDurationSec: 180, + AutonomyMaxConsecutiveStalls: 3, + AutoLearnMaxRoundsWithoutUser: 200, + RunStateTTLSeconds: 1800, + RunStateMax: 500, + ToolParallelSafeNames: []string{"read_file", "list_files", "find_files", "grep_files", "memory_search", "web_search", "repo_map", "system_info"}, + ToolMaxParallelCalls: 2, + SessionResourceSchedulingEnabled: true, + SessionMaxParallelRuns: 4, + SessionAutoPlanEnabled: true, + SessionAutoPlanMaxTasks: 4, SystemSummary: SystemSummaryPolicyConfig{ Marker: "## System Task Summary", CompletedPrefix: "- Completed:", diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 0825089..b5bbf87 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -52,6 +52,12 @@ func Validate(cfg *Config) []error { if rc.ToolMaxParallelCalls <= 0 { errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.tool_max_parallel_calls must be > 0")) } + if rc.SessionMaxParallelRuns <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.session_max_parallel_runs must be > 0")) + } + if rc.SessionAutoPlanMaxTasks <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.session_auto_plan_max_tasks must be > 0")) + } if strings.TrimSpace(rc.SystemSummary.Marker) == "" { errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.system_summary.marker must be non-empty")) } diff --git a/pkg/scheduling/resource_keys.go b/pkg/scheduling/resource_keys.go new file mode 100644 index 0000000..0bb294f --- /dev/null +++ b/pkg/scheduling/resource_keys.go @@ -0,0 +1,166 @@ +package scheduling + +import ( + "regexp" + "strings" +) + +var ( + reBracketResourceKeys = regexp.MustCompile(`(?i)\[\s*resource[_\s-]*keys?\s*:\s*([^\]]+)\]`) + reBracketKeys = regexp.MustCompile(`(?i)\[\s*keys?\s*:\s*([^\]]+)\]`) + reLineResourceKeys = regexp.MustCompile(`(?i)^\s*resource[_\s-]*keys?\s*[:=]\s*(.+)$`) +) + +// DeriveResourceKeys derives lock keys from content. +// Explicit directives win; otherwise lightweight heuristic extraction is used. +func DeriveResourceKeys(content string) []string { + raw := strings.TrimSpace(content) + if raw == "" { + return nil + } + if explicit := ParseExplicitResourceKeys(raw); len(explicit) > 0 { + return explicit + } + + lower := strings.ToLower(raw) + keys := make([]string, 0, 8) + hasRepo := false + for _, token := range strings.Fields(lower) { + t := strings.Trim(token, "`'\"()[]{}:;,,。!?") + if t == "" { + continue + } + if strings.Contains(t, "gitea.") || strings.Contains(t, "github.com") || strings.Count(t, "/") >= 1 { + if strings.Contains(t, "github.com/") || strings.Contains(t, "gitea.") { + keys = append(keys, "repo:"+t) + hasRepo = true + } + } + if strings.Contains(t, "/") || strings.HasSuffix(t, ".go") || strings.HasSuffix(t, ".md") || strings.HasSuffix(t, ".json") || strings.HasSuffix(t, ".yaml") || strings.HasSuffix(t, ".yml") { + keys = append(keys, "file:"+t) + } + if t == "main" || strings.HasPrefix(t, "branch:") { + keys = append(keys, "branch:"+strings.TrimPrefix(t, "branch:")) + } + } + if !hasRepo { + keys = append(keys, "repo:default") + } + if len(keys) == 0 { + keys = append(keys, "scope:general") + } + return NormalizeResourceKeys(keys) +} + +// ParseExplicitResourceKeys parses directive-style keys from content. +func ParseExplicitResourceKeys(content string) []string { + raw := strings.TrimSpace(content) + if raw == "" { + return nil + } + if m := reBracketResourceKeys.FindStringSubmatch(raw); len(m) == 2 { + return ParseResourceKeyList(m[1]) + } + if m := reBracketKeys.FindStringSubmatch(raw); len(m) == 2 { + return ParseResourceKeyList(m[1]) + } + for _, line := range strings.Split(raw, "\n") { + m := reLineResourceKeys.FindStringSubmatch(strings.TrimSpace(line)) + if len(m) == 2 { + return ParseResourceKeyList(m[1]) + } + } + return nil +} + +// ExtractResourceKeysDirective returns explicit keys and content without directive text. +func ExtractResourceKeysDirective(content string) (keys []string, cleaned string, found bool) { + raw := strings.TrimSpace(content) + if raw == "" { + return nil, "", false + } + if m := reBracketResourceKeys.FindStringSubmatch(raw); len(m) == 2 { + keys = ParseResourceKeyList(m[1]) + if len(keys) == 0 { + return nil, raw, false + } + cleaned = strings.TrimSpace(reBracketResourceKeys.ReplaceAllString(raw, "")) + return keys, cleaned, true + } + if m := reBracketKeys.FindStringSubmatch(raw); len(m) == 2 { + keys = ParseResourceKeyList(m[1]) + if len(keys) == 0 { + return nil, raw, false + } + cleaned = strings.TrimSpace(reBracketKeys.ReplaceAllString(raw, "")) + return keys, cleaned, true + } + lines := strings.Split(raw, "\n") + for i, line := range lines { + m := reLineResourceKeys.FindStringSubmatch(strings.TrimSpace(line)) + if len(m) != 2 { + continue + } + keys = ParseResourceKeyList(m[1]) + if len(keys) == 0 { + break + } + trimmed := make([]string, 0, len(lines)-1) + trimmed = append(trimmed, lines[:i]...) + trimmed = append(trimmed, lines[i+1:]...) + cleaned = strings.TrimSpace(strings.Join(trimmed, "\n")) + return keys, cleaned, true + } + return nil, raw, false +} + +// ParseResourceKeyList parses comma/newline/space separated keys. +func ParseResourceKeyList(raw string) []string { + if strings.TrimSpace(raw) == "" { + return nil + } + replacer := strings.NewReplacer("\n", ",", ";", ",", ";", ",", ",", ",") + normalized := replacer.Replace(raw) + parts := strings.Split(normalized, ",") + keys := make([]string, 0, len(parts)) + for _, p := range parts { + k := strings.TrimSpace(strings.Trim(p, "`'\"")) + if k == "" { + continue + } + if !strings.Contains(k, ":") { + k = "file:" + k + } + keys = append(keys, k) + } + if len(keys) == 0 { + for _, p := range strings.Fields(raw) { + if !strings.Contains(p, ":") { + p = "file:" + p + } + keys = append(keys, p) + } + } + return NormalizeResourceKeys(keys) +} + +// NormalizeResourceKeys lowercases, trims and deduplicates keys. +func NormalizeResourceKeys(keys []string) []string { + if len(keys) == 0 { + return nil + } + out := make([]string, 0, len(keys)) + seen := make(map[string]struct{}, len(keys)) + for _, k := range keys { + n := strings.ToLower(strings.TrimSpace(k)) + if n == "" { + continue + } + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + out = append(out, n) + } + return out +} diff --git a/pkg/scheduling/resource_keys_test.go b/pkg/scheduling/resource_keys_test.go new file mode 100644 index 0000000..1e516b5 --- /dev/null +++ b/pkg/scheduling/resource_keys_test.go @@ -0,0 +1,49 @@ +package scheduling + +import "testing" + +func TestExtractResourceKeysDirective(t *testing.T) { + keys, cleaned, ok := ExtractResourceKeysDirective("[resource_keys: repo:acme/app,file:pkg/a.go]\nplease check") + if !ok { + t.Fatalf("expected directive") + } + if len(keys) != 2 || keys[0] != "repo:acme/app" || keys[1] != "file:pkg/a.go" { + t.Fatalf("unexpected keys: %#v", keys) + } + if cleaned != "please check" { + t.Fatalf("unexpected cleaned content: %q", cleaned) + } +} + +func TestDeriveResourceKeysHeuristic(t *testing.T) { + keys := DeriveResourceKeys("update pkg/agent/loop.go on main") + if len(keys) == 0 { + t.Fatalf("expected non-empty keys") + } + foundFile := false + foundBranch := false + for _, k := range keys { + if k == "file:pkg/agent/loop.go" { + foundFile = true + } + if k == "branch:main" { + foundBranch = true + } + } + if !foundFile { + t.Fatalf("expected file key in %#v", keys) + } + if !foundBranch { + t.Fatalf("expected branch key in %#v", keys) + } +} + +func TestParseResourceKeyListAddsFilePrefix(t *testing.T) { + keys := ParseResourceKeyList("pkg/a.go, repo:acme/app") + if len(keys) != 2 { + t.Fatalf("unexpected len: %#v", keys) + } + if keys[0] != "file:pkg/a.go" && keys[1] != "file:pkg/a.go" { + t.Fatalf("expected file-prefixed key in %#v", keys) + } +} diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index cdc59d3..ee55420 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -374,6 +374,10 @@ const resources = { run_state_max: 'Run State Max', tool_parallel_safe_names: 'Tool Parallel Safe Names', tool_max_parallel_calls: 'Tool Max Parallel Calls', + session_resource_scheduling_enabled: 'Session Resource Scheduling Enabled', + session_max_parallel_runs: 'Session Max Parallel Runs', + session_auto_plan_enabled: 'Session Auto Plan Enabled', + session_auto_plan_max_tasks: 'Session Auto Plan Max Tasks', system_summary: 'System Summary', marker: 'Summary Marker', completed_prefix: 'Completed Prefix', @@ -796,6 +800,10 @@ const resources = { run_state_max: '运行状态上限', tool_parallel_safe_names: '工具并行安全名单', tool_max_parallel_calls: '工具最大并行调用数', + session_resource_scheduling_enabled: '会话资源调度开关', + session_max_parallel_runs: '会话最大并行数', + session_auto_plan_enabled: '会话自动拆解开关', + session_auto_plan_max_tasks: '会话自动拆解最大任务数', system_summary: '系统摘要', marker: '摘要标记', completed_prefix: '完成前缀',