diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 6283251..3175296 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -1,8 +1,10 @@ package bus import ( + "clawgo/pkg/logger" "context" "sync" + "time" ) type MessageBus struct { @@ -12,6 +14,8 @@ type MessageBus struct { mu sync.RWMutex } +const queueWriteTimeout = 2 * time.Second + func NewMessageBus() *MessageBus { return &MessageBus{ inbound: make(chan InboundMessage, 100), @@ -21,7 +25,15 @@ func NewMessageBus() *MessageBus { } func (mb *MessageBus) PublishInbound(msg InboundMessage) { - mb.inbound <- msg + select { + case mb.inbound <- msg: + case <-time.After(queueWriteTimeout): + logger.ErrorCF("bus", "PublishInbound timeout (queue full)", map[string]interface{}{ + "channel": msg.Channel, + "chat_id": msg.ChatID, + "session_key": msg.SessionKey, + }) + } } func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool) { @@ -34,7 +46,14 @@ func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool) } func (mb *MessageBus) PublishOutbound(msg OutboundMessage) { - mb.outbound <- msg + select { + case mb.outbound <- msg: + case <-time.After(queueWriteTimeout): + logger.ErrorCF("bus", "PublishOutbound timeout (queue full)", map[string]interface{}{ + "channel": msg.Channel, + "chat_id": msg.ChatID, + }) + } } func (mb *MessageBus) SubscribeOutbound(ctx context.Context) (OutboundMessage, bool) { diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 888d63c..0772e2b 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -3,8 +3,10 @@ package channels import ( "context" "fmt" + "strings" "clawgo/pkg/bus" + "clawgo/pkg/logger" ) type Channel interface { @@ -47,8 +49,14 @@ func (c *BaseChannel) IsAllowed(senderID string) bool { return true } + // Normalize sender id for channels that include display suffix, e.g. "12345|alice". + rawSenderID := senderID + if idx := strings.Index(senderID, "|"); idx > 0 { + rawSenderID = senderID[:idx] + } + for _, allowed := range c.allowList { - if senderID == allowed { + if senderID == allowed || rawSenderID == allowed { return true } } @@ -58,6 +66,11 @@ func (c *BaseChannel) IsAllowed(senderID string) bool { func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) { if !c.IsAllowed(senderID) { + logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{ + "channel": c.name, + "sender_id": senderID, + "chat_id": chatID, + }) return } diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 69fc23e..a0768f5 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -172,9 +172,6 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { } senderID := fmt.Sprintf("%d", user.ID) - if user.Username != "" { - senderID = fmt.Sprintf("%d|%s", user.ID, user.Username) - } chatID := message.Chat.ID c.chatIDs[senderID] = chatID @@ -262,6 +259,11 @@ func (c *TelegramChannel) handleMessage(message *telego.Message) { log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50)) + if !c.IsAllowed(senderID) { + log.Printf("Telegram message rejected by allowlist: sender=%s chat=%d", senderID, chatID) + return + } + // Thinking indicator _ = c.bot.SendChatAction(context.Background(), &telego.SendChatActionParams{ ChatID: telegoutil.ID(chatID),