p0 dedupe tune: configurable dedupe windows, include buttons in outbound idempotency key, and add regression coverage

This commit is contained in:
DBT
2026-03-01 12:27:38 +00:00
parent 95d9f43034
commit 9f5191eef3
6 changed files with 101 additions and 24 deletions

View File

@@ -29,12 +29,26 @@ type ActionCapable interface {
SupportsAction(action string) bool
}
var (
inboundMessageIDDedupeTTL = 10 * time.Minute
inboundContentDedupeTTL = 12 * time.Second
)
func setInboundDedupeWindows(messageIDTTL, contentTTL time.Duration) {
if messageIDTTL > 0 {
inboundMessageIDDedupeTTL = messageIDTTL
}
if contentTTL > 0 {
inboundContentDedupeTTL = contentTTL
}
}
type BaseChannel struct {
config interface{}
bus *bus.MessageBus
running atomic.Bool
name string
allowList []string
config interface{}
bus *bus.MessageBus
running atomic.Bool
name string
allowList []string
recentMsgMu sync.Mutex
recentMsg map[string]time.Time
}
@@ -93,7 +107,7 @@ func (c *BaseChannel) seenRecently(key string, ttl time.Duration) bool {
c.recentMsgMu.Lock()
defer c.recentMsgMu.Unlock()
for id, ts := range c.recentMsg {
if now.Sub(ts) > 10*time.Minute {
if now.Sub(ts) > inboundMessageIDDedupeTTL {
delete(c.recentMsg, id)
}
}
@@ -125,7 +139,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
if metadata != nil {
if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" {
if c.seenRecently(c.name+":"+messageID, 10*time.Minute) {
if c.seenRecently(c.name+":"+messageID, inboundMessageIDDedupeTTL) {
logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{
logger.FieldChannel: c.name,
"message_id": messageID,
@@ -137,7 +151,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
}
// Fallback dedupe when platform omits/changes message_id (short window, same sender/chat/content).
contentKey := c.name + ":content:" + chatID + ":" + senderID + ":" + messageDigest(content)
if c.seenRecently(contentKey, 12*time.Second) {
if c.seenRecently(contentKey, inboundContentDedupeTTL) {
logger.WarnCF("channels", "Duplicate inbound content skipped", map[string]interface{}{
logger.FieldChannel: c.name,
logger.FieldChatID: chatID,

View File

@@ -77,3 +77,28 @@ func TestBaseChannel_HandleMessage_ContentHashFallbackDedupe(t *testing.T) {
t.Fatalf("expected duplicate inbound to be dropped")
}
}
func TestDispatchOutbound_DifferentButtonsShouldNotDeduplicate(t *testing.T) {
mb := bus.NewMessageBus()
mgr, err := NewManager(&config.Config{}, mb)
if err != nil {
t.Fatalf("new manager: %v", err)
}
rc := &recordingChannel{}
mgr.RegisterChannel("test", rc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go mgr.dispatchOutbound(ctx)
msg1 := bus.OutboundMessage{Channel: "test", ChatID: "c1", Content: "choose", Action: "send", Buttons: [][]bus.Button{{{Text: "A", Data: "a"}}}}
msg2 := bus.OutboundMessage{Channel: "test", ChatID: "c1", Content: "choose", Action: "send", Buttons: [][]bus.Button{{{Text: "B", Data: "b"}}}}
mb.PublishOutbound(msg1)
mb.PublishOutbound(msg2)
time.Sleep(220 * time.Millisecond)
if got := rc.count(); got != 2 {
t.Fatalf("expected 2 sends when buttons differ, got %d", got)
}
}

View File

@@ -8,6 +8,7 @@ package channels
import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"strings"
@@ -23,16 +24,17 @@ import (
)
type Manager struct {
channels map[string]Channel
bus *bus.MessageBus
config *config.Config
dispatchTask *asyncTask
dispatchSem chan struct{}
outboundLimit *rate.Limiter
mu sync.RWMutex
snapshot atomic.Value // map[string]Channel
channels map[string]Channel
bus *bus.MessageBus
config *config.Config
dispatchTask *asyncTask
dispatchSem chan struct{}
outboundLimit *rate.Limiter
mu sync.RWMutex
snapshot atomic.Value // map[string]Channel
outboundSeenMu sync.Mutex
outboundSeen map[string]time.Time
outboundTTL time.Duration
}
type asyncTask struct {
@@ -48,8 +50,18 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error
dispatchSem: make(chan struct{}, 32),
outboundLimit: rate.NewLimiter(rate.Limit(40), 80),
outboundSeen: map[string]time.Time{},
outboundTTL: 12 * time.Second,
}
m.snapshot.Store(map[string]Channel{})
if cfg != nil {
if v := cfg.Channels.OutboundDedupeWindowSeconds; v > 0 {
m.outboundTTL = time.Duration(v) * time.Second
}
setInboundDedupeWindows(
time.Duration(cfg.Channels.InboundMessageIDDedupeTTLSeconds)*time.Second,
time.Duration(cfg.Channels.InboundContentDedupeWindowSeconds)*time.Second,
)
}
if err := m.initChannels(); err != nil {
return nil, err
@@ -286,6 +298,11 @@ func outboundDigest(msg bus.OutboundMessage) string {
_, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Content)))
_, _ = h.Write([]byte("|" + strings.TrimSpace(msg.Media)))
_, _ = h.Write([]byte("|" + strings.TrimSpace(msg.ReplyToID)))
if len(msg.Buttons) > 0 {
if b, err := json.Marshal(msg.Buttons); err == nil {
_, _ = h.Write([]byte("|" + string(b)))
}
}
return fmt.Sprintf("%08x", h.Sum32())
}
@@ -299,7 +316,10 @@ func (m *Manager) shouldSkipOutboundDuplicate(msg bus.OutboundMessage) bool {
}
key := outboundDigest(msg)
now := time.Now()
const ttl = 12 * time.Second
ttl := m.outboundTTL
if ttl <= 0 {
ttl = 12 * time.Second
}
m.outboundSeenMu.Lock()
defer m.outboundSeenMu.Unlock()
for k, ts := range m.outboundSeen {

View File

@@ -127,13 +127,16 @@ type ContextCompactionConfig struct {
}
type ChannelsConfig struct {
WhatsApp WhatsAppConfig `json:"whatsapp"`
Telegram TelegramConfig `json:"telegram"`
Feishu FeishuConfig `json:"feishu"`
Discord DiscordConfig `json:"discord"`
MaixCam MaixCamConfig `json:"maixcam"`
QQ QQConfig `json:"qq"`
DingTalk DingTalkConfig `json:"dingtalk"`
InboundMessageIDDedupeTTLSeconds int `json:"inbound_message_id_dedupe_ttl_seconds" env:"CLAWGO_CHANNELS_INBOUND_MESSAGE_ID_DEDUPE_TTL_SECONDS"`
InboundContentDedupeWindowSeconds int `json:"inbound_content_dedupe_window_seconds" env:"CLAWGO_CHANNELS_INBOUND_CONTENT_DEDUPE_WINDOW_SECONDS"`
OutboundDedupeWindowSeconds int `json:"outbound_dedupe_window_seconds" env:"CLAWGO_CHANNELS_OUTBOUND_DEDUPE_WINDOW_SECONDS"`
WhatsApp WhatsAppConfig `json:"whatsapp"`
Telegram TelegramConfig `json:"telegram"`
Feishu FeishuConfig `json:"feishu"`
Discord DiscordConfig `json:"discord"`
MaixCam MaixCamConfig `json:"maixcam"`
QQ QQConfig `json:"qq"`
DingTalk DingTalkConfig `json:"dingtalk"`
}
type WhatsAppConfig struct {
@@ -386,6 +389,9 @@ func DefaultConfig() *Config {
},
},
Channels: ChannelsConfig{
InboundMessageIDDedupeTTLSeconds: 600,
InboundContentDedupeWindowSeconds: 12,
OutboundDedupeWindowSeconds: 12,
WhatsApp: WhatsAppConfig{
Enabled: false,
BridgeURL: "ws://localhost:3001",

View File

@@ -260,6 +260,15 @@ func Validate(cfg *Config) []error {
errs = append(errs, fmt.Errorf("memory.recent_days must be > 0"))
}
if cfg.Channels.InboundMessageIDDedupeTTLSeconds <= 0 {
errs = append(errs, fmt.Errorf("channels.inbound_message_id_dedupe_ttl_seconds must be > 0"))
}
if cfg.Channels.InboundContentDedupeWindowSeconds <= 0 {
errs = append(errs, fmt.Errorf("channels.inbound_content_dedupe_window_seconds must be > 0"))
}
if cfg.Channels.OutboundDedupeWindowSeconds <= 0 {
errs = append(errs, fmt.Errorf("channels.outbound_dedupe_window_seconds must be > 0"))
}
if cfg.Channels.Telegram.Enabled && cfg.Channels.Telegram.Token == "" {
errs = append(errs, fmt.Errorf("channels.telegram.token is required when channels.telegram.enabled=true"))
}