diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 9347e4f..a4108f9 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -51,6 +51,7 @@ type SubagentTask struct { type SubagentManager struct { tasks map[string]*SubagentTask cancelFuncs map[string]context.CancelFunc + recoverableTaskIDs []string archiveAfterMinute int64 mu sync.RWMutex provider providers.LLMProvider @@ -99,9 +100,13 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b if runStore != nil { for _, task := range runStore.List() { mgr.tasks[task.ID] = task + if task.Status == "running" { + mgr.recoverableTaskIDs = append(mgr.recoverableTaskIDs, task.ID) + } } mgr.nextID = runStore.NextIDSeed() } + go mgr.resumeRecoveredTasks() return mgr } @@ -534,6 +539,7 @@ func (sm *SubagentManager) SetRunFunc(f SubagentRunFunc) { sm.mu.Lock() defer sm.mu.Unlock() sm.runFunc = f + go sm.resumeRecoveredTasks() } func (sm *SubagentManager) ProfileStore() *SubagentProfileStore { @@ -542,6 +548,38 @@ func (sm *SubagentManager) ProfileStore() *SubagentProfileStore { return sm.profileStore } +func (sm *SubagentManager) resumeRecoveredTasks() { + if sm == nil { + return + } + sm.mu.Lock() + if sm.runFunc == nil && sm.provider == nil { + sm.mu.Unlock() + return + } + taskIDs := append([]string(nil), sm.recoverableTaskIDs...) + sm.recoverableTaskIDs = nil + toResume := make([]*SubagentTask, 0, len(taskIDs)) + for _, taskID := range taskIDs { + task, ok := sm.tasks[taskID] + if !ok || task == nil || task.Status != "running" { + continue + } + task.Updated = time.Now().UnixMilli() + sm.persistTaskLocked(task, "recovered", "auto-resumed after restart") + toResume = append(toResume, task) + } + sm.mu.Unlock() + + for _, task := range toResume { + taskCtx, cancel := context.WithCancel(context.Background()) + sm.mu.Lock() + sm.cancelFuncs[task.ID] = cancel + sm.mu.Unlock() + go sm.runTask(taskCtx, task) + } +} + func (sm *SubagentManager) NextTaskSequence() int { sm.mu.RLock() defer sm.mu.RUnlock() diff --git a/pkg/tools/subagent_runtime_control_test.go b/pkg/tools/subagent_runtime_control_test.go index 6a0e47f..6090b39 100644 --- a/pkg/tools/subagent_runtime_control_test.go +++ b/pkg/tools/subagent_runtime_control_test.go @@ -205,6 +205,55 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) { time.Sleep(100 * time.Millisecond) } +func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) { + workspace := t.TempDir() + block := make(chan struct{}) + manager := NewSubagentManager(nil, workspace, nil) + manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { + <-block + return "should-not-complete-here", nil + }) + + _, err := manager.Spawn(context.Background(), SubagentSpawnOptions{ + Task: "recover me", + AgentID: "coder", + OriginChannel: "cli", + OriginChatID: "direct", + }) + if err != nil { + t.Fatalf("spawn failed: %v", err) + } + time.Sleep(80 * time.Millisecond) + + recovered := make(chan string, 1) + reloaded := NewSubagentManager(nil, workspace, nil) + reloaded.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { + recovered <- task.ID + return "recovered-ok", nil + }) + + select { + case taskID := <-recovered: + if taskID != "subagent-1" { + t.Fatalf("expected recovered task id subagent-1, got %s", taskID) + } + case <-time.After(2 * time.Second): + t.Fatalf("expected running task to auto-recover after restart") + } + + got, ok := reloaded.GetTask("subagent-1") + if !ok { + t.Fatalf("expected recovered task to exist") + } + if got.Status != "completed" || got.Result != "recovered-ok" { + t.Fatalf("unexpected recovered task: %+v", got) + } + + close(block) + _ = waitSubagentDone(t, manager, 4*time.Second) + time.Sleep(100 * time.Millisecond) +} + func TestSubagentManagerPersistsEvents(t *testing.T) { workspace := t.TempDir() manager := NewSubagentManager(nil, workspace, nil)