This commit is contained in:
lpf
2026-03-03 10:36:53 +08:00
parent 35b0ad1bfd
commit bd93c12edc
30 changed files with 1311 additions and 262 deletions

View File

@@ -129,7 +129,7 @@ func messageDigest(s string) string {
func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) {
if !c.IsAllowed(senderID) {
logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{
logger.WarnCF("channels", logger.C0001, map[string]interface{}{
logger.FieldChannel: c.name,
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,
@@ -140,10 +140,10 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
if metadata != nil {
if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" {
if c.seenRecently(c.name+":"+messageID, inboundMessageIDDedupeTTL) {
logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{
logger.WarnCF("channels", logger.C0002, map[string]interface{}{
logger.FieldChannel: c.name,
"message_id": messageID,
logger.FieldChatID: chatID,
"message_id": messageID,
logger.FieldChatID: chatID,
})
return
}
@@ -152,7 +152,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
// Fallback dedupe when platform omits/changes message_id (short window, same sender/chat/content).
contentKey := c.name + ":content:" + chatID + ":" + senderID + ":" + messageDigest(content)
if c.seenRecently(contentKey, inboundContentDedupeTTL) {
logger.WarnCF("channels", "Duplicate inbound content skipped", map[string]interface{}{
logger.WarnCF("channels", logger.C0003, map[string]interface{}{
logger.FieldChannel: c.name,
logger.FieldChatID: chatID,
})

View File

@@ -49,7 +49,7 @@ func (c *DingTalkChannel) Start(ctx context.Context) error {
if c.IsRunning() {
return nil
}
logger.InfoC("dingtalk", "Starting DingTalk channel (Stream Mode)")
logger.InfoC("dingtalk", logger.C0115)
runCtx, cancel := context.WithCancel(ctx)
c.runCancel.set(cancel)
@@ -72,7 +72,7 @@ func (c *DingTalkChannel) Start(ctx context.Context) error {
}
c.setRunning(true)
logger.InfoC("dingtalk", "DingTalk channel started (Stream Mode)")
logger.InfoC("dingtalk", logger.C0116)
return nil
}
@@ -81,7 +81,7 @@ func (c *DingTalkChannel) Stop(ctx context.Context) error {
if !c.IsRunning() {
return nil
}
logger.InfoC("dingtalk", "Stopping DingTalk channel")
logger.InfoC("dingtalk", logger.C0117)
c.runCancel.cancelAndClear()
@@ -90,7 +90,7 @@ func (c *DingTalkChannel) Stop(ctx context.Context) error {
}
c.setRunning(false)
logger.InfoC("dingtalk", "DingTalk channel stopped")
logger.InfoC("dingtalk", logger.C0118)
return nil
}
@@ -111,7 +111,7 @@ func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
}
logger.InfoCF("dingtalk", "DingTalk outbound message", map[string]interface{}{
logger.InfoCF("dingtalk", logger.C0119, map[string]interface{}{
logger.FieldChatID: msg.ChatID,
logger.FieldPreview: truncateString(msg.Content, 100),
"platform": "dingtalk",
@@ -159,7 +159,7 @@ func (c *DingTalkChannel) onChatBotMessageReceived(ctx context.Context, data *ch
"session_webhook": data.SessionWebhook,
}
logger.InfoCF("dingtalk", "DingTalk inbound message", map[string]interface{}{
logger.InfoCF("dingtalk", logger.C0120, map[string]interface{}{
"sender_name": senderNick,
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,

View File

@@ -37,7 +37,7 @@ func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordC
}
func (c *DiscordChannel) Start(ctx context.Context) error {
logger.InfoC("discord", "Starting Discord bot")
logger.InfoC("discord", logger.C0069)
c.session.AddHandler(c.handleMessage)
@@ -51,7 +51,7 @@ func (c *DiscordChannel) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to get bot user: %w", err)
}
logger.InfoCF("discord", "Discord bot connected", map[string]interface{}{
logger.InfoCF("discord", logger.C0070, map[string]interface{}{
"username": botUser.Username,
"user_id": botUser.ID,
})
@@ -60,7 +60,7 @@ func (c *DiscordChannel) Start(ctx context.Context) error {
}
func (c *DiscordChannel) Stop(ctx context.Context) error {
logger.InfoC("discord", "Stopping Discord bot")
logger.InfoC("discord", logger.C0071)
c.setRunning(false)
if err := c.session.Close(); err != nil {
@@ -145,7 +145,7 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
content = "[media only]"
}
logger.DebugCF("discord", "Received message", map[string]interface{}{
logger.DebugCF("discord", logger.C0072, map[string]interface{}{
"sender_name": senderName,
logger.FieldSenderID: senderID,
logger.FieldPreview: truncateString(content, 50),
@@ -186,7 +186,7 @@ func isAudioFile(filename, contentType string) bool {
func (c *DiscordChannel) downloadAttachment(url, filename string) string {
mediaDir := filepath.Join(os.TempDir(), "clawgo_media")
if err := os.MkdirAll(mediaDir, 0755); err != nil {
logger.WarnCF("discord", "Failed to create media directory", map[string]interface{}{
logger.WarnCF("discord", logger.C0073, map[string]interface{}{
logger.FieldError: err.Error(),
})
return ""
@@ -196,7 +196,7 @@ func (c *DiscordChannel) downloadAttachment(url, filename string) string {
resp, err := http.Get(url)
if err != nil {
logger.WarnCF("discord", "Failed to download attachment", map[string]interface{}{
logger.WarnCF("discord", logger.C0074, map[string]interface{}{
logger.FieldError: err.Error(),
})
return ""
@@ -204,7 +204,7 @@ func (c *DiscordChannel) downloadAttachment(url, filename string) string {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.WarnCF("discord", "Attachment download returned non-200", map[string]interface{}{
logger.WarnCF("discord", logger.C0075, map[string]interface{}{
"status_code": resp.StatusCode,
})
return ""
@@ -212,7 +212,7 @@ func (c *DiscordChannel) downloadAttachment(url, filename string) string {
out, err := os.Create(localPath)
if err != nil {
logger.WarnCF("discord", "Failed to create local attachment file", map[string]interface{}{
logger.WarnCF("discord", logger.C0076, map[string]interface{}{
logger.FieldError: err.Error(),
})
return ""
@@ -221,13 +221,13 @@ func (c *DiscordChannel) downloadAttachment(url, filename string) string {
_, err = io.Copy(out, resp.Body)
if err != nil {
logger.WarnCF("discord", "Failed to write local attachment file", map[string]interface{}{
logger.WarnCF("discord", logger.C0077, map[string]interface{}{
logger.FieldError: err.Error(),
})
return ""
}
logger.DebugCF("discord", "Attachment downloaded successfully", map[string]interface{}{
logger.DebugCF("discord", logger.C0078, map[string]interface{}{
"path": localPath,
})
return localPath

View File

@@ -86,7 +86,7 @@ func (c *FeishuChannel) Start(ctx context.Context) error {
c.mu.Unlock()
c.setRunning(true)
logger.InfoC("feishu", "Feishu channel started (websocket mode)")
logger.InfoC("feishu", logger.C0043)
runChannelTask("feishu", "websocket", func() error {
return wsClient.Start(runCtx)
@@ -107,7 +107,7 @@ func (c *FeishuChannel) Stop(ctx context.Context) error {
c.runCancel.cancelAndClear()
c.setRunning(false)
logger.InfoC("feishu", "Feishu channel stopped")
logger.InfoC("feishu", logger.C0044)
return nil
}
@@ -132,7 +132,7 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
for i, t := range tables {
link, lerr := c.createFeishuSheetFromTable(ctx, t.Name, t.Rows)
if lerr != nil {
logger.WarnCF("feishu", "create sheet from markdown table failed", map[string]interface{}{logger.FieldError: lerr.Error(), logger.FieldChatID: msg.ChatID})
logger.WarnCF("feishu", logger.C0045, map[string]interface{}{logger.FieldError: lerr.Error(), logger.FieldChatID: msg.ChatID})
continue
}
links = append(links, fmt.Sprintf("表格%d: %s", i+1, link))
@@ -160,7 +160,7 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
return err
}
logger.InfoCF("feishu", "Feishu message sent", map[string]interface{}{
logger.InfoCF("feishu", logger.C0046, map[string]interface{}{
logger.FieldChatID: msg.ChatID,
"msg_type": msgType,
"has_media": strings.TrimSpace(workMsg.Media) != "",
@@ -184,7 +184,7 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim.
}
chatType := strings.ToLower(strings.TrimSpace(stringValue(message.ChatType)))
if !c.isAllowedChat(chatID, chatType) {
logger.WarnCF("feishu", "Feishu message rejected by chat allowlist", map[string]interface{}{
logger.WarnCF("feishu", logger.C0047, map[string]interface{}{
logger.FieldSenderID: extractFeishuSenderID(sender),
logger.FieldChatID: chatID,
"chat_type": chatType,
@@ -202,7 +202,7 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim.
content = "[empty message]"
}
if !c.shouldHandleGroupMessage(chatType, content) {
logger.DebugCF("feishu", "Ignoring group message without mention/command", map[string]interface{}{
logger.DebugCF("feishu", logger.C0048, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,
})
@@ -223,7 +223,7 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim.
metadata["tenant_key"] = *sender.TenantKey
}
logger.InfoCF("feishu", "Feishu message received", map[string]interface{}{
logger.InfoCF("feishu", logger.C0049, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,
logger.FieldPreview: truncateString(content, 80),
@@ -246,7 +246,7 @@ func (c *FeishuChannel) resolveInboundMedia(ctx context.Context, mediaRefs []str
if path, err := c.downloadFeishuMediaByKey(ctx, "image", key); err == nil {
out = append(out, path)
} else {
logger.WarnCF("feishu", "download inbound image failed", map[string]interface{}{logger.FieldError: err.Error(), "image_key": key})
logger.WarnCF("feishu", logger.C0050, map[string]interface{}{logger.FieldError: err.Error(), "image_key": key})
out = append(out, ref)
}
continue
@@ -256,7 +256,7 @@ func (c *FeishuChannel) resolveInboundMedia(ctx context.Context, mediaRefs []str
if path, err := c.downloadFeishuMediaByKey(ctx, "file", key); err == nil {
out = append(out, path)
} else {
logger.WarnCF("feishu", "download inbound file failed", map[string]interface{}{logger.FieldError: err.Error(), "file_key": key})
logger.WarnCF("feishu", logger.C0051, map[string]interface{}{logger.FieldError: err.Error(), "file_key": key})
out = append(out, ref)
}
continue
@@ -674,7 +674,7 @@ func (c *FeishuChannel) createFeishuSheetFromTable(ctx context.Context, name str
}
}
if err := c.setFeishuSheetPublicEditable(ctx, spToken); err != nil {
logger.WarnCF("feishu", "set sheet permission failed", map[string]interface{}{logger.FieldError: err.Error(), "sheet_token": spToken})
logger.WarnCF("feishu", logger.C0052, map[string]interface{}{logger.FieldError: err.Error(), "sheet_token": spToken})
}
if createResp.Data.Spreadsheet.Url != nil && strings.TrimSpace(*createResp.Data.Spreadsheet.Url) != "" {
return strings.TrimSpace(*createResp.Data.Spreadsheet.Url), nil

View File

@@ -38,7 +38,7 @@ func NewMaixCamChannel(cfg config.MaixCamConfig, bus *bus.MessageBus) (*MaixCamC
}
func (c *MaixCamChannel) Start(ctx context.Context) error {
logger.InfoC("maixcam", "Starting MaixCam channel server")
logger.InfoC("maixcam", logger.C0079)
addr := fmt.Sprintf("%s:%d", c.config.Host, c.config.Port)
listener, err := net.Listen("tcp", addr)
@@ -49,7 +49,7 @@ func (c *MaixCamChannel) Start(ctx context.Context) error {
c.listener = listener
c.setRunning(true)
logger.InfoCF("maixcam", "MaixCam server listening", map[string]interface{}{
logger.InfoCF("maixcam", logger.C0080, map[string]interface{}{
"host": c.config.Host,
"port": c.config.Port,
})
@@ -60,25 +60,25 @@ func (c *MaixCamChannel) Start(ctx context.Context) error {
}
func (c *MaixCamChannel) acceptConnections(ctx context.Context) {
logger.DebugC("maixcam", "Starting connection acceptor")
logger.DebugC("maixcam", logger.C0081)
for {
select {
case <-ctx.Done():
logger.InfoC("maixcam", "Stopping connection acceptor")
logger.InfoC("maixcam", logger.C0082)
return
default:
conn, err := c.listener.Accept()
if err != nil {
if c.IsRunning() {
logger.ErrorCF("maixcam", "Failed to accept connection", map[string]interface{}{
logger.ErrorCF("maixcam", logger.C0083, map[string]interface{}{
logger.FieldError: err.Error(),
})
}
return
}
logger.InfoCF("maixcam", "New connection from MaixCam device", map[string]interface{}{
logger.InfoCF("maixcam", logger.C0084, map[string]interface{}{
"remote_addr": conn.RemoteAddr().String(),
})
@@ -92,14 +92,14 @@ func (c *MaixCamChannel) acceptConnections(ctx context.Context) {
}
func (c *MaixCamChannel) handleConnection(conn net.Conn, ctx context.Context) {
logger.DebugC("maixcam", "Handling MaixCam connection")
logger.DebugC("maixcam", logger.C0085)
defer func() {
conn.Close()
c.clientsMux.Lock()
delete(c.clients, conn)
c.clientsMux.Unlock()
logger.DebugC("maixcam", "Connection closed")
logger.DebugC("maixcam", logger.C0086)
}()
decoder := json.NewDecoder(conn)
@@ -112,7 +112,7 @@ func (c *MaixCamChannel) handleConnection(conn net.Conn, ctx context.Context) {
var msg MaixCamMessage
if err := decoder.Decode(&msg); err != nil {
if err.Error() != "EOF" {
logger.ErrorCF("maixcam", "Failed to decode message", map[string]interface{}{
logger.ErrorCF("maixcam", logger.C0087, map[string]interface{}{
logger.FieldError: err.Error(),
})
}
@@ -129,18 +129,18 @@ func (c *MaixCamChannel) processMessage(msg MaixCamMessage, conn net.Conn) {
case "person_detected":
c.handlePersonDetection(msg)
case "heartbeat":
logger.DebugC("maixcam", "Received heartbeat")
logger.DebugC("maixcam", logger.C0088)
case "status":
c.handleStatusUpdate(msg)
default:
logger.WarnCF("maixcam", "Unknown message type", map[string]interface{}{
logger.WarnCF("maixcam", logger.C0089, map[string]interface{}{
"message_type": msg.Type,
})
}
}
func (c *MaixCamChannel) handlePersonDetection(msg MaixCamMessage) {
logger.InfoCF("maixcam", "Person detected event", map[string]interface{}{
logger.InfoCF("maixcam", logger.C0090, map[string]interface{}{
logger.FieldSenderID: "maixcam",
logger.FieldChatID: "default",
"timestamp": msg.Timestamp,
@@ -178,13 +178,13 @@ func (c *MaixCamChannel) handlePersonDetection(msg MaixCamMessage) {
}
func (c *MaixCamChannel) handleStatusUpdate(msg MaixCamMessage) {
logger.InfoCF("maixcam", "Status update from MaixCam", map[string]interface{}{
logger.InfoCF("maixcam", logger.C0091, map[string]interface{}{
"status": msg.Data,
})
}
func (c *MaixCamChannel) Stop(ctx context.Context) error {
logger.InfoC("maixcam", "Stopping MaixCam channel")
logger.InfoC("maixcam", logger.C0092)
c.setRunning(false)
if c.listener != nil {
@@ -199,7 +199,7 @@ func (c *MaixCamChannel) Stop(ctx context.Context) error {
}
c.clients = make(map[net.Conn]bool)
logger.InfoC("maixcam", "MaixCam channel stopped")
logger.InfoC("maixcam", logger.C0093)
return nil
}
@@ -212,7 +212,7 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
defer c.clientsMux.RUnlock()
if len(c.clients) == 0 {
logger.WarnC("maixcam", "No MaixCam devices connected")
logger.WarnC("maixcam", logger.C0094)
return fmt.Errorf("no connected MaixCam devices")
}
@@ -231,7 +231,7 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
var sendErr error
for conn := range c.clients {
if _, err := conn.Write(data); err != nil {
logger.ErrorCF("maixcam", "Failed to send to client", map[string]interface{}{
logger.ErrorCF("maixcam", logger.C0095, map[string]interface{}{
"client": conn.RemoteAddr().String(),
logger.FieldError: err.Error(),
})

View File

@@ -71,39 +71,39 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error
}
func (m *Manager) initChannels() error {
logger.InfoC("channels", "Initializing channel manager")
logger.InfoC("channels", logger.C0004)
if m.config.Channels.Telegram.Enabled {
logger.DebugCF("channels", "Attempting to initialize Telegram channel", map[string]interface{}{
logger.DebugCF("channels", logger.C0005, map[string]interface{}{
"has_token": m.config.Channels.Telegram.Token != "",
})
if m.config.Channels.Telegram.Token == "" {
logger.WarnC("channels", "Telegram token is empty, skipping")
logger.WarnC("channels", logger.C0006)
} else {
telegram, err := NewTelegramChannel(m.config.Channels.Telegram, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize Telegram channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0007, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["telegram"] = telegram
logger.InfoC("channels", "Telegram channel enabled successfully")
logger.InfoC("channels", logger.C0008)
}
}
}
if m.config.Channels.WhatsApp.Enabled {
if m.config.Channels.WhatsApp.BridgeURL == "" {
logger.WarnC("channels", "WhatsApp bridge URL is empty, skipping")
logger.WarnC("channels", logger.C0009)
} else {
whatsapp, err := NewWhatsAppChannel(m.config.Channels.WhatsApp, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize WhatsApp channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0010, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["whatsapp"] = whatsapp
logger.InfoC("channels", "WhatsApp channel enabled successfully")
logger.InfoC("channels", logger.C0011)
}
}
}
@@ -111,27 +111,27 @@ func (m *Manager) initChannels() error {
if m.config.Channels.Feishu.Enabled {
feishu, err := NewFeishuChannel(m.config.Channels.Feishu, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize Feishu channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0012, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["feishu"] = feishu
logger.InfoC("channels", "Feishu channel enabled successfully")
logger.InfoC("channels", logger.C0013)
}
}
if m.config.Channels.Discord.Enabled {
if m.config.Channels.Discord.Token == "" {
logger.WarnC("channels", "Discord token is empty, skipping")
logger.WarnC("channels", logger.C0014)
} else {
discord, err := NewDiscordChannel(m.config.Channels.Discord, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize Discord channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0015, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["discord"] = discord
logger.InfoC("channels", "Discord channel enabled successfully")
logger.InfoC("channels", logger.C0016)
}
}
}
@@ -139,44 +139,44 @@ func (m *Manager) initChannels() error {
if m.config.Channels.MaixCam.Enabled {
maixcam, err := NewMaixCamChannel(m.config.Channels.MaixCam, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize MaixCam channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0017, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["maixcam"] = maixcam
logger.InfoC("channels", "MaixCam channel enabled successfully")
logger.InfoC("channels", logger.C0018)
}
}
if m.config.Channels.QQ.Enabled {
qq, err := NewQQChannel(m.config.Channels.QQ, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize QQ channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0019, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["qq"] = qq
logger.InfoC("channels", "QQ channel enabled successfully")
logger.InfoC("channels", logger.C0020)
}
}
if m.config.Channels.DingTalk.Enabled {
if m.config.Channels.DingTalk.ClientID == "" {
logger.WarnC("channels", "DingTalk Client ID is empty, skipping")
logger.WarnC("channels", logger.C0021)
} else {
dingtalk, err := NewDingTalkChannel(m.config.Channels.DingTalk, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize DingTalk channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0022, map[string]interface{}{
logger.FieldError: err.Error(),
})
} else {
m.channels["dingtalk"] = dingtalk
logger.InfoC("channels", "DingTalk channel enabled successfully")
logger.InfoC("channels", logger.C0023)
}
}
}
logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{
logger.InfoCF("channels", logger.C0024, map[string]interface{}{
"enabled_channels": len(m.channels),
})
m.refreshSnapshot()
@@ -196,7 +196,7 @@ func (m *Manager) StartAll(ctx context.Context) error {
m.mu.Lock()
if len(m.channels) == 0 {
m.mu.Unlock()
logger.WarnC("channels", "No channels enabled")
logger.WarnC("channels", logger.C0025)
return nil
}
channelsSnapshot := make(map[string]Channel, len(m.channels))
@@ -207,7 +207,7 @@ func (m *Manager) StartAll(ctx context.Context) error {
m.dispatchTask = &asyncTask{cancel: cancel}
m.mu.Unlock()
logger.InfoC("channels", "Starting all channels")
logger.InfoC("channels", logger.C0026)
go m.dispatchOutbound(dispatchCtx)
var g errgroup.Group
@@ -215,9 +215,9 @@ func (m *Manager) StartAll(ctx context.Context) error {
name := name
channel := channel
g.Go(func() error {
logger.InfoCF("channels", "Starting channel", map[string]interface{}{logger.FieldChannel: name})
logger.InfoCF("channels", logger.C0027, map[string]interface{}{logger.FieldChannel: name})
if err := channel.Start(ctx); err != nil {
logger.ErrorCF("channels", "Failed to start channel", map[string]interface{}{logger.FieldChannel: name, logger.FieldError: err.Error()})
logger.ErrorCF("channels", logger.C0028, map[string]interface{}{logger.FieldChannel: name, logger.FieldError: err.Error()})
return fmt.Errorf("%s: %w", name, err)
}
return nil
@@ -226,7 +226,7 @@ func (m *Manager) StartAll(ctx context.Context) error {
if err := g.Wait(); err != nil {
return err
}
logger.InfoC("channels", "All channels started")
logger.InfoC("channels", logger.C0029)
return nil
}
@@ -240,7 +240,7 @@ func (m *Manager) StopAll(ctx context.Context) error {
m.dispatchTask = nil
m.mu.Unlock()
logger.InfoC("channels", "Stopping all channels")
logger.InfoC("channels", logger.C0030)
if task != nil {
task.cancel()
}
@@ -250,9 +250,9 @@ func (m *Manager) StopAll(ctx context.Context) error {
name := name
channel := channel
g.Go(func() error {
logger.InfoCF("channels", "Stopping channel", map[string]interface{}{logger.FieldChannel: name})
logger.InfoCF("channels", logger.C0031, map[string]interface{}{logger.FieldChannel: name})
if err := channel.Stop(ctx); err != nil {
logger.ErrorCF("channels", "Error stopping channel", map[string]interface{}{logger.FieldChannel: name, logger.FieldError: err.Error()})
logger.ErrorCF("channels", logger.C0032, map[string]interface{}{logger.FieldChannel: name, logger.FieldError: err.Error()})
return fmt.Errorf("%s: %w", name, err)
}
return nil
@@ -261,7 +261,7 @@ func (m *Manager) StopAll(ctx context.Context) error {
if err := g.Wait(); err != nil {
return err
}
logger.InfoC("channels", "All channels stopped")
logger.InfoC("channels", logger.C0033)
return nil
}
@@ -285,7 +285,7 @@ func (m *Manager) RestartChannel(ctx context.Context, name string) error {
return fmt.Errorf("channel %s not found", name)
}
logger.InfoCF("channels", "Restarting channel", map[string]interface{}{"channel": name})
logger.InfoCF("channels", logger.C0034, map[string]interface{}{"channel": name})
_ = channel.Stop(ctx)
return channel.Start(ctx)
}
@@ -335,21 +335,21 @@ func (m *Manager) shouldSkipOutboundDuplicate(msg bus.OutboundMessage) bool {
}
func (m *Manager) dispatchOutbound(ctx context.Context) {
logger.InfoC("channels", "Outbound dispatcher started")
logger.InfoC("channels", logger.C0035)
for {
select {
case <-ctx.Done():
logger.InfoC("channels", "Outbound dispatcher stopped")
logger.InfoC("channels", logger.C0036)
return
default:
msg, ok := m.bus.SubscribeOutbound(ctx)
if !ok {
logger.InfoC("channels", "Outbound dispatcher stopped (bus closed)")
logger.InfoC("channels", logger.C0037)
return
}
if m.shouldSkipOutboundDuplicate(msg) {
logger.WarnCF("channels", "Duplicate outbound message skipped", map[string]interface{}{
logger.WarnCF("channels", logger.C0038, map[string]interface{}{
logger.FieldChannel: msg.Channel,
logger.FieldChatID: msg.ChatID,
})
@@ -365,7 +365,7 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
// Internal/system pseudo channels are not externally dispatchable.
continue
}
logger.WarnCF("channels", "Unknown channel for outbound message", map[string]interface{}{
logger.WarnCF("channels", logger.C0039, map[string]interface{}{
logger.FieldChannel: msg.Channel,
})
continue
@@ -377,9 +377,9 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
}
if action != "send" {
if ac, ok := channel.(ActionCapable); !ok || !ac.SupportsAction(action) {
logger.WarnCF("channels", "Channel does not support outbound action", map[string]interface{}{
logger.WarnCF("channels", logger.C0040, map[string]interface{}{
logger.FieldChannel: msg.Channel,
"action": action,
"action": action,
})
continue
}
@@ -387,7 +387,7 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
if m.outboundLimit != nil {
if err := m.outboundLimit.Wait(ctx); err != nil {
logger.WarnCF("channels", "Outbound rate limiter canceled", map[string]interface{}{logger.FieldError: err.Error()})
logger.WarnCF("channels", logger.C0041, map[string]interface{}{logger.FieldError: err.Error()})
continue
}
}
@@ -396,7 +396,7 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
go func(c Channel, outbound bus.OutboundMessage) {
defer func() { <-m.dispatchSem }()
if err := c.Send(ctx, outbound); err != nil {
logger.ErrorCF("channels", "Error sending message to channel", map[string]interface{}{
logger.ErrorCF("channels", logger.C0042, map[string]interface{}{
logger.FieldChannel: outbound.Channel,
logger.FieldError: err.Error(),
})

View File

@@ -47,7 +47,7 @@ func (c *QQChannel) Start(ctx context.Context) error {
return fmt.Errorf("QQ app_id and app_secret not configured")
}
logger.InfoC("qq", "Starting QQ bot (WebSocket mode)")
logger.InfoC("qq", logger.C0099)
// Create token source
credentials := &token.QQBotCredentials{
@@ -80,7 +80,7 @@ func (c *QQChannel) Start(ctx context.Context) error {
return fmt.Errorf("failed to get websocket info: %w", err)
}
logger.InfoCF("qq", "Got WebSocket info", map[string]interface{}{
logger.InfoCF("qq", logger.C0100, map[string]interface{}{
"shards": wsInfo.Shards,
})
@@ -95,7 +95,7 @@ func (c *QQChannel) Start(ctx context.Context) error {
})
c.setRunning(true)
logger.InfoC("qq", "QQ bot started successfully")
logger.InfoC("qq", logger.C0101)
return nil
}
@@ -104,7 +104,7 @@ func (c *QQChannel) Stop(ctx context.Context) error {
if !c.IsRunning() {
return nil
}
logger.InfoC("qq", "Stopping QQ bot")
logger.InfoC("qq", logger.C0102)
c.setRunning(false)
c.runCancel.cancelAndClear()
@@ -124,7 +124,7 @@ func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
// Send C2C message
_, err := c.api.PostC2CMessage(ctx, msg.ChatID, msgToCreate)
if err != nil {
logger.ErrorCF("qq", "Failed to send C2C message", map[string]interface{}{
logger.ErrorCF("qq", logger.C0103, map[string]interface{}{
logger.FieldError: err.Error(),
})
return err
@@ -146,18 +146,18 @@ func (c *QQChannel) handleC2CMessage() event.C2CMessageEventHandler {
if data.Author != nil && data.Author.ID != "" {
senderID = data.Author.ID
} else {
logger.WarnC("qq", "Received message with no sender ID")
logger.WarnC("qq", logger.C0104)
return nil
}
// Extract message content
content := data.Content
if content == "" {
logger.DebugC("qq", "Received empty message, ignoring")
logger.DebugC("qq", logger.C0105)
return nil
}
logger.InfoCF("qq", "Received C2C message", map[string]interface{}{
logger.InfoCF("qq", logger.C0106, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldChatID: senderID,
logger.FieldMessageContentLength: len(content),
@@ -187,18 +187,18 @@ func (c *QQChannel) handleGroupATMessage() event.GroupATMessageEventHandler {
if data.Author != nil && data.Author.ID != "" {
senderID = data.Author.ID
} else {
logger.WarnC("qq", "Received group message with no sender ID")
logger.WarnC("qq", logger.C0107)
return nil
}
// Extract message content (remove bot mention)
content := data.Content
if content == "" {
logger.DebugC("qq", "Received empty group message, ignoring")
logger.DebugC("qq", logger.C0108)
return nil
}
logger.InfoCF("qq", "Received group AT message", map[string]interface{}{
logger.InfoCF("qq", logger.C0109, map[string]interface{}{
logger.FieldSenderID: senderID,
"group_id": data.GroupID,
logger.FieldMessageContentLength: len(content),

View File

@@ -33,15 +33,15 @@ const (
type TelegramChannel struct {
*BaseChannel
bot *telego.Bot
config config.TelegramConfig
chatIDs map[string]int64
chatIDsMu sync.RWMutex
updates <-chan telego.Update
runCancel cancelGuard
handleSem chan struct{}
handleWG sync.WaitGroup
botUsername string
bot *telego.Bot
config config.TelegramConfig
chatIDs map[string]int64
chatIDsMu sync.RWMutex
updates <-chan telego.Update
runCancel cancelGuard
handleSem chan struct{}
handleWG sync.WaitGroup
botUsername string
}
func (c *TelegramChannel) SupportsAction(action string) bool {
@@ -62,11 +62,11 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr
base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom)
return &TelegramChannel{
BaseChannel: base,
bot: bot,
config: cfg,
chatIDs: make(map[string]int64),
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
BaseChannel: base,
bot: bot,
config: cfg,
chatIDs: make(map[string]int64),
handleSem: make(chan struct{}, telegramMaxConcurrentHandlers),
}, nil
}
@@ -91,7 +91,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
if c.IsRunning() {
return nil
}
logger.InfoC("telegram", "Starting Telegram bot (polling mode)")
logger.InfoC("telegram", logger.C0054)
runCtx, cancel := context.WithCancel(ctx)
c.runCancel.set(cancel)
@@ -111,7 +111,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
return fmt.Errorf("failed to get bot info: %w", err)
}
c.botUsername = strings.ToLower(strings.TrimSpace(botInfo.Username))
logger.InfoCF("telegram", "Telegram bot connected", map[string]interface{}{
logger.InfoCF("telegram", logger.C0055, map[string]interface{}{
"username": botInfo.Username,
})
@@ -122,7 +122,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
return
case update, ok := <-updates:
if !ok {
logger.WarnC("telegram", "Updates channel closed unexpectedly, attempting to restart polling...")
logger.WarnC("telegram", logger.C0056)
c.setRunning(false)
select {
@@ -133,7 +133,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
newUpdates, err := c.bot.UpdatesViaLongPolling(runCtx, nil)
if err != nil {
logger.ErrorCF("telegram", "Failed to restart updates polling", map[string]interface{}{
logger.ErrorCF("telegram", logger.C0057, map[string]interface{}{
logger.FieldError: err.Error(),
})
continue
@@ -142,7 +142,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
updates = newUpdates
c.updates = newUpdates
c.setRunning(true)
logger.InfoC("telegram", "Updates polling restarted successfully")
logger.InfoC("telegram", logger.C0058)
continue
}
if update.Message != nil {
@@ -161,7 +161,7 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
if !c.IsRunning() {
return nil
}
logger.InfoC("telegram", "Stopping Telegram bot")
logger.InfoC("telegram", logger.C0059)
c.setRunning(false)
c.runCancel.cancelAndClear()
@@ -173,7 +173,7 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
select {
case <-done:
case <-time.After(telegramStopWaitHandlersPeriod):
logger.WarnC("telegram", "Timeout waiting for telegram message handlers to stop")
logger.WarnC("telegram", logger.C0060)
}
return nil
@@ -195,7 +195,7 @@ func (c *TelegramChannel) dispatchHandleMessage(runCtx context.Context, message
defer func() { <-c.handleSem }()
defer func() {
if r := recover(); r != nil {
logger.ErrorCF("telegram", "Recovered panic in telegram message handler", map[string]interface{}{
logger.ErrorCF("telegram", logger.C0061, map[string]interface{}{
"panic": fmt.Sprintf("%v", r),
})
}
@@ -219,7 +219,7 @@ func (c *TelegramChannel) handleCallbackQuery(ctx context.Context, query *telego
})
cancel()
logger.InfoCF("telegram", "Callback query received", map[string]interface{}{
logger.InfoCF("telegram", logger.C0062, map[string]interface{}{
"sender_id": senderID,
"data": query.Data,
})
@@ -308,7 +308,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
cancelSend()
if err != nil {
logger.WarnCF("telegram", "HTML parse failed, fallback to plain text", map[string]interface{}{
logger.WarnCF("telegram", logger.C0063, map[string]interface{}{
logger.FieldError: err.Error(),
})
plain := plainTextFromTelegramHTML(htmlContent)
@@ -472,7 +472,7 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
return
}
if user.IsBot {
logger.DebugCF("telegram", "Ignoring bot-originated message", map[string]interface{}{
logger.DebugCF("telegram", logger.C0064, map[string]interface{}{
"user_id": user.ID,
})
return
@@ -549,27 +549,27 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
}
if !c.isAllowedChat(chatID, message.Chat.Type) {
logger.WarnCF("telegram", "Telegram message rejected by chat allowlist", map[string]interface{}{
logger.WarnCF("telegram", logger.C0065, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,
})
return
}
if !c.shouldHandleGroupMessage(message, content) {
logger.DebugCF("telegram", "Ignoring group message without mention/command", map[string]interface{}{
logger.DebugCF("telegram", logger.C0048, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,
})
return
}
logger.InfoCF("telegram", "Telegram message received", map[string]interface{}{
logger.InfoCF("telegram", logger.C0066, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldPreview: truncateString(content, 50),
})
if !c.IsAllowed(senderID) {
logger.WarnCF("telegram", "Telegram message rejected by allowlist", map[string]interface{}{
logger.WarnCF("telegram", logger.C0067, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldChatID: chatID,
})
@@ -633,7 +633,6 @@ func min(a, b int) int {
return b
}
func splitTelegramMarkdown(s string, maxRunes int) []string {
if s == "" {
return []string{""}

View File

@@ -52,12 +52,14 @@ func runChannelTask(name, taskName string, task func() error, onFailure func(err
go func() {
if err := task(); err != nil {
if errors.Is(err, context.Canceled) {
logger.InfoCF(name, taskName+" stopped", map[string]interface{}{
"reason": "context canceled",
logger.InfoCF(name, logger.C0168, map[string]interface{}{
"task_name": taskName,
"reason": "context canceled",
})
return
}
logger.ErrorCF(name, taskName+" failed", map[string]interface{}{
logger.ErrorCF(name, logger.C0169, map[string]interface{}{
"task_name": taskName,
logger.FieldError: err.Error(),
})
if onFailure != nil {

View File

@@ -41,7 +41,7 @@ func (c *WhatsAppChannel) Start(ctx context.Context) error {
if c.IsRunning() {
return nil
}
logger.InfoCF("whatsapp", "Starting WhatsApp channel", map[string]interface{}{
logger.InfoCF("whatsapp", logger.C0121, map[string]interface{}{
"url": c.url,
})
runCtx, cancel := context.WithCancel(ctx)
@@ -61,7 +61,7 @@ func (c *WhatsAppChannel) Start(ctx context.Context) error {
c.mu.Unlock()
c.setRunning(true)
logger.InfoC("whatsapp", "WhatsApp channel connected")
logger.InfoC("whatsapp", logger.C0122)
go c.listen(runCtx)
@@ -72,7 +72,7 @@ func (c *WhatsAppChannel) Stop(ctx context.Context) error {
if !c.IsRunning() {
return nil
}
logger.InfoC("whatsapp", "Stopping WhatsApp channel")
logger.InfoC("whatsapp", logger.C0123)
c.runCancel.cancelAndClear()
c.mu.Lock()
@@ -80,7 +80,7 @@ func (c *WhatsAppChannel) Stop(ctx context.Context) error {
if c.conn != nil {
if err := c.conn.Close(); err != nil {
logger.WarnCF("whatsapp", "Error closing WhatsApp connection", map[string]interface{}{
logger.WarnCF("whatsapp", logger.C0124, map[string]interface{}{
logger.FieldError: err.Error(),
})
}
@@ -146,12 +146,12 @@ func (c *WhatsAppChannel) listen(ctx context.Context) {
return
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) || errors.Is(err, net.ErrClosed) {
logger.InfoCF("whatsapp", "WhatsApp connection closed", map[string]interface{}{
logger.InfoCF("whatsapp", logger.C0125, map[string]interface{}{
logger.FieldError: err.Error(),
})
return
}
logger.WarnCF("whatsapp", "WhatsApp read error", map[string]interface{}{
logger.WarnCF("whatsapp", logger.C0126, map[string]interface{}{
logger.FieldError: err.Error(),
})
if !sleepWithContext(ctx, backoff) {
@@ -164,7 +164,7 @@ func (c *WhatsAppChannel) listen(ctx context.Context) {
var msg map[string]interface{}
if err := json.Unmarshal(message, &msg); err != nil {
logger.WarnCF("whatsapp", "Failed to unmarshal WhatsApp message", map[string]interface{}{
logger.WarnCF("whatsapp", logger.C0127, map[string]interface{}{
logger.FieldError: err.Error(),
})
continue
@@ -216,7 +216,7 @@ func (c *WhatsAppChannel) handleIncomingMessage(msg map[string]interface{}) {
metadata["user_name"] = userName
}
logger.InfoCF("whatsapp", "WhatsApp message received", map[string]interface{}{
logger.InfoCF("whatsapp", logger.C0128, map[string]interface{}{
logger.FieldSenderID: senderID,
logger.FieldPreview: truncateString(content, 50),
})