This commit is contained in:
lpf
2026-02-13 14:27:49 +08:00
parent 1deb9e6888
commit 308d69271a
3 changed files with 40 additions and 6 deletions

View File

@@ -1,8 +1,10 @@
package bus package bus
import ( import (
"clawgo/pkg/logger"
"context" "context"
"sync" "sync"
"time"
) )
type MessageBus struct { type MessageBus struct {
@@ -12,6 +14,8 @@ type MessageBus struct {
mu sync.RWMutex mu sync.RWMutex
} }
const queueWriteTimeout = 2 * time.Second
func NewMessageBus() *MessageBus { func NewMessageBus() *MessageBus {
return &MessageBus{ return &MessageBus{
inbound: make(chan InboundMessage, 100), inbound: make(chan InboundMessage, 100),
@@ -21,7 +25,15 @@ func NewMessageBus() *MessageBus {
} }
func (mb *MessageBus) PublishInbound(msg InboundMessage) { func (mb *MessageBus) PublishInbound(msg InboundMessage) {
mb.inbound <- msg select {
case mb.inbound <- msg:
case <-time.After(queueWriteTimeout):
logger.ErrorCF("bus", "PublishInbound timeout (queue full)", map[string]interface{}{
"channel": msg.Channel,
"chat_id": msg.ChatID,
"session_key": msg.SessionKey,
})
}
} }
func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool) { func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool) {
@@ -34,7 +46,14 @@ func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool)
} }
func (mb *MessageBus) PublishOutbound(msg OutboundMessage) { func (mb *MessageBus) PublishOutbound(msg OutboundMessage) {
mb.outbound <- msg select {
case mb.outbound <- msg:
case <-time.After(queueWriteTimeout):
logger.ErrorCF("bus", "PublishOutbound timeout (queue full)", map[string]interface{}{
"channel": msg.Channel,
"chat_id": msg.ChatID,
})
}
} }
func (mb *MessageBus) SubscribeOutbound(ctx context.Context) (OutboundMessage, bool) { func (mb *MessageBus) SubscribeOutbound(ctx context.Context) (OutboundMessage, bool) {

View File

@@ -3,8 +3,10 @@ package channels
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"clawgo/pkg/bus" "clawgo/pkg/bus"
"clawgo/pkg/logger"
) )
type Channel interface { type Channel interface {
@@ -47,8 +49,14 @@ func (c *BaseChannel) IsAllowed(senderID string) bool {
return true return true
} }
// Normalize sender id for channels that include display suffix, e.g. "12345|alice".
rawSenderID := senderID
if idx := strings.Index(senderID, "|"); idx > 0 {
rawSenderID = senderID[:idx]
}
for _, allowed := range c.allowList { for _, allowed := range c.allowList {
if senderID == allowed { if senderID == allowed || rawSenderID == allowed {
return true return true
} }
} }
@@ -58,6 +66,11 @@ func (c *BaseChannel) IsAllowed(senderID string) bool {
func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) { func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) {
if !c.IsAllowed(senderID) { if !c.IsAllowed(senderID) {
logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{
"channel": c.name,
"sender_id": senderID,
"chat_id": chatID,
})
return return
} }

View File

@@ -172,9 +172,6 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
} }
senderID := fmt.Sprintf("%d", user.ID) senderID := fmt.Sprintf("%d", user.ID)
if user.Username != "" {
senderID = fmt.Sprintf("%d|%s", user.ID, user.Username)
}
chatID := message.Chat.ID chatID := message.Chat.ID
c.chatIDs[senderID] = chatID c.chatIDs[senderID] = chatID
@@ -262,6 +259,11 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) {
log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50)) log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50))
if !c.IsAllowed(senderID) {
log.Printf("Telegram message rejected by allowlist: sender=%s chat=%d", senderID, chatID)
return
}
// Thinking indicator // Thinking indicator
_ = c.bot.SendChatAction(context.Background(), &telego.SendChatActionParams{ _ = c.bot.SendChatAction(context.Background(), &telego.SendChatActionParams{
ChatID: telegoutil.ID(chatID), ChatID: telegoutil.ID(chatID),