From 8f75bb9ed8ea909ab8f6af25aab7162eb1b4a997 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 09:34:13 +0000 Subject: [PATCH] optimize channel orchestration with errgroup and rate limiter --- README.md | 2 + README_EN.md | 2 + go.mod | 3 +- pkg/channels/manager.go | 98 +++++++++++++++++++++++++---------------- 4 files changed, 67 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index bc1c641..4fb59c6 100644 --- a/README.md +++ b/README.md @@ -21,11 +21,13 @@ 1. **Actor 化关键路径(process)** - process 元数据持久化改为异步队列(`persistQ`)串行落盘。 + - channel 启停编排使用 `errgroup.WithContext` 并发+统一取消。 2. **Typed Events 事件总线** - 新增 `pkg/events/typed_bus.go` 泛型事件总线。 - process 生命周期事件(start/exit/kill)可发布订阅。 3. **日志批量刷盘** - process 日志由 `logWriter` 批量 flush(时间片 + 大小阈值),减少高频 I/O。 + - outbound 分发增加 `rate.Limiter`(令牌桶)平滑突发流量。 4. **Context 分层取消传播** - 后台进程改为 `exec.CommandContext`,通过父 `ctx` 统一取消。 5. **原子配置快照** diff --git a/README_EN.md b/README_EN.md index 996b98c..5a612b6 100644 --- a/README_EN.md +++ b/README_EN.md @@ -21,11 +21,13 @@ A recent architecture pass leveraged core Go strengths: 1. **Actor-style process path** - Process metadata persistence is serialized via async queue (`persistQ`). + - Channel start/stop orchestration uses `errgroup.WithContext` for concurrent + unified cancellation. 2. **Typed Events bus** - Added generic typed pub/sub bus (`pkg/events/typed_bus.go`). - Process lifecycle events (start/exit/kill) are now publishable. 3. **Batched log flushing** - Process logs are flushed by `logWriter` with time/size thresholds to reduce I/O churn. + - Outbound dispatch adds a token-bucket `rate.Limiter` for burst smoothing. 4. **Context hierarchy + cancellation propagation** - Background exec now uses `exec.CommandContext` with parent `ctx` propagation. 5. **Atomic runtime config snapshot** diff --git a/go.mod b/go.mod index 8a5b0e0..fed9881 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,8 @@ require ( github.com/open-dingtalk/dingtalk-stream-sdk-go v0.9.1 github.com/tencent-connect/botgo v0.2.1 golang.org/x/oauth2 v0.35.0 + golang.org/x/sync v0.19.0 + golang.org/x/time v0.12.0 ) require ( @@ -36,6 +38,5 @@ require ( golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.50.0 // indirect - golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect ) diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 8c9310e..ee29c0e 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -15,15 +15,18 @@ import ( "clawgo/pkg/bus" "clawgo/pkg/config" "clawgo/pkg/logger" + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) type Manager struct { - channels map[string]Channel - bus *bus.MessageBus - config *config.Config - dispatchTask *asyncTask - dispatchSem chan struct{} - mu sync.RWMutex + channels map[string]Channel + bus *bus.MessageBus + config *config.Config + dispatchTask *asyncTask + dispatchSem chan struct{} + outboundLimit *rate.Limiter + mu sync.RWMutex } type asyncTask struct { @@ -36,7 +39,8 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error bus: messageBus, config: cfg, // Limit concurrent outbound sends to avoid unbounded goroutine growth. - dispatchSem: make(chan struct{}, 32), + dispatchSem: make(chan struct{}, 32), + outboundLimit: rate.NewLimiter(rate.Limit(40), 80), } if err := m.initChannels(); err != nil { @@ -161,59 +165,73 @@ func (m *Manager) initChannels() error { func (m *Manager) StartAll(ctx context.Context) error { m.mu.Lock() - defer m.mu.Unlock() - if len(m.channels) == 0 { + m.mu.Unlock() logger.WarnC("channels", "No channels enabled") return nil } - - logger.InfoC("channels", "Starting all channels") - + channelsSnapshot := make(map[string]Channel, len(m.channels)) + for k, v := range m.channels { + channelsSnapshot[k] = v + } dispatchCtx, cancel := context.WithCancel(ctx) m.dispatchTask = &asyncTask{cancel: cancel} + m.mu.Unlock() + logger.InfoC("channels", "Starting all channels") go m.dispatchOutbound(dispatchCtx) - for name, channel := range m.channels { - logger.InfoCF("channels", "Starting channel", map[string]interface{}{ - logger.FieldChannel: name, + g, gctx := errgroup.WithContext(ctx) + for name, channel := range channelsSnapshot { + name := name + channel := channel + g.Go(func() error { + logger.InfoCF("channels", "Starting channel", map[string]interface{}{logger.FieldChannel: name}) + if err := channel.Start(gctx); err != nil { + logger.ErrorCF("channels", "Failed to start channel", map[string]interface{}{logger.FieldChannel: name, logger.FieldError: err.Error()}) + return fmt.Errorf("%s: %w", name, err) + } + return nil }) - if err := channel.Start(ctx); err != nil { - logger.ErrorCF("channels", "Failed to start channel", map[string]interface{}{ - logger.FieldChannel: name, - logger.FieldError: err.Error(), - }) - } } - + if err := g.Wait(); err != nil { + return err + } logger.InfoC("channels", "All channels started") return nil } func (m *Manager) StopAll(ctx context.Context) error { m.mu.Lock() - defer m.mu.Unlock() + channelsSnapshot := make(map[string]Channel, len(m.channels)) + for k, v := range m.channels { + channelsSnapshot[k] = v + } + task := m.dispatchTask + m.dispatchTask = nil + m.mu.Unlock() logger.InfoC("channels", "Stopping all channels") - - if m.dispatchTask != nil { - m.dispatchTask.cancel() - m.dispatchTask = nil + if task != nil { + task.cancel() } - for name, channel := range m.channels { - logger.InfoCF("channels", "Stopping channel", map[string]interface{}{ - logger.FieldChannel: name, + g, gctx := errgroup.WithContext(ctx) + for name, channel := range channelsSnapshot { + name := name + channel := channel + g.Go(func() error { + logger.InfoCF("channels", "Stopping channel", map[string]interface{}{logger.FieldChannel: name}) + if err := channel.Stop(gctx); err != nil { + logger.ErrorCF("channels", "Error stopping channel", map[string]interface{}{logger.FieldChannel: name, logger.FieldError: err.Error()}) + return fmt.Errorf("%s: %w", name, err) + } + return nil }) - if err := channel.Stop(ctx); err != nil { - logger.ErrorCF("channels", "Error stopping channel", map[string]interface{}{ - logger.FieldChannel: name, - logger.FieldError: err.Error(), - }) - } } - + if err := g.Wait(); err != nil { + return err + } logger.InfoC("channels", "All channels stopped") return nil } @@ -283,6 +301,12 @@ func (m *Manager) dispatchOutbound(ctx context.Context) { } } + if m.outboundLimit != nil { + if err := m.outboundLimit.Wait(ctx); err != nil { + logger.WarnCF("channels", "Outbound rate limiter canceled", map[string]interface{}{logger.FieldError: err.Error()}) + continue + } + } // Bound fan-out concurrency to prevent goroutine explosion under burst traffic. m.dispatchSem <- struct{}{} go func(c Channel, outbound bus.OutboundMessage) {