From 947816182295bd0ca6f42e39383d8a4252c24369 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 08:10:53 +0000 Subject: [PATCH] persist process session metadata for restart-safe process tool state --- pkg/agent/loop.go | 2 +- pkg/tools/process_manager.go | 106 ++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 6499e4b..dcdbb5c 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -63,7 +63,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) toolsRegistry := tools.NewToolRegistry() - processManager := tools.NewProcessManager() + processManager := tools.NewProcessManager(workspace) readTool := tools.NewReadFileTool(workspace) writeTool := tools.NewWriteFileTool(workspace) listTool := tools.NewListDirTool(workspace) diff --git a/pkg/tools/process_manager.go b/pkg/tools/process_manager.go index 3c85edb..594e72e 100644 --- a/pkg/tools/process_manager.go +++ b/pkg/tools/process_manager.go @@ -2,10 +2,14 @@ package tools import ( "bytes" + "encoding/json" "fmt" + "os" "os/exec" + "path/filepath" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -27,10 +31,18 @@ type ProcessManager struct { mu sync.RWMutex sessions map[string]*processSession seq uint64 + metaPath string } -func NewProcessManager() *ProcessManager { - return &ProcessManager{sessions: map[string]*processSession{}} +func NewProcessManager(workspace string) *ProcessManager { + m := &ProcessManager{sessions: map[string]*processSession{}} + if workspace != "" { + memDir := filepath.Join(workspace, "memory") + _ = os.MkdirAll(memDir, 0755) + m.metaPath = filepath.Join(memDir, "process-sessions.json") + m.load() + } + return m } func (m *ProcessManager) Start(command, cwd string) (string, error) { @@ -52,6 +64,7 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) { m.mu.Lock() m.sessions[id] = s m.mu.Unlock() + m.persist() if err := cmd.Start(); err != nil { m.mu.Lock() @@ -76,6 +89,7 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) { s.EndedAt = time.Now().UTC() s.ExitCode = &code s.mu.Unlock() + m.persist() close(s.done) }() @@ -162,5 +176,91 @@ func (m *ProcessManager) Kill(id string) error { if cmd.Process == nil { return fmt.Errorf("process not started") } - return cmd.Process.Kill() + err := cmd.Process.Kill() + m.persist() + return err +} + +type processSessionMeta struct { + ID string `json:"id"` + Command string `json:"command"` + StartedAt string `json:"started_at"` + EndedAt string `json:"ended_at,omitempty"` + ExitCode *int `json:"exit_code,omitempty"` + Recovered bool `json:"recovered"` +} + +func (m *ProcessManager) persist() { + if m.metaPath == "" { + return + } + m.mu.RLock() + items := make([]processSessionMeta, 0, len(m.sessions)) + for _, s := range m.sessions { + s.mu.RLock() + row := processSessionMeta{ + ID: s.ID, + Command: s.Command, + StartedAt: s.StartedAt.Format(time.RFC3339), + Recovered: s.cmd == nil, + } + if !s.EndedAt.IsZero() { + row.EndedAt = s.EndedAt.Format(time.RFC3339) + } + if s.ExitCode != nil { + code := *s.ExitCode + row.ExitCode = &code + } + s.mu.RUnlock() + items = append(items, row) + } + m.mu.RUnlock() + data, err := json.MarshalIndent(items, "", " ") + if err != nil { + return + } + _ = os.WriteFile(m.metaPath, data, 0644) +} + +func (m *ProcessManager) load() { + if m.metaPath == "" { + return + } + data, err := os.ReadFile(m.metaPath) + if err != nil { + return + } + var items []processSessionMeta + if err := json.Unmarshal(data, &items); err != nil { + return + } + maxSeq := uint64(0) + for _, it := range items { + s := &processSession{ID: it.ID, Command: it.Command, done: make(chan struct{})} + if t, err := time.Parse(time.RFC3339, it.StartedAt); err == nil { + s.StartedAt = t + } + if it.EndedAt != "" { + if t, err := time.Parse(time.RFC3339, it.EndedAt); err == nil { + s.EndedAt = t + } + } + if it.ExitCode != nil { + code := *it.ExitCode + s.ExitCode = &code + close(s.done) + } else { + code := -2 + s.ExitCode = &code + s.EndedAt = time.Now().UTC() + close(s.done) + } + m.sessions[s.ID] = s + if strings.HasPrefix(s.ID, "p-") { + if n, err := strconv.ParseUint(strings.TrimPrefix(s.ID, "p-"), 10, 64); err == nil && n > maxSeq { + maxSeq = n + } + } + } + m.seq = maxSeq }