mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-08 22:57:29 +08:00
Compare commits
5 Commits
b3b7c3edfd
...
5dfa0f4d72
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5dfa0f4d72 | ||
|
|
4ed24b57e3 | ||
|
|
1ac0fb123b | ||
|
|
1e4bf34fac | ||
|
|
f56005246d |
BIN
clawgo_new
Executable file
BIN
clawgo_new
Executable file
Binary file not shown.
@@ -714,6 +714,7 @@ func gatewayCmd() {
|
||||
fmt.Printf("Error initializing gateway runtime: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
sentinelService.SetManager(channelManager)
|
||||
|
||||
pidFile := filepath.Join(filepath.Dir(getConfigPath()), "gateway.pid")
|
||||
if err := os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644); err != nil {
|
||||
@@ -797,6 +798,7 @@ func gatewayCmd() {
|
||||
},
|
||||
)
|
||||
if newCfg.Sentinel.Enabled {
|
||||
sentinelService.SetManager(channelManager)
|
||||
sentinelService.Start()
|
||||
}
|
||||
cfg = newCfg
|
||||
@@ -835,6 +837,7 @@ func gatewayCmd() {
|
||||
if newCfg.Sentinel.Enabled {
|
||||
sentinelService.Start()
|
||||
}
|
||||
sentinelService.SetManager(channelManager)
|
||||
|
||||
if err := channelManager.StartAll(ctx); err != nil {
|
||||
fmt.Printf("✗ Reload failed (start channels): %v\n", err)
|
||||
|
||||
@@ -82,8 +82,9 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
|
||||
|
||||
// Register message tool
|
||||
messageTool := tools.NewMessageTool()
|
||||
messageTool.SetSendCallback(func(channel, chatID, content string) error {
|
||||
messageTool.SetSendCallback(func(channel, chatID, content string, buttons [][]bus.Button) error {
|
||||
msgBus.PublishOutbound(bus.OutboundMessage{
|
||||
Buttons: buttons,
|
||||
Channel: channel,
|
||||
ChatID: chatID,
|
||||
Content: content,
|
||||
@@ -210,6 +211,7 @@ func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
case <-ctx.Done():
|
||||
case <-time.After(2 * time.Second):
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{
|
||||
Buttons: nil,
|
||||
Channel: msg.Channel,
|
||||
ChatID: msg.ChatID,
|
||||
Content: "Message queue is busy. Please try again shortly.",
|
||||
@@ -275,6 +277,7 @@ func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, wo
|
||||
|
||||
if response != "" {
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{
|
||||
Buttons: nil,
|
||||
Channel: msg.Channel,
|
||||
ChatID: msg.ChatID,
|
||||
Content: response,
|
||||
|
||||
@@ -10,10 +10,16 @@ type InboundMessage struct {
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type Button struct {
|
||||
Text string `json:"text"`
|
||||
Data string `json:"data"`
|
||||
}
|
||||
|
||||
type OutboundMessage struct {
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Content string `json:"content"`
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Content string `json:"content"`
|
||||
Buttons [][]Button `json:"buttons,omitempty"`
|
||||
}
|
||||
|
||||
type MessageHandler func(InboundMessage) error
|
||||
|
||||
@@ -17,6 +17,7 @@ type Channel interface {
|
||||
Send(ctx context.Context, msg bus.OutboundMessage) error
|
||||
IsRunning() bool
|
||||
IsAllowed(senderID string) bool
|
||||
HealthCheck(ctx context.Context) error
|
||||
}
|
||||
|
||||
type BaseChannel struct {
|
||||
@@ -44,6 +45,13 @@ func (c *BaseChannel) IsRunning() bool {
|
||||
return c.running.Load()
|
||||
}
|
||||
|
||||
func (c *BaseChannel) HealthCheck(ctx context.Context) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("%s channel not running", c.name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *BaseChannel) IsAllowed(senderID string) bool {
|
||||
if len(c.allowList) == 0 {
|
||||
return true
|
||||
|
||||
@@ -217,6 +217,31 @@ func (m *Manager) StopAll(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) CheckHealth(ctx context.Context) map[string]error {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
results := make(map[string]error)
|
||||
for name, channel := range m.channels {
|
||||
results[name] = channel.HealthCheck(ctx)
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func (m *Manager) RestartChannel(ctx context.Context, name string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
channel, ok := m.channels[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("channel %s not found", name)
|
||||
}
|
||||
|
||||
logger.InfoCF("channels", "Restarting channel", map[string]interface{}{"channel": name})
|
||||
_ = channel.Stop(ctx)
|
||||
return channel.Start(ctx)
|
||||
}
|
||||
|
||||
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
||||
logger.InfoC("channels", "Outbound dispatcher started")
|
||||
|
||||
@@ -270,11 +295,8 @@ func (m *Manager) GetStatus() map[string]interface{} {
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
status := make(map[string]interface{})
|
||||
for name, channel := range m.channels {
|
||||
status[name] = map[string]interface{}{
|
||||
"enabled": true,
|
||||
"running": channel.IsRunning(),
|
||||
}
|
||||
for name := range m.channels {
|
||||
status[name] = map[string]interface{}{}
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
@@ -74,6 +74,16 @@ func (c *TelegramChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
|
||||
c.transcriber = transcriber
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) HealthCheck(ctx context.Context) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("telegram bot not running")
|
||||
}
|
||||
hCtx, cancel := withTelegramAPITimeout(ctx)
|
||||
defer cancel()
|
||||
_, err := c.bot.GetMe(hCtx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) Start(ctx context.Context) error {
|
||||
if c.IsRunning() {
|
||||
return nil
|
||||
@@ -133,6 +143,8 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
|
||||
}
|
||||
if update.Message != nil {
|
||||
c.dispatchHandleMessage(runCtx, update.Message)
|
||||
} else if update.CallbackQuery != nil {
|
||||
c.handleCallbackQuery(runCtx, update.CallbackQuery)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -170,10 +182,6 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
|
||||
return true
|
||||
})
|
||||
|
||||
// In telego v1.x, the long polling is stopped by canceling the context
|
||||
// passed to UpdatesViaLongPolling. We don't need a separate Stop call
|
||||
// if we use the parent context correctly.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -203,6 +211,35 @@ func (c *TelegramChannel) dispatchHandleMessage(runCtx context.Context, message
|
||||
}(message)
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) handleCallbackQuery(ctx context.Context, query *telego.CallbackQuery) {
|
||||
if query == nil || query.Message == nil {
|
||||
return
|
||||
}
|
||||
|
||||
senderID := fmt.Sprintf("%d", query.From.ID)
|
||||
chatID := fmt.Sprintf("%d", query.Message.GetChat().ID)
|
||||
|
||||
answerCtx, cancel := withTelegramAPITimeout(ctx)
|
||||
_ = c.bot.AnswerCallbackQuery(answerCtx, &telego.AnswerCallbackQueryParams{
|
||||
CallbackQueryID: query.ID,
|
||||
})
|
||||
cancel()
|
||||
|
||||
logger.InfoCF("telegram", "Callback query received", map[string]interface{}{
|
||||
"sender_id": senderID,
|
||||
"data": query.Data,
|
||||
})
|
||||
|
||||
if !c.IsAllowed(senderID) {
|
||||
return
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, chatID, query.Data, nil, map[string]string{
|
||||
"is_callback": "true",
|
||||
"callback_id": query.ID,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("telegram bot not running")
|
||||
@@ -214,57 +251,54 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
|
||||
}
|
||||
chatID := telegoutil.ID(chatIDInt)
|
||||
|
||||
// Stop thinking animation first to avoid animation/update races.
|
||||
if stop, ok := c.stopThinking.LoadAndDelete(msg.ChatID); ok {
|
||||
logger.DebugCF("telegram", "Telegram thinking stop signal", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
})
|
||||
safeCloseSignal(stop)
|
||||
} else {
|
||||
logger.DebugCF("telegram", "Telegram thinking stop skipped (not found)", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
})
|
||||
}
|
||||
|
||||
htmlContent := sanitizeTelegramHTML(markdownToTelegramHTML(msg.Content))
|
||||
|
||||
// Try to edit placeholder
|
||||
if pID, ok := c.placeholders.Load(msg.ChatID); ok {
|
||||
// Always reset placeholder state even when edit/send fails.
|
||||
defer c.placeholders.Delete(msg.ChatID)
|
||||
logger.DebugCF("telegram", "Telegram editing thinking placeholder", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
"message_id": pID.(int),
|
||||
})
|
||||
var markup *telego.InlineKeyboardMarkup
|
||||
if len(msg.Buttons) > 0 {
|
||||
var rows [][]telego.InlineKeyboardButton
|
||||
for _, row := range msg.Buttons {
|
||||
var buttons []telego.InlineKeyboardButton
|
||||
for _, btn := range row {
|
||||
buttons = append(buttons, telegoutil.InlineKeyboardButton(btn.Text).WithCallbackData(btn.Data))
|
||||
}
|
||||
rows = append(rows, buttons)
|
||||
}
|
||||
markup = telegoutil.InlineKeyboard(rows...)
|
||||
}
|
||||
|
||||
if pID, ok := c.placeholders.Load(msg.ChatID); ok {
|
||||
defer c.placeholders.Delete(msg.ChatID)
|
||||
editCtx, cancelEdit := withTelegramAPITimeout(ctx)
|
||||
_, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{
|
||||
ChatID: chatID,
|
||||
MessageID: pID.(int),
|
||||
Text: htmlContent,
|
||||
ParseMode: telego.ModeHTML,
|
||||
})
|
||||
params := &telego.EditMessageTextParams{
|
||||
ChatID: chatID,
|
||||
MessageID: pID.(int),
|
||||
Text: htmlContent,
|
||||
ParseMode: telego.ModeHTML,
|
||||
ReplyMarkup: markup,
|
||||
}
|
||||
_, err := c.bot.EditMessageText(editCtx, params)
|
||||
cancelEdit()
|
||||
|
||||
if err == nil {
|
||||
logger.DebugCF("telegram", "Telegram placeholder updated", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
logger.WarnCF("telegram", "Telegram placeholder update failed; fallback to new message", map[string]interface{}{
|
||||
logger.WarnCF("telegram", "Placeholder update failed; fallback to new message", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
// Fallback to new message if edit fails
|
||||
} else {
|
||||
logger.DebugCF("telegram", "Telegram placeholder not found, sending new message", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
})
|
||||
}
|
||||
|
||||
sendParams := telegoutil.Message(chatID, htmlContent).WithParseMode(telego.ModeHTML)
|
||||
if markup != nil {
|
||||
sendParams.WithReplyMarkup(markup)
|
||||
}
|
||||
|
||||
sendCtx, cancelSend := withTelegramAPITimeout(ctx)
|
||||
_, err = c.bot.SendMessage(sendCtx, telegoutil.Message(chatID, htmlContent).WithParseMode(telego.ModeHTML))
|
||||
_, err = c.bot.SendMessage(sendCtx, sendParams)
|
||||
cancelSend()
|
||||
|
||||
if err != nil {
|
||||
@@ -272,15 +306,13 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
plain := plainTextFromTelegramHTML(htmlContent)
|
||||
sendPlainCtx, cancelSendPlain := withTelegramAPITimeout(ctx)
|
||||
_, err = c.bot.SendMessage(sendPlainCtx, telegoutil.Message(chatID, plain))
|
||||
cancelSendPlain()
|
||||
if err != nil {
|
||||
logger.ErrorCF("telegram", "Telegram plain-text fallback send failed", map[string]interface{}{
|
||||
logger.FieldChatID: msg.ChatID,
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
sendPlainParams := telegoutil.Message(chatID, plain)
|
||||
if markup != nil {
|
||||
sendPlainParams.WithReplyMarkup(markup)
|
||||
}
|
||||
sendPlainCtx, cancelSendPlain := withTelegramAPITimeout(ctx)
|
||||
_, err = c.bot.SendMessage(sendPlainCtx, sendPlainParams)
|
||||
cancelSendPlain()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -402,7 +434,6 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
|
||||
return
|
||||
}
|
||||
|
||||
// Thinking indicator
|
||||
apiCtx, cancelAPI := context.WithTimeout(runCtx, telegramAPICallTimeout)
|
||||
_ = c.bot.SendChatAction(apiCtx, &telego.SendChatActionParams{
|
||||
ChatID: telegoutil.ID(chatID),
|
||||
@@ -412,13 +443,9 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
if prev, ok := c.stopThinking.LoadAndDelete(fmt.Sprintf("%d", chatID)); ok {
|
||||
// Ensure previous animation loop exits before replacing channel.
|
||||
safeCloseSignal(prev)
|
||||
}
|
||||
c.stopThinking.Store(fmt.Sprintf("%d", chatID), stopChan)
|
||||
logger.DebugCF("telegram", "Telegram thinking started", map[string]interface{}{
|
||||
logger.FieldChatID: chatID,
|
||||
})
|
||||
|
||||
sendCtx, cancelSend := context.WithTimeout(runCtx, telegramAPICallTimeout)
|
||||
pMsg, err := c.bot.SendMessage(sendCtx, telegoutil.Message(telegoutil.ID(chatID), "Thinking... 💭"))
|
||||
@@ -426,10 +453,6 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
|
||||
if err == nil {
|
||||
pID := pMsg.MessageID
|
||||
c.placeholders.Store(fmt.Sprintf("%d", chatID), pID)
|
||||
logger.DebugCF("telegram", "Telegram thinking placeholder created", map[string]interface{}{
|
||||
logger.FieldChatID: chatID,
|
||||
"message_id": pID,
|
||||
})
|
||||
|
||||
go func(cid int64, mid int, stop <-chan struct{}, parentCtx context.Context) {
|
||||
dots := []string{".", "..", "..."}
|
||||
@@ -442,9 +465,6 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
|
||||
case <-parentCtx.Done():
|
||||
return
|
||||
case <-stop:
|
||||
logger.DebugCF("telegram", "Telegram thinking animation stopped", map[string]interface{}{
|
||||
logger.FieldChatID: cid,
|
||||
})
|
||||
return
|
||||
case <-ticker.C:
|
||||
i++
|
||||
@@ -457,20 +477,11 @@ func (c *TelegramChannel) handleMessage(runCtx context.Context, message *telego.
|
||||
})
|
||||
cancelEdit()
|
||||
if err != nil {
|
||||
logger.DebugCF("telegram", "Telegram thinking animation edit failed", map[string]interface{}{
|
||||
logger.FieldChatID: cid,
|
||||
"message_id": mid,
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(chatID, pID, stopChan, runCtx)
|
||||
} else {
|
||||
logger.WarnCF("telegram", "Telegram thinking placeholder create failed", map[string]interface{}{
|
||||
logger.FieldChatID: chatID,
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
metadata := map[string]string{
|
||||
@@ -489,9 +500,6 @@ func (c *TelegramChannel) downloadFile(runCtx context.Context, fileID, ext strin
|
||||
file, err := c.bot.GetFile(getFileCtx, &telego.GetFileParams{FileID: fileID})
|
||||
cancelGetFile()
|
||||
if err != nil {
|
||||
logger.WarnCF("telegram", "Failed to get file", map[string]interface{}{
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -499,26 +507,12 @@ func (c *TelegramChannel) downloadFile(runCtx context.Context, fileID, ext strin
|
||||
return ""
|
||||
}
|
||||
|
||||
// In telego, we can use Link() or just build the URL
|
||||
url := fmt.Sprintf("https://api.telegram.org/file/bot%s/%s", c.config.Token, file.FilePath)
|
||||
logger.DebugCF("telegram", "Telegram file URL resolved", map[string]interface{}{
|
||||
"url": url,
|
||||
})
|
||||
|
||||
mediaDir := filepath.Join(os.TempDir(), "clawgo_media")
|
||||
if err := os.MkdirAll(mediaDir, 0755); err != nil {
|
||||
logger.WarnCF("telegram", "Failed to create media directory", map[string]interface{}{
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
return ""
|
||||
}
|
||||
|
||||
_ = os.MkdirAll(mediaDir, 0755)
|
||||
localPath := filepath.Join(mediaDir, fileID[:min(16, len(fileID))]+ext)
|
||||
|
||||
if err := c.downloadFromURL(runCtx, url, localPath); err != nil {
|
||||
logger.WarnCF("telegram", "Failed to download file", map[string]interface{}{
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -538,35 +532,28 @@ func (c *TelegramChannel) downloadFromURL(runCtx context.Context, url, localPath
|
||||
|
||||
req, err := http.NewRequestWithContext(downloadCtx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: telegramDownloadTimeout}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download: %w", err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("download failed with status: %d", resp.StatusCode)
|
||||
return fmt.Errorf("status: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
out, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file: %w", err)
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write file: %w", err)
|
||||
}
|
||||
|
||||
logger.DebugCF("telegram", "File downloaded successfully", map[string]interface{}{
|
||||
"path": localPath,
|
||||
})
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func parseChatID(chatIDStr string) (int64, error) {
|
||||
@@ -588,23 +575,16 @@ func markdownToTelegramHTML(text string) string {
|
||||
|
||||
text = escapeHTML(text)
|
||||
|
||||
text = regexp.MustCompile(`(?m)^#{1,6}\s+(.+)$`).ReplaceAllString(text, "<b>$1</b>")
|
||||
|
||||
text = regexp.MustCompile(`(?m)^>\s*(.*)$`).ReplaceAllString(text, "│ $1")
|
||||
|
||||
text = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`).ReplaceAllString(text, `<a href="$2">$1</a>`)
|
||||
|
||||
text = regexp.MustCompile(`\*\*(.+?)\*\*`).ReplaceAllString(text, "<b>$1</b>")
|
||||
|
||||
text = regexp.MustCompile(`__(.+?)__`).ReplaceAllString(text, "<b>$1</b>")
|
||||
|
||||
text = regexp.MustCompile(`\*([^*\n]+)\*`).ReplaceAllString(text, "<i>$1</i>")
|
||||
text = regexp.MustCompile(`_([^_\n]+)_`).ReplaceAllString(text, "<i>$1</i>")
|
||||
|
||||
text = regexp.MustCompile(`~~(.+?)~~`).ReplaceAllString(text, "<s>$1</s>")
|
||||
|
||||
text = regexp.MustCompile(`(?m)^[-*]\s+`).ReplaceAllString(text, "• ")
|
||||
text = regexp.MustCompile(`(?m)^\d+\.\s+`).ReplaceAllString(text, "• ")
|
||||
text = regexp.MustCompile("(?m)^#{1,6}\\s+(.+)$").ReplaceAllString(text, "<b>$1</b>")
|
||||
text = regexp.MustCompile("(?m)^>\\s*(.*)$").ReplaceAllString(text, "│ $1")
|
||||
text = regexp.MustCompile("\\[([^\\]]+)\\]\\(([^)]+)\\)").ReplaceAllString(text, `<a href="$2">$1</a>`)
|
||||
text = regexp.MustCompile("\\*\\*(.+?)\\*\\*").ReplaceAllString(text, "<b>$1</b>")
|
||||
text = regexp.MustCompile("__(.+?)__").ReplaceAllString(text, "<b>$1</b>")
|
||||
text = regexp.MustCompile("\\*([^*\\n]+)\\*").ReplaceAllString(text, "<i>$1</i>")
|
||||
text = regexp.MustCompile("_([^_\\n]+)_").ReplaceAllString(text, "<i>$1</i>")
|
||||
text = regexp.MustCompile("~~(.+?)~~").ReplaceAllString(text, "<s>$1</s>")
|
||||
text = regexp.MustCompile("(?m)^[-*]\\s+").ReplaceAllString(text, "• ")
|
||||
text = regexp.MustCompile("(?m)^\\d+\\.\\s+").ReplaceAllString(text, "• ")
|
||||
|
||||
for i, code := range inlineCodes.codes {
|
||||
escaped := escapeHTML(code)
|
||||
@@ -693,8 +673,8 @@ func sanitizeTelegramHTML(input string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
tagRe := regexp.MustCompile(`(?is)<\s*(/?)\s*([a-z0-9]+)([^>]*)>`)
|
||||
hrefRe := regexp.MustCompile(`(?is)\bhref\s*=\s*"([^"]+)"`)
|
||||
tagRe := regexp.MustCompile("(?is)<\\s*(/?)\\s*([a-z0-9]+)([^>]*)>")
|
||||
hrefRe := regexp.MustCompile("(?is)\\bhref\\s*=\\s*\"([^\"]+)\"")
|
||||
|
||||
var out strings.Builder
|
||||
stack := make([]string, 0, 16)
|
||||
@@ -719,7 +699,6 @@ func sanitizeTelegramHTML(input string) string {
|
||||
}
|
||||
|
||||
if isClose {
|
||||
// Ensure tag stack remains balanced; drop unmatched close tags.
|
||||
found := -1
|
||||
for i := len(stack) - 1; i >= 0; i-- {
|
||||
if stack[i] == tagName {
|
||||
@@ -739,11 +718,9 @@ func sanitizeTelegramHTML(input string) string {
|
||||
continue
|
||||
}
|
||||
|
||||
// Normalize opening tags; only <a href="..."> can carry attributes.
|
||||
if tagName == "a" {
|
||||
hrefMatch := hrefRe.FindStringSubmatch(attrRaw)
|
||||
if len(hrefMatch) < 2 {
|
||||
// Invalid anchor tag -> degrade to escaped text to avoid parse errors.
|
||||
out.WriteString("<a>")
|
||||
pos = end
|
||||
continue
|
||||
@@ -783,8 +760,7 @@ func escapeHTMLAttr(text string) string {
|
||||
}
|
||||
|
||||
func plainTextFromTelegramHTML(text string) string {
|
||||
// Best-effort fallback for parse failures: drop tags and keep readable content.
|
||||
tagRe := regexp.MustCompile(`(?is)<[^>]+>`)
|
||||
tagRe := regexp.MustCompile("(?is)<[^>]+>")
|
||||
plain := tagRe.ReplaceAllString(text, "")
|
||||
plain = strings.ReplaceAll(plain, "<", "<")
|
||||
plain = strings.ReplaceAll(plain, ">", ">")
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package sentinel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"clawgo/pkg/channels"
|
||||
"clawgo/pkg/config"
|
||||
"clawgo/pkg/lifecycle"
|
||||
"clawgo/pkg/logger"
|
||||
@@ -23,6 +25,8 @@ type Service struct {
|
||||
runner *lifecycle.LoopRunner
|
||||
mu sync.RWMutex
|
||||
lastAlerts map[string]time.Time
|
||||
mgr *channels.Manager
|
||||
healingChannels map[string]bool
|
||||
}
|
||||
|
||||
func NewService(cfgPath, workspace string, intervalSec int, autoHeal bool, onAlert AlertFunc) *Service {
|
||||
@@ -37,9 +41,14 @@ func NewService(cfgPath, workspace string, intervalSec int, autoHeal bool, onAle
|
||||
onAlert: onAlert,
|
||||
runner: lifecycle.NewLoopRunner(),
|
||||
lastAlerts: map[string]time.Time{},
|
||||
healingChannels: map[string]bool{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) SetManager(mgr *channels.Manager) {
|
||||
s.mgr = mgr
|
||||
}
|
||||
|
||||
func (s *Service) Start() {
|
||||
if !s.runner.Start(s.loop) {
|
||||
return
|
||||
@@ -76,6 +85,7 @@ func (s *Service) runChecks() {
|
||||
issues := s.checkConfig()
|
||||
issues = append(issues, s.checkMemory()...)
|
||||
issues = append(issues, s.checkLogs()...)
|
||||
issues = append(issues, s.checkChannels()...)
|
||||
|
||||
if len(issues) == 0 {
|
||||
return
|
||||
@@ -86,6 +96,48 @@ func (s *Service) runChecks() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) checkChannels() []string {
|
||||
if s.mgr == nil {
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
health := s.mgr.CheckHealth(ctx)
|
||||
var issues []string
|
||||
for name, err := range health {
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("sentinel: channel %s health check failed: %v", name, err)
|
||||
issues = append(issues, msg)
|
||||
if s.autoHeal {
|
||||
s.mu.Lock()
|
||||
if s.healingChannels[name] {
|
||||
s.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
s.healingChannels[name] = true
|
||||
s.mu.Unlock()
|
||||
|
||||
go func(n string) {
|
||||
defer func() {
|
||||
s.mu.Lock()
|
||||
delete(s.healingChannels, n)
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
logger.InfoCF("sentinel", "Attempting auto-heal for channel", map[string]interface{}{"channel": n})
|
||||
// Use a fresh context for restart to avoid being canceled by sentinel loop
|
||||
if rErr := s.mgr.RestartChannel(context.Background(), n); rErr != nil {
|
||||
logger.ErrorCF("sentinel", "Auto-heal restart failed", map[string]interface{}{"channel": n, "error": rErr.Error()})
|
||||
} else {
|
||||
logger.InfoCF("sentinel", "Auto-heal successful", map[string]interface{}{"channel": n})
|
||||
}
|
||||
}(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return issues
|
||||
}
|
||||
|
||||
func (s *Service) checkConfig() []string {
|
||||
_, err := os.Stat(s.cfgPath)
|
||||
if err != nil {
|
||||
|
||||
@@ -29,7 +29,7 @@ func (s *Server) Start() error {
|
||||
addr := fmt.Sprintf("%s:%d", s.config.Gateway.Host, s.config.Gateway.Port)
|
||||
s.server = &http.Server{
|
||||
Addr: addr,
|
||||
Handler: mux,
|
||||
Handler: s.withCORS(mux),
|
||||
}
|
||||
|
||||
logger.InfoCF("server", "Starting HTTP server", map[string]interface{}{
|
||||
@@ -67,3 +67,19 @@ func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintf(w, "ClawGo Gateway Running\nTime: %s", time.Now().Format(time.RFC3339))
|
||||
}
|
||||
|
||||
func (s *Server) withCORS(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
|
||||
w.Header().Set("Access-Control-Max-Age", "86400")
|
||||
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,9 +3,10 @@ package tools
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"clawgo/pkg/bus"
|
||||
)
|
||||
|
||||
type SendCallback func(channel, chatID, content string) error
|
||||
type SendCallback func(channel, chatID, content string, buttons [][]bus.Button) error
|
||||
|
||||
type MessageTool struct {
|
||||
sendCallback SendCallback
|
||||
@@ -41,6 +42,21 @@ func (t *MessageTool) Parameters() map[string]interface{} {
|
||||
"type": "string",
|
||||
"description": "Optional: target chat/user ID",
|
||||
},
|
||||
"buttons": map[string]interface{}{
|
||||
"type": "array",
|
||||
"description": "Optional: buttons to include in the message (2D array: rows of buttons)",
|
||||
"items": map[string]interface{}{
|
||||
"type": "array",
|
||||
"items": map[string]interface{}{
|
||||
"type": "object",
|
||||
"properties": map[string]interface{}{
|
||||
"text": map[string]interface{}{"type": "string", "description": "Button text"},
|
||||
"data": map[string]interface{}{"type": "string", "description": "Callback data"},
|
||||
},
|
||||
"required": []string{"text", "data"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"required": []string{"content"},
|
||||
}
|
||||
@@ -79,7 +95,28 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{})
|
||||
return "Error: Message sending not configured", nil
|
||||
}
|
||||
|
||||
if err := t.sendCallback(channel, chatID, content); err != nil {
|
||||
var buttons [][]bus.Button
|
||||
if btns, ok := args["buttons"].([]interface{}); ok {
|
||||
for _, row := range btns {
|
||||
if rowArr, ok := row.([]interface{}); ok {
|
||||
var buttonRow []bus.Button
|
||||
for _, b := range rowArr {
|
||||
if bMap, ok := b.(map[string]interface{}); ok {
|
||||
text, _ := bMap["text"].(string)
|
||||
data, _ := bMap["data"].(string)
|
||||
if text != "" && data != "" {
|
||||
buttonRow = append(buttonRow, bus.Button{Text: text, Data: data})
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(buttonRow) > 0 {
|
||||
buttons = append(buttons, buttonRow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := t.sendCallback(channel, chatID, content, buttons); err != nil {
|
||||
return fmt.Sprintf("Error sending message: %v", err), nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user