mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-18 19:47:35 +08:00
Initial commit for ClawGo
This commit is contained in:
82
pkg/channels/base.go
Normal file
82
pkg/channels/base.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
)
|
||||
|
||||
type Channel interface {
|
||||
Name() string
|
||||
Start(ctx context.Context) error
|
||||
Stop(ctx context.Context) error
|
||||
Send(ctx context.Context, msg bus.OutboundMessage) error
|
||||
IsRunning() bool
|
||||
IsAllowed(senderID string) bool
|
||||
}
|
||||
|
||||
type BaseChannel struct {
|
||||
config interface{}
|
||||
bus *bus.MessageBus
|
||||
running bool
|
||||
name string
|
||||
allowList []string
|
||||
}
|
||||
|
||||
func NewBaseChannel(name string, config interface{}, bus *bus.MessageBus, allowList []string) *BaseChannel {
|
||||
return &BaseChannel{
|
||||
config: config,
|
||||
bus: bus,
|
||||
name: name,
|
||||
allowList: allowList,
|
||||
running: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *BaseChannel) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c *BaseChannel) IsRunning() bool {
|
||||
return c.running
|
||||
}
|
||||
|
||||
func (c *BaseChannel) IsAllowed(senderID string) bool {
|
||||
if len(c.allowList) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, allowed := range c.allowList {
|
||||
if senderID == allowed {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) {
|
||||
if !c.IsAllowed(senderID) {
|
||||
return
|
||||
}
|
||||
|
||||
// 生成 SessionKey: channel:chatID
|
||||
sessionKey := fmt.Sprintf("%s:%s", c.name, chatID)
|
||||
|
||||
msg := bus.InboundMessage{
|
||||
Channel: c.name,
|
||||
SenderID: senderID,
|
||||
ChatID: chatID,
|
||||
Content: content,
|
||||
Media: media,
|
||||
Metadata: metadata,
|
||||
SessionKey: sessionKey,
|
||||
}
|
||||
|
||||
c.bus.PublishInbound(msg)
|
||||
}
|
||||
|
||||
func (c *BaseChannel) setRunning(running bool) {
|
||||
c.running = running
|
||||
}
|
||||
193
pkg/channels/dingtalk.go
Normal file
193
pkg/channels/dingtalk.go
Normal file
@@ -0,0 +1,193 @@
|
||||
// ClawGo - Ultra-lightweight personal AI agent
|
||||
// DingTalk channel implementation using Stream Mode
|
||||
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
|
||||
"github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
)
|
||||
|
||||
// DingTalkChannel implements the Channel interface for DingTalk (钉钉)
|
||||
// It uses WebSocket for receiving messages via stream mode and API for sending
|
||||
type DingTalkChannel struct {
|
||||
*BaseChannel
|
||||
config config.DingTalkConfig
|
||||
clientID string
|
||||
clientSecret string
|
||||
streamClient *client.StreamClient
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
// Map to store session webhooks for each chat
|
||||
sessionWebhooks sync.Map // chatID -> sessionWebhook
|
||||
}
|
||||
|
||||
// NewDingTalkChannel creates a new DingTalk channel instance
|
||||
func NewDingTalkChannel(cfg config.DingTalkConfig, messageBus *bus.MessageBus) (*DingTalkChannel, error) {
|
||||
if cfg.ClientID == "" || cfg.ClientSecret == "" {
|
||||
return nil, fmt.Errorf("dingtalk client_id and client_secret are required")
|
||||
}
|
||||
|
||||
base := NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom)
|
||||
|
||||
return &DingTalkChannel{
|
||||
BaseChannel: base,
|
||||
config: cfg,
|
||||
clientID: cfg.ClientID,
|
||||
clientSecret: cfg.ClientSecret,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start initializes the DingTalk channel with Stream Mode
|
||||
func (c *DingTalkChannel) Start(ctx context.Context) error {
|
||||
log.Printf("Starting DingTalk channel (Stream Mode)...")
|
||||
|
||||
c.ctx, c.cancel = context.WithCancel(ctx)
|
||||
|
||||
// Create credential config
|
||||
cred := client.NewAppCredentialConfig(c.clientID, c.clientSecret)
|
||||
|
||||
// Create the stream client with options
|
||||
c.streamClient = client.NewStreamClient(
|
||||
client.WithAppCredential(cred),
|
||||
client.WithAutoReconnect(true),
|
||||
)
|
||||
|
||||
// Register chatbot callback handler (IChatBotMessageHandler is a function type)
|
||||
c.streamClient.RegisterChatBotCallbackRouter(c.onChatBotMessageReceived)
|
||||
|
||||
// Start the stream client
|
||||
if err := c.streamClient.Start(c.ctx); err != nil {
|
||||
return fmt.Errorf("failed to start stream client: %w", err)
|
||||
}
|
||||
|
||||
c.setRunning(true)
|
||||
log.Println("DingTalk channel started (Stream Mode)")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop gracefully stops the DingTalk channel
|
||||
func (c *DingTalkChannel) Stop(ctx context.Context) error {
|
||||
log.Println("Stopping DingTalk channel...")
|
||||
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
if c.streamClient != nil {
|
||||
c.streamClient.Close()
|
||||
}
|
||||
|
||||
c.setRunning(false)
|
||||
log.Println("DingTalk channel stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends a message to DingTalk via the chatbot reply API
|
||||
func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("dingtalk channel not running")
|
||||
}
|
||||
|
||||
// Get session webhook from storage
|
||||
sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID)
|
||||
if !ok {
|
||||
return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID)
|
||||
}
|
||||
|
||||
sessionWebhook, ok := sessionWebhookRaw.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
|
||||
}
|
||||
|
||||
log.Printf("DingTalk message to %s: %s", msg.ChatID, truncateStringDingTalk(msg.Content, 100))
|
||||
|
||||
// Use the session webhook to send the reply
|
||||
return c.SendDirectReply(sessionWebhook, msg.Content)
|
||||
}
|
||||
|
||||
// onChatBotMessageReceived implements the IChatBotMessageHandler function signature
|
||||
// This is called by the Stream SDK when a new message arrives
|
||||
// IChatBotMessageHandler is: func(c context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error)
|
||||
func (c *DingTalkChannel) onChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
|
||||
// Extract message content from Text field
|
||||
content := data.Text.Content
|
||||
if content == "" {
|
||||
// Try to extract from Content interface{} if Text is empty
|
||||
if contentMap, ok := data.Content.(map[string]interface{}); ok {
|
||||
if textContent, ok := contentMap["content"].(string); ok {
|
||||
content = textContent
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if content == "" {
|
||||
return nil, nil // Ignore empty messages
|
||||
}
|
||||
|
||||
senderID := data.SenderStaffId
|
||||
senderNick := data.SenderNick
|
||||
chatID := senderID
|
||||
if data.ConversationType != "1" {
|
||||
// For group chats
|
||||
chatID = data.ConversationId
|
||||
}
|
||||
|
||||
// Store the session webhook for this chat so we can reply later
|
||||
c.sessionWebhooks.Store(chatID, data.SessionWebhook)
|
||||
|
||||
metadata := map[string]string{
|
||||
"sender_name": senderNick,
|
||||
"conversation_id": data.ConversationId,
|
||||
"conversation_type": data.ConversationType,
|
||||
"platform": "dingtalk",
|
||||
"session_webhook": data.SessionWebhook,
|
||||
}
|
||||
|
||||
log.Printf("DingTalk message from %s (%s): %s", senderNick, senderID, truncateStringDingTalk(content, 50))
|
||||
|
||||
// Handle the message through the base channel
|
||||
c.HandleMessage(senderID, chatID, content, nil, metadata)
|
||||
|
||||
// Return nil to indicate we've handled the message asynchronously
|
||||
// The response will be sent through the message bus
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// SendDirectReply sends a direct reply using the session webhook
|
||||
func (c *DingTalkChannel) SendDirectReply(sessionWebhook, content string) error {
|
||||
replier := chatbot.NewChatbotReplier()
|
||||
|
||||
// Convert string content to []byte for the API
|
||||
contentBytes := []byte(content)
|
||||
titleBytes := []byte("ClawGo")
|
||||
|
||||
// Send markdown formatted reply
|
||||
err := replier.SimpleReplyMarkdown(
|
||||
context.Background(),
|
||||
sessionWebhook,
|
||||
titleBytes,
|
||||
contentBytes,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send reply: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// truncateStringDingTalk truncates a string to max length for logging (avoiding name collision with telegram.go)
|
||||
func truncateStringDingTalk(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen]
|
||||
}
|
||||
246
pkg/channels/discord.go
Normal file
246
pkg/channels/discord.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/discordgo"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/logger"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/voice"
|
||||
)
|
||||
|
||||
type DiscordChannel struct {
|
||||
*BaseChannel
|
||||
session *discordgo.Session
|
||||
config config.DiscordConfig
|
||||
transcriber *voice.GroqTranscriber
|
||||
}
|
||||
|
||||
func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordChannel, error) {
|
||||
session, err := discordgo.New("Bot " + cfg.Token)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create discord session: %w", err)
|
||||
}
|
||||
|
||||
base := NewBaseChannel("discord", cfg, bus, cfg.AllowFrom)
|
||||
|
||||
return &DiscordChannel{
|
||||
BaseChannel: base,
|
||||
session: session,
|
||||
config: cfg,
|
||||
transcriber: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
|
||||
c.transcriber = transcriber
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) Start(ctx context.Context) error {
|
||||
logger.InfoC("discord", "Starting Discord bot")
|
||||
|
||||
c.session.AddHandler(c.handleMessage)
|
||||
|
||||
if err := c.session.Open(); err != nil {
|
||||
return fmt.Errorf("failed to open discord session: %w", err)
|
||||
}
|
||||
|
||||
c.setRunning(true)
|
||||
|
||||
botUser, err := c.session.User("@me")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get bot user: %w", err)
|
||||
}
|
||||
logger.InfoCF("discord", "Discord bot connected", map[string]interface{}{
|
||||
"username": botUser.Username,
|
||||
"user_id": botUser.ID,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) Stop(ctx context.Context) error {
|
||||
logger.InfoC("discord", "Stopping Discord bot")
|
||||
c.setRunning(false)
|
||||
|
||||
if err := c.session.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close discord session: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("discord bot not running")
|
||||
}
|
||||
|
||||
channelID := msg.ChatID
|
||||
if channelID == "" {
|
||||
return fmt.Errorf("channel ID is empty")
|
||||
}
|
||||
|
||||
message := msg.Content
|
||||
|
||||
if _, err := c.session.ChannelMessageSend(channelID, message); err != nil {
|
||||
return fmt.Errorf("failed to send discord message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.MessageCreate) {
|
||||
if m == nil || m.Author == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if m.Author.ID == s.State.User.ID {
|
||||
return
|
||||
}
|
||||
|
||||
senderID := m.Author.ID
|
||||
senderName := m.Author.Username
|
||||
if m.Author.Discriminator != "" && m.Author.Discriminator != "0" {
|
||||
senderName += "#" + m.Author.Discriminator
|
||||
}
|
||||
|
||||
content := m.Content
|
||||
mediaPaths := []string{}
|
||||
|
||||
for _, attachment := range m.Attachments {
|
||||
isAudio := isAudioFile(attachment.Filename, attachment.ContentType)
|
||||
|
||||
if isAudio {
|
||||
localPath := c.downloadAttachment(attachment.URL, attachment.Filename)
|
||||
if localPath != "" {
|
||||
mediaPaths = append(mediaPaths, localPath)
|
||||
|
||||
transcribedText := ""
|
||||
if c.transcriber != nil && c.transcriber.IsAvailable() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
result, err := c.transcriber.Transcribe(ctx, localPath)
|
||||
if err != nil {
|
||||
log.Printf("Voice transcription failed: %v", err)
|
||||
transcribedText = fmt.Sprintf("[audio: %s (transcription failed)]", localPath)
|
||||
} else {
|
||||
transcribedText = fmt.Sprintf("[audio transcription: %s]", result.Text)
|
||||
log.Printf("Audio transcribed successfully: %s", result.Text)
|
||||
}
|
||||
} else {
|
||||
transcribedText = fmt.Sprintf("[audio: %s]", localPath)
|
||||
}
|
||||
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += transcribedText
|
||||
} else {
|
||||
mediaPaths = append(mediaPaths, attachment.URL)
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += fmt.Sprintf("[attachment: %s]", attachment.URL)
|
||||
}
|
||||
} else {
|
||||
mediaPaths = append(mediaPaths, attachment.URL)
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += fmt.Sprintf("[attachment: %s]", attachment.URL)
|
||||
}
|
||||
}
|
||||
|
||||
if content == "" && len(mediaPaths) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if content == "" {
|
||||
content = "[media only]"
|
||||
}
|
||||
|
||||
logger.DebugCF("discord", "Received message", map[string]interface{}{
|
||||
"sender_name": senderName,
|
||||
"sender_id": senderID,
|
||||
"preview": truncateString(content, 50),
|
||||
})
|
||||
|
||||
metadata := map[string]string{
|
||||
"message_id": m.ID,
|
||||
"user_id": senderID,
|
||||
"username": m.Author.Username,
|
||||
"display_name": senderName,
|
||||
"guild_id": m.GuildID,
|
||||
"channel_id": m.ChannelID,
|
||||
"is_dm": fmt.Sprintf("%t", m.GuildID == ""),
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, m.ChannelID, content, mediaPaths, metadata)
|
||||
}
|
||||
|
||||
func isAudioFile(filename, contentType string) bool {
|
||||
audioExtensions := []string{".mp3", ".wav", ".ogg", ".m4a", ".flac", ".aac", ".wma"}
|
||||
audioTypes := []string{"audio/", "application/ogg", "application/x-ogg"}
|
||||
|
||||
for _, ext := range audioExtensions {
|
||||
if strings.HasSuffix(strings.ToLower(filename), ext) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
for _, audioType := range audioTypes {
|
||||
if strings.HasPrefix(strings.ToLower(contentType), audioType) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) downloadAttachment(url, filename string) string {
|
||||
mediaDir := filepath.Join(os.TempDir(), "clawgo_media")
|
||||
if err := os.MkdirAll(mediaDir, 0755); err != nil {
|
||||
log.Printf("Failed to create media directory: %v", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
localPath := filepath.Join(mediaDir, filename)
|
||||
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
log.Printf("Failed to download attachment: %v", err)
|
||||
return ""
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.Printf("Failed to download attachment, status: %d", resp.StatusCode)
|
||||
return ""
|
||||
}
|
||||
|
||||
out, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
log.Printf("Failed to create file: %v", err)
|
||||
return ""
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("Failed to write file: %v", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
log.Printf("Attachment downloaded successfully to: %s", localPath)
|
||||
return localPath
|
||||
}
|
||||
215
pkg/channels/feishu.go
Normal file
215
pkg/channels/feishu.go
Normal file
@@ -0,0 +1,215 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
lark "github.com/larksuite/oapi-sdk-go/v3"
|
||||
larkdispatcher "github.com/larksuite/oapi-sdk-go/v3/event/dispatcher"
|
||||
larkim "github.com/larksuite/oapi-sdk-go/v3/service/im/v1"
|
||||
larkws "github.com/larksuite/oapi-sdk-go/v3/ws"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/logger"
|
||||
)
|
||||
|
||||
type FeishuChannel struct {
|
||||
*BaseChannel
|
||||
config config.FeishuConfig
|
||||
client *lark.Client
|
||||
wsClient *larkws.Client
|
||||
|
||||
mu sync.Mutex
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChannel, error) {
|
||||
base := NewBaseChannel("feishu", cfg, bus, cfg.AllowFrom)
|
||||
|
||||
return &FeishuChannel{
|
||||
BaseChannel: base,
|
||||
config: cfg,
|
||||
client: lark.NewClient(cfg.AppID, cfg.AppSecret),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *FeishuChannel) Start(ctx context.Context) error {
|
||||
if c.config.AppID == "" || c.config.AppSecret == "" {
|
||||
return fmt.Errorf("feishu app_id or app_secret is empty")
|
||||
}
|
||||
|
||||
dispatcher := larkdispatcher.NewEventDispatcher(c.config.VerificationToken, c.config.EncryptKey).
|
||||
OnP2MessageReceiveV1(c.handleMessageReceive)
|
||||
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
c.mu.Lock()
|
||||
c.cancel = cancel
|
||||
c.wsClient = larkws.NewClient(
|
||||
c.config.AppID,
|
||||
c.config.AppSecret,
|
||||
larkws.WithEventHandler(dispatcher),
|
||||
)
|
||||
wsClient := c.wsClient
|
||||
c.mu.Unlock()
|
||||
|
||||
c.setRunning(true)
|
||||
logger.InfoC("feishu", "Feishu channel started (websocket mode)")
|
||||
|
||||
go func() {
|
||||
if err := wsClient.Start(runCtx); err != nil {
|
||||
logger.ErrorCF("feishu", "Feishu websocket stopped with error", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *FeishuChannel) Stop(ctx context.Context) error {
|
||||
c.mu.Lock()
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
c.cancel = nil
|
||||
}
|
||||
c.wsClient = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
c.setRunning(false)
|
||||
logger.InfoC("feishu", "Feishu channel stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("feishu channel not running")
|
||||
}
|
||||
|
||||
if msg.ChatID == "" {
|
||||
return fmt.Errorf("chat ID is empty")
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(map[string]string{"text": msg.Content})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal feishu content: %w", err)
|
||||
}
|
||||
|
||||
req := larkim.NewCreateMessageReqBuilder().
|
||||
ReceiveIdType(larkim.ReceiveIdTypeChatId).
|
||||
Body(larkim.NewCreateMessageReqBodyBuilder().
|
||||
ReceiveId(msg.ChatID).
|
||||
MsgType(larkim.MsgTypeText).
|
||||
Content(string(payload)).
|
||||
Uuid(fmt.Sprintf("clawgo-%d", time.Now().UnixNano())).
|
||||
Build()).
|
||||
Build()
|
||||
|
||||
resp, err := c.client.Im.V1.Message.Create(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send feishu message: %w", err)
|
||||
}
|
||||
|
||||
if !resp.Success() {
|
||||
return fmt.Errorf("feishu api error: code=%d msg=%s", resp.Code, resp.Msg)
|
||||
}
|
||||
|
||||
logger.DebugCF("feishu", "Feishu message sent", map[string]interface{}{
|
||||
"chat_id": msg.ChatID,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2MessageReceiveV1) error {
|
||||
if event == nil || event.Event == nil || event.Event.Message == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
message := event.Event.Message
|
||||
sender := event.Event.Sender
|
||||
|
||||
chatID := stringValue(message.ChatId)
|
||||
if chatID == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
senderID := extractFeishuSenderID(sender)
|
||||
if senderID == "" {
|
||||
senderID = "unknown"
|
||||
}
|
||||
|
||||
content := extractFeishuMessageContent(message)
|
||||
if content == "" {
|
||||
content = "[empty message]"
|
||||
}
|
||||
|
||||
metadata := map[string]string{}
|
||||
if messageID := stringValue(message.MessageId); messageID != "" {
|
||||
metadata["message_id"] = messageID
|
||||
}
|
||||
if messageType := stringValue(message.MessageType); messageType != "" {
|
||||
metadata["message_type"] = messageType
|
||||
}
|
||||
if chatType := stringValue(message.ChatType); chatType != "" {
|
||||
metadata["chat_type"] = chatType
|
||||
}
|
||||
if sender != nil && sender.TenantKey != nil {
|
||||
metadata["tenant_key"] = *sender.TenantKey
|
||||
}
|
||||
|
||||
logger.InfoCF("feishu", "Feishu message received", map[string]interface{}{
|
||||
"sender_id": senderID,
|
||||
"chat_id": chatID,
|
||||
"preview": truncateString(content, 80),
|
||||
})
|
||||
|
||||
c.HandleMessage(senderID, chatID, content, nil, metadata)
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractFeishuSenderID(sender *larkim.EventSender) string {
|
||||
if sender == nil || sender.SenderId == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
if sender.SenderId.UserId != nil && *sender.SenderId.UserId != "" {
|
||||
return *sender.SenderId.UserId
|
||||
}
|
||||
if sender.SenderId.OpenId != nil && *sender.SenderId.OpenId != "" {
|
||||
return *sender.SenderId.OpenId
|
||||
}
|
||||
if sender.SenderId.UnionId != nil && *sender.SenderId.UnionId != "" {
|
||||
return *sender.SenderId.UnionId
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func extractFeishuMessageContent(message *larkim.EventMessage) string {
|
||||
if message == nil || message.Content == nil || *message.Content == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
if message.MessageType != nil && *message.MessageType == larkim.MsgTypeText {
|
||||
var textPayload struct {
|
||||
Text string `json:"text"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(*message.Content), &textPayload); err == nil {
|
||||
return textPayload.Text
|
||||
}
|
||||
}
|
||||
|
||||
return *message.Content
|
||||
}
|
||||
|
||||
func stringValue(v *string) string {
|
||||
if v == nil {
|
||||
return ""
|
||||
}
|
||||
return *v
|
||||
}
|
||||
243
pkg/channels/maixcam.go
Normal file
243
pkg/channels/maixcam.go
Normal file
@@ -0,0 +1,243 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/logger"
|
||||
)
|
||||
|
||||
type MaixCamChannel struct {
|
||||
*BaseChannel
|
||||
config config.MaixCamConfig
|
||||
listener net.Listener
|
||||
clients map[net.Conn]bool
|
||||
clientsMux sync.RWMutex
|
||||
running bool
|
||||
}
|
||||
|
||||
type MaixCamMessage struct {
|
||||
Type string `json:"type"`
|
||||
Tips string `json:"tips"`
|
||||
Timestamp float64 `json:"timestamp"`
|
||||
Data map[string]interface{} `json:"data"`
|
||||
}
|
||||
|
||||
func NewMaixCamChannel(cfg config.MaixCamConfig, bus *bus.MessageBus) (*MaixCamChannel, error) {
|
||||
base := NewBaseChannel("maixcam", cfg, bus, cfg.AllowFrom)
|
||||
|
||||
return &MaixCamChannel{
|
||||
BaseChannel: base,
|
||||
config: cfg,
|
||||
clients: make(map[net.Conn]bool),
|
||||
running: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) Start(ctx context.Context) error {
|
||||
logger.InfoC("maixcam", "Starting MaixCam channel server")
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", c.config.Host, c.config.Port)
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to listen on %s: %w", addr, err)
|
||||
}
|
||||
|
||||
c.listener = listener
|
||||
c.setRunning(true)
|
||||
|
||||
logger.InfoCF("maixcam", "MaixCam server listening", map[string]interface{}{
|
||||
"host": c.config.Host,
|
||||
"port": c.config.Port,
|
||||
})
|
||||
|
||||
go c.acceptConnections(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) acceptConnections(ctx context.Context) {
|
||||
logger.DebugC("maixcam", "Starting connection acceptor")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.InfoC("maixcam", "Stopping connection acceptor")
|
||||
return
|
||||
default:
|
||||
conn, err := c.listener.Accept()
|
||||
if err != nil {
|
||||
if c.running {
|
||||
logger.ErrorCF("maixcam", "Failed to accept connection", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
logger.InfoCF("maixcam", "New connection from MaixCam device", map[string]interface{}{
|
||||
"remote_addr": conn.RemoteAddr().String(),
|
||||
})
|
||||
|
||||
c.clientsMux.Lock()
|
||||
c.clients[conn] = true
|
||||
c.clientsMux.Unlock()
|
||||
|
||||
go c.handleConnection(conn, ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) handleConnection(conn net.Conn, ctx context.Context) {
|
||||
logger.DebugC("maixcam", "Handling MaixCam connection")
|
||||
|
||||
defer func() {
|
||||
conn.Close()
|
||||
c.clientsMux.Lock()
|
||||
delete(c.clients, conn)
|
||||
c.clientsMux.Unlock()
|
||||
logger.DebugC("maixcam", "Connection closed")
|
||||
}()
|
||||
|
||||
decoder := json.NewDecoder(conn)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
var msg MaixCamMessage
|
||||
if err := decoder.Decode(&msg); err != nil {
|
||||
if err.Error() != "EOF" {
|
||||
logger.ErrorCF("maixcam", "Failed to decode message", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
c.processMessage(msg, conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) processMessage(msg MaixCamMessage, conn net.Conn) {
|
||||
switch msg.Type {
|
||||
case "person_detected":
|
||||
c.handlePersonDetection(msg)
|
||||
case "heartbeat":
|
||||
logger.DebugC("maixcam", "Received heartbeat")
|
||||
case "status":
|
||||
c.handleStatusUpdate(msg)
|
||||
default:
|
||||
logger.WarnCF("maixcam", "Unknown message type", map[string]interface{}{
|
||||
"type": msg.Type,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) handlePersonDetection(msg MaixCamMessage) {
|
||||
logger.InfoCF("maixcam", "", map[string]interface{}{
|
||||
"timestamp": msg.Timestamp,
|
||||
"data": msg.Data,
|
||||
})
|
||||
|
||||
senderID := "maixcam"
|
||||
chatID := "default"
|
||||
|
||||
classInfo, ok := msg.Data["class_name"].(string)
|
||||
if !ok {
|
||||
classInfo = "person"
|
||||
}
|
||||
|
||||
score, _ := msg.Data["score"].(float64)
|
||||
x, _ := msg.Data["x"].(float64)
|
||||
y, _ := msg.Data["y"].(float64)
|
||||
w, _ := msg.Data["w"].(float64)
|
||||
h, _ := msg.Data["h"].(float64)
|
||||
|
||||
content := fmt.Sprintf("📷 Person detected!\nClass: %s\nConfidence: %.2f%%\nPosition: (%.0f, %.0f)\nSize: %.0fx%.0f",
|
||||
classInfo, score*100, x, y, w, h)
|
||||
|
||||
metadata := map[string]string{
|
||||
"timestamp": fmt.Sprintf("%.0f", msg.Timestamp),
|
||||
"class_id": fmt.Sprintf("%.0f", msg.Data["class_id"]),
|
||||
"score": fmt.Sprintf("%.2f", score),
|
||||
"x": fmt.Sprintf("%.0f", x),
|
||||
"y": fmt.Sprintf("%.0f", y),
|
||||
"w": fmt.Sprintf("%.0f", w),
|
||||
"h": fmt.Sprintf("%.0f", h),
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, chatID, content, []string{}, metadata)
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) handleStatusUpdate(msg MaixCamMessage) {
|
||||
logger.InfoCF("maixcam", "Status update from MaixCam", map[string]interface{}{
|
||||
"status": msg.Data,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) Stop(ctx context.Context) error {
|
||||
logger.InfoC("maixcam", "Stopping MaixCam channel")
|
||||
c.setRunning(false)
|
||||
|
||||
if c.listener != nil {
|
||||
c.listener.Close()
|
||||
}
|
||||
|
||||
c.clientsMux.Lock()
|
||||
defer c.clientsMux.Unlock()
|
||||
|
||||
for conn := range c.clients {
|
||||
conn.Close()
|
||||
}
|
||||
c.clients = make(map[net.Conn]bool)
|
||||
|
||||
logger.InfoC("maixcam", "MaixCam channel stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("maixcam channel not running")
|
||||
}
|
||||
|
||||
c.clientsMux.RLock()
|
||||
defer c.clientsMux.RUnlock()
|
||||
|
||||
if len(c.clients) == 0 {
|
||||
logger.WarnC("maixcam", "No MaixCam devices connected")
|
||||
return fmt.Errorf("no connected MaixCam devices")
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"type": "command",
|
||||
"timestamp": float64(0),
|
||||
"message": msg.Content,
|
||||
"chat_id": msg.ChatID,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(response)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal response: %w", err)
|
||||
}
|
||||
|
||||
var sendErr error
|
||||
for conn := range c.clients {
|
||||
if _, err := conn.Write(data); err != nil {
|
||||
logger.ErrorCF("maixcam", "Failed to send to client", map[string]interface{}{
|
||||
"client": conn.RemoteAddr().String(),
|
||||
"error": err.Error(),
|
||||
})
|
||||
sendErr = err
|
||||
}
|
||||
}
|
||||
|
||||
return sendErr
|
||||
}
|
||||
300
pkg/channels/manager.go
Normal file
300
pkg/channels/manager.go
Normal file
@@ -0,0 +1,300 @@
|
||||
// ClawGo - Ultra-lightweight personal AI agent
|
||||
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
||||
// License: MIT
|
||||
//
|
||||
// Copyright (c) 2026 ClawGo contributors
|
||||
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/logger"
|
||||
)
|
||||
|
||||
type Manager struct {
|
||||
channels map[string]Channel
|
||||
bus *bus.MessageBus
|
||||
config *config.Config
|
||||
dispatchTask *asyncTask
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
type asyncTask struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error) {
|
||||
m := &Manager{
|
||||
channels: make(map[string]Channel),
|
||||
bus: messageBus,
|
||||
config: cfg,
|
||||
}
|
||||
|
||||
if err := m.initChannels(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *Manager) initChannels() error {
|
||||
logger.InfoC("channels", "Initializing channel manager")
|
||||
|
||||
if m.config.Channels.Telegram.Enabled && m.config.Channels.Telegram.Token != "" {
|
||||
logger.DebugC("channels", "Attempting to initialize Telegram channel")
|
||||
telegram, err := NewTelegramChannel(m.config.Channels.Telegram, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize Telegram channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["telegram"] = telegram
|
||||
logger.InfoC("channels", "Telegram channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.Channels.WhatsApp.Enabled && m.config.Channels.WhatsApp.BridgeURL != "" {
|
||||
logger.DebugC("channels", "Attempting to initialize WhatsApp channel")
|
||||
whatsapp, err := NewWhatsAppChannel(m.config.Channels.WhatsApp, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize WhatsApp channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["whatsapp"] = whatsapp
|
||||
logger.InfoC("channels", "WhatsApp channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.Channels.Feishu.Enabled {
|
||||
logger.DebugC("channels", "Attempting to initialize Feishu channel")
|
||||
feishu, err := NewFeishuChannel(m.config.Channels.Feishu, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize Feishu channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["feishu"] = feishu
|
||||
logger.InfoC("channels", "Feishu channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.Channels.Discord.Enabled && m.config.Channels.Discord.Token != "" {
|
||||
logger.DebugC("channels", "Attempting to initialize Discord channel")
|
||||
discord, err := NewDiscordChannel(m.config.Channels.Discord, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize Discord channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["discord"] = discord
|
||||
logger.InfoC("channels", "Discord channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.Channels.MaixCam.Enabled {
|
||||
logger.DebugC("channels", "Attempting to initialize MaixCam channel")
|
||||
maixcam, err := NewMaixCamChannel(m.config.Channels.MaixCam, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize MaixCam channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["maixcam"] = maixcam
|
||||
logger.InfoC("channels", "MaixCam channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.Channels.QQ.Enabled {
|
||||
logger.DebugC("channels", "Attempting to initialize QQ channel")
|
||||
qq, err := NewQQChannel(m.config.Channels.QQ, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize QQ channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["qq"] = qq
|
||||
logger.InfoC("channels", "QQ channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.Channels.DingTalk.Enabled && m.config.Channels.DingTalk.ClientID != "" {
|
||||
logger.DebugC("channels", "Attempting to initialize DingTalk channel")
|
||||
dingtalk, err := NewDingTalkChannel(m.config.Channels.DingTalk, m.bus)
|
||||
if err != nil {
|
||||
logger.ErrorCF("channels", "Failed to initialize DingTalk channel", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
m.channels["dingtalk"] = dingtalk
|
||||
logger.InfoC("channels", "DingTalk channel enabled successfully")
|
||||
}
|
||||
}
|
||||
|
||||
logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{
|
||||
"enabled_channels": len(m.channels),
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) StartAll(ctx context.Context) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if len(m.channels) == 0 {
|
||||
logger.WarnC("channels", "No channels enabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.InfoC("channels", "Starting all channels")
|
||||
|
||||
dispatchCtx, cancel := context.WithCancel(ctx)
|
||||
m.dispatchTask = &asyncTask{cancel: cancel}
|
||||
|
||||
go m.dispatchOutbound(dispatchCtx)
|
||||
|
||||
for name, channel := range m.channels {
|
||||
logger.InfoCF("channels", "Starting channel", map[string]interface{}{
|
||||
"channel": name,
|
||||
})
|
||||
if err := channel.Start(ctx); err != nil {
|
||||
logger.ErrorCF("channels", "Failed to start channel", map[string]interface{}{
|
||||
"channel": name,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
logger.InfoC("channels", "All channels started")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) StopAll(ctx context.Context) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
logger.InfoC("channels", "Stopping all channels")
|
||||
|
||||
if m.dispatchTask != nil {
|
||||
m.dispatchTask.cancel()
|
||||
m.dispatchTask = nil
|
||||
}
|
||||
|
||||
for name, channel := range m.channels {
|
||||
logger.InfoCF("channels", "Stopping channel", map[string]interface{}{
|
||||
"channel": name,
|
||||
})
|
||||
if err := channel.Stop(ctx); err != nil {
|
||||
logger.ErrorCF("channels", "Error stopping channel", map[string]interface{}{
|
||||
"channel": name,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
logger.InfoC("channels", "All channels stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
||||
logger.InfoC("channels", "Outbound dispatcher started")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.InfoC("channels", "Outbound dispatcher stopped")
|
||||
return
|
||||
default:
|
||||
msg, ok := m.bus.SubscribeOutbound(ctx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
m.mu.RLock()
|
||||
channel, exists := m.channels[msg.Channel]
|
||||
m.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
logger.WarnCF("channels", "Unknown channel for outbound message", map[string]interface{}{
|
||||
"channel": msg.Channel,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if err := channel.Send(ctx, msg); err != nil {
|
||||
logger.ErrorCF("channels", "Error sending message to channel", map[string]interface{}{
|
||||
"channel": msg.Channel,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) GetChannel(name string) (Channel, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
channel, ok := m.channels[name]
|
||||
return channel, ok
|
||||
}
|
||||
|
||||
func (m *Manager) GetStatus() map[string]interface{} {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
status := make(map[string]interface{})
|
||||
for name, channel := range m.channels {
|
||||
status[name] = map[string]interface{}{
|
||||
"enabled": true,
|
||||
"running": channel.IsRunning(),
|
||||
}
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func (m *Manager) GetEnabledChannels() []string {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
names := make([]string, 0, len(m.channels))
|
||||
for name := range m.channels {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
func (m *Manager) RegisterChannel(name string, channel Channel) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.channels[name] = channel
|
||||
}
|
||||
|
||||
func (m *Manager) UnregisterChannel(name string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.channels, name)
|
||||
}
|
||||
|
||||
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
|
||||
m.mu.RLock()
|
||||
channel, exists := m.channels[channelName]
|
||||
m.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("channel %s not found", channelName)
|
||||
}
|
||||
|
||||
msg := bus.OutboundMessage{
|
||||
Channel: channelName,
|
||||
ChatID: chatID,
|
||||
Content: content,
|
||||
}
|
||||
|
||||
return channel.Send(ctx, msg)
|
||||
}
|
||||
243
pkg/channels/qq.go
Normal file
243
pkg/channels/qq.go
Normal file
@@ -0,0 +1,243 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tencent-connect/botgo"
|
||||
"github.com/tencent-connect/botgo/dto"
|
||||
"github.com/tencent-connect/botgo/event"
|
||||
"github.com/tencent-connect/botgo/openapi"
|
||||
"github.com/tencent-connect/botgo/token"
|
||||
"golang.org/x/oauth2"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/logger"
|
||||
)
|
||||
|
||||
type QQChannel struct {
|
||||
*BaseChannel
|
||||
config config.QQConfig
|
||||
api openapi.OpenAPI
|
||||
tokenSource oauth2.TokenSource
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
sessionManager botgo.SessionManager
|
||||
processedIDs map[string]bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewQQChannel(cfg config.QQConfig, messageBus *bus.MessageBus) (*QQChannel, error) {
|
||||
base := NewBaseChannel("qq", cfg, messageBus, cfg.AllowFrom)
|
||||
|
||||
return &QQChannel{
|
||||
BaseChannel: base,
|
||||
config: cfg,
|
||||
processedIDs: make(map[string]bool),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *QQChannel) Start(ctx context.Context) error {
|
||||
if c.config.AppID == "" || c.config.AppSecret == "" {
|
||||
return fmt.Errorf("QQ app_id and app_secret not configured")
|
||||
}
|
||||
|
||||
logger.InfoC("qq", "Starting QQ bot (WebSocket mode)")
|
||||
|
||||
// 创建 token source
|
||||
credentials := &token.QQBotCredentials{
|
||||
AppID: c.config.AppID,
|
||||
AppSecret: c.config.AppSecret,
|
||||
}
|
||||
c.tokenSource = token.NewQQBotTokenSource(credentials)
|
||||
|
||||
// 创建子 context
|
||||
c.ctx, c.cancel = context.WithCancel(ctx)
|
||||
|
||||
// 启动自动刷新 token 协程
|
||||
if err := token.StartRefreshAccessToken(c.ctx, c.tokenSource); err != nil {
|
||||
return fmt.Errorf("failed to start token refresh: %w", err)
|
||||
}
|
||||
|
||||
// 初始化 OpenAPI 客户端
|
||||
c.api = botgo.NewOpenAPI(c.config.AppID, c.tokenSource).WithTimeout(5 * time.Second)
|
||||
|
||||
// 注册事件处理器
|
||||
intent := event.RegisterHandlers(
|
||||
c.handleC2CMessage(),
|
||||
c.handleGroupATMessage(),
|
||||
)
|
||||
|
||||
// 获取 WebSocket 接入点
|
||||
wsInfo, err := c.api.WS(c.ctx, nil, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get websocket info: %w", err)
|
||||
}
|
||||
|
||||
logger.InfoCF("qq", "Got WebSocket info", map[string]interface{}{
|
||||
"shards": wsInfo.Shards,
|
||||
})
|
||||
|
||||
// 创建并保存 sessionManager
|
||||
c.sessionManager = botgo.NewSessionManager()
|
||||
|
||||
// 在 goroutine 中启动 WebSocket 连接,避免阻塞
|
||||
go func() {
|
||||
if err := c.sessionManager.Start(wsInfo, c.tokenSource, &intent); err != nil {
|
||||
logger.ErrorCF("qq", "WebSocket session error", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
c.setRunning(false)
|
||||
}
|
||||
}()
|
||||
|
||||
c.setRunning(true)
|
||||
logger.InfoC("qq", "QQ bot started successfully")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *QQChannel) Stop(ctx context.Context) error {
|
||||
logger.InfoC("qq", "Stopping QQ bot")
|
||||
c.setRunning(false)
|
||||
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("QQ bot not running")
|
||||
}
|
||||
|
||||
// 构造消息
|
||||
msgToCreate := &dto.MessageToCreate{
|
||||
Content: msg.Content,
|
||||
}
|
||||
|
||||
// C2C 消息发送
|
||||
_, err := c.api.PostC2CMessage(ctx, msg.ChatID, msgToCreate)
|
||||
if err != nil {
|
||||
logger.ErrorCF("qq", "Failed to send C2C message", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleC2CMessage 处理 QQ 私聊消息
|
||||
func (c *QQChannel) handleC2CMessage() event.C2CMessageEventHandler {
|
||||
return func(event *dto.WSPayload, data *dto.WSC2CMessageData) error {
|
||||
// 去重检查
|
||||
if c.isDuplicate(data.ID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 提取用户信息
|
||||
var senderID string
|
||||
if data.Author != nil && data.Author.ID != "" {
|
||||
senderID = data.Author.ID
|
||||
} else {
|
||||
logger.WarnC("qq", "Received message with no sender ID")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 提取消息内容
|
||||
content := data.Content
|
||||
if content == "" {
|
||||
logger.DebugC("qq", "Received empty message, ignoring")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.InfoCF("qq", "Received C2C message", map[string]interface{}{
|
||||
"sender": senderID,
|
||||
"length": len(content),
|
||||
})
|
||||
|
||||
// 转发到消息总线
|
||||
metadata := map[string]string{
|
||||
"message_id": data.ID,
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, senderID, content, []string{}, metadata)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// handleGroupATMessage 处理群@消息
|
||||
func (c *QQChannel) handleGroupATMessage() event.GroupATMessageEventHandler {
|
||||
return func(event *dto.WSPayload, data *dto.WSGroupATMessageData) error {
|
||||
// 去重检查
|
||||
if c.isDuplicate(data.ID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 提取用户信息
|
||||
var senderID string
|
||||
if data.Author != nil && data.Author.ID != "" {
|
||||
senderID = data.Author.ID
|
||||
} else {
|
||||
logger.WarnC("qq", "Received group message with no sender ID")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 提取消息内容(去掉 @ 机器人部分)
|
||||
content := data.Content
|
||||
if content == "" {
|
||||
logger.DebugC("qq", "Received empty group message, ignoring")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.InfoCF("qq", "Received group AT message", map[string]interface{}{
|
||||
"sender": senderID,
|
||||
"group": data.GroupID,
|
||||
"length": len(content),
|
||||
})
|
||||
|
||||
// 转发到消息总线(使用 GroupID 作为 ChatID)
|
||||
metadata := map[string]string{
|
||||
"message_id": data.ID,
|
||||
"group_id": data.GroupID,
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, data.GroupID, content, []string{}, metadata)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// isDuplicate 检查消息是否重复
|
||||
func (c *QQChannel) isDuplicate(messageID string) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.processedIDs[messageID] {
|
||||
return true
|
||||
}
|
||||
|
||||
c.processedIDs[messageID] = true
|
||||
|
||||
// 简单清理:限制 map 大小
|
||||
if len(c.processedIDs) > 10000 {
|
||||
// 清空一半
|
||||
count := 0
|
||||
for id := range c.processedIDs {
|
||||
if count >= 5000 {
|
||||
break
|
||||
}
|
||||
delete(c.processedIDs, id)
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
475
pkg/channels/telegram.go
Normal file
475
pkg/channels/telegram.go
Normal file
@@ -0,0 +1,475 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mymmrac/telego"
|
||||
"github.com/mymmrac/telego/telegoutil"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/voice"
|
||||
)
|
||||
|
||||
type TelegramChannel struct {
|
||||
*BaseChannel
|
||||
bot *telego.Bot
|
||||
config config.TelegramConfig
|
||||
chatIDs map[string]int64
|
||||
updates <-chan telego.Update
|
||||
transcriber *voice.GroqTranscriber
|
||||
placeholders sync.Map // chatID -> messageID
|
||||
stopThinking sync.Map // chatID -> chan struct{}
|
||||
}
|
||||
|
||||
func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) {
|
||||
bot, err := telego.NewBot(cfg.Token, telego.WithDefaultLogger(false, false))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create telegram bot: %w", err)
|
||||
}
|
||||
|
||||
base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom)
|
||||
|
||||
return &TelegramChannel{
|
||||
BaseChannel: base,
|
||||
bot: bot,
|
||||
config: cfg,
|
||||
chatIDs: make(map[string]int64),
|
||||
transcriber: nil,
|
||||
placeholders: sync.Map{},
|
||||
stopThinking: sync.Map{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
|
||||
c.transcriber = transcriber
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) Start(ctx context.Context) error {
|
||||
log.Printf("Starting Telegram bot (polling mode)...")
|
||||
|
||||
updates, err := c.bot.UpdatesViaLongPolling(nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start updates polling: %w", err)
|
||||
}
|
||||
c.updates = updates
|
||||
|
||||
c.setRunning(true)
|
||||
|
||||
botInfo, err := c.bot.GetMe(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get bot info: %w", err)
|
||||
}
|
||||
log.Printf("Telegram bot @%s connected", botInfo.Username)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case update, ok := <-updates:
|
||||
if !ok {
|
||||
log.Printf("Updates channel closed")
|
||||
return
|
||||
}
|
||||
if update.Message != nil {
|
||||
c.handleMessage(update.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) Stop(ctx context.Context) error {
|
||||
log.Println("Stopping Telegram bot...")
|
||||
c.setRunning(false)
|
||||
|
||||
if c.updates != nil {
|
||||
c.bot.StopLongPolling()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("telegram bot not running")
|
||||
}
|
||||
|
||||
chatIDInt, err := parseChatID(msg.ChatID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid chat ID: %w", err)
|
||||
}
|
||||
chatID := telegoutil.ID(chatIDInt)
|
||||
|
||||
// Stop thinking animation
|
||||
if stop, ok := c.stopThinking.Load(msg.ChatID); ok {
|
||||
close(stop.(chan struct{}))
|
||||
c.stopThinking.Delete(msg.ChatID)
|
||||
}
|
||||
|
||||
htmlContent := markdownToTelegramHTML(msg.Content)
|
||||
|
||||
// Try to edit placeholder
|
||||
if pID, ok := c.placeholders.Load(msg.ChatID); ok {
|
||||
c.placeholders.Delete(msg.ChatID)
|
||||
|
||||
_, err := c.bot.EditMessageText(ctx, &telego.EditMessageTextParams{
|
||||
ChatID: chatID,
|
||||
MessageID: pID.(int),
|
||||
Text: htmlContent,
|
||||
ParseMode: telego.ModeHTML,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
// Fallback to new message if edit fails
|
||||
}
|
||||
|
||||
_, err = c.bot.SendMessage(ctx, telegoutil.Message(chatID, htmlContent).WithParseMode(telego.ModeHTML))
|
||||
|
||||
if err != nil {
|
||||
log.Printf("HTML parse failed, falling back to plain text: %v", err)
|
||||
_, err = c.bot.SendMessage(ctx, telegoutil.Message(chatID, msg.Content))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) handleMessage(message *telego.Message) {
|
||||
if message == nil {
|
||||
return
|
||||
}
|
||||
|
||||
user := message.From
|
||||
if user == nil {
|
||||
return
|
||||
}
|
||||
|
||||
senderID := fmt.Sprintf("%d", user.ID)
|
||||
if user.Username != "" {
|
||||
senderID = fmt.Sprintf("%d|%s", user.ID, user.Username)
|
||||
}
|
||||
|
||||
chatID := message.Chat.ID
|
||||
c.chatIDs[senderID] = chatID
|
||||
|
||||
content := ""
|
||||
mediaPaths := []string{}
|
||||
|
||||
if message.Text != "" {
|
||||
content += message.Text
|
||||
}
|
||||
|
||||
if message.Caption != "" {
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += message.Caption
|
||||
}
|
||||
|
||||
if message.Photo != nil && len(message.Photo) > 0 {
|
||||
photo := message.Photo[len(message.Photo)-1]
|
||||
photoPath := c.downloadFile(photo.FileID, ".jpg")
|
||||
if photoPath != "" {
|
||||
mediaPaths = append(mediaPaths, photoPath)
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += fmt.Sprintf("[image: %s]", photoPath)
|
||||
}
|
||||
}
|
||||
|
||||
if message.Voice != nil {
|
||||
voicePath := c.downloadFile(message.Voice.FileID, ".ogg")
|
||||
if voicePath != "" {
|
||||
mediaPaths = append(mediaPaths, voicePath)
|
||||
|
||||
transcribedText := ""
|
||||
if c.transcriber != nil && c.transcriber.IsAvailable() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
result, err := c.transcriber.Transcribe(ctx, voicePath)
|
||||
if err != nil {
|
||||
log.Printf("Voice transcription failed: %v", err)
|
||||
transcribedText = fmt.Sprintf("[voice: %s (transcription failed)]", voicePath)
|
||||
} else {
|
||||
transcribedText = fmt.Sprintf("[voice transcription: %s]", result.Text)
|
||||
log.Printf("Voice transcribed successfully: %s", result.Text)
|
||||
}
|
||||
} else {
|
||||
transcribedText = fmt.Sprintf("[voice: %s]", voicePath)
|
||||
}
|
||||
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += transcribedText
|
||||
}
|
||||
}
|
||||
|
||||
if message.Audio != nil {
|
||||
audioPath := c.downloadFile(message.Audio.FileID, ".mp3")
|
||||
if audioPath != "" {
|
||||
mediaPaths = append(mediaPaths, audioPath)
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += fmt.Sprintf("[audio: %s]", audioPath)
|
||||
}
|
||||
}
|
||||
|
||||
if message.Document != nil {
|
||||
docPath := c.downloadFile(message.Document.FileID, "")
|
||||
if docPath != "" {
|
||||
mediaPaths = append(mediaPaths, docPath)
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += fmt.Sprintf("[file: %s]", docPath)
|
||||
}
|
||||
}
|
||||
|
||||
if content == "" {
|
||||
content = "[empty message]"
|
||||
}
|
||||
|
||||
log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50))
|
||||
|
||||
// Thinking indicator
|
||||
_ = c.bot.SendChatAction(context.Background(), &telego.SendChatActionParams{
|
||||
ChatID: telegoutil.ID(chatID),
|
||||
Action: telego.ChatActionTyping,
|
||||
})
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
c.stopThinking.Store(fmt.Sprintf("%d", chatID), stopChan)
|
||||
|
||||
pMsg, err := c.bot.SendMessage(context.Background(), telegoutil.Message(telegoutil.ID(chatID), "Thinking... 💭"))
|
||||
if err == nil {
|
||||
pID := pMsg.MessageID
|
||||
c.placeholders.Store(fmt.Sprintf("%d", chatID), pID)
|
||||
|
||||
go func(cid int64, mid int, stop <-chan struct{}) {
|
||||
dots := []string{".", "..", "..."}
|
||||
emotes := []string{"💭", "🤔", "☁️"}
|
||||
i := 0
|
||||
ticker := time.NewTicker(2000 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
i++
|
||||
text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)])
|
||||
_, _ = c.bot.EditMessageText(context.Background(), &telego.EditMessageTextParams{
|
||||
ChatID: telegoutil.ID(cid),
|
||||
MessageID: mid,
|
||||
Text: text,
|
||||
})
|
||||
}
|
||||
}
|
||||
}(chatID, pID, stopChan)
|
||||
}
|
||||
|
||||
metadata := map[string]string{
|
||||
"message_id": fmt.Sprintf("%d", message.MessageID),
|
||||
"user_id": fmt.Sprintf("%d", user.ID),
|
||||
"username": user.Username,
|
||||
"first_name": user.FirstName,
|
||||
"is_group": fmt.Sprintf("%t", message.Chat.Type != telego.ChatTypePrivate),
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata)
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) downloadFile(fileID, ext string) string {
|
||||
file, err := c.bot.GetFile(context.Background(), &telego.GetFileParams{FileID: fileID})
|
||||
if err != nil {
|
||||
log.Printf("Failed to get file: %v", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
if file.FilePath == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
// In telego, we can use Link() or just build the URL
|
||||
url := fmt.Sprintf("https://api.telegram.org/file/bot%s/%s", c.config.Token, file.FilePath)
|
||||
log.Printf("File URL: %s", url)
|
||||
|
||||
mediaDir := filepath.Join(os.TempDir(), "clawgo_media")
|
||||
if err := os.MkdirAll(mediaDir, 0755); err != nil {
|
||||
log.Printf("Failed to create media directory: %v", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
localPath := filepath.Join(mediaDir, fileID[:min(16, len(fileID))]+ext)
|
||||
|
||||
if err := c.downloadFromURL(url, localPath); err != nil {
|
||||
log.Printf("Failed to download file: %v", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
return localPath
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) downloadFromURL(url, localPath string) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("download failed with status: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
out, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file: %w", err)
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write file: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("File downloaded successfully to: %s", localPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseChatID(chatIDStr string) (int64, error) {
|
||||
var id int64
|
||||
_, err := fmt.Sscanf(chatIDStr, "%d", &id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func truncateString(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen]
|
||||
}
|
||||
|
||||
func markdownToTelegramHTML(text string) string {
|
||||
if text == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
codeBlocks := extractCodeBlocks(text)
|
||||
text = codeBlocks.text
|
||||
|
||||
inlineCodes := extractInlineCodes(text)
|
||||
text = inlineCodes.text
|
||||
|
||||
text = regexp.MustCompile(`^#{1,6}\s+(.+)$`).ReplaceAllString(text, "$1")
|
||||
|
||||
text = regexp.MustCompile(`^>\s*(.*)$`).ReplaceAllString(text, "$1")
|
||||
|
||||
text = escapeHTML(text)
|
||||
|
||||
text = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`).ReplaceAllString(text, `<a href="$2">$1</a>`)
|
||||
|
||||
text = regexp.MustCompile(`\*\*(.+?)\*\*`).ReplaceAllString(text, "<b>$1</b>")
|
||||
|
||||
text = regexp.MustCompile(`__(.+?)__`).ReplaceAllString(text, "<b>$1</b>")
|
||||
|
||||
reItalic := regexp.MustCompile(`_([^_]+)_`)
|
||||
text = reItalic.ReplaceAllStringFunc(text, func(s string) string {
|
||||
match := reItalic.FindStringSubmatch(s)
|
||||
if len(match) < 2 {
|
||||
return s
|
||||
}
|
||||
return "<i>" + match[1] + "</i>"
|
||||
})
|
||||
|
||||
text = regexp.MustCompile(`~~(.+?)~~`).ReplaceAllString(text, "<s>$1</s>")
|
||||
|
||||
text = regexp.MustCompile(`^[-*]\s+`).ReplaceAllString(text, "• ")
|
||||
|
||||
for i, code := range inlineCodes.codes {
|
||||
escaped := escapeHTML(code)
|
||||
text = strings.ReplaceAll(text, fmt.Sprintf("\x00IC%d\x00", i), fmt.Sprintf("<code>%s</code>", escaped))
|
||||
}
|
||||
|
||||
for i, code := range codeBlocks.codes {
|
||||
escaped := escapeHTML(code)
|
||||
text = strings.ReplaceAll(text, fmt.Sprintf("\x00CB%d\x00", i), fmt.Sprintf("<pre><code>%s</code></pre>", escaped))
|
||||
}
|
||||
|
||||
return text
|
||||
}
|
||||
|
||||
type codeBlockMatch struct {
|
||||
text string
|
||||
codes []string
|
||||
}
|
||||
|
||||
func extractCodeBlocks(text string) codeBlockMatch {
|
||||
re := regexp.MustCompile("```[\\w]*\\n?([\\s\\S]*?)```")
|
||||
matches := re.FindAllStringSubmatch(text, -1)
|
||||
|
||||
codes := make([]string, 0, len(matches))
|
||||
for _, match := range matches {
|
||||
codes = append(codes, match[1])
|
||||
}
|
||||
|
||||
text = re.ReplaceAllStringFunc(text, func(m string) string {
|
||||
return fmt.Sprintf("\x00CB%d\x00", len(codes)-1)
|
||||
})
|
||||
|
||||
return codeBlockMatch{text: text, codes: codes}
|
||||
}
|
||||
|
||||
type inlineCodeMatch struct {
|
||||
text string
|
||||
codes []string
|
||||
}
|
||||
|
||||
func extractInlineCodes(text string) inlineCodeMatch {
|
||||
re := regexp.MustCompile("`([^`]+)`")
|
||||
matches := re.FindAllStringSubmatch(text, -1)
|
||||
|
||||
codes := make([]string, 0, len(matches))
|
||||
for _, match := range matches {
|
||||
codes = append(codes, match[1])
|
||||
}
|
||||
|
||||
text = re.ReplaceAllStringFunc(text, func(m string) string {
|
||||
return fmt.Sprintf("\x00IC%d\x00", len(codes)-1)
|
||||
})
|
||||
|
||||
return inlineCodeMatch{text: text, codes: codes}
|
||||
}
|
||||
|
||||
func escapeHTML(text string) string {
|
||||
text = strings.ReplaceAll(text, "&", "&")
|
||||
text = strings.ReplaceAll(text, "<", "<")
|
||||
text = strings.ReplaceAll(text, ">", ">")
|
||||
return text
|
||||
}
|
||||
183
pkg/channels/whatsapp.go
Normal file
183
pkg/channels/whatsapp.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/bus"
|
||||
"gitea.kkkk.dev/DBT/clawgo/pkg/config"
|
||||
)
|
||||
|
||||
type WhatsAppChannel struct {
|
||||
*BaseChannel
|
||||
conn *websocket.Conn
|
||||
config config.WhatsAppConfig
|
||||
url string
|
||||
mu sync.Mutex
|
||||
connected bool
|
||||
}
|
||||
|
||||
func NewWhatsAppChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus) (*WhatsAppChannel, error) {
|
||||
base := NewBaseChannel("whatsapp", cfg, bus, cfg.AllowFrom)
|
||||
|
||||
return &WhatsAppChannel{
|
||||
BaseChannel: base,
|
||||
config: cfg,
|
||||
url: cfg.BridgeURL,
|
||||
connected: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *WhatsAppChannel) Start(ctx context.Context) error {
|
||||
log.Printf("Starting WhatsApp channel connecting to %s...", c.url)
|
||||
|
||||
dialer := websocket.DefaultDialer
|
||||
dialer.HandshakeTimeout = 10 * time.Second
|
||||
|
||||
conn, _, err := dialer.Dial(c.url, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to WhatsApp bridge: %w", err)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.conn = conn
|
||||
c.connected = true
|
||||
c.mu.Unlock()
|
||||
|
||||
c.setRunning(true)
|
||||
log.Println("WhatsApp channel connected")
|
||||
|
||||
go c.listen(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *WhatsAppChannel) Stop(ctx context.Context) error {
|
||||
log.Println("Stopping WhatsApp channel...")
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
if err := c.conn.Close(); err != nil {
|
||||
log.Printf("Error closing WhatsApp connection: %v", err)
|
||||
}
|
||||
c.conn = nil
|
||||
}
|
||||
|
||||
c.connected = false
|
||||
c.setRunning(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn == nil {
|
||||
return fmt.Errorf("whatsapp connection not established")
|
||||
}
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"type": "message",
|
||||
"to": msg.ChatID,
|
||||
"content": msg.Content,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal message: %w", err)
|
||||
}
|
||||
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
return fmt.Errorf("failed to send message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *WhatsAppChannel) listen(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
c.mu.Lock()
|
||||
conn := c.conn
|
||||
c.mu.Unlock()
|
||||
|
||||
if conn == nil {
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
_, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Printf("WhatsApp read error: %v", err)
|
||||
time.Sleep(2 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(message, &msg); err != nil {
|
||||
log.Printf("Failed to unmarshal WhatsApp message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
msgType, ok := msg["type"].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if msgType == "message" {
|
||||
c.handleIncomingMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *WhatsAppChannel) handleIncomingMessage(msg map[string]interface{}) {
|
||||
senderID, ok := msg["from"].(string)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
chatID, ok := msg["chat"].(string)
|
||||
if !ok {
|
||||
chatID = senderID
|
||||
}
|
||||
|
||||
content, ok := msg["content"].(string)
|
||||
if !ok {
|
||||
content = ""
|
||||
}
|
||||
|
||||
var mediaPaths []string
|
||||
if mediaData, ok := msg["media"].([]interface{}); ok {
|
||||
mediaPaths = make([]string, 0, len(mediaData))
|
||||
for _, m := range mediaData {
|
||||
if path, ok := m.(string); ok {
|
||||
mediaPaths = append(mediaPaths, path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metadata := make(map[string]string)
|
||||
if messageID, ok := msg["id"].(string); ok {
|
||||
metadata["message_id"] = messageID
|
||||
}
|
||||
if userName, ok := msg["from_name"].(string); ok {
|
||||
metadata["user_name"] = userName
|
||||
}
|
||||
|
||||
log.Printf("WhatsApp message from %s: %s...", senderID, truncateString(content, 50))
|
||||
|
||||
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
|
||||
}
|
||||
Reference in New Issue
Block a user