Unify task watchdog timeout handling

This commit is contained in:
lpf
2026-03-07 12:27:34 +08:00
parent 0b1fdecd68
commit ee4a1a1775
6 changed files with 194 additions and 30 deletions

View File

@@ -2535,13 +2535,13 @@ func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) {
label := fmt.Sprintf("%v", row["label"]) label := fmt.Sprintf("%v", row["label"])
source := strings.TrimSpace(fmt.Sprintf("%v", row["source"])) source := strings.TrimSpace(fmt.Sprintf("%v", row["source"]))
if source == "" { if source == "" {
source = "command_watchdog" source = "task_watchdog"
} }
rec := map[string]interface{}{ rec := map[string]interface{}{
"task_id": "cmd:" + id, "task_id": "cmd:" + id,
"time": fmt.Sprintf("%v", row["started_at"]), "time": fmt.Sprintf("%v", row["started_at"]),
"status": "running", "status": "running",
"source": "command_watchdog", "source": "task_watchdog",
"channel": source, "channel": source,
"session": "watchdog:" + id, "session": "watchdog:" + id,
"input_preview": label, "input_preview": label,
@@ -2571,13 +2571,13 @@ func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) {
label := fmt.Sprintf("%v", row["label"]) label := fmt.Sprintf("%v", row["label"])
source := strings.TrimSpace(fmt.Sprintf("%v", row["source"])) source := strings.TrimSpace(fmt.Sprintf("%v", row["source"]))
if source == "" { if source == "" {
source = "command_watchdog" source = "task_watchdog"
} }
rec := map[string]interface{}{ rec := map[string]interface{}{
"task_id": "cmd:" + id, "task_id": "cmd:" + id,
"time": fmt.Sprintf("%v", row["enqueued_at"]), "time": fmt.Sprintf("%v", row["enqueued_at"]),
"status": "waiting", "status": "waiting",
"source": "command_watchdog", "source": "task_watchdog",
"channel": source, "channel": source,
"session": "watchdog:" + id, "session": "watchdog:" + id,
"input_preview": label, "input_preview": label,
@@ -2598,10 +2598,10 @@ func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) {
"task_id": "cmd:watchdog", "task_id": "cmd:watchdog",
"time": fmt.Sprintf("%v", q["time"]), "time": fmt.Sprintf("%v", q["time"]),
"status": "running", "status": "running",
"source": "command_watchdog", "source": "task_watchdog",
"channel": "watchdog", "channel": "watchdog",
"session": "watchdog:stats", "session": "watchdog:stats",
"input_preview": "command watchdog capacity snapshot", "input_preview": "task watchdog capacity snapshot",
"duration_ms": 0, "duration_ms": 0,
"attempts": 1, "attempts": 1,
"retry_count": 0, "retry_count": 0,

View File

@@ -535,10 +535,18 @@ func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask)
var lastErr error var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ { for attempt := 0; attempt <= maxRetries; attempt++ {
result, err := runStringTaskWithCommandTickTimeout( result, err := runStringTaskWithTaskWatchdog(
ctx, ctx,
timeoutSec, timeoutSec,
2*time.Second, 2*time.Second,
stringTaskWatchdogOptions{
ProgressFn: func() int {
return sm.taskWatchdogProgress(task)
},
CanExtend: func() bool {
return sm.taskCanAutoExtend(task)
},
},
func(runCtx context.Context) (string, error) { func(runCtx context.Context) (string, error) {
return sm.executeTaskOnce(runCtx, task) return sm.executeTaskOnce(runCtx, task)
}, },
@@ -572,6 +580,35 @@ func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask)
return "", lastErr return "", lastErr
} }
func (sm *SubagentManager) taskWatchdogProgress(task *SubagentTask) int {
if sm == nil || task == nil {
return 0
}
sm.mu.RLock()
defer sm.mu.RUnlock()
current, ok := sm.tasks[task.ID]
if !ok || current == nil {
current = task
}
if current.Updated <= 0 {
return 0
}
return int(current.Updated)
}
func (sm *SubagentManager) taskCanAutoExtend(task *SubagentTask) bool {
if sm == nil || task == nil {
return false
}
sm.mu.RLock()
defer sm.mu.RUnlock()
current, ok := sm.tasks[task.ID]
if !ok || current == nil {
current = task
}
return strings.EqualFold(strings.TrimSpace(current.Status), "running")
}
func (sm *SubagentManager) executeTaskOnce(ctx context.Context, task *SubagentTask) (string, error) { func (sm *SubagentManager) executeTaskOnce(ctx context.Context, task *SubagentTask) (string, error) {
if task == nil { if task == nil {
return "", fmt.Errorf("subagent task is nil") return "", fmt.Errorf("subagent task is nil")

View File

@@ -79,7 +79,7 @@ func TestSubagentRunWithRetryEventuallySucceeds(t *testing.T) {
} }
} }
func TestSubagentRunWithTimeoutFails(t *testing.T) { func TestSubagentRunAutoExtendsWhileStillRunning(t *testing.T) {
workspace := t.TempDir() workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil) manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
@@ -87,7 +87,7 @@ func TestSubagentRunWithTimeoutFails(t *testing.T) {
case <-ctx.Done(): case <-ctx.Done():
return "", ctx.Err() return "", ctx.Err()
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
return "unexpected", nil return "completed after extension", nil
} }
}) })
@@ -103,12 +103,15 @@ func TestSubagentRunWithTimeoutFails(t *testing.T) {
} }
task := waitSubagentDone(t, manager, 4*time.Second) task := waitSubagentDone(t, manager, 4*time.Second)
if task.Status != "failed" { if task.Status != "completed" {
t.Fatalf("expected failed task on timeout, got %s", task.Status) t.Fatalf("expected completed task after watchdog extension, got %s", task.Status)
} }
if task.RetryCount != 0 { if task.RetryCount != 0 {
t.Fatalf("expected retry_count=0, got %d", task.RetryCount) t.Fatalf("expected retry_count=0, got %d", task.RetryCount)
} }
if !strings.Contains(task.Result, "completed after extension") {
t.Fatalf("expected extended result, got %q", task.Result)
}
} }
func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) { func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {

View File

@@ -26,7 +26,7 @@ const (
) )
var ErrCommandNoProgress = errors.New("command no progress across tick rounds") var ErrCommandNoProgress = errors.New("command no progress across tick rounds")
var ErrCommandTickTimeout = errors.New("command tick timeout exceeded") var ErrTaskWatchdogTimeout = errors.New("task watchdog timeout exceeded")
type commandRuntimePolicy struct { type commandRuntimePolicy struct {
BaseTick time.Duration BaseTick time.Duration
@@ -600,13 +600,19 @@ type stringTaskResult struct {
err error err error
} }
// runStringTaskWithCommandTickTimeout executes a string-returning task with a type stringTaskWatchdogOptions struct {
// command-tick-based timeout loop so timeout behavior stays consistent with the ProgressFn func() int
// command watchdog pacing policy. CanExtend func() bool
func runStringTaskWithCommandTickTimeout( }
// runStringTaskWithTaskWatchdog executes a string-returning task with the same
// tick pacing as the command watchdog, but only times out after a full timeout
// window without observable progress or an allowed extension signal.
func runStringTaskWithTaskWatchdog(
ctx context.Context, ctx context.Context,
timeoutSec int, timeoutSec int,
baseTick time.Duration, baseTick time.Duration,
opts stringTaskWatchdogOptions,
run func(context.Context) (string, error), run func(context.Context) (string, error),
) (string, error) { ) (string, error) {
if run == nil { if run == nil {
@@ -620,7 +626,8 @@ func runStringTaskWithCommandTickTimeout(
} }
timeout := time.Duration(timeoutSec) * time.Second timeout := time.Duration(timeoutSec) * time.Second
started := time.Now() lastProgressAt := time.Now()
lastProgress := safeProgress(opts.ProgressFn)
tick := normalizeCommandTick(baseTick) tick := normalizeCommandTick(baseTick)
if tick <= 0 { if tick <= 0 {
tick = 2 * time.Second tick = 2 * time.Second
@@ -646,19 +653,31 @@ func runStringTaskWithCommandTickTimeout(
case res := <-done: case res := <-done:
return res.output, res.err return res.output, res.err
case <-timer.C: case <-timer.C:
elapsed := time.Since(started) if cur := safeProgress(opts.ProgressFn); cur > lastProgress {
if elapsed >= timeout { lastProgress = cur
cancel() lastProgressAt = time.Now()
select { }
case res := <-done: stalledFor := time.Since(lastProgressAt)
if res.err != nil { if stalledFor >= timeout {
return "", fmt.Errorf("%w: %v", ErrCommandTickTimeout, res.err) if opts.CanExtend != nil && opts.CanExtend() {
} lastProgressAt = time.Now()
case <-time.After(2 * time.Second): stalledFor = 0
} } else {
return "", fmt.Errorf("%w: %ds", ErrCommandTickTimeout, timeoutSec) cancel()
select {
case res := <-done:
if res.err != nil {
return "", fmt.Errorf("%w: %v", ErrTaskWatchdogTimeout, res.err)
}
case <-time.After(2 * time.Second):
}
return "", fmt.Errorf("%w: %ds", ErrTaskWatchdogTimeout, timeoutSec)
}
}
next := nextCommandTick(tick, stalledFor)
if next <= 0 {
next = tick
} }
next := nextCommandTick(tick, elapsed)
timer.Reset(next) timer.Reset(next)
} }
} }

View File

@@ -0,0 +1,105 @@
package tools
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
)
func TestRunStringTaskWithTaskWatchdogTimesOutWithoutExtension(t *testing.T) {
t.Parallel()
started := time.Now()
_, err := runStringTaskWithTaskWatchdog(
context.Background(),
1,
100*time.Millisecond,
stringTaskWatchdogOptions{},
func(ctx context.Context) (string, error) {
<-ctx.Done()
return "", ctx.Err()
},
)
if !errors.Is(err, ErrTaskWatchdogTimeout) {
t.Fatalf("expected ErrTaskWatchdogTimeout, got %v", err)
}
if elapsed := time.Since(started); elapsed > 3*time.Second {
t.Fatalf("expected watchdog timeout quickly, took %v", elapsed)
}
}
func TestRunStringTaskWithTaskWatchdogAutoExtendsWhileRunning(t *testing.T) {
t.Parallel()
started := time.Now()
out, err := runStringTaskWithTaskWatchdog(
context.Background(),
1,
100*time.Millisecond,
stringTaskWatchdogOptions{
CanExtend: func() bool { return true },
},
func(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(1500 * time.Millisecond):
return "ok", nil
}
},
)
if err != nil {
t.Fatalf("expected auto-extended task to finish, got %v", err)
}
if out != "ok" {
t.Fatalf("expected output ok, got %q", out)
}
if elapsed := time.Since(started); elapsed < time.Second {
t.Fatalf("expected task to run past initial timeout window, took %v", elapsed)
}
}
func TestRunStringTaskWithTaskWatchdogExtendsOnProgress(t *testing.T) {
t.Parallel()
var progress atomic.Int64
done := make(chan struct{})
go func() {
ticker := time.NewTicker(400 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
progress.Add(1)
}
}
}()
defer close(done)
out, err := runStringTaskWithTaskWatchdog(
context.Background(),
1,
100*time.Millisecond,
stringTaskWatchdogOptions{
ProgressFn: func() int { return int(progress.Load()) },
},
func(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(1500 * time.Millisecond):
return "done", nil
}
},
)
if err != nil {
t.Fatalf("expected progress-based extension to finish, got %v", err)
}
if out != "done" {
t.Fatalf("expected output done, got %q", out)
}
}

View File

@@ -75,7 +75,7 @@ const TaskAudit: React.FC = () => {
<option value="all">{t('allSources')}</option> <option value="all">{t('allSources')}</option>
<option value="direct">{t('sourceDirect')}</option> <option value="direct">{t('sourceDirect')}</option>
<option value="memory_todo">{t('sourceMemoryTodo')}</option> <option value="memory_todo">{t('sourceMemoryTodo')}</option>
<option value="command_watchdog">command_watchdog</option> <option value="task_watchdog">task_watchdog</option>
<option value="-">-</option> <option value="-">-</option>
</select> </select>
<select value={statusFilter} onChange={(e)=>setStatusFilter(e.target.value)} className="bg-zinc-900 border border-zinc-700 rounded px-2 py-1 text-xs"> <select value={statusFilter} onChange={(e)=>setStatusFilter(e.target.value)} className="bg-zinc-900 border border-zinc-700 rounded px-2 py-1 text-xs">