diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 5381266..e27d0ce 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -955,10 +955,6 @@ func buildHeartbeatService(cfg *config.Config, msgBus *bus.MessageBus) *heartbea func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.Engine { a := cfg.Agents.Defaults.Autonomy - maxRoundsWithoutUser := a.MaxRoundsWithoutUser - if maxRoundsWithoutUser == 0 && cfg.Agents.Defaults.RuntimeControl.AutonomyMaxRoundsWithoutUser > 0 { - maxRoundsWithoutUser = cfg.Agents.Defaults.RuntimeControl.AutonomyMaxRoundsWithoutUser - } idleRoundBudgetReleaseSec := a.IdleRoundBudgetReleaseSec if idleRoundBudgetReleaseSec == 0 { idleRoundBudgetReleaseSec = 1800 @@ -990,7 +986,7 @@ func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.E NotifySameReasonCooldownSec: a.NotifySameReasonCooldownSec, QuietHours: a.QuietHours, UserIdleResumeSec: a.UserIdleResumeSec, - MaxRoundsWithoutUser: maxRoundsWithoutUser, + MaxRoundsWithoutUser: a.MaxRoundsWithoutUser, TaskHistoryRetentionDays: a.TaskHistoryRetentionDays, WaitingResumeDebounceSec: a.WaitingResumeDebounceSec, IdleRoundBudgetReleaseSec: idleRoundBudgetReleaseSec, diff --git a/config.example.json b/config.example.json index 77da77a..4705c70 100644 --- a/config.example.json +++ b/config.example.json @@ -17,14 +17,14 @@ "enabled": false, "tick_interval_sec": 30, "min_run_interval_sec": 20, - "max_pending_duration_sec": 180, + "max_pending_duration_sec": 900, "max_consecutive_stalls": 3, - "max_dispatch_per_tick": 2, + "max_dispatch_per_tick": 0, "notify_cooldown_sec": 300, "notify_same_reason_cooldown_sec": 900, "quiet_hours": "23:00-08:00", "user_idle_resume_sec": 20, - "max_rounds_without_user": 12, + "max_rounds_without_user": 0, "task_history_retention_days": 3, "waiting_resume_debounce_sec": 5, "idle_round_budget_release_sec": 1800, @@ -61,8 +61,8 @@ "autonomy_tick_interval_sec": 20, "autonomy_min_run_interval_sec": 20, "autonomy_idle_threshold_sec": 20, - "autonomy_max_rounds_without_user": 120, - "autonomy_max_pending_duration_sec": 180, + "autonomy_max_rounds_without_user": 0, + "autonomy_max_pending_duration_sec": 900, "autonomy_max_consecutive_stalls": 3, "autolearn_max_rounds_without_user": 200, "run_state_ttl_seconds": 1800, diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 51b8408..bcaf5eb 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -734,23 +734,6 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return fmt.Sprintf(tpl, lang), nil } - // Update tool contexts - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(msg.Channel, msg.ChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(msg.Channel, msg.ChatID) - } - } - if tool, ok := al.tools.Get("remind"); ok { - if rt, ok := tool.(*tools.RemindTool); ok { - rt.SetContext(msg.Channel, msg.ChatID) - } - } - history := al.sessions.GetHistory(msg.SessionKey) summary := al.sessions.GetSummary(msg.SessionKey) memoryRecallUsed := false @@ -948,7 +931,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) "iteration": iteration, }) - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) + execArgs := withToolContextArgs(tc.Name, tc.Arguments, msg.Channel, msg.ChatID) + result, err := al.tools.Execute(ctx, tc.Name, execArgs) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -1168,18 +1152,6 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Use the origin session for context sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID) - // Update tool contexts to original channel/chatID - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(originChannel, originChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(originChannel, originChatID) - } - } - // Build messages with the announce content history := al.sessions.GetHistory(sessionKey) summary := al.sessions.GetSummary(sessionKey) @@ -1273,7 +1245,8 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe al.sessions.AddMessageFull(sessionKey, assistantMsg) for _, tc := range response.ToolCalls { - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) + execArgs := withToolContextArgs(tc.Name, tc.Arguments, originChannel, originChatID) + result, err := al.tools.Execute(ctx, tc.Name, execArgs) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -1657,6 +1630,42 @@ func truncateString(s string, maxLen int) string { return s[:maxLen-3] + "..." } +func withToolContextArgs(toolName string, args map[string]interface{}, channel, chatID string) map[string]interface{} { + if channel == "" || chatID == "" { + return args + } + switch toolName { + case "message", "spawn", "remind": + default: + return args + } + + next := make(map[string]interface{}, len(args)+2) + for k, v := range args { + next[k] = v + } + + if toolName == "message" { + if _, ok := next["channel"]; !ok { + next["channel"] = channel + } + if _, hasChat := next["chat_id"]; !hasChat { + if _, hasTo := next["to"]; !hasTo { + next["chat_id"] = chatID + } + } + return next + } + + if _, ok := next["channel"]; !ok { + next["channel"] = channel + } + if _, ok := next["chat_id"]; !ok { + next["chat_id"] = chatID + } + return next +} + func shouldRecallMemory(text string, keywords []string) bool { s := strings.ToLower(strings.TrimSpace(text)) if s == "" { diff --git a/pkg/agent/loop_tool_context_test.go b/pkg/agent/loop_tool_context_test.go new file mode 100644 index 0000000..8fdf859 --- /dev/null +++ b/pkg/agent/loop_tool_context_test.go @@ -0,0 +1,36 @@ +package agent + +import "testing" + +func TestWithToolContextArgsInjectsDefaults(t *testing.T) { + args := map[string]interface{}{"message": "hello"} + got := withToolContextArgs("message", args, "telegram", "chat-1") + if got["channel"] != "telegram" { + t.Fatalf("expected channel injected, got %v", got["channel"]) + } + if got["chat_id"] != "chat-1" { + t.Fatalf("expected chat_id injected, got %v", got["chat_id"]) + } +} + +func TestWithToolContextArgsPreservesExplicitTarget(t *testing.T) { + args := map[string]interface{}{"message": "hello", "to": "target-2"} + got := withToolContextArgs("message", args, "telegram", "chat-1") + if _, ok := got["chat_id"]; ok { + t.Fatalf("chat_id should not be injected when 'to' is provided") + } + if got["to"] != "target-2" { + t.Fatalf("expected to preserved, got %v", got["to"]) + } +} + +func TestWithToolContextArgsSkipsUnrelatedTools(t *testing.T) { + args := map[string]interface{}{"query": "x"} + got := withToolContextArgs("memory_search", args, "telegram", "chat-1") + if len(got) != len(args) { + t.Fatalf("expected unchanged args for unrelated tool") + } + if _, ok := got["channel"]; ok { + t.Fatalf("unexpected channel key for unrelated tool") + } +} diff --git a/pkg/agent/session_planner.go b/pkg/agent/session_planner.go index 74d2cbf..f238436 100644 --- a/pkg/agent/session_planner.go +++ b/pkg/agent/session_planner.go @@ -8,7 +8,6 @@ import ( "os" "path/filepath" "regexp" - "sort" "strings" "sync" @@ -140,11 +139,10 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage res.ErrText = err.Error() } results[index] = res + al.publishPlannedTaskProgress(msg, len(tasks), res) }(i, task) } wg.Wait() - - sort.SliceStable(results, func(i, j int) bool { return results[i].Task.Index < results[j].Task.Index }) var b strings.Builder b.WriteString(fmt.Sprintf("已自动拆解为 %d 个任务并执行:\n\n", len(results))) for _, r := range results { @@ -162,6 +160,35 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage return strings.TrimSpace(b.String()), nil } +func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total int, res plannedTaskResult) { + if al == nil || al.bus == nil || total <= 1 { + return + } + if msg.Channel == "system" || msg.Channel == "internal" { + return + } + idx := res.Task.Index + if idx <= 0 { + idx = res.Index + 1 + } + status := "完成" + body := strings.TrimSpace(res.Output) + if res.ErrText != "" { + status = "失败" + body = strings.TrimSpace(res.ErrText) + } + if body == "" { + body = "(无输出)" + } + body = truncate(strings.ReplaceAll(body, "\n", " "), 280) + content := fmt.Sprintf("进度 %d/%d:任务%d已%s\n%s", idx, total, idx, status, body) + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: msg.Channel, + ChatID: msg.ChatID, + Content: content, + }) +} + func (al *AgentLoop) enrichTaskContentWithMemoryAndEKG(ctx context.Context, task plannedTask) string { base := strings.TrimSpace(task.Content) if base == "" { diff --git a/pkg/agent/session_planner_test.go b/pkg/agent/session_planner_test.go index 11059be..f6d2d36 100644 --- a/pkg/agent/session_planner_test.go +++ b/pkg/agent/session_planner_test.go @@ -4,9 +4,14 @@ import ( "context" "os" "path/filepath" + "strconv" + "strings" + "sync" "testing" + "time" "clawgo/pkg/bus" + "clawgo/pkg/config" "clawgo/pkg/ekg" "clawgo/pkg/providers" ) @@ -54,6 +59,115 @@ func TestProcessPlannedMessage_AggregatesResults(t *testing.T) { } } +type probeProvider struct { + mu sync.Mutex + inFlight int + maxInFlight int + delayPerCall time.Duration + responseCount int +} + +func (p *probeProvider) Chat(_ context.Context, _ []providers.Message, _ []providers.ToolDefinition, _ string, _ map[string]interface{}) (*providers.LLMResponse, error) { + p.mu.Lock() + p.inFlight++ + if p.inFlight > p.maxInFlight { + p.maxInFlight = p.inFlight + } + p.responseCount++ + p.mu.Unlock() + + time.Sleep(p.delayPerCall) + + p.mu.Lock() + n := p.responseCount + p.inFlight-- + p.mu.Unlock() + resp := providers.LLMResponse{Content: "done-" + strconv.Itoa(n), FinishReason: "stop"} + return &resp, nil +} + +func (p *probeProvider) GetDefaultModel() string { return "test-model" } + +func TestRunPlannedTasks_NonConflictingKeysCanRunInParallel(t *testing.T) { + p := &probeProvider{delayPerCall: 100 * time.Millisecond} + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + cfg.Agents.Defaults.MaxToolIterations = 2 + cfg.Agents.Defaults.ContextCompaction.Enabled = false + loop := NewAgentLoop(cfg, bus.NewMessageBus(), p, nil) + + _, err := loop.processPlannedMessage(context.Background(), bus.InboundMessage{ + Channel: "cli", + SenderID: "u", + ChatID: "direct", + SessionKey: "sess-plan-parallel", + Content: "[resource_keys: file:pkg/a.go] 修复 a;[resource_keys: file:pkg/b.go] 修复 b", + }) + if err != nil { + t.Fatalf("processPlannedMessage error: %v", err) + } + if p.maxInFlight < 2 { + t.Fatalf("expected parallel execution for non-conflicting keys, got maxInFlight=%d", p.maxInFlight) + } +} + +func TestRunPlannedTasks_ConflictingKeysMutuallyExclusive(t *testing.T) { + p := &probeProvider{delayPerCall: 100 * time.Millisecond} + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + cfg.Agents.Defaults.MaxToolIterations = 2 + cfg.Agents.Defaults.ContextCompaction.Enabled = false + loop := NewAgentLoop(cfg, bus.NewMessageBus(), p, nil) + + _, err := loop.processPlannedMessage(context.Background(), bus.InboundMessage{ + Channel: "cli", + SenderID: "u", + ChatID: "direct", + SessionKey: "sess-plan-locked", + Content: "[resource_keys: file:pkg/a.go] 修复 a;[resource_keys: file:pkg/a.go] 补测试", + }) + if err != nil { + t.Fatalf("processPlannedMessage error: %v", err) + } + if p.maxInFlight != 1 { + t.Fatalf("expected mutual exclusion for conflicting keys, got maxInFlight=%d", p.maxInFlight) + } +} + +func TestRunPlannedTasks_PublishesStepProgress(t *testing.T) { + rp := &recordingProvider{responses: []providers.LLMResponse{ + {Content: "done-a", FinishReason: "stop"}, + {Content: "done-b", FinishReason: "stop"}, + }} + loop := setupLoop(t, rp) + + _, err := loop.processPlannedMessage(context.Background(), bus.InboundMessage{ + Channel: "cli", + SenderID: "u", + ChatID: "direct", + SessionKey: "sess-plan-progress", + Content: "修复 pkg/a.go;补充 pkg/b.go 测试", + }) + if err != nil { + t.Fatalf("processPlannedMessage error: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + out1, ok := loop.bus.SubscribeOutbound(ctx) + if !ok { + t.Fatalf("expected first progress outbound") + } + out2, ok := loop.bus.SubscribeOutbound(ctx) + if !ok { + t.Fatalf("expected second progress outbound") + } + all := out1.Content + "\n" + out2.Content + if !strings.Contains(all, "进度 1/2") || !strings.Contains(all, "进度 2/2") { + t.Fatalf("unexpected progress outputs:\n%s", all) + } +} + func TestFindRecentRelatedErrorEvent(t *testing.T) { ws := filepath.Join(t.TempDir(), "workspace") _ = os.MkdirAll(filepath.Join(ws, "memory"), 0o755) diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go index daac913..5adfa7a 100644 --- a/pkg/autonomy/engine.go +++ b/pkg/autonomy/engine.go @@ -95,7 +95,8 @@ func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { if opts.MaxConsecutiveStalls <= 0 { opts.MaxConsecutiveStalls = 3 } - if opts.MaxDispatchPerTick <= 0 { + // max_dispatch_per_tick <= 0 means "unlimited dispatch per tick". + if opts.MaxDispatchPerTick < 0 { opts.MaxDispatchPerTick = 2 } if opts.NotifyCooldownSec <= 0 { @@ -296,7 +297,7 @@ func (e *Engine) tick() { dispatched := 0 for _, st := range ordered { - if dispatched >= e.opts.MaxDispatchPerTick { + if e.opts.MaxDispatchPerTick > 0 && dispatched >= e.opts.MaxDispatchPerTick { break } if st.Status == "completed" { @@ -601,9 +602,6 @@ func (e *Engine) dispatchTask(st *taskState) { func (e *Engine) sendCompletionNotification(st *taskState) { e.writeReflectLog("complete", st, "task marked completed") e.writeTriggerAudit("complete", st, "") - if !e.isHighValueCompletion(st) { - return - } if !e.shouldNotify("done:"+st.ID, "") { return } diff --git a/pkg/channels/feishu.go b/pkg/channels/feishu.go index d9f8af0..5339a58 100644 --- a/pkg/channels/feishu.go +++ b/pkg/channels/feishu.go @@ -948,26 +948,26 @@ func extractFeishuMessageContent(message *larkim.EventMessage) (string, []string } switch msgType { - case string(larkim.MsgTypeText): + case larkim.MsgTypeText: var textPayload struct { Text string `json:"text"` } if err := json.Unmarshal([]byte(raw), &textPayload); err == nil { return textPayload.Text, nil } - case string(larkim.MsgTypePost): + case larkim.MsgTypePost: md, media := parseFeishuPostToMarkdown(raw) if md != "" || len(media) > 0 { return md, media } - case string(larkim.MsgTypeImage): + case larkim.MsgTypeImage: var img struct { ImageKey string `json:"image_key"` } if err := json.Unmarshal([]byte(raw), &img); err == nil && img.ImageKey != "" { return "[image]", []string{"feishu:image:" + img.ImageKey} } - case string(larkim.MsgTypeFile): + case larkim.MsgTypeFile: var f struct { FileKey string `json:"file_key"` FileName string `json:"file_name"` diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 0825089..0621fcc 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -30,8 +30,8 @@ func Validate(cfg *Config) []error { if rc.AutonomyIdleThresholdSec < 5 { errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.autonomy_idle_threshold_sec must be >= 5")) } - if rc.AutonomyMaxRoundsWithoutUser <= 0 { - errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.autonomy_max_rounds_without_user must be > 0")) + if rc.AutonomyMaxRoundsWithoutUser < 0 { + errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.autonomy_max_rounds_without_user must be >= 0")) } if rc.AutonomyMaxPendingDurationSec < 10 { errs = append(errs, fmt.Errorf("agents.defaults.runtime_control.autonomy_max_pending_duration_sec must be >= 10")) @@ -96,8 +96,8 @@ func Validate(cfg *Config) []error { if aut.MaxConsecutiveStalls <= 0 { errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_consecutive_stalls must be > 0 when enabled=true")) } - if aut.MaxDispatchPerTick <= 0 { - errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_dispatch_per_tick must be > 0 when enabled=true")) + if aut.MaxDispatchPerTick < 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_dispatch_per_tick must be >= 0 when enabled=true")) } if aut.NotifyCooldownSec <= 0 { errs = append(errs, fmt.Errorf("agents.defaults.autonomy.notify_cooldown_sec must be > 0 when enabled=true")) diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index b0b6dc4..3179480 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -1929,7 +1929,7 @@ func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Req http.Error(w, "unauthorized", http.StatusUnauthorized) return } - if r.Method != http.MethodGet { + if r.Method != http.MethodGet && r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } @@ -2052,11 +2052,10 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl") includeHeartbeat := r.URL.Query().Get("include_heartbeat") == "1" b, err := os.ReadFile(path) - if err != nil { - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": []map[string]interface{}{}, "items": []map[string]interface{}{}}) - return + lines := []string{} + if err == nil { + lines = strings.Split(string(b), "\n") } - lines := strings.Split(string(b), "\n") type agg struct { Last map[string]interface{} Logs []string @@ -2144,6 +2143,104 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req } } + // Merge command watchdog queue from memory/task_queue.json for visibility. + queuePath := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task_queue.json") + if qb, qErr := os.ReadFile(queuePath); qErr == nil { + var q map[string]interface{} + if json.Unmarshal(qb, &q) == nil { + if arr, ok := q["running"].([]interface{}); ok { + for _, item := range arr { + row, ok := item.(map[string]interface{}) + if !ok { + continue + } + id := fmt.Sprintf("%v", row["id"]) + if strings.TrimSpace(id) == "" { + continue + } + label := fmt.Sprintf("%v", row["label"]) + source := strings.TrimSpace(fmt.Sprintf("%v", row["source"])) + if source == "" { + source = "command_watchdog" + } + rec := map[string]interface{}{ + "task_id": "cmd:" + id, + "time": fmt.Sprintf("%v", row["started_at"]), + "status": "running", + "source": "command_watchdog", + "channel": source, + "session": "watchdog:" + id, + "input_preview": label, + "duration_ms": 0, + "attempts": 1, + "retry_count": 0, + "logs": []string{ + fmt.Sprintf("watchdog source=%s heavy=%v", source, row["heavy"]), + fmt.Sprintf("next_check_at=%v stalled_rounds=%v/%v", row["next_check_at"], row["stalled_rounds"], row["stall_round_limit"]), + }, + "idle_run": true, + } + items = append(items, rec) + running = append(running, rec) + } + } + if arr, ok := q["waiting"].([]interface{}); ok { + for _, item := range arr { + row, ok := item.(map[string]interface{}) + if !ok { + continue + } + id := fmt.Sprintf("%v", row["id"]) + if strings.TrimSpace(id) == "" { + continue + } + label := fmt.Sprintf("%v", row["label"]) + source := strings.TrimSpace(fmt.Sprintf("%v", row["source"])) + if source == "" { + source = "command_watchdog" + } + rec := map[string]interface{}{ + "task_id": "cmd:" + id, + "time": fmt.Sprintf("%v", row["enqueued_at"]), + "status": "waiting", + "source": "command_watchdog", + "channel": source, + "session": "watchdog:" + id, + "input_preview": label, + "duration_ms": 0, + "attempts": 1, + "retry_count": 0, + "logs": []string{ + fmt.Sprintf("watchdog source=%s heavy=%v", source, row["heavy"]), + fmt.Sprintf("enqueued_at=%v", row["enqueued_at"]), + }, + "idle_run": true, + } + items = append(items, rec) + } + } + if wd, ok := q["watchdog"].(map[string]interface{}); ok { + items = append(items, map[string]interface{}{ + "task_id": "cmd:watchdog", + "time": fmt.Sprintf("%v", q["time"]), + "status": "running", + "source": "command_watchdog", + "channel": "watchdog", + "session": "watchdog:stats", + "input_preview": "command watchdog capacity snapshot", + "duration_ms": 0, + "attempts": 1, + "retry_count": 0, + "logs": []string{ + fmt.Sprintf("cpu_total=%v usage_ratio=%v reserve_pct=%v", wd["cpu_total"], wd["usage_ratio"], wd["reserve_pct"]), + fmt.Sprintf("active=%v/%v heavy=%v/%v waiting=%v running=%v", wd["active"], wd["max_active"], wd["active_heavy"], wd["max_heavy"], wd["waiting"], wd["running"]), + }, + "idle_run": true, + }) + } + } + } + sort.Slice(items, func(i, j int) bool { return fmt.Sprintf("%v", items[i]["time"]) > fmt.Sprintf("%v", items[j]["time"]) }) stats := map[string]int{"total": len(items), "running": len(running), "idle_round_budget": 0, "active_user": 0, "manual_pause": 0} for _, it := range items { diff --git a/pkg/tools/command_tick.go b/pkg/tools/command_tick.go new file mode 100644 index 0000000..13a8183 --- /dev/null +++ b/pkg/tools/command_tick.go @@ -0,0 +1,797 @@ +package tools + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +const ( + minCommandTick = 1 * time.Second + maxCommandTick = 45 * time.Second + watchdogTick = 1 * time.Second + minWorldCycle = 10 * time.Second + maxWorldCycle = 60 * time.Second +) + +var ErrCommandNoProgress = errors.New("command no progress across tick rounds") + +type commandRuntimePolicy struct { + BaseTick time.Duration + StallRoundLimit int + MaxRestarts int + Difficulty int +} + +func buildCommandRuntimePolicy(command string, baseTick time.Duration) commandRuntimePolicy { + diff := commandDifficulty(command) + cpu := runtime.NumCPU() + + // Baseline: kill/restart after 5 unchanged-progress ticks. + stallLimit := 5 + // Difficulty adjustment (1..4) => +0..6 rounds. + stallLimit += (diff - 1) * 2 + // Hardware adjustment: weaker CPU gets more patience. + switch { + case cpu <= 4: + stallLimit += 5 + case cpu <= 8: + stallLimit += 3 + case cpu <= 16: + stallLimit += 1 + } + if stallLimit < 5 { + stallLimit = 5 + } + if stallLimit > 24 { + stallLimit = 24 + } + + // Restart budget: heavier tasks and weaker CPUs allow extra retries. + restarts := 1 + if diff >= 3 { + restarts++ + } + if cpu <= 4 { + restarts++ + } + if restarts > 3 { + restarts = 3 + } + + return commandRuntimePolicy{ + BaseTick: normalizeCommandTick(baseTick), + StallRoundLimit: stallLimit, + MaxRestarts: restarts, + Difficulty: diff, + } +} + +type commandWatchdog struct { + mu sync.Mutex + watches map[uint64]*watchedCommand + waiters []*watchWaiter + nextID uint64 + cpuTotal int + baseActive int + baseHeavy int + reservePct float64 + usageRatio float64 + lastSample time.Time + worldCycle time.Duration + nextSampleAt time.Time + active int + activeHeavy int + queueLimit int + queuePath string +} + +type watchedCommand struct { + id uint64 + cmd *exec.Cmd + startedAt time.Time + baseTick time.Duration + stallRoundLimit int + nextCheckAt time.Time + lastProgress int + stalledRounds int + progressFn func() int + stallNotify chan int + heavy bool + source string + label string +} + +type stalledCommand struct { + cmd *exec.Cmd + rounds int + notify chan int +} + +type watchWaiter struct { + id uint64 + heavy bool + ready chan struct{} + source string + label string + enqueuedAt time.Time +} + +var globalCommandWatchdog = newCommandWatchdog() +var reLoadAverage = regexp.MustCompile(`load averages?:\s*([0-9]+(?:[.,][0-9]+)?)`) + +func newCommandWatchdog() *commandWatchdog { + cpu := runtime.NumCPU() + baseActive, baseHeavy, queueLimit := deriveWatchdogLimits(cpu) + wd := &commandWatchdog{ + watches: make(map[uint64]*watchedCommand), + waiters: make([]*watchWaiter, 0, queueLimit), + cpuTotal: cpu, + baseActive: baseActive, + baseHeavy: baseHeavy, + reservePct: 0.20, + usageRatio: 0, + worldCycle: 20 * time.Second, + queueLimit: queueLimit, + } + go wd.loop() + return wd +} + +func deriveWatchdogLimits(cpu int) (maxActive, maxHeavy, queueLimit int) { + if cpu <= 0 { + cpu = 2 + } + maxActive = cpu + if maxActive < 2 { + maxActive = 2 + } + if maxActive > 12 { + maxActive = 12 + } + maxHeavy = cpu/4 + 1 + if maxHeavy < 1 { + maxHeavy = 1 + } + if maxHeavy > 4 { + maxHeavy = 4 + } + queueLimit = maxActive * 8 + if queueLimit < 16 { + queueLimit = 16 + } + return +} + +func (wd *commandWatchdog) loop() { + ticker := time.NewTicker(watchdogTick) + defer ticker.Stop() + for now := range ticker.C { + wd.refreshSystemUsage(now) + wd.tick(now) + } +} + +func (wd *commandWatchdog) refreshSystemUsage(now time.Time) { + if wd == nil { + return + } + wd.mu.Lock() + if wd.nextSampleAt.IsZero() { + wd.nextSampleAt = now + } + if now.Before(wd.nextSampleAt) { + wd.mu.Unlock() + return + } + wd.lastSample = now + cpu := wd.cpuTotal + cycle := wd.computeWorldCycleLocked() + wd.worldCycle = cycle + wd.nextSampleAt = now.Add(cycle) + wd.mu.Unlock() + + usage := sampleSystemUsageRatio(cpu) + + wd.mu.Lock() + wd.usageRatio = usage + wd.mu.Unlock() +} + +func (wd *commandWatchdog) computeWorldCycleLocked() time.Duration { + if wd == nil { + return 20 * time.Second + } + // Game-world style cycle: + // base=20s; busier world => shorter cycle; idle world => longer cycle. + cycle := 20 * time.Second + pending := len(wd.waiters) + if pending > 0 { + cycle -= time.Duration(minInt(pending, 8)) * time.Second + } + if wd.active > wd.baseActive/2 { + cycle -= 3 * time.Second + } + if wd.active == 0 && pending == 0 { + cycle += 10 * time.Second + } + if cycle < minWorldCycle { + cycle = minWorldCycle + } + if cycle > maxWorldCycle { + cycle = maxWorldCycle + } + return cycle +} + +func (wd *commandWatchdog) register(cmd *exec.Cmd, baseTick time.Duration, stallRoundLimit int, progressFn func() int, stallNotify chan int, heavy bool, source, label string) func() { + if wd == nil || cmd == nil { + return func() {} + } + base := normalizeCommandTick(baseTick) + id := atomic.AddUint64(&wd.nextID, 1) + w := &watchedCommand{ + id: id, + cmd: cmd, + startedAt: time.Now(), + baseTick: base, + stallRoundLimit: stallRoundLimit, + nextCheckAt: time.Now().Add(base), + lastProgress: safeProgress(progressFn), + progressFn: progressFn, + stallNotify: stallNotify, + heavy: heavy, + source: strings.TrimSpace(source), + label: strings.TrimSpace(label), + } + + wd.mu.Lock() + wd.watches[id] = w + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + wd.writeQueueSnapshot(snap) + + var once sync.Once + return func() { + once.Do(func() { + wd.mu.Lock() + delete(wd.watches, id) + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + wd.writeQueueSnapshot(snap) + }) + } +} + +func (wd *commandWatchdog) setQueuePath(path string) { + if wd == nil { + return + } + path = strings.TrimSpace(path) + if path != "" { + path = filepath.Clean(path) + } + wd.mu.Lock() + changed := wd.queuePath != path + wd.queuePath = path + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + if changed { + wd.writeQueueSnapshot(snap) + } +} + +func (wd *commandWatchdog) acquireSlot(ctx context.Context, heavy bool, source, label string) (func(), error) { + if wd == nil { + return func() {}, nil + } + if ctx == nil { + ctx = context.Background() + } + wd.mu.Lock() + if wd.canAcquireSlotLocked(heavy) { + wd.grantSlotLocked(heavy) + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + wd.writeQueueSnapshot(snap) + return wd.releaseSlotFunc(heavy), nil + } + // Queue when slots are full; wait until a slot is available or context cancels. + waitID := atomic.AddUint64(&wd.nextID, 1) + w := &watchWaiter{ + id: waitID, + heavy: heavy, + ready: make(chan struct{}, 1), + source: strings.TrimSpace(source), + label: strings.TrimSpace(label), + enqueuedAt: time.Now(), + } + wd.waiters = append(wd.waiters, w) + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + wd.writeQueueSnapshot(snap) + + for { + select { + case <-ctx.Done(): + wd.mu.Lock() + wd.removeWaiterLocked(waitID) + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + wd.writeQueueSnapshot(snap) + return nil, ctx.Err() + case <-w.ready: + return wd.releaseSlotFunc(heavy), nil + } + } +} + +func (wd *commandWatchdog) releaseSlotFunc(heavy bool) func() { + var once sync.Once + return func() { + once.Do(func() { + wd.mu.Lock() + if wd.active > 0 { + wd.active-- + } + if heavy && wd.activeHeavy > 0 { + wd.activeHeavy-- + } + wd.scheduleWaitersLocked() + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + wd.writeQueueSnapshot(snap) + }) + } +} + +func (wd *commandWatchdog) canAcquireSlotLocked(heavy bool) bool { + maxActive, maxHeavy := wd.dynamicLimitsLocked() + if wd.active >= maxActive { + return false + } + if heavy && wd.activeHeavy >= maxHeavy { + return false + } + return true +} + +func (wd *commandWatchdog) grantSlotLocked(heavy bool) { + wd.active++ + if heavy { + wd.activeHeavy++ + } +} + +func (wd *commandWatchdog) dynamicLimitsLocked() (maxActive, maxHeavy int) { + if wd == nil { + return 1, 1 + } + maxActive = computeDynamicActiveSlots(wd.cpuTotal, wd.reservePct, wd.usageRatio, wd.baseActive) + maxHeavy = computeDynamicHeavySlots(maxActive, wd.baseHeavy) + return +} + +func computeDynamicActiveSlots(cpu int, reservePct, usageRatio float64, baseActive int) int { + if cpu <= 0 { + cpu = 1 + } + if reservePct <= 0 { + reservePct = 0.20 + } + if reservePct > 0.90 { + reservePct = 0.90 + } + if usageRatio < 0 { + usageRatio = 0 + } + if usageRatio > 0.95 { + usageRatio = 0.95 + } + headroom := 1.0 - reservePct - usageRatio + if headroom < 0 { + headroom = 0 + } + maxActive := int(float64(cpu) * headroom) + if maxActive < 1 { + maxActive = 1 + } + if baseActive > 0 && maxActive > baseActive { + maxActive = baseActive + } + return maxActive +} + +func computeDynamicHeavySlots(maxActive, baseHeavy int) int { + if maxActive <= 0 { + return 1 + } + maxHeavy := maxActive/2 + 1 + if maxHeavy < 1 { + maxHeavy = 1 + } + if baseHeavy > 0 && maxHeavy > baseHeavy { + maxHeavy = baseHeavy + } + if maxHeavy > maxActive { + maxHeavy = maxActive + } + return maxHeavy +} + +func (wd *commandWatchdog) scheduleWaitersLocked() { + if len(wd.waiters) == 0 { + return + } + for { + progress := false + for i := 0; i < len(wd.waiters); { + w := wd.waiters[i] + if w == nil { + wd.waiters = append(wd.waiters[:i], wd.waiters[i+1:]...) + progress = true + continue + } + if !wd.canAcquireSlotLocked(w.heavy) { + i++ + continue + } + wd.grantSlotLocked(w.heavy) + wd.waiters = append(wd.waiters[:i], wd.waiters[i+1:]...) + select { + case w.ready <- struct{}{}: + default: + } + progress = true + } + if !progress { + break + } + } +} + +func (wd *commandWatchdog) removeWaiterLocked(id uint64) { + if id == 0 || len(wd.waiters) == 0 { + return + } + for i, w := range wd.waiters { + if w == nil || w.id != id { + continue + } + wd.waiters = append(wd.waiters[:i], wd.waiters[i+1:]...) + return + } +} + +func (wd *commandWatchdog) tick(now time.Time) { + if wd == nil { + return + } + toStall := make([]stalledCommand, 0, 4) + changed := false + + wd.mu.Lock() + for id, w := range wd.watches { + if w == nil { + delete(wd.watches, id) + changed = true + continue + } + if now.Before(w.nextCheckAt) { + continue + } + cur := safeProgress(w.progressFn) + if cur > w.lastProgress { + w.lastProgress = cur + w.stalledRounds = 0 + } else { + w.stalledRounds++ + changed = true + if w.stallRoundLimit > 0 && w.stalledRounds >= w.stallRoundLimit { + delete(wd.watches, id) + changed = true + toStall = append(toStall, stalledCommand{ + cmd: w.cmd, + rounds: w.stalledRounds, + notify: w.stallNotify, + }) + continue + } + } + next := nextCommandTick(w.baseTick, now.Sub(w.startedAt)) + w.nextCheckAt = now.Add(next) + changed = true + } + snap := wd.buildQueueSnapshotLocked() + wd.mu.Unlock() + + if changed { + wd.writeQueueSnapshot(snap) + } + + for _, st := range toStall { + if st.cmd != nil && st.cmd.Process != nil { + _ = st.cmd.Process.Kill() + } + if st.notify != nil { + select { + case st.notify <- st.rounds: + default: + } + } + } +} + +func safeProgress(progressFn func() int) (progress int) { + if progressFn == nil { + return 0 + } + defer func() { + if recover() != nil { + progress = 0 + } + }() + progress = progressFn() + if progress < 0 { + return 0 + } + return progress +} + +func runCommandWithDynamicTick(ctx context.Context, cmd *exec.Cmd, source, label string, difficulty int, baseTick time.Duration, stallRoundLimit int, progressFn func() int) error { + base := normalizeCommandTick(baseTick) + heavy := difficulty >= 3 + releaseSlot, err := globalCommandWatchdog.acquireSlot(ctx, heavy, source, label) + if err != nil { + return err + } + defer releaseSlot() + if err := cmd.Start(); err != nil { + return err + } + + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + stallNotify := make(chan int, 1) + unwatch := globalCommandWatchdog.register(cmd, base, stallRoundLimit, progressFn, stallNotify, heavy, source, label) + defer unwatch() + + for { + select { + case err := <-done: + return err + case stalledRounds := <-stallNotify: + select { + case err := <-done: + return fmt.Errorf("%w: %d ticks without progress (%v)", ErrCommandNoProgress, stalledRounds, err) + case <-time.After(2 * time.Second): + return fmt.Errorf("%w: %d ticks without progress", ErrCommandNoProgress, stalledRounds) + } + case <-ctx.Done(): + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + select { + case err := <-done: + if err != nil { + return err + } + case <-time.After(2 * time.Second): + } + return ctx.Err() + } + } +} + +func (wd *commandWatchdog) buildQueueSnapshotLocked() map[string]interface{} { + if wd == nil { + return nil + } + maxActive, maxHeavy := wd.dynamicLimitsLocked() + running := make([]map[string]interface{}, 0, len(wd.watches)) + for _, w := range wd.watches { + if w == nil { + continue + } + running = append(running, map[string]interface{}{ + "id": w.id, + "source": queueNonEmpty(w.source, "exec"), + "label": w.label, + "heavy": w.heavy, + "status": "running", + "started_at": w.startedAt.UTC().Format(time.RFC3339), + "next_check_at": w.nextCheckAt.UTC().Format(time.RFC3339), + "stalled_rounds": w.stalledRounds, + "stall_round_limit": w.stallRoundLimit, + "last_progress": w.lastProgress, + }) + } + waiting := make([]map[string]interface{}, 0, len(wd.waiters)) + for _, w := range wd.waiters { + if w == nil { + continue + } + waiting = append(waiting, map[string]interface{}{ + "id": w.id, + "source": queueNonEmpty(w.source, "exec"), + "label": w.label, + "heavy": w.heavy, + "status": "waiting", + "enqueued_at": w.enqueuedAt.UTC().Format(time.RFC3339), + }) + } + return map[string]interface{}{ + "time": time.Now().UTC().Format(time.RFC3339), + "watchdog": map[string]interface{}{ + "cpu_total": wd.cpuTotal, + "reserve_pct": wd.reservePct, + "usage_ratio": wd.usageRatio, + "world_cycle_sec": int(wd.worldCycle.Seconds()), + "next_sample_at": func() string { + if wd.nextSampleAt.IsZero() { + return "" + } + return wd.nextSampleAt.UTC().Format(time.RFC3339) + }(), + "max_active": maxActive, + "max_heavy": maxHeavy, + "active": wd.active, + "active_heavy": wd.activeHeavy, + "waiting": len(waiting), + "running": len(running), + }, + "running": running, + "waiting": waiting, + } +} + +func (wd *commandWatchdog) writeQueueSnapshot(snap map[string]interface{}) { + if wd == nil || snap == nil { + return + } + wd.mu.Lock() + path := strings.TrimSpace(wd.queuePath) + wd.mu.Unlock() + if path == "" { + return + } + raw, err := json.MarshalIndent(snap, "", " ") + if err != nil { + return + } + _ = os.MkdirAll(filepath.Dir(path), 0755) + _ = os.WriteFile(path, raw, 0644) +} + +func queueNonEmpty(v, fallback string) string { + v = strings.TrimSpace(v) + if v == "" { + return fallback + } + return v +} + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} + +func nextCommandTick(baseTick, elapsed time.Duration) time.Duration { + base := normalizeCommandTick(baseTick) + if elapsed < 0 { + elapsed = 0 + } + next := base + elapsed/8 + if next > maxCommandTick { + return maxCommandTick + } + if next < base { + return base + } + return next +} + +func normalizeCommandTick(baseTick time.Duration) time.Duration { + if baseTick < minCommandTick { + return minCommandTick + } + if baseTick > maxCommandTick { + return maxCommandTick + } + return baseTick +} + +func commandDifficulty(command string) int { + cmd := strings.ToLower(strings.TrimSpace(command)) + if cmd == "" { + return 1 + } + // 4: very heavy build / container graph. + for _, p := range []string{"docker build", "docker compose build", "bazel build", "gradle build", "mvn package"} { + if strings.Contains(cmd, p) { + return 4 + } + } + // 3: compile/test/install heavy workloads. + for _, p := range []string{"go test", "go build", "cargo build", "npm install", "npm ci", "pnpm install", "yarn install", "npm run build", "pnpm build", "yarn build"} { + if strings.Contains(cmd, p) { + return 3 + } + } + // 2: medium multi-step shell chains. + if strings.Contains(cmd, "&&") || strings.Contains(cmd, "|") { + return 2 + } + return 1 +} + +func sampleSystemUsageRatio(cpu int) float64 { + if cpu <= 0 { + cpu = 1 + } + load1, ok := readLoadAverage1() + if !ok { + return 0 + } + ratio := load1 / float64(cpu) + if ratio < 0 { + return 0 + } + if ratio > 0.95 { + return 0.95 + } + return ratio +} + +func readLoadAverage1() (float64, bool) { + // Linux fast path. + if b, err := os.ReadFile("/proc/loadavg"); err == nil { + fields := strings.Fields(strings.TrimSpace(string(b))) + if len(fields) > 0 { + if v, err := strconv.ParseFloat(fields[0], 64); err == nil && v >= 0 { + return v, true + } + } + } + + // macOS/BSD fallback. + if out, err := runCommandOutputWithTimeout(300*time.Millisecond, "sysctl", "-n", "vm.loadavg"); err == nil { + fields := strings.Fields(strings.Trim(strings.TrimSpace(string(out)), "{}")) + if len(fields) > 0 { + if v, err := strconv.ParseFloat(strings.ReplaceAll(fields[0], ",", "."), 64); err == nil && v >= 0 { + return v, true + } + } + } + if out, err := runCommandOutputWithTimeout(300*time.Millisecond, "uptime"); err == nil { + m := reLoadAverage.FindStringSubmatch(strings.ToLower(string(out))) + if len(m) >= 2 { + if v, err := strconv.ParseFloat(strings.ReplaceAll(m[1], ",", "."), 64); err == nil && v >= 0 { + return v, true + } + } + } + return 0, false +} + +func runCommandOutputWithTimeout(timeout time.Duration, name string, args ...string) ([]byte, error) { + if timeout <= 0 { + timeout = 300 * time.Millisecond + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return exec.CommandContext(ctx, name, args...).Output() +} diff --git a/pkg/tools/command_tick_limits_test.go b/pkg/tools/command_tick_limits_test.go new file mode 100644 index 0000000..9bdb02a --- /dev/null +++ b/pkg/tools/command_tick_limits_test.go @@ -0,0 +1,24 @@ +package tools + +import "testing" + +func TestComputeDynamicActiveSlots_ReservesTwentyPercent(t *testing.T) { + got := computeDynamicActiveSlots(10, 0.20, 0.0, 12) + if got != 8 { + t.Fatalf("expected 8 active slots with 20%% reserve on 10 CPU, got %d", got) + } +} + +func TestComputeDynamicActiveSlots_ReducesWithSystemUsage(t *testing.T) { + got := computeDynamicActiveSlots(10, 0.20, 0.5, 12) + if got != 3 { + t.Fatalf("expected 3 active slots when system usage is 50%%, got %d", got) + } +} + +func TestComputeDynamicActiveSlots_AlwaysKeepsOne(t *testing.T) { + got := computeDynamicActiveSlots(8, 0.20, 0.95, 12) + if got != 1 { + t.Fatalf("expected at least 1 active slot under high system usage, got %d", got) + } +} diff --git a/pkg/tools/message.go b/pkg/tools/message.go index bc154c2..13faaf9 100644 --- a/pkg/tools/message.go +++ b/pkg/tools/message.go @@ -12,6 +12,7 @@ import ( type SendCallback func(channel, chatID, action, content, media, messageID, emoji string, buttons [][]bus.Button) error type MessageTool struct { + mu sync.RWMutex sendCallback SendCallback defaultChannel string defaultChatID string @@ -104,11 +105,15 @@ func (t *MessageTool) Parameters() map[string]interface{} { } func (t *MessageTool) SetContext(channel, chatID string) { + t.mu.Lock() + defer t.mu.Unlock() t.defaultChannel = channel t.defaultChatID = chatID } func (t *MessageTool) SetSendCallback(callback SendCallback) { + t.mu.Lock() + defer t.mu.Unlock() t.sendCallback = callback } @@ -168,18 +173,24 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) chatID = to } + t.mu.RLock() + defaultChannel := t.defaultChannel + defaultChatID := t.defaultChatID + sendCallback := t.sendCallback + t.mu.RUnlock() + if channel == "" { - channel = t.defaultChannel + channel = defaultChannel } if chatID == "" { - chatID = t.defaultChatID + chatID = defaultChatID } if channel == "" || chatID == "" { return "Error: No target channel/chat specified", nil } - if t.sendCallback == nil { + if sendCallback == nil { return "Error: Message sending not configured", nil } @@ -207,7 +218,7 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) } } - if err := t.sendCallback(channel, chatID, action, content, media, messageID, emoji, buttons); err != nil { + if err := sendCallback(channel, chatID, action, content, media, messageID, emoji, buttons); err != nil { return fmt.Sprintf("Error sending message: %v", err), nil } diff --git a/pkg/tools/remind.go b/pkg/tools/remind.go index e83057d..fd8fde9 100644 --- a/pkg/tools/remind.go +++ b/pkg/tools/remind.go @@ -3,12 +3,14 @@ package tools import ( "context" "fmt" + "sync" "time" "clawgo/pkg/cron" ) type RemindTool struct { + mu sync.RWMutex cs *cron.CronService defaultChannel string defaultChatID string @@ -19,6 +21,8 @@ func NewRemindTool(cs *cron.CronService) *RemindTool { } func (t *RemindTool) SetContext(channel, chatID string) { + t.mu.Lock() + defer t.mu.Unlock() t.defaultChannel = channel t.defaultChatID = chatID } @@ -43,6 +47,14 @@ func (t *RemindTool) Parameters() map[string]interface{} { "type": "string", "description": "When to remind (e.g., '10m', '1h', '2026-02-12 10:00')", }, + "channel": map[string]interface{}{ + "type": "string", + "description": "Optional destination channel override", + }, + "chat_id": map[string]interface{}{ + "type": "string", + "description": "Optional destination chat ID override", + }, }, "required": []string{"message", "time_expr"}, } @@ -63,6 +75,21 @@ func (t *RemindTool) Execute(ctx context.Context, args map[string]interface{}) ( return "", fmt.Errorf("time_expr is required") } + channel, _ := args["channel"].(string) + chatID, _ := args["chat_id"].(string) + if channel == "" || chatID == "" { + t.mu.RLock() + defaultChannel := t.defaultChannel + defaultChatID := t.defaultChatID + t.mu.RUnlock() + if channel == "" { + channel = defaultChannel + } + if chatID == "" { + chatID = defaultChatID + } + } + // Try duration first (e.g., "10m", "1h30m") if d, err := time.ParseDuration(timeExpr); err == nil { at := time.Now().Add(d).UnixMilli() @@ -70,7 +97,7 @@ func (t *RemindTool) Execute(ctx context.Context, args map[string]interface{}) ( Kind: "at", AtMS: &at, } - job, err := t.cs.AddJob("Reminder", schedule, message, true, t.defaultChannel, t.defaultChatID) + job, err := t.cs.AddJob("Reminder", schedule, message, true, channel, chatID) if err != nil { return "", fmt.Errorf("failed to schedule reminder: %w", err) } @@ -120,7 +147,7 @@ func (t *RemindTool) Execute(ctx context.Context, args map[string]interface{}) ( AtMS: &at, } - job, err := t.cs.AddJob("Reminder", schedule, message, true, t.defaultChannel, t.defaultChatID) + job, err := t.cs.AddJob("Reminder", schedule, message, true, channel, chatID) if err != nil { return "", fmt.Errorf("failed to schedule reminder: %w", err) } diff --git a/pkg/tools/shell.go b/pkg/tools/shell.go index 41bb3e6..4be5617 100644 --- a/pkg/tools/shell.go +++ b/pkg/tools/shell.go @@ -1,8 +1,8 @@ package tools import ( - "bytes" "context" + "errors" "fmt" "os" "os/exec" @@ -16,12 +16,12 @@ import ( ) type ExecTool struct { - workingDir string - timeout time.Duration - sandboxEnabled bool - sandboxImage string + workingDir string + timeout time.Duration + sandboxEnabled bool + sandboxImage string autoInstallMissing bool - procManager *ProcessManager + procManager *ProcessManager } func NewExecTool(cfg config.ShellConfig, workspace string, pm *ProcessManager) *ExecTool { @@ -81,6 +81,11 @@ func (t *ExecTool) Execute(ctx context.Context, args map[string]interface{}) (st cwd = wd } } + queueBase := strings.TrimSpace(t.workingDir) + if queueBase == "" { + queueBase = cwd + } + globalCommandWatchdog.setQueuePath(resolveCommandQueuePath(queueBase)) if bg, _ := args["background"].(bool); bg { if t.procManager == nil { @@ -112,26 +117,38 @@ func (t *ExecTool) executeInSandbox(ctx context.Context, command, cwd string) (s t.sandboxImage, "sh", "-c", command, } - - cmdCtx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - - cmd := exec.CommandContext(cmdCtx, "docker", dockerArgs...) - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - err := cmd.Run() - output := stdout.String() - if stderr.Len() > 0 { - output += "\nSTDERR:\n" + stderr.String() + policy := buildCommandRuntimePolicy(command, t.commandTickBase(command)) + var merged strings.Builder + for attempt := 0; attempt <= policy.MaxRestarts; attempt++ { + cmd := exec.CommandContext(ctx, "docker", dockerArgs...) + var stdout, stderr trackedOutput + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := runCommandWithDynamicTick(ctx, cmd, "exec:sandbox", command, policy.Difficulty, policy.BaseTick, policy.StallRoundLimit, func() int { + return stdout.Len() + stderr.Len() + }) + out := stdout.String() + if stderr.Len() > 0 { + out += "\nSTDERR:\n" + stderr.String() + } + if strings.TrimSpace(out) != "" { + if merged.Len() > 0 { + merged.WriteString("\n") + } + merged.WriteString(out) + } + if err == nil { + return merged.String(), nil + } + if errors.Is(err, ErrCommandNoProgress) && ctx.Err() == nil && attempt < policy.MaxRestarts { + merged.WriteString(fmt.Sprintf("\n[RESTART] no progress for %d ticks, restarting (%d/%d)\n", + policy.StallRoundLimit, attempt+1, policy.MaxRestarts)) + continue + } + merged.WriteString(fmt.Sprintf("\nSandbox Exit code: %v", err)) + return merged.String(), nil } - - if err != nil { - output += fmt.Sprintf("\nSandbox Exit code: %v", err) - } - - return output, nil + return merged.String(), nil } func (t *ExecTool) SetTimeout(timeout time.Duration) { @@ -139,19 +156,13 @@ func (t *ExecTool) SetTimeout(timeout time.Duration) { } func (t *ExecTool) executeCommand(ctx context.Context, command, cwd string) (string, error) { - output, err, timedOut := t.runShellCommand(ctx, command, cwd) - if timedOut { - return fmt.Sprintf("Error: Command timed out after %v", t.timeout), nil - } + output, err := t.runShellCommand(ctx, command, cwd) if err != nil && t.autoInstallMissing { if missingCmd := detectMissingCommandFromOutput(output); missingCmd != "" { if installLog, installed := t.tryAutoInstallMissingCommand(ctx, missingCmd, cwd); installed { output += "\n[AUTO-INSTALL]\n" + installLog - retryOutput, retryErr, retryTimedOut := t.runShellCommand(ctx, command, cwd) - if retryTimedOut { - return fmt.Sprintf("Error: Command timed out after %v", t.timeout), nil - } + retryOutput, retryErr := t.runShellCommand(ctx, command, cwd) output += "\n[RETRY]\n" + retryOutput err = retryErr } @@ -173,32 +184,44 @@ func (t *ExecTool) executeCommand(ctx context.Context, command, cwd string) (str return output, nil } -func (t *ExecTool) runShellCommand(ctx context.Context, command, cwd string) (string, error, bool) { - cmdCtx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - - cmd := exec.CommandContext(cmdCtx, "sh", "-c", command) - cmd.Env = buildExecEnv() - if cwd != "" { - cmd.Dir = cwd - } - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - err := cmd.Run() - output := stdout.String() - if stderr.Len() > 0 { - output += "\nSTDERR:\n" + stderr.String() - } - - if err != nil { - if cmdCtx.Err() == context.DeadlineExceeded { - return output, err, true +func (t *ExecTool) runShellCommand(ctx context.Context, command, cwd string) (string, error) { + policy := buildCommandRuntimePolicy(command, t.commandTickBase(command)) + var merged strings.Builder + for attempt := 0; attempt <= policy.MaxRestarts; attempt++ { + cmd := exec.CommandContext(ctx, "sh", "-c", command) + cmd.Env = buildExecEnv() + if cwd != "" { + cmd.Dir = cwd } + + var stdout, stderr trackedOutput + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := runCommandWithDynamicTick(ctx, cmd, "exec", command, policy.Difficulty, policy.BaseTick, policy.StallRoundLimit, func() int { + return stdout.Len() + stderr.Len() + }) + out := stdout.String() + if stderr.Len() > 0 { + out += "\nSTDERR:\n" + stderr.String() + } + if strings.TrimSpace(out) != "" { + if merged.Len() > 0 { + merged.WriteString("\n") + } + merged.WriteString(out) + } + if err == nil { + return merged.String(), nil + } + if errors.Is(err, ErrCommandNoProgress) && ctx.Err() == nil && attempt < policy.MaxRestarts { + merged.WriteString(fmt.Sprintf("\n[RESTART] no progress for %d ticks, restarting (%d/%d)\n", + policy.StallRoundLimit, attempt+1, policy.MaxRestarts)) + continue + } + return merged.String(), err } - return output, err, false + return merged.String(), nil } func buildExecEnv() []string { @@ -212,6 +235,70 @@ func buildExecEnv() []string { return append(env, "PATH="+current+":"+fallback) } +func (t *ExecTool) commandTickBase(command string) time.Duration { + base := 2 * time.Second + if isHeavyCommand(command) { + base = 4 * time.Second + } + // Reuse configured timeout as a pacing hint (not a kill deadline). + if t.timeout > 0 { + derived := t.timeout / 30 + if derived > base { + base = derived + } + } + if base > 12*time.Second { + base = 12 * time.Second + } + return base +} + +func resolveCommandQueuePath(cwd string) string { + cwd = strings.TrimSpace(cwd) + if cwd == "" { + if wd, err := os.Getwd(); err == nil { + cwd = wd + } + } + if cwd == "" { + return "" + } + abs, err := filepath.Abs(cwd) + if err != nil { + return "" + } + return filepath.Join(abs, "memory", "task_queue.json") +} + +func isHeavyCommand(command string) bool { + cmd := strings.ToLower(strings.TrimSpace(command)) + if cmd == "" { + return false + } + heavyPatterns := []string{ + "docker build", + "docker compose build", + "go build", + "go test", + "npm install", + "npm ci", + "npm run build", + "pnpm install", + "pnpm build", + "yarn install", + "yarn build", + "cargo build", + "mvn package", + "gradle build", + } + for _, p := range heavyPatterns { + if strings.Contains(cmd, p) { + return true + } + } + return false +} + func detectMissingCommandFromOutput(output string) string { patterns := []*regexp.Regexp{ regexp.MustCompile(`(?m)(?:^|[:\s])([a-zA-Z0-9._+-]+): not found`), @@ -278,19 +365,8 @@ func (t *ExecTool) tryAutoInstallMissingCommand(ctx context.Context, commandName return fmt.Sprintf("No supported package manager found to install missing command: %s", name), false } - timeout := 5 * time.Minute - if t.timeout > 0 && t.timeout < timeout { - timeout = t.timeout - } - for _, installCmd := range candidates { - installCtx, cancel := context.WithTimeout(ctx, timeout) - output, err, timedOut := t.runShellCommand(installCtx, installCmd, cwd) - cancel() - - if timedOut { - continue - } + output, err := t.runShellCommand(ctx, installCmd, cwd) if err == nil && commandExists(name) { return fmt.Sprintf("Installed %s using: %s\n%s", name, installCmd, output), true } diff --git a/pkg/tools/shell_timeout_test.go b/pkg/tools/shell_timeout_test.go new file mode 100644 index 0000000..4cc3810 --- /dev/null +++ b/pkg/tools/shell_timeout_test.go @@ -0,0 +1,48 @@ +package tools + +import ( + "testing" + "time" +) + +func TestIsHeavyCommand(t *testing.T) { + tests := []struct { + command string + heavy bool + }{ + {command: "docker build -t app .", heavy: true}, + {command: "docker compose build api", heavy: true}, + {command: "go test ./...", heavy: true}, + {command: "npm run build", heavy: true}, + {command: "echo hello", heavy: false}, + } + + for _, tt := range tests { + if got := isHeavyCommand(tt.command); got != tt.heavy { + t.Fatalf("isHeavyCommand(%q)=%v want %v", tt.command, got, tt.heavy) + } + } +} + +func TestCommandTickBase(t *testing.T) { + light := (&ExecTool{}).commandTickBase("echo hello") + heavy := (&ExecTool{}).commandTickBase("docker build -t app .") + if heavy <= light { + t.Fatalf("expected heavy command base tick > light, got heavy=%v light=%v", heavy, light) + } +} + +func TestNextCommandTick(t *testing.T) { + base := 2 * time.Second + t1 := nextCommandTick(base, 30*time.Second) + t2 := nextCommandTick(base, 5*time.Minute) + if t1 < base { + t.Fatalf("tick should not shrink below base: %v", t1) + } + if t2 <= t1 { + t.Fatalf("tick should grow with elapsed time: t1=%v t2=%v", t1, t2) + } + if t2 > 45*time.Second { + t.Fatalf("tick should be capped, got %v", t2) + } +} diff --git a/pkg/tools/skill_exec.go b/pkg/tools/skill_exec.go index 20cf741..f03167d 100644 --- a/pkg/tools/skill_exec.go +++ b/pkg/tools/skill_exec.go @@ -2,6 +2,7 @@ package tools import ( "context" + "errors" "fmt" "os" "os/exec" @@ -45,8 +46,8 @@ func (t *SkillExecTool) Parameters() map[string]interface{} { }, "timeout_sec": map[string]interface{}{ "type": "integer", - "default": 60, - "description": "Execution timeout in seconds", + "default": 0, + "description": "Deprecated. No hard timeout is enforced.", }, "reason": map[string]interface{}{ "type": "string", @@ -70,10 +71,8 @@ func (t *SkillExecTool) Execute(ctx context.Context, args map[string]interface{} t.writeAudit(skill, script, reason, false, err.Error()) return "", err } - - timeoutSec := 60 - if raw, ok := args["timeout_sec"].(float64); ok && raw > 0 { - timeoutSec = int(raw) + if strings.TrimSpace(t.workspace) != "" { + globalCommandWatchdog.setQueuePath(filepath.Join(strings.TrimSpace(t.workspace), "memory", "task_queue.json")) } skillDir, err := t.resolveSkillDir(skill) @@ -115,22 +114,55 @@ func (t *SkillExecTool) Execute(ctx context.Context, args map[string]interface{} } } - runCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) - defer cancel() - - cmd, err := buildSkillCommand(runCtx, scriptPath, cmdArgs) - if err != nil { - t.writeAudit(skill, script, reason, false, err.Error()) - return "", err + commandLabel := relScript + if len(cmdArgs) > 0 { + commandLabel += " " + strings.Join(cmdArgs, " ") } - cmd.Dir = skillDir - output, err := cmd.CombinedOutput() - if err != nil { - t.writeAudit(skill, script, reason, false, err.Error()) - return "", fmt.Errorf("skill execution failed: %w\n%s", err, string(output)) + policy := buildCommandRuntimePolicy(commandLabel, 2*time.Second) + var merged strings.Builder + var runErr error + for attempt := 0; attempt <= policy.MaxRestarts; attempt++ { + cmd, err := buildSkillCommand(ctx, scriptPath, cmdArgs) + if err != nil { + t.writeAudit(skill, script, reason, false, err.Error()) + return "", err + } + cmd.Dir = skillDir + var stdout, stderr trackedOutput + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err = runCommandWithDynamicTick(ctx, cmd, "skill_exec", commandLabel, policy.Difficulty, policy.BaseTick, policy.StallRoundLimit, func() int { + return stdout.Len() + stderr.Len() + }) + out := stdout.String() + if stderr.Len() > 0 { + out += "\nSTDERR:\n" + stderr.String() + } + if strings.TrimSpace(out) != "" { + if merged.Len() > 0 { + merged.WriteString("\n") + } + merged.WriteString(out) + } + if err == nil { + runErr = nil + break + } + runErr = err + if errors.Is(err, ErrCommandNoProgress) && ctx.Err() == nil && attempt < policy.MaxRestarts { + merged.WriteString(fmt.Sprintf("\n[RESTART] no progress for %d ticks, restarting (%d/%d)\n", + policy.StallRoundLimit, attempt+1, policy.MaxRestarts)) + continue + } + break + } + output := merged.String() + if runErr != nil { + t.writeAudit(skill, script, reason, false, runErr.Error()) + return "", fmt.Errorf("skill execution failed: %w\n%s", runErr, output) } - out := strings.TrimSpace(string(output)) + out := strings.TrimSpace(output) if out == "" { out = "(no output)" } diff --git a/pkg/tools/spawn.go b/pkg/tools/spawn.go index b597e26..06e5944 100644 --- a/pkg/tools/spawn.go +++ b/pkg/tools/spawn.go @@ -3,9 +3,11 @@ package tools import ( "context" "fmt" + "sync" ) type SpawnTool struct { + mu sync.RWMutex manager *SubagentManager originChannel string originChatID string @@ -51,12 +53,22 @@ func (t *SpawnTool) Parameters() map[string]interface{} { "type": "string", "description": "Optional task ID under the pipeline", }, + "channel": map[string]interface{}{ + "type": "string", + "description": "Optional origin channel override", + }, + "chat_id": map[string]interface{}{ + "type": "string", + "description": "Optional origin chat ID override", + }, }, "required": []string{"task"}, } } func (t *SpawnTool) SetContext(channel, chatID string) { + t.mu.Lock() + defer t.mu.Unlock() t.originChannel = channel t.originChatID = chatID } @@ -79,7 +91,22 @@ func (t *SpawnTool) Execute(ctx context.Context, args map[string]interface{}) (s return "Error: Subagent manager not configured", nil } - result, err := t.manager.Spawn(ctx, task, label, t.originChannel, t.originChatID, pipelineID, taskID) + originChannel, _ := args["channel"].(string) + originChatID, _ := args["chat_id"].(string) + if originChannel == "" || originChatID == "" { + t.mu.RLock() + defaultChannel := t.originChannel + defaultChatID := t.originChatID + t.mu.RUnlock() + if originChannel == "" { + originChannel = defaultChannel + } + if originChatID == "" { + originChatID = defaultChatID + } + } + + result, err := t.manager.Spawn(ctx, task, label, originChannel, originChatID, pipelineID, taskID) if err != nil { return "", fmt.Errorf("failed to spawn subagent: %w", err) } diff --git a/pkg/tools/tracked_output.go b/pkg/tools/tracked_output.go new file mode 100644 index 0000000..39ee41a --- /dev/null +++ b/pkg/tools/tracked_output.go @@ -0,0 +1,43 @@ +package tools + +import ( + "bytes" + "sync" + "sync/atomic" +) + +// trackedOutput is a thread-safe writer+buffer pair used by command progress checks. +type trackedOutput struct { + mu sync.Mutex + buf bytes.Buffer + size atomic.Int64 +} + +func (t *trackedOutput) Write(p []byte) (int, error) { + if t == nil { + return 0, nil + } + t.mu.Lock() + n, err := t.buf.Write(p) + t.mu.Unlock() + if n > 0 { + t.size.Add(int64(n)) + } + return n, err +} + +func (t *trackedOutput) Len() int { + if t == nil { + return 0 + } + return int(t.size.Load()) +} + +func (t *trackedOutput) String() string { + if t == nil { + return "" + } + t.mu.Lock() + defer t.mu.Unlock() + return t.buf.String() +} diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index dcb248c..faad674 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -119,7 +119,7 @@ const TaskAudit: React.FC = () => { if (!ok) return; } try { - const url = `/webui/api/task_queue${q}`; + const url = `/webui/api/task_audit${q}`; const r = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ action, task_id: selected.task_id }) }); if (!r.ok) throw new Error(await r.text()); await fetchData(); @@ -129,6 +129,7 @@ const TaskAudit: React.FC = () => { }; const selectedPretty = useMemo(() => selected ? JSON.stringify(selected, null, 2) : '', [selected]); + const selectedReadonly = selected?.source === 'command_watchdog'; return (
@@ -140,6 +141,7 @@ const TaskAudit: React.FC = () => { +