mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-20 20:07:28 +08:00
inbound dedupe + ekg stratification: channel-level message_id dedupe and source/channel-separated ekg stats
This commit is contained in:
@@ -5,7 +5,9 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"clawgo/pkg/bus"
|
||||
"clawgo/pkg/logger"
|
||||
@@ -32,6 +34,8 @@ type BaseChannel struct {
|
||||
running atomic.Bool
|
||||
name string
|
||||
allowList []string
|
||||
recentMsgMu sync.Mutex
|
||||
recentMsg map[string]time.Time
|
||||
}
|
||||
|
||||
func NewBaseChannel(name string, config interface{}, bus *bus.MessageBus, allowList []string) *BaseChannel {
|
||||
@@ -40,6 +44,7 @@ func NewBaseChannel(name string, config interface{}, bus *bus.MessageBus, allowL
|
||||
bus: bus,
|
||||
name: name,
|
||||
allowList: allowList,
|
||||
recentMsg: map[string]time.Time{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +83,29 @@ func (c *BaseChannel) IsAllowed(senderID string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *BaseChannel) isDuplicateInboundMessage(messageID string) bool {
|
||||
messageID = strings.TrimSpace(messageID)
|
||||
if messageID == "" {
|
||||
return false
|
||||
}
|
||||
now := time.Now()
|
||||
const ttl = 10 * time.Minute
|
||||
c.recentMsgMu.Lock()
|
||||
defer c.recentMsgMu.Unlock()
|
||||
for id, ts := range c.recentMsg {
|
||||
if now.Sub(ts) > ttl {
|
||||
delete(c.recentMsg, id)
|
||||
}
|
||||
}
|
||||
if ts, ok := c.recentMsg[messageID]; ok {
|
||||
if now.Sub(ts) <= ttl {
|
||||
return true
|
||||
}
|
||||
}
|
||||
c.recentMsg[messageID] = now
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) {
|
||||
if !c.IsAllowed(senderID) {
|
||||
logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{
|
||||
@@ -88,6 +116,19 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
|
||||
return
|
||||
}
|
||||
|
||||
if metadata != nil {
|
||||
if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" {
|
||||
if c.isDuplicateInboundMessage(c.name + ":" + messageID) {
|
||||
logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{
|
||||
logger.FieldChannel: c.name,
|
||||
"message_id": messageID,
|
||||
logger.FieldChatID: chatID,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build session key: channel:chatID
|
||||
sessionKey := fmt.Sprintf("%s:%s", c.name, chatID)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user