diff --git a/pkg/api/server.go b/pkg/api/server.go index 2d2b8eb..1edbd5f 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -2535,13 +2535,13 @@ func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) { label := fmt.Sprintf("%v", row["label"]) source := strings.TrimSpace(fmt.Sprintf("%v", row["source"])) if source == "" { - source = "command_watchdog" + source = "task_watchdog" } rec := map[string]interface{}{ "task_id": "cmd:" + id, "time": fmt.Sprintf("%v", row["started_at"]), "status": "running", - "source": "command_watchdog", + "source": "task_watchdog", "channel": source, "session": "watchdog:" + id, "input_preview": label, @@ -2571,13 +2571,13 @@ func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) { label := fmt.Sprintf("%v", row["label"]) source := strings.TrimSpace(fmt.Sprintf("%v", row["source"])) if source == "" { - source = "command_watchdog" + source = "task_watchdog" } rec := map[string]interface{}{ "task_id": "cmd:" + id, "time": fmt.Sprintf("%v", row["enqueued_at"]), "status": "waiting", - "source": "command_watchdog", + "source": "task_watchdog", "channel": source, "session": "watchdog:" + id, "input_preview": label, @@ -2598,10 +2598,10 @@ func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) { "task_id": "cmd:watchdog", "time": fmt.Sprintf("%v", q["time"]), "status": "running", - "source": "command_watchdog", + "source": "task_watchdog", "channel": "watchdog", "session": "watchdog:stats", - "input_preview": "command watchdog capacity snapshot", + "input_preview": "task watchdog capacity snapshot", "duration_ms": 0, "attempts": 1, "retry_count": 0, diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 1021c2f..7353e0f 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -535,10 +535,18 @@ func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { - result, err := runStringTaskWithCommandTickTimeout( + result, err := runStringTaskWithTaskWatchdog( ctx, timeoutSec, 2*time.Second, + stringTaskWatchdogOptions{ + ProgressFn: func() int { + return sm.taskWatchdogProgress(task) + }, + CanExtend: func() bool { + return sm.taskCanAutoExtend(task) + }, + }, func(runCtx context.Context) (string, error) { return sm.executeTaskOnce(runCtx, task) }, @@ -572,6 +580,35 @@ func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask) return "", lastErr } +func (sm *SubagentManager) taskWatchdogProgress(task *SubagentTask) int { + if sm == nil || task == nil { + return 0 + } + sm.mu.RLock() + defer sm.mu.RUnlock() + current, ok := sm.tasks[task.ID] + if !ok || current == nil { + current = task + } + if current.Updated <= 0 { + return 0 + } + return int(current.Updated) +} + +func (sm *SubagentManager) taskCanAutoExtend(task *SubagentTask) bool { + if sm == nil || task == nil { + return false + } + sm.mu.RLock() + defer sm.mu.RUnlock() + current, ok := sm.tasks[task.ID] + if !ok || current == nil { + current = task + } + return strings.EqualFold(strings.TrimSpace(current.Status), "running") +} + func (sm *SubagentManager) executeTaskOnce(ctx context.Context, task *SubagentTask) (string, error) { if task == nil { return "", fmt.Errorf("subagent task is nil") diff --git a/pkg/tools/subagent_runtime_control_test.go b/pkg/tools/subagent_runtime_control_test.go index 4886ea3..b5e3a69 100644 --- a/pkg/tools/subagent_runtime_control_test.go +++ b/pkg/tools/subagent_runtime_control_test.go @@ -79,7 +79,7 @@ func TestSubagentRunWithRetryEventuallySucceeds(t *testing.T) { } } -func TestSubagentRunWithTimeoutFails(t *testing.T) { +func TestSubagentRunAutoExtendsWhileStillRunning(t *testing.T) { workspace := t.TempDir() manager := NewSubagentManager(nil, workspace, nil) manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { @@ -87,7 +87,7 @@ func TestSubagentRunWithTimeoutFails(t *testing.T) { case <-ctx.Done(): return "", ctx.Err() case <-time.After(2 * time.Second): - return "unexpected", nil + return "completed after extension", nil } }) @@ -103,12 +103,15 @@ func TestSubagentRunWithTimeoutFails(t *testing.T) { } task := waitSubagentDone(t, manager, 4*time.Second) - if task.Status != "failed" { - t.Fatalf("expected failed task on timeout, got %s", task.Status) + if task.Status != "completed" { + t.Fatalf("expected completed task after watchdog extension, got %s", task.Status) } if task.RetryCount != 0 { t.Fatalf("expected retry_count=0, got %d", task.RetryCount) } + if !strings.Contains(task.Result, "completed after extension") { + t.Fatalf("expected extended result, got %q", task.Result) + } } func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) { diff --git a/pkg/tools/command_tick.go b/pkg/tools/task_watchdog.go similarity index 93% rename from pkg/tools/command_tick.go rename to pkg/tools/task_watchdog.go index 09032d2..6075380 100644 --- a/pkg/tools/command_tick.go +++ b/pkg/tools/task_watchdog.go @@ -26,7 +26,7 @@ const ( ) var ErrCommandNoProgress = errors.New("command no progress across tick rounds") -var ErrCommandTickTimeout = errors.New("command tick timeout exceeded") +var ErrTaskWatchdogTimeout = errors.New("task watchdog timeout exceeded") type commandRuntimePolicy struct { BaseTick time.Duration @@ -600,13 +600,19 @@ type stringTaskResult struct { err error } -// runStringTaskWithCommandTickTimeout executes a string-returning task with a -// command-tick-based timeout loop so timeout behavior stays consistent with the -// command watchdog pacing policy. -func runStringTaskWithCommandTickTimeout( +type stringTaskWatchdogOptions struct { + ProgressFn func() int + CanExtend func() bool +} + +// runStringTaskWithTaskWatchdog executes a string-returning task with the same +// tick pacing as the command watchdog, but only times out after a full timeout +// window without observable progress or an allowed extension signal. +func runStringTaskWithTaskWatchdog( ctx context.Context, timeoutSec int, baseTick time.Duration, + opts stringTaskWatchdogOptions, run func(context.Context) (string, error), ) (string, error) { if run == nil { @@ -620,7 +626,8 @@ func runStringTaskWithCommandTickTimeout( } timeout := time.Duration(timeoutSec) * time.Second - started := time.Now() + lastProgressAt := time.Now() + lastProgress := safeProgress(opts.ProgressFn) tick := normalizeCommandTick(baseTick) if tick <= 0 { tick = 2 * time.Second @@ -646,19 +653,31 @@ func runStringTaskWithCommandTickTimeout( case res := <-done: return res.output, res.err case <-timer.C: - elapsed := time.Since(started) - if elapsed >= timeout { - cancel() - select { - case res := <-done: - if res.err != nil { - return "", fmt.Errorf("%w: %v", ErrCommandTickTimeout, res.err) - } - case <-time.After(2 * time.Second): - } - return "", fmt.Errorf("%w: %ds", ErrCommandTickTimeout, timeoutSec) + if cur := safeProgress(opts.ProgressFn); cur > lastProgress { + lastProgress = cur + lastProgressAt = time.Now() + } + stalledFor := time.Since(lastProgressAt) + if stalledFor >= timeout { + if opts.CanExtend != nil && opts.CanExtend() { + lastProgressAt = time.Now() + stalledFor = 0 + } else { + cancel() + select { + case res := <-done: + if res.err != nil { + return "", fmt.Errorf("%w: %v", ErrTaskWatchdogTimeout, res.err) + } + case <-time.After(2 * time.Second): + } + return "", fmt.Errorf("%w: %ds", ErrTaskWatchdogTimeout, timeoutSec) + } + } + next := nextCommandTick(tick, stalledFor) + if next <= 0 { + next = tick } - next := nextCommandTick(tick, elapsed) timer.Reset(next) } } diff --git a/pkg/tools/task_watchdog_test.go b/pkg/tools/task_watchdog_test.go new file mode 100644 index 0000000..3ac6083 --- /dev/null +++ b/pkg/tools/task_watchdog_test.go @@ -0,0 +1,105 @@ +package tools + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" +) + +func TestRunStringTaskWithTaskWatchdogTimesOutWithoutExtension(t *testing.T) { + t.Parallel() + + started := time.Now() + _, err := runStringTaskWithTaskWatchdog( + context.Background(), + 1, + 100*time.Millisecond, + stringTaskWatchdogOptions{}, + func(ctx context.Context) (string, error) { + <-ctx.Done() + return "", ctx.Err() + }, + ) + if !errors.Is(err, ErrTaskWatchdogTimeout) { + t.Fatalf("expected ErrTaskWatchdogTimeout, got %v", err) + } + if elapsed := time.Since(started); elapsed > 3*time.Second { + t.Fatalf("expected watchdog timeout quickly, took %v", elapsed) + } +} + +func TestRunStringTaskWithTaskWatchdogAutoExtendsWhileRunning(t *testing.T) { + t.Parallel() + + started := time.Now() + out, err := runStringTaskWithTaskWatchdog( + context.Background(), + 1, + 100*time.Millisecond, + stringTaskWatchdogOptions{ + CanExtend: func() bool { return true }, + }, + func(ctx context.Context) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(1500 * time.Millisecond): + return "ok", nil + } + }, + ) + if err != nil { + t.Fatalf("expected auto-extended task to finish, got %v", err) + } + if out != "ok" { + t.Fatalf("expected output ok, got %q", out) + } + if elapsed := time.Since(started); elapsed < time.Second { + t.Fatalf("expected task to run past initial timeout window, took %v", elapsed) + } +} + +func TestRunStringTaskWithTaskWatchdogExtendsOnProgress(t *testing.T) { + t.Parallel() + + var progress atomic.Int64 + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(400 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + progress.Add(1) + } + } + }() + defer close(done) + + out, err := runStringTaskWithTaskWatchdog( + context.Background(), + 1, + 100*time.Millisecond, + stringTaskWatchdogOptions{ + ProgressFn: func() int { return int(progress.Load()) }, + }, + func(ctx context.Context) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(1500 * time.Millisecond): + return "done", nil + } + }, + ) + if err != nil { + t.Fatalf("expected progress-based extension to finish, got %v", err) + } + if out != "done" { + t.Fatalf("expected output done, got %q", out) + } +} diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index c86de10..d840b3b 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -75,7 +75,7 @@ const TaskAudit: React.FC = () => { - +