//go:build !omit_dingtalk // ClawGo - Ultra-lightweight personal AI agent // DingTalk channel implementation using Stream Mode package channels import ( "context" "fmt" "sync" "github.com/YspCoder/clawgo/pkg/bus" "github.com/YspCoder/clawgo/pkg/config" "github.com/YspCoder/clawgo/pkg/logger" "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" "github.com/open-dingtalk/dingtalk-stream-sdk-go/client" ) // DingTalkChannel implements the Channel interface for DingTalk. // It uses WebSocket for receiving messages via stream mode and API for sending type DingTalkChannel struct { *BaseChannel config config.DingTalkConfig clientID string clientSecret string streamClient *client.StreamClient runCancel cancelGuard // Map to store session webhooks for each chat sessionWebhooks sync.Map // chatID -> sessionWebhook } const dingtalkCompiled = true // NewDingTalkChannel creates a new DingTalk channel instance func NewDingTalkChannel(cfg config.DingTalkConfig, messageBus *bus.MessageBus) (*DingTalkChannel, error) { if cfg.ClientID == "" || cfg.ClientSecret == "" { return nil, fmt.Errorf("dingtalk client_id and client_secret are required") } base := NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom) return &DingTalkChannel{ BaseChannel: base, config: cfg, clientID: cfg.ClientID, clientSecret: cfg.ClientSecret, }, nil } // Start initializes the DingTalk channel with Stream Mode func (c *DingTalkChannel) Start(ctx context.Context) error { if c.IsRunning() { return nil } logger.InfoC("dingtalk", logger.C0115) // The upstream SDK has a race in StreamClient.Close() that can panic with // "send on closed channel". Keep the existing websocket alive across local // stop/start cycles instead of tearing it down here. if c.streamClient != nil { c.streamClient.AutoReconnect = true c.setRunning(true) logger.InfoC("dingtalk", logger.C0116) return nil } runCtx, cancel := context.WithCancel(ctx) c.runCancel.set(cancel) // Create credential config cred := client.NewAppCredentialConfig(c.clientID, c.clientSecret) // Create the stream client with options c.streamClient = client.NewStreamClient( client.WithAppCredential(cred), client.WithAutoReconnect(true), ) // Register chatbot callback handler (IChatBotMessageHandler is a function type) c.streamClient.RegisterChatBotCallbackRouter(c.onChatBotMessageReceived) // Start the stream client if err := c.streamClient.Start(runCtx); err != nil { return fmt.Errorf("failed to start stream client: %w", err) } c.setRunning(true) logger.InfoC("dingtalk", logger.C0116) return nil } // Stop gracefully stops the DingTalk channel func (c *DingTalkChannel) Stop(ctx context.Context) error { if !c.IsRunning() { return nil } logger.InfoC("dingtalk", logger.C0117) c.runCancel.cancelAndClear() if c.streamClient != nil { // Avoid StreamClient.Close(): the SDK can panic internally during shutdown. // Disabling auto-reconnect plus the running flag is enough for our local // lifecycle, and callbacks below will ignore messages while stopped. c.streamClient.AutoReconnect = false } c.setRunning(false) logger.InfoC("dingtalk", logger.C0118) return nil } // Send sends a message to DingTalk via the chatbot reply API func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("dingtalk channel not running") } // Get session webhook from storage sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID) if !ok { return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID) } sessionWebhook, ok := sessionWebhookRaw.(string) if !ok { return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID) } logger.InfoCF("dingtalk", logger.C0119, map[string]interface{}{ logger.FieldChatID: msg.ChatID, logger.FieldPreview: truncateString(msg.Content, 100), "platform": "dingtalk", }) // Use the session webhook to send the reply return c.SendDirectReply(sessionWebhook, msg.Content) } // onChatBotMessageReceived implements the IChatBotMessageHandler function signature // This is called by the Stream SDK when a new message arrives // IChatBotMessageHandler is: func(c context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) func (c *DingTalkChannel) onChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { if !c.IsRunning() { return nil, nil } // Extract message content from Text field content := data.Text.Content if content == "" { // Try to extract from Content interface{} if Text is empty if contentMap, ok := data.Content.(map[string]interface{}); ok { if textContent, ok := contentMap["content"].(string); ok { content = textContent } } } if content == "" { return nil, nil // Ignore empty messages } senderID := data.SenderStaffId senderNick := data.SenderNick chatID := senderID if data.ConversationType != "1" { // For group chats chatID = data.ConversationId } // Store the session webhook for this chat so we can reply later c.sessionWebhooks.Store(chatID, data.SessionWebhook) metadata := map[string]string{ "sender_name": senderNick, "conversation_id": data.ConversationId, "conversation_type": data.ConversationType, "platform": "dingtalk", "session_webhook": data.SessionWebhook, } logger.InfoCF("dingtalk", logger.C0120, map[string]interface{}{ "sender_name": senderNick, logger.FieldSenderID: senderID, logger.FieldChatID: chatID, logger.FieldPreview: truncateString(content, 50), }) // Handle the message through the base channel c.HandleMessage(senderID, chatID, content, nil, metadata) // Return nil to indicate we've handled the message asynchronously // The response will be sent through the message bus return nil, nil } // SendDirectReply sends a direct reply using the session webhook func (c *DingTalkChannel) SendDirectReply(sessionWebhook, content string) error { replier := chatbot.NewChatbotReplier() // Convert string content to []byte for the API contentBytes := []byte(content) titleBytes := []byte("ClawGo") // Send markdown formatted reply err := replier.SimpleReplyMarkdown( context.Background(), sessionWebhook, titleBytes, contentBytes, ) if err != nil { return fmt.Errorf("failed to send reply: %w", err) } return nil }