From 4105eeb0db2ef7567d09a828caf33c9029d9c0a3 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 09:25:12 +0000 Subject: [PATCH] apply go-centric architecture optimizations and document them in readme --- README.md | 18 ++++++ README_EN.md | 18 ++++++ cmd/clawgo/cmd_gateway.go | 4 ++ pkg/events/typed_bus.go | 36 +++++++++++ pkg/runtimecfg/snapshot.go | 25 ++++++++ pkg/tools/process_manager.go | 118 +++++++++++++++++++++++++++++++---- pkg/tools/shell.go | 2 +- 7 files changed, 209 insertions(+), 12 deletions(-) create mode 100644 pkg/events/typed_bus.go create mode 100644 pkg/runtimecfg/snapshot.go diff --git a/README.md b/README.md index 9de45d0..bc1c641 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,24 @@ - **稳定性保障**:Sentinel 巡检与自动修复能力。 - **技能扩展**:支持内置技能与 GitHub 技能安装,支持原子脚本执行。 +## 🧠 架构级优化(Go 特性) + +近期已完成一轮架构增强,重点利用 Go 并发与类型系统能力: + +1. **Actor 化关键路径(process)** + - process 元数据持久化改为异步队列(`persistQ`)串行落盘。 +2. **Typed Events 事件总线** + - 新增 `pkg/events/typed_bus.go` 泛型事件总线。 + - process 生命周期事件(start/exit/kill)可发布订阅。 +3. **日志批量刷盘** + - process 日志由 `logWriter` 批量 flush(时间片 + 大小阈值),减少高频 I/O。 +4. **Context 分层取消传播** + - 后台进程改为 `exec.CommandContext`,通过父 `ctx` 统一取消。 +5. **原子配置快照** + - 新增 `pkg/runtimecfg/snapshot.go`,网关启动与热重载时原子替换配置快照。 + +这些优化提升了高并发场景下的稳定性、可观测性与可维护性。 + ## 🏁 快速开始 1. 初始化配置与工作区 diff --git a/README_EN.md b/README_EN.md index b7bacbe..996b98c 100644 --- a/README_EN.md +++ b/README_EN.md @@ -15,6 +15,24 @@ - **Stability controls**: Sentinel inspection and auto-heal support. - **Skill extensibility**: built-in skills plus GitHub skill installation and atomic script execution. +## 🧠 Architecture-Level Optimizations (Go) + +A recent architecture pass leveraged core Go strengths: + +1. **Actor-style process path** + - Process metadata persistence is serialized via async queue (`persistQ`). +2. **Typed Events bus** + - Added generic typed pub/sub bus (`pkg/events/typed_bus.go`). + - Process lifecycle events (start/exit/kill) are now publishable. +3. **Batched log flushing** + - Process logs are flushed by `logWriter` with time/size thresholds to reduce I/O churn. +4. **Context hierarchy + cancellation propagation** + - Background exec now uses `exec.CommandContext` with parent `ctx` propagation. +5. **Atomic runtime config snapshot** + - Added `pkg/runtimecfg/snapshot.go`; gateway startup/reload atomically swaps config snapshot. + +These changes improve stability, observability, and maintainability under concurrency. + ## 🏁 Quick Start 1. Initialize config and workspace diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 2badeb0..99333ae 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -24,6 +24,7 @@ import ( "clawgo/pkg/heartbeat" "clawgo/pkg/logger" "clawgo/pkg/providers" + "clawgo/pkg/runtimecfg" "clawgo/pkg/sentinel" ) @@ -62,6 +63,7 @@ func gatewayCmd() { fmt.Printf("Error loading config: %v\n", err) os.Exit(1) } + runtimecfg.Set(cfg) if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") { applyMaximumPermissionPolicy(cfg) } @@ -238,6 +240,7 @@ func gatewayCmd() { sentinelService.Start() } cfg = newCfg + runtimecfg.Set(cfg) if len(templateChanges) > 0 { fmt.Printf("↻ Dialog template changes: %s\n", strings.Join(templateChanges, ", ")) } @@ -260,6 +263,7 @@ func gatewayCmd() { channelManager = newChannelManager agentLoop = newAgentLoop cfg = newCfg + runtimecfg.Set(cfg) sentinelService.Stop() sentinelService = sentinel.NewService( getConfigPath(), diff --git a/pkg/events/typed_bus.go b/pkg/events/typed_bus.go new file mode 100644 index 0000000..4df924c --- /dev/null +++ b/pkg/events/typed_bus.go @@ -0,0 +1,36 @@ +package events + +import "sync" + +// TypedBus is a lightweight generic pub/sub bus for internal architecture events. +type TypedBus[T any] struct { + mu sync.RWMutex + subs []chan T +} + +func NewTypedBus[T any]() *TypedBus[T] { + return &TypedBus[T]{subs: make([]chan T, 0)} +} + +func (b *TypedBus[T]) Subscribe(buffer int) <-chan T { + if buffer <= 0 { + buffer = 8 + } + ch := make(chan T, buffer) + b.mu.Lock() + b.subs = append(b.subs, ch) + b.mu.Unlock() + return ch +} + +func (b *TypedBus[T]) Publish(v T) { + b.mu.RLock() + defer b.mu.RUnlock() + for _, ch := range b.subs { + select { + case ch <- v: + default: + // drop on backpressure to keep publisher non-blocking + } + } +} diff --git a/pkg/runtimecfg/snapshot.go b/pkg/runtimecfg/snapshot.go new file mode 100644 index 0000000..12db9a6 --- /dev/null +++ b/pkg/runtimecfg/snapshot.go @@ -0,0 +1,25 @@ +package runtimecfg + +import ( + "sync/atomic" + + "clawgo/pkg/config" +) + +var current atomic.Value // *config.Config + +func Set(cfg *config.Config) { + if cfg == nil { + return + } + current.Store(cfg) +} + +func Get() *config.Config { + v := current.Load() + if v == nil { + return nil + } + cfg, _ := v.(*config.Config) + return cfg +} diff --git a/pkg/tools/process_manager.go b/pkg/tools/process_manager.go index 8b4114f..304dab1 100644 --- a/pkg/tools/process_manager.go +++ b/pkg/tools/process_manager.go @@ -2,6 +2,7 @@ package tools import ( "bytes" + "context" "encoding/json" "fmt" "os" @@ -13,6 +14,8 @@ import ( "sync" "sync/atomic" "time" + + "clawgo/pkg/events" ) type processSession struct { @@ -22,10 +25,12 @@ type processSession struct { EndedAt time.Time ExitCode *int cmd *exec.Cmd + cancel context.CancelFunc done chan struct{} mu sync.RWMutex log bytes.Buffer logPath string + logQueue chan []byte } type ProcessManager struct { @@ -33,22 +38,42 @@ type ProcessManager struct { sessions map[string]*processSession seq uint64 metaPath string + persistQ chan struct{} + events *events.TypedBus[ProcessEvent] +} + +// ProcessEvent is a typed lifecycle event for process sessions. +type ProcessEvent struct { + Type string `json:"type"` + SessionID string `json:"session_id"` + Command string `json:"command,omitempty"` + At time.Time `json:"at"` + ExitCode *int `json:"exit_code,omitempty"` } func NewProcessManager(workspace string) *ProcessManager { - m := &ProcessManager{sessions: map[string]*processSession{}} + m := &ProcessManager{ + sessions: map[string]*processSession{}, + persistQ: make(chan struct{}, 1), + events: events.NewTypedBus[ProcessEvent](), + } if workspace != "" { memDir := filepath.Join(workspace, "memory") _ = os.MkdirAll(memDir, 0755) m.metaPath = filepath.Join(memDir, "process-sessions.json") m.load() } + go m.persistLoop() return m } -func (m *ProcessManager) Start(command, cwd string) (string, error) { +func (m *ProcessManager) Start(parent context.Context, command, cwd string) (string, error) { id := "p-" + strconv.FormatUint(atomic.AddUint64(&m.seq, 1), 10) - cmd := exec.Command("sh", "-c", command) + if parent == nil { + parent = context.Background() + } + procCtx, cancel := context.WithCancel(parent) + cmd := exec.CommandContext(procCtx, "sh", "-c", command) if cwd != "" { cmd.Dir = cwd } @@ -60,7 +85,7 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) { if err != nil { return "", err } - s := &processSession{ID: id, Command: command, StartedAt: time.Now().UTC(), cmd: cmd, done: make(chan struct{})} + s := &processSession{ID: id, Command: command, StartedAt: time.Now().UTC(), cmd: cmd, cancel: cancel, done: make(chan struct{}), logQueue: make(chan []byte, 128)} if m.metaPath != "" { s.logPath = filepath.Join(filepath.Dir(m.metaPath), "process-"+id+".log") } @@ -68,9 +93,12 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) { m.mu.Lock() m.sessions[id] = s m.mu.Unlock() + go m.logWriter(s) m.persist() + m.events.Publish(ProcessEvent{Type: "start", SessionID: id, Command: command, At: time.Now().UTC()}) if err := cmd.Start(); err != nil { + cancel() m.mu.Lock() delete(m.sessions, id) m.mu.Unlock() @@ -93,7 +121,11 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) { s.EndedAt = time.Now().UTC() s.ExitCode = &code s.mu.Unlock() + if s.logQueue != nil { + close(s.logQueue) + } m.persist() + m.events.Publish(ProcessEvent{Type: "exit", SessionID: s.ID, Command: s.Command, At: s.EndedAt, ExitCode: &code}) close(s.done) }() @@ -105,17 +137,17 @@ func (m *ProcessManager) capture(s *processSession, r interface{ Read([]byte) (i for { n, err := r.Read(buf) if n > 0 { + chunk := append([]byte(nil), buf[:n]...) s.mu.Lock() - chunk := buf[:n] _, _ = s.log.Write(chunk) - if s.logPath != "" { - f, err := os.OpenFile(s.logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) - if err == nil { - _, _ = f.Write(chunk) - _ = f.Close() + s.mu.Unlock() + if s.logQueue != nil { + select { + case s.logQueue <- chunk: + default: + // backpressure: drop to keep process capture non-blocking } } - s.mu.Unlock() } if err != nil { return @@ -193,11 +225,65 @@ func (m *ProcessManager) Kill(id string) error { if cmd.Process == nil { return fmt.Errorf("process not started") } + if s.cancel != nil { + s.cancel() + } err := cmd.Process.Kill() m.persist() + m.events.Publish(ProcessEvent{Type: "kill", SessionID: s.ID, Command: s.Command, At: time.Now().UTC()}) return err } +func (m *ProcessManager) SubscribeEvents(buffer int) <-chan ProcessEvent { + return m.events.Subscribe(buffer) +} + +func (m *ProcessManager) logWriter(s *processSession) { + if s == nil || s.logPath == "" || s.logQueue == nil { + return + } + _ = os.MkdirAll(filepath.Dir(s.logPath), 0755) + f, err := os.OpenFile(s.logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return + } + defer f.Close() + + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + buf := bytes.Buffer{} + flush := func() { + if buf.Len() == 0 { + return + } + _, _ = f.Write(buf.Bytes()) + _ = f.Sync() + buf.Reset() + } + + for { + select { + case chunk, ok := <-s.logQueue: + if !ok { + flush() + return + } + _, _ = buf.Write(chunk) + if buf.Len() >= 4096 { + flush() + } + case <-ticker.C: + flush() + } + } +} + +func (m *ProcessManager) persistLoop() { + for range m.persistQ { + m.persistNow() + } +} + type processSessionMeta struct { ID string `json:"id"` Command string `json:"command"` @@ -209,6 +295,16 @@ type processSessionMeta struct { } func (m *ProcessManager) persist() { + if m.metaPath == "" { + return + } + select { + case m.persistQ <- struct{}{}: + default: + } +} + +func (m *ProcessManager) persistNow() { if m.metaPath == "" { return } diff --git a/pkg/tools/shell.go b/pkg/tools/shell.go index 706b8b1..41bb3e6 100644 --- a/pkg/tools/shell.go +++ b/pkg/tools/shell.go @@ -86,7 +86,7 @@ func (t *ExecTool) Execute(ctx context.Context, args map[string]interface{}) (st if t.procManager == nil { return "", fmt.Errorf("background process manager not configured") } - sid, err := t.procManager.Start(command, cwd) + sid, err := t.procManager.Start(ctx, command, cwd) if err != nil { return "", err }