From 95d9f43034900ffff0dd566b4e4c84821d5e9e0b Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 11:38:09 +0000 Subject: [PATCH] p0 hardening: outbound idempotency guard + regression tests + cached ekg/task-queue aggregation refinements --- pkg/channels/dedupe_regression_test.go | 79 ++++++++++++++++++++++++++ pkg/channels/manager.go | 48 ++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 pkg/channels/dedupe_regression_test.go diff --git a/pkg/channels/dedupe_regression_test.go b/pkg/channels/dedupe_regression_test.go new file mode 100644 index 0000000..41df162 --- /dev/null +++ b/pkg/channels/dedupe_regression_test.go @@ -0,0 +1,79 @@ +package channels + +import ( + "context" + "sync" + "testing" + "time" + + "clawgo/pkg/bus" + "clawgo/pkg/config" +) + +type recordingChannel struct { + mu sync.Mutex + sent []bus.OutboundMessage +} + +func (r *recordingChannel) Name() string { return "test" } +func (r *recordingChannel) Start(ctx context.Context) error { return nil } +func (r *recordingChannel) Stop(ctx context.Context) error { return nil } +func (r *recordingChannel) IsRunning() bool { return true } +func (r *recordingChannel) IsAllowed(senderID string) bool { return true } +func (r *recordingChannel) HealthCheck(ctx context.Context) error { return nil } +func (r *recordingChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + r.mu.Lock() + defer r.mu.Unlock() + r.sent = append(r.sent, msg) + return nil +} +func (r *recordingChannel) count() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.sent) +} + +func TestDispatchOutbound_DeduplicatesRepeatedSend(t *testing.T) { + mb := bus.NewMessageBus() + mgr, err := NewManager(&config.Config{}, mb) + if err != nil { + t.Fatalf("new manager: %v", err) + } + rc := &recordingChannel{} + mgr.RegisterChannel("test", rc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go mgr.dispatchOutbound(ctx) + + msg := bus.OutboundMessage{Channel: "test", ChatID: "c1", Content: "hello", Action: "send"} + mb.PublishOutbound(msg) + mb.PublishOutbound(msg) + mb.PublishOutbound(msg) + time.Sleep(200 * time.Millisecond) + + if got := rc.count(); got != 1 { + t.Fatalf("expected 1 send after dedupe, got %d", got) + } +} + +func TestBaseChannel_HandleMessage_ContentHashFallbackDedupe(t *testing.T) { + mb := bus.NewMessageBus() + bc := NewBaseChannel("test", nil, mb, nil) + meta1 := map[string]string{} + meta2 := map[string]string{} + + bc.HandleMessage("u1", "c1", "same content", nil, meta1) + bc.HandleMessage("u1", "c1", "same content", nil, meta2) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + if _, ok := mb.ConsumeInbound(ctx); !ok { + t.Fatalf("expected first inbound message") + } + ctx2, cancel2 := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel2() + if _, ok := mb.ConsumeInbound(ctx2); ok { + t.Fatalf("expected duplicate inbound to be dropped") + } +} diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 2f2925f..0204433 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -9,9 +9,11 @@ package channels import ( "context" "fmt" + "hash/fnv" "strings" "sync" "sync/atomic" + "time" "clawgo/pkg/bus" "clawgo/pkg/config" @@ -29,6 +31,8 @@ type Manager struct { outboundLimit *rate.Limiter mu sync.RWMutex snapshot atomic.Value // map[string]Channel + outboundSeenMu sync.Mutex + outboundSeen map[string]time.Time } type asyncTask struct { @@ -43,6 +47,7 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error // Limit concurrent outbound sends to avoid unbounded goroutine growth. dispatchSem: make(chan struct{}, 32), outboundLimit: rate.NewLimiter(rate.Limit(40), 80), + outboundSeen: map[string]time.Time{}, } m.snapshot.Store(map[string]Channel{}) @@ -273,6 +278,42 @@ func (m *Manager) RestartChannel(ctx context.Context, name string) error { return channel.Start(ctx) } +func outboundDigest(msg bus.OutboundMessage) string { + h := fnv.New32a() + _, _ = h.Write([]byte(strings.ToLower(strings.TrimSpace(msg.Channel)))) + _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.ChatID))) + _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Action))) + _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Content))) + _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Media))) + _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.ReplyToID))) + return fmt.Sprintf("%08x", h.Sum32()) +} + +func (m *Manager) shouldSkipOutboundDuplicate(msg bus.OutboundMessage) bool { + action := strings.ToLower(strings.TrimSpace(msg.Action)) + if action == "" { + action = "send" + } + if action != "send" { + return false + } + key := outboundDigest(msg) + now := time.Now() + const ttl = 12 * time.Second + m.outboundSeenMu.Lock() + defer m.outboundSeenMu.Unlock() + for k, ts := range m.outboundSeen { + if now.Sub(ts) > 20*time.Second { + delete(m.outboundSeen, k) + } + } + if ts, ok := m.outboundSeen[key]; ok && now.Sub(ts) <= ttl { + return true + } + m.outboundSeen[key] = now + return false +} + func (m *Manager) dispatchOutbound(ctx context.Context) { logger.InfoC("channels", "Outbound dispatcher started") @@ -287,6 +328,13 @@ func (m *Manager) dispatchOutbound(ctx context.Context) { logger.InfoC("channels", "Outbound dispatcher stopped (bus closed)") return } + if m.shouldSkipOutboundDuplicate(msg) { + logger.WarnCF("channels", "Duplicate outbound message skipped", map[string]interface{}{ + logger.FieldChannel: msg.Channel, + logger.FieldChatID: msg.ChatID, + }) + continue + } cur, _ := m.snapshot.Load().(map[string]Channel) channel, exists := cur[msg.Channel]