mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-19 17:37:29 +08:00
Recover running subagent tasks after restart
This commit is contained in:
@@ -51,6 +51,7 @@ type SubagentTask struct {
|
|||||||
type SubagentManager struct {
|
type SubagentManager struct {
|
||||||
tasks map[string]*SubagentTask
|
tasks map[string]*SubagentTask
|
||||||
cancelFuncs map[string]context.CancelFunc
|
cancelFuncs map[string]context.CancelFunc
|
||||||
|
recoverableTaskIDs []string
|
||||||
archiveAfterMinute int64
|
archiveAfterMinute int64
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
provider providers.LLMProvider
|
provider providers.LLMProvider
|
||||||
@@ -99,9 +100,13 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
|
|||||||
if runStore != nil {
|
if runStore != nil {
|
||||||
for _, task := range runStore.List() {
|
for _, task := range runStore.List() {
|
||||||
mgr.tasks[task.ID] = task
|
mgr.tasks[task.ID] = task
|
||||||
|
if task.Status == "running" {
|
||||||
|
mgr.recoverableTaskIDs = append(mgr.recoverableTaskIDs, task.ID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
mgr.nextID = runStore.NextIDSeed()
|
mgr.nextID = runStore.NextIDSeed()
|
||||||
}
|
}
|
||||||
|
go mgr.resumeRecoveredTasks()
|
||||||
return mgr
|
return mgr
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -534,6 +539,7 @@ func (sm *SubagentManager) SetRunFunc(f SubagentRunFunc) {
|
|||||||
sm.mu.Lock()
|
sm.mu.Lock()
|
||||||
defer sm.mu.Unlock()
|
defer sm.mu.Unlock()
|
||||||
sm.runFunc = f
|
sm.runFunc = f
|
||||||
|
go sm.resumeRecoveredTasks()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
|
func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
|
||||||
@@ -542,6 +548,38 @@ func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
|
|||||||
return sm.profileStore
|
return sm.profileStore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sm *SubagentManager) resumeRecoveredTasks() {
|
||||||
|
if sm == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sm.mu.Lock()
|
||||||
|
if sm.runFunc == nil && sm.provider == nil {
|
||||||
|
sm.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
taskIDs := append([]string(nil), sm.recoverableTaskIDs...)
|
||||||
|
sm.recoverableTaskIDs = nil
|
||||||
|
toResume := make([]*SubagentTask, 0, len(taskIDs))
|
||||||
|
for _, taskID := range taskIDs {
|
||||||
|
task, ok := sm.tasks[taskID]
|
||||||
|
if !ok || task == nil || task.Status != "running" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
task.Updated = time.Now().UnixMilli()
|
||||||
|
sm.persistTaskLocked(task, "recovered", "auto-resumed after restart")
|
||||||
|
toResume = append(toResume, task)
|
||||||
|
}
|
||||||
|
sm.mu.Unlock()
|
||||||
|
|
||||||
|
for _, task := range toResume {
|
||||||
|
taskCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
sm.mu.Lock()
|
||||||
|
sm.cancelFuncs[task.ID] = cancel
|
||||||
|
sm.mu.Unlock()
|
||||||
|
go sm.runTask(taskCtx, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sm *SubagentManager) NextTaskSequence() int {
|
func (sm *SubagentManager) NextTaskSequence() int {
|
||||||
sm.mu.RLock()
|
sm.mu.RLock()
|
||||||
defer sm.mu.RUnlock()
|
defer sm.mu.RUnlock()
|
||||||
|
|||||||
@@ -205,6 +205,55 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
|
|||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) {
|
||||||
|
workspace := t.TempDir()
|
||||||
|
block := make(chan struct{})
|
||||||
|
manager := NewSubagentManager(nil, workspace, nil)
|
||||||
|
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
|
||||||
|
<-block
|
||||||
|
return "should-not-complete-here", nil
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
|
||||||
|
Task: "recover me",
|
||||||
|
AgentID: "coder",
|
||||||
|
OriginChannel: "cli",
|
||||||
|
OriginChatID: "direct",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("spawn failed: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(80 * time.Millisecond)
|
||||||
|
|
||||||
|
recovered := make(chan string, 1)
|
||||||
|
reloaded := NewSubagentManager(nil, workspace, nil)
|
||||||
|
reloaded.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
|
||||||
|
recovered <- task.ID
|
||||||
|
return "recovered-ok", nil
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case taskID := <-recovered:
|
||||||
|
if taskID != "subagent-1" {
|
||||||
|
t.Fatalf("expected recovered task id subagent-1, got %s", taskID)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("expected running task to auto-recover after restart")
|
||||||
|
}
|
||||||
|
|
||||||
|
got, ok := reloaded.GetTask("subagent-1")
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected recovered task to exist")
|
||||||
|
}
|
||||||
|
if got.Status != "completed" || got.Result != "recovered-ok" {
|
||||||
|
t.Fatalf("unexpected recovered task: %+v", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(block)
|
||||||
|
_ = waitSubagentDone(t, manager, 4*time.Second)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubagentManagerPersistsEvents(t *testing.T) {
|
func TestSubagentManagerPersistsEvents(t *testing.T) {
|
||||||
workspace := t.TempDir()
|
workspace := t.TempDir()
|
||||||
manager := NewSubagentManager(nil, workspace, nil)
|
manager := NewSubagentManager(nil, workspace, nil)
|
||||||
|
|||||||
Reference in New Issue
Block a user