apply go-centric architecture optimizations and document them in readme

This commit is contained in:
DBT
2026-02-24 09:25:12 +00:00
parent 3dd918d429
commit 4105eeb0db
7 changed files with 209 additions and 12 deletions

View File

@@ -2,6 +2,7 @@ package tools
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
@@ -13,6 +14,8 @@ import (
"sync"
"sync/atomic"
"time"
"clawgo/pkg/events"
)
type processSession struct {
@@ -22,10 +25,12 @@ type processSession struct {
EndedAt time.Time
ExitCode *int
cmd *exec.Cmd
cancel context.CancelFunc
done chan struct{}
mu sync.RWMutex
log bytes.Buffer
logPath string
logQueue chan []byte
}
type ProcessManager struct {
@@ -33,22 +38,42 @@ type ProcessManager struct {
sessions map[string]*processSession
seq uint64
metaPath string
persistQ chan struct{}
events *events.TypedBus[ProcessEvent]
}
// ProcessEvent is a typed lifecycle event for process sessions.
type ProcessEvent struct {
Type string `json:"type"`
SessionID string `json:"session_id"`
Command string `json:"command,omitempty"`
At time.Time `json:"at"`
ExitCode *int `json:"exit_code,omitempty"`
}
func NewProcessManager(workspace string) *ProcessManager {
m := &ProcessManager{sessions: map[string]*processSession{}}
m := &ProcessManager{
sessions: map[string]*processSession{},
persistQ: make(chan struct{}, 1),
events: events.NewTypedBus[ProcessEvent](),
}
if workspace != "" {
memDir := filepath.Join(workspace, "memory")
_ = os.MkdirAll(memDir, 0755)
m.metaPath = filepath.Join(memDir, "process-sessions.json")
m.load()
}
go m.persistLoop()
return m
}
func (m *ProcessManager) Start(command, cwd string) (string, error) {
func (m *ProcessManager) Start(parent context.Context, command, cwd string) (string, error) {
id := "p-" + strconv.FormatUint(atomic.AddUint64(&m.seq, 1), 10)
cmd := exec.Command("sh", "-c", command)
if parent == nil {
parent = context.Background()
}
procCtx, cancel := context.WithCancel(parent)
cmd := exec.CommandContext(procCtx, "sh", "-c", command)
if cwd != "" {
cmd.Dir = cwd
}
@@ -60,7 +85,7 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) {
if err != nil {
return "", err
}
s := &processSession{ID: id, Command: command, StartedAt: time.Now().UTC(), cmd: cmd, done: make(chan struct{})}
s := &processSession{ID: id, Command: command, StartedAt: time.Now().UTC(), cmd: cmd, cancel: cancel, done: make(chan struct{}), logQueue: make(chan []byte, 128)}
if m.metaPath != "" {
s.logPath = filepath.Join(filepath.Dir(m.metaPath), "process-"+id+".log")
}
@@ -68,9 +93,12 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) {
m.mu.Lock()
m.sessions[id] = s
m.mu.Unlock()
go m.logWriter(s)
m.persist()
m.events.Publish(ProcessEvent{Type: "start", SessionID: id, Command: command, At: time.Now().UTC()})
if err := cmd.Start(); err != nil {
cancel()
m.mu.Lock()
delete(m.sessions, id)
m.mu.Unlock()
@@ -93,7 +121,11 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) {
s.EndedAt = time.Now().UTC()
s.ExitCode = &code
s.mu.Unlock()
if s.logQueue != nil {
close(s.logQueue)
}
m.persist()
m.events.Publish(ProcessEvent{Type: "exit", SessionID: s.ID, Command: s.Command, At: s.EndedAt, ExitCode: &code})
close(s.done)
}()
@@ -105,17 +137,17 @@ func (m *ProcessManager) capture(s *processSession, r interface{ Read([]byte) (i
for {
n, err := r.Read(buf)
if n > 0 {
chunk := append([]byte(nil), buf[:n]...)
s.mu.Lock()
chunk := buf[:n]
_, _ = s.log.Write(chunk)
if s.logPath != "" {
f, err := os.OpenFile(s.logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err == nil {
_, _ = f.Write(chunk)
_ = f.Close()
s.mu.Unlock()
if s.logQueue != nil {
select {
case s.logQueue <- chunk:
default:
// backpressure: drop to keep process capture non-blocking
}
}
s.mu.Unlock()
}
if err != nil {
return
@@ -193,11 +225,65 @@ func (m *ProcessManager) Kill(id string) error {
if cmd.Process == nil {
return fmt.Errorf("process not started")
}
if s.cancel != nil {
s.cancel()
}
err := cmd.Process.Kill()
m.persist()
m.events.Publish(ProcessEvent{Type: "kill", SessionID: s.ID, Command: s.Command, At: time.Now().UTC()})
return err
}
func (m *ProcessManager) SubscribeEvents(buffer int) <-chan ProcessEvent {
return m.events.Subscribe(buffer)
}
func (m *ProcessManager) logWriter(s *processSession) {
if s == nil || s.logPath == "" || s.logQueue == nil {
return
}
_ = os.MkdirAll(filepath.Dir(s.logPath), 0755)
f, err := os.OpenFile(s.logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
buf := bytes.Buffer{}
flush := func() {
if buf.Len() == 0 {
return
}
_, _ = f.Write(buf.Bytes())
_ = f.Sync()
buf.Reset()
}
for {
select {
case chunk, ok := <-s.logQueue:
if !ok {
flush()
return
}
_, _ = buf.Write(chunk)
if buf.Len() >= 4096 {
flush()
}
case <-ticker.C:
flush()
}
}
}
func (m *ProcessManager) persistLoop() {
for range m.persistQ {
m.persistNow()
}
}
type processSessionMeta struct {
ID string `json:"id"`
Command string `json:"command"`
@@ -209,6 +295,16 @@ type processSessionMeta struct {
}
func (m *ProcessManager) persist() {
if m.metaPath == "" {
return
}
select {
case m.persistQ <- struct{}{}:
default:
}
}
func (m *ProcessManager) persistNow() {
if m.metaPath == "" {
return
}

View File

@@ -86,7 +86,7 @@ func (t *ExecTool) Execute(ctx context.Context, args map[string]interface{}) (st
if t.procManager == nil {
return "", fmt.Errorf("background process manager not configured")
}
sid, err := t.procManager.Start(command, cwd)
sid, err := t.procManager.Start(ctx, command, cwd)
if err != nil {
return "", err
}