refactor: stabilize runtime and unify config

This commit is contained in:
lpf
2026-03-14 21:40:12 +08:00
parent 60eee65fec
commit 341e578c9f
75 changed files with 3081 additions and 1627 deletions

View File

@@ -107,7 +107,7 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
if runStore != nil {
for _, task := range runStore.List() {
mgr.tasks[task.ID] = task
if task.Status == "running" {
if task.Status == RuntimeStatusRunning {
mgr.recoverableTaskIDs = append(mgr.recoverableTaskIDs, task.ID)
}
}
@@ -300,7 +300,7 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
ParentRunID: parentRunID,
OriginChannel: originChannel,
OriginChatID: originChatID,
Status: "running",
Status: RuntimeStatusRouting,
Created: now,
Updated: now,
}
@@ -332,7 +332,7 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
}()
sm.mu.Lock()
task.Status = "running"
task.Status = RuntimeStatusRunning
task.Created = time.Now().UnixMilli()
task.Updated = task.Created
sm.persistTaskLocked(task, "started", "")
@@ -341,7 +341,7 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
result, runErr := sm.runWithRetry(ctx, task)
sm.mu.Lock()
if runErr != nil {
task.Status = "failed"
task.Status = RuntimeStatusFailed
task.Result = fmt.Sprintf("Error: %v", runErr)
task.Result = applySubagentResultQuota(task.Result, task.MaxResultChars)
task.Updated = time.Now().UnixMilli()
@@ -357,10 +357,10 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
Status: "delivered",
CreatedAt: task.Updated,
})
sm.persistTaskLocked(task, "completed", task.Result)
sm.persistTaskLocked(task, "failed", task.Result)
sm.notifyTaskWaitersLocked(task.ID)
} else {
task.Status = "completed"
task.Status = RuntimeStatusCompleted
task.Result = applySubagentResultQuota(result, task.MaxResultChars)
task.Updated = time.Now().UnixMilli()
task.WaitingReply = false
@@ -789,8 +789,8 @@ func (sm *SubagentManager) KillTask(taskID string) bool {
cancel()
delete(sm.cancelFuncs, taskID)
}
if t.Status == "running" {
t.Status = "killed"
if !IsTerminalRuntimeStatus(t.Status) {
t.Status = RuntimeStatusCancelled
t.WaitingReply = false
t.Updated = time.Now().UnixMilli()
sm.persistTaskLocked(t, "killed", "")
@@ -885,6 +885,53 @@ func (sm *SubagentManager) Events(taskID string, limit int) ([]SubagentRunEvent,
return sm.runStore.Events(taskID, limit)
}
func (sm *SubagentManager) RuntimeSnapshot(limit int) RuntimeSnapshot {
if sm == nil {
return RuntimeSnapshot{}
}
tasks := sm.ListTasks()
snapshot := RuntimeSnapshot{
Tasks: make([]TaskRecord, 0, len(tasks)),
Runs: make([]RunRecord, 0, len(tasks)),
}
seenThreads := map[string]struct{}{}
for _, task := range tasks {
snapshot.Tasks = append(snapshot.Tasks, taskToTaskRecord(task))
snapshot.Runs = append(snapshot.Runs, taskToRunRecord(task))
if evts, err := sm.Events(task.ID, limit); err == nil {
for _, evt := range evts {
snapshot.Events = append(snapshot.Events, EventRecord{
ID: EventRecordID(evt.RunID, evt.Type, evt.At),
RunID: evt.RunID,
TaskID: evt.RunID,
AgentID: evt.AgentID,
Type: evt.Type,
Status: evt.Status,
Message: evt.Message,
RetryCount: evt.RetryCount,
At: evt.At,
})
}
}
threadID := strings.TrimSpace(task.ThreadID)
if threadID == "" {
continue
}
if _, ok := seenThreads[threadID]; !ok {
if thread, found := sm.Thread(threadID); found {
snapshot.Threads = append(snapshot.Threads, threadToThreadRecord(thread))
}
seenThreads[threadID] = struct{}{}
}
if msgs, err := sm.ThreadMessages(threadID, limit); err == nil {
for _, msg := range msgs {
snapshot.Artifacts = append(snapshot.Artifacts, messageToArtifactRecord(msg))
}
}
}
return snapshot
}
func (sm *SubagentManager) Thread(threadID string) (*AgentThread, bool) {
if sm.mailboxStore == nil {
return nil, false
@@ -929,7 +976,7 @@ func (sm *SubagentManager) pruneArchivedLocked() {
}
cutoff := time.Now().Add(-time.Duration(sm.archiveAfterMinute) * time.Minute).UnixMilli()
for id, t := range sm.tasks {
if t.Status == "running" {
if !IsTerminalRuntimeStatus(t.Status) {
continue
}
if t.Updated > 0 && t.Updated < cutoff {
@@ -1035,13 +1082,13 @@ func (sm *SubagentManager) WaitTask(ctx context.Context, taskID string) (*Subage
task, ok := sm.tasks[taskID]
if !ok && sm.runStore != nil {
if persisted, found := sm.runStore.Get(taskID); found && persisted != nil {
if strings.TrimSpace(persisted.Status) != "running" {
if IsTerminalRuntimeStatus(persisted.Status) {
sm.mu.Unlock()
return persisted, true, nil
}
}
}
if ok && task != nil && strings.TrimSpace(task.Status) != "running" {
if ok && task != nil && IsTerminalRuntimeStatus(task.Status) {
cp := cloneSubagentTask(task)
sm.mu.Unlock()
return cp, true, nil
@@ -1063,13 +1110,13 @@ func (sm *SubagentManager) WaitTask(ctx context.Context, taskID string) (*Subage
sm.mu.Lock()
sm.pruneArchivedLocked()
task, ok := sm.tasks[taskID]
if ok && task != nil && strings.TrimSpace(task.Status) != "running" {
if ok && task != nil && IsTerminalRuntimeStatus(task.Status) {
cp := cloneSubagentTask(task)
sm.mu.Unlock()
return cp, true, nil
}
if !ok && sm.runStore != nil {
if persisted, found := sm.runStore.Get(taskID); found && persisted != nil && strings.TrimSpace(persisted.Status) != "running" {
if persisted, found := sm.runStore.Get(taskID); found && persisted != nil && IsTerminalRuntimeStatus(persisted.Status) {
sm.mu.Unlock()
return persisted, true, nil
}