This commit is contained in:
lpf
2026-02-13 13:50:09 +08:00
parent f88a78ef8b
commit 085c265319
15 changed files with 1485 additions and 179 deletions

View File

@@ -21,6 +21,7 @@ type Manager struct {
bus *bus.MessageBus
config *config.Config
dispatchTask *asyncTask
dispatchSem chan struct{}
mu sync.RWMutex
}
@@ -33,6 +34,8 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error
channels: make(map[string]Channel),
bus: messageBus,
config: cfg,
// Limit concurrent outbound sends to avoid unbounded goroutine growth.
dispatchSem: make(chan struct{}, 32),
}
if err := m.initChannels(); err != nil {
@@ -239,11 +242,13 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
continue
}
// 使用 goroutine 实现并发消息分发,避免单个通道延迟阻塞全局 dispatcher
go func(c Channel, m bus.OutboundMessage) {
if err := c.Send(ctx, m); err != nil {
// Bound fan-out concurrency to prevent goroutine explosion under burst traffic.
m.dispatchSem <- struct{}{}
go func(c Channel, outbound bus.OutboundMessage) {
defer func() { <-m.dispatchSem }()
if err := c.Send(ctx, outbound); err != nil {
logger.ErrorCF("channels", "Error sending message to channel", map[string]interface{}{
"channel": m.Channel,
"channel": outbound.Channel,
"error": err.Error(),
})
}