From 9f5191eef3540ed4f7a2b6cf7b479216dddc64d3 Mon Sep 17 00:00:00 2001 From: DBT Date: Sun, 1 Mar 2026 12:27:38 +0000 Subject: [PATCH] p0 dedupe tune: configurable dedupe windows, include buttons in outbound idempotency key, and add regression coverage --- config.example.json | 3 ++ pkg/channels/base.go | 30 ++++++++++++++------ pkg/channels/dedupe_regression_test.go | 25 +++++++++++++++++ pkg/channels/manager.go | 38 ++++++++++++++++++++------ pkg/config/config.go | 20 +++++++++----- pkg/config/validate.go | 9 ++++++ 6 files changed, 101 insertions(+), 24 deletions(-) diff --git a/config.example.json b/config.example.json index 055519e..07297fc 100644 --- a/config.example.json +++ b/config.example.json @@ -81,6 +81,9 @@ } }, "channels": { + "inbound_message_id_dedupe_ttl_seconds": 600, + "inbound_content_dedupe_window_seconds": 12, + "outbound_dedupe_window_seconds": 12, "telegram": { "enabled": false, "token": "YOUR_TELEGRAM_BOT_TOKEN", diff --git a/pkg/channels/base.go b/pkg/channels/base.go index c988f33..3f28e99 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -29,12 +29,26 @@ type ActionCapable interface { SupportsAction(action string) bool } +var ( + inboundMessageIDDedupeTTL = 10 * time.Minute + inboundContentDedupeTTL = 12 * time.Second +) + +func setInboundDedupeWindows(messageIDTTL, contentTTL time.Duration) { + if messageIDTTL > 0 { + inboundMessageIDDedupeTTL = messageIDTTL + } + if contentTTL > 0 { + inboundContentDedupeTTL = contentTTL + } +} + type BaseChannel struct { - config interface{} - bus *bus.MessageBus - running atomic.Bool - name string - allowList []string + config interface{} + bus *bus.MessageBus + running atomic.Bool + name string + allowList []string recentMsgMu sync.Mutex recentMsg map[string]time.Time } @@ -93,7 +107,7 @@ func (c *BaseChannel) seenRecently(key string, ttl time.Duration) bool { c.recentMsgMu.Lock() defer c.recentMsgMu.Unlock() for id, ts := range c.recentMsg { - if now.Sub(ts) > 10*time.Minute { + if now.Sub(ts) > inboundMessageIDDedupeTTL { delete(c.recentMsg, id) } } @@ -125,7 +139,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st if metadata != nil { if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" { - if c.seenRecently(c.name+":"+messageID, 10*time.Minute) { + if c.seenRecently(c.name+":"+messageID, inboundMessageIDDedupeTTL) { logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{ logger.FieldChannel: c.name, "message_id": messageID, @@ -137,7 +151,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st } // Fallback dedupe when platform omits/changes message_id (short window, same sender/chat/content). contentKey := c.name + ":content:" + chatID + ":" + senderID + ":" + messageDigest(content) - if c.seenRecently(contentKey, 12*time.Second) { + if c.seenRecently(contentKey, inboundContentDedupeTTL) { logger.WarnCF("channels", "Duplicate inbound content skipped", map[string]interface{}{ logger.FieldChannel: c.name, logger.FieldChatID: chatID, diff --git a/pkg/channels/dedupe_regression_test.go b/pkg/channels/dedupe_regression_test.go index 41df162..d15b229 100644 --- a/pkg/channels/dedupe_regression_test.go +++ b/pkg/channels/dedupe_regression_test.go @@ -77,3 +77,28 @@ func TestBaseChannel_HandleMessage_ContentHashFallbackDedupe(t *testing.T) { t.Fatalf("expected duplicate inbound to be dropped") } } + + +func TestDispatchOutbound_DifferentButtonsShouldNotDeduplicate(t *testing.T) { + mb := bus.NewMessageBus() + mgr, err := NewManager(&config.Config{}, mb) + if err != nil { + t.Fatalf("new manager: %v", err) + } + rc := &recordingChannel{} + mgr.RegisterChannel("test", rc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go mgr.dispatchOutbound(ctx) + + msg1 := bus.OutboundMessage{Channel: "test", ChatID: "c1", Content: "choose", Action: "send", Buttons: [][]bus.Button{{{Text: "A", Data: "a"}}}} + msg2 := bus.OutboundMessage{Channel: "test", ChatID: "c1", Content: "choose", Action: "send", Buttons: [][]bus.Button{{{Text: "B", Data: "b"}}}} + mb.PublishOutbound(msg1) + mb.PublishOutbound(msg2) + time.Sleep(220 * time.Millisecond) + + if got := rc.count(); got != 2 { + t.Fatalf("expected 2 sends when buttons differ, got %d", got) + } +} diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 0204433..562b0ab 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -8,6 +8,7 @@ package channels import ( "context" + "encoding/json" "fmt" "hash/fnv" "strings" @@ -23,16 +24,17 @@ import ( ) type Manager struct { - channels map[string]Channel - bus *bus.MessageBus - config *config.Config - dispatchTask *asyncTask - dispatchSem chan struct{} - outboundLimit *rate.Limiter - mu sync.RWMutex - snapshot atomic.Value // map[string]Channel + channels map[string]Channel + bus *bus.MessageBus + config *config.Config + dispatchTask *asyncTask + dispatchSem chan struct{} + outboundLimit *rate.Limiter + mu sync.RWMutex + snapshot atomic.Value // map[string]Channel outboundSeenMu sync.Mutex outboundSeen map[string]time.Time + outboundTTL time.Duration } type asyncTask struct { @@ -48,8 +50,18 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error dispatchSem: make(chan struct{}, 32), outboundLimit: rate.NewLimiter(rate.Limit(40), 80), outboundSeen: map[string]time.Time{}, + outboundTTL: 12 * time.Second, } m.snapshot.Store(map[string]Channel{}) + if cfg != nil { + if v := cfg.Channels.OutboundDedupeWindowSeconds; v > 0 { + m.outboundTTL = time.Duration(v) * time.Second + } + setInboundDedupeWindows( + time.Duration(cfg.Channels.InboundMessageIDDedupeTTLSeconds)*time.Second, + time.Duration(cfg.Channels.InboundContentDedupeWindowSeconds)*time.Second, + ) + } if err := m.initChannels(); err != nil { return nil, err @@ -286,6 +298,11 @@ func outboundDigest(msg bus.OutboundMessage) string { _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Content))) _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Media))) _, _ = h.Write([]byte("|" + strings.TrimSpace(msg.ReplyToID))) + if len(msg.Buttons) > 0 { + if b, err := json.Marshal(msg.Buttons); err == nil { + _, _ = h.Write([]byte("|" + string(b))) + } + } return fmt.Sprintf("%08x", h.Sum32()) } @@ -299,7 +316,10 @@ func (m *Manager) shouldSkipOutboundDuplicate(msg bus.OutboundMessage) bool { } key := outboundDigest(msg) now := time.Now() - const ttl = 12 * time.Second + ttl := m.outboundTTL + if ttl <= 0 { + ttl = 12 * time.Second + } m.outboundSeenMu.Lock() defer m.outboundSeenMu.Unlock() for k, ts := range m.outboundSeen { diff --git a/pkg/config/config.go b/pkg/config/config.go index 49e4f69..9de0b90 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -127,13 +127,16 @@ type ContextCompactionConfig struct { } type ChannelsConfig struct { - WhatsApp WhatsAppConfig `json:"whatsapp"` - Telegram TelegramConfig `json:"telegram"` - Feishu FeishuConfig `json:"feishu"` - Discord DiscordConfig `json:"discord"` - MaixCam MaixCamConfig `json:"maixcam"` - QQ QQConfig `json:"qq"` - DingTalk DingTalkConfig `json:"dingtalk"` + InboundMessageIDDedupeTTLSeconds int `json:"inbound_message_id_dedupe_ttl_seconds" env:"CLAWGO_CHANNELS_INBOUND_MESSAGE_ID_DEDUPE_TTL_SECONDS"` + InboundContentDedupeWindowSeconds int `json:"inbound_content_dedupe_window_seconds" env:"CLAWGO_CHANNELS_INBOUND_CONTENT_DEDUPE_WINDOW_SECONDS"` + OutboundDedupeWindowSeconds int `json:"outbound_dedupe_window_seconds" env:"CLAWGO_CHANNELS_OUTBOUND_DEDUPE_WINDOW_SECONDS"` + WhatsApp WhatsAppConfig `json:"whatsapp"` + Telegram TelegramConfig `json:"telegram"` + Feishu FeishuConfig `json:"feishu"` + Discord DiscordConfig `json:"discord"` + MaixCam MaixCamConfig `json:"maixcam"` + QQ QQConfig `json:"qq"` + DingTalk DingTalkConfig `json:"dingtalk"` } type WhatsAppConfig struct { @@ -386,6 +389,9 @@ func DefaultConfig() *Config { }, }, Channels: ChannelsConfig{ + InboundMessageIDDedupeTTLSeconds: 600, + InboundContentDedupeWindowSeconds: 12, + OutboundDedupeWindowSeconds: 12, WhatsApp: WhatsAppConfig{ Enabled: false, BridgeURL: "ws://localhost:3001", diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 1108b64..73f15ff 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -260,6 +260,15 @@ func Validate(cfg *Config) []error { errs = append(errs, fmt.Errorf("memory.recent_days must be > 0")) } + if cfg.Channels.InboundMessageIDDedupeTTLSeconds <= 0 { + errs = append(errs, fmt.Errorf("channels.inbound_message_id_dedupe_ttl_seconds must be > 0")) + } + if cfg.Channels.InboundContentDedupeWindowSeconds <= 0 { + errs = append(errs, fmt.Errorf("channels.inbound_content_dedupe_window_seconds must be > 0")) + } + if cfg.Channels.OutboundDedupeWindowSeconds <= 0 { + errs = append(errs, fmt.Errorf("channels.outbound_dedupe_window_seconds must be > 0")) + } if cfg.Channels.Telegram.Enabled && cfg.Channels.Telegram.Token == "" { errs = append(errs, fmt.Errorf("channels.telegram.token is required when channels.telegram.enabled=true")) }