diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index dcdbb5c..d1d2e51 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -101,12 +101,15 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers // Register message tool messageTool := tools.NewMessageTool() - messageTool.SetSendCallback(func(channel, chatID, content string, buttons [][]bus.Button) error { + messageTool.SetSendCallback(func(channel, chatID, action, content, messageID, emoji string, buttons [][]bus.Button) error { msgBus.PublishOutbound(bus.OutboundMessage{ - Channel: channel, - ChatID: chatID, - Content: content, - Buttons: buttons, + Channel: channel, + ChatID: chatID, + Content: content, + Buttons: buttons, + Action: action, + MessageID: messageID, + Emoji: emoji, }) return nil }) diff --git a/pkg/bus/types.go b/pkg/bus/types.go index bad831a..592f591 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -21,6 +21,9 @@ type OutboundMessage struct { Content string `json:"content"` ReplyToID string `json:"reply_to_id,omitempty"` Buttons [][]Button `json:"buttons,omitempty"` + Action string `json:"action,omitempty"` + MessageID string `json:"message_id,omitempty"` + Emoji string `json:"emoji,omitempty"` } type MessageHandler func(InboundMessage) error diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 1807870..4b25222 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -248,6 +248,14 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err safeCloseSignal(stop) } + action := strings.ToLower(strings.TrimSpace(msg.Action)) + if action == "" { + action = "send" + } + if action != "send" { + return c.handleAction(ctx, chatIDInt, action, msg) + } + htmlContent := sanitizeTelegramHTML(markdownToTelegramHTML(msg.Content)) var markup *telego.InlineKeyboardMarkup @@ -507,6 +515,33 @@ func parseChatID(chatIDStr string) (int64, error) { return id, err } +func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action string, msg bus.OutboundMessage) error { + messageID, ok := parseTelegramMessageID(msg.MessageID) + if !ok && action != "send" { + return fmt.Errorf("message_id required for action=%s", action) + } + switch action { + case "edit": + htmlContent := sanitizeTelegramHTML(markdownToTelegramHTML(msg.Content)) + editCtx, cancel := withTelegramAPITimeout(ctx) + defer cancel() + _, err := c.bot.EditMessageText(editCtx, &telego.EditMessageTextParams{ChatID: telegoutil.ID(chatID), MessageID: messageID, Text: htmlContent, ParseMode: telego.ModeHTML}) + return err + case "delete": + delCtx, cancel := withTelegramAPITimeout(ctx) + defer cancel() + return c.bot.DeleteMessage(delCtx, &telego.DeleteMessageParams{ChatID: telegoutil.ID(chatID), MessageID: messageID}) + case "react": + emoji := strings.TrimSpace(msg.Emoji) + if emoji == "" { + return fmt.Errorf("emoji required for react action") + } + return fmt.Errorf("telegram react action not supported by current telego version") + default: + return fmt.Errorf("unsupported telegram action: %s", action) + } +} + func parseTelegramMessageID(raw string) (int, bool) { raw = strings.TrimSpace(raw) if raw == "" { diff --git a/pkg/tools/message.go b/pkg/tools/message.go index a5a585e..0f5c003 100644 --- a/pkg/tools/message.go +++ b/pkg/tools/message.go @@ -6,7 +6,7 @@ import ( "clawgo/pkg/bus" ) -type SendCallback func(channel, chatID, content string, buttons [][]bus.Button) error +type SendCallback func(channel, chatID, action, content, messageID, emoji string, buttons [][]bus.Button) error type MessageTool struct { sendCallback SendCallback @@ -32,7 +32,7 @@ func (t *MessageTool) Parameters() map[string]interface{} { "properties": map[string]interface{}{ "action": map[string]interface{}{ "type": "string", - "description": "Action type: send (supported), edit/delete/react (reserved)", + "description": "Action type: send|edit|delete|react", }, "message": map[string]interface{}{ "type": "string", @@ -54,6 +54,14 @@ func (t *MessageTool) Parameters() map[string]interface{} { "type": "string", "description": "Optional: target chat/user ID", }, + "message_id": map[string]interface{}{ + "type": "string", + "description": "Target message id for edit/delete/react", + }, + "emoji": map[string]interface{}{ + "type": "string", + "description": "Emoji for react action", + }, "buttons": map[string]interface{}{ "type": "array", "description": "Optional: buttons to include in the message (2D array: rows of buttons)", @@ -88,16 +96,32 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) if action == "" { action = "send" } - if action != "send" { - return fmt.Sprintf("Unsupported action: %s (currently only send is implemented)", action), nil - } - content, _ := args["content"].(string) if msg, _ := args["message"].(string); msg != "" { content = msg } - if content == "" { - return "", fmt.Errorf("message/content is required for action=send") + messageID, _ := args["message_id"].(string) + emoji, _ := args["emoji"].(string) + + switch action { + case "send": + if content == "" { + return "", fmt.Errorf("message/content is required for action=send") + } + case "edit": + if messageID == "" || content == "" { + return "", fmt.Errorf("message_id and message/content are required for action=edit") + } + case "delete": + if messageID == "" { + return "", fmt.Errorf("message_id is required for action=delete") + } + case "react": + if messageID == "" || emoji == "" { + return "", fmt.Errorf("message_id and emoji are required for action=react") + } + default: + return fmt.Sprintf("Unsupported action: %s", action), nil } channel, _ := args["channel"].(string) @@ -142,9 +166,9 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) } } - if err := t.sendCallback(channel, chatID, content, buttons); err != nil { + if err := t.sendCallback(channel, chatID, action, content, messageID, emoji, buttons); err != nil { return fmt.Sprintf("Error sending message: %v", err), nil } - return fmt.Sprintf("Message sent to %s:%s", channel, chatID), nil + return fmt.Sprintf("Message action=%s sent to %s:%s", action, channel, chatID), nil } diff --git a/pkg/tools/process_manager.go b/pkg/tools/process_manager.go index 594e72e..8b4114f 100644 --- a/pkg/tools/process_manager.go +++ b/pkg/tools/process_manager.go @@ -25,6 +25,7 @@ type processSession struct { done chan struct{} mu sync.RWMutex log bytes.Buffer + logPath string } type ProcessManager struct { @@ -60,6 +61,9 @@ func (m *ProcessManager) Start(command, cwd string) (string, error) { return "", err } s := &processSession{ID: id, Command: command, StartedAt: time.Now().UTC(), cmd: cmd, done: make(chan struct{})} + if m.metaPath != "" { + s.logPath = filepath.Join(filepath.Dir(m.metaPath), "process-"+id+".log") + } m.mu.Lock() m.sessions[id] = s @@ -102,7 +106,15 @@ func (m *ProcessManager) capture(s *processSession, r interface{ Read([]byte) (i n, err := r.Read(buf) if n > 0 { s.mu.Lock() - _, _ = s.log.Write(buf[:n]) + chunk := buf[:n] + _, _ = s.log.Write(chunk) + if s.logPath != "" { + f, err := os.OpenFile(s.logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err == nil { + _, _ = f.Write(chunk) + _ = f.Close() + } + } s.mu.Unlock() } if err != nil { @@ -148,6 +160,11 @@ func (m *ProcessManager) Log(id string, offset, limit int) (string, error) { s.mu.RLock() defer s.mu.RUnlock() b := s.log.Bytes() + if len(b) == 0 && s.logPath != "" { + if data, err := os.ReadFile(s.logPath); err == nil { + b = data + } + } if offset < 0 { offset = 0 } @@ -188,6 +205,7 @@ type processSessionMeta struct { EndedAt string `json:"ended_at,omitempty"` ExitCode *int `json:"exit_code,omitempty"` Recovered bool `json:"recovered"` + LogPath string `json:"log_path,omitempty"` } func (m *ProcessManager) persist() { @@ -203,6 +221,7 @@ func (m *ProcessManager) persist() { Command: s.Command, StartedAt: s.StartedAt.Format(time.RFC3339), Recovered: s.cmd == nil, + LogPath: s.logPath, } if !s.EndedAt.IsZero() { row.EndedAt = s.EndedAt.Format(time.RFC3339) @@ -236,7 +255,10 @@ func (m *ProcessManager) load() { } maxSeq := uint64(0) for _, it := range items { - s := &processSession{ID: it.ID, Command: it.Command, done: make(chan struct{})} + s := &processSession{ID: it.ID, Command: it.Command, done: make(chan struct{}), logPath: it.LogPath} + if s.logPath == "" && m.metaPath != "" { + s.logPath = filepath.Join(filepath.Dir(m.metaPath), "process-"+s.ID+".log") + } if t, err := time.Parse(time.RFC3339, it.StartedAt); err == nil { s.StartedAt = t }