diff --git a/cmd/clawgo/main.go b/cmd/clawgo/main.go index 5026c0d..0a16b9f 100644 --- a/cmd/clawgo/main.go +++ b/cmd/clawgo/main.go @@ -714,6 +714,7 @@ func gatewayCmd() { fmt.Printf("Error initializing gateway runtime: %v\n", err) os.Exit(1) } + sentinelService.SetManager(channelManager) pidFile := filepath.Join(filepath.Dir(getConfigPath()), "gateway.pid") if err := os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644); err != nil { @@ -797,6 +798,7 @@ func gatewayCmd() { }, ) if newCfg.Sentinel.Enabled { + sentinelService.SetManager(channelManager) sentinelService.Start() } cfg = newCfg @@ -835,6 +837,7 @@ func gatewayCmd() { if newCfg.Sentinel.Enabled { sentinelService.Start() } + sentinelService.SetManager(channelManager) if err := channelManager.StartAll(ctx); err != nil { fmt.Printf("✗ Reload failed (start channels): %v\n", err) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 58f1363..15ce279 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -82,8 +82,9 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers // Register message tool messageTool := tools.NewMessageTool() - messageTool.SetSendCallback(func(channel, chatID, content string) error { + messageTool.SetSendCallback(func(channel, chatID, content string, buttons [][]bus.Button) error { msgBus.PublishOutbound(bus.OutboundMessage{ + Buttons: buttons, Channel: channel, ChatID: chatID, Content: content, @@ -210,6 +211,7 @@ func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage) case <-ctx.Done(): case <-time.After(2 * time.Second): al.bus.PublishOutbound(bus.OutboundMessage{ + Buttons: buttons, Channel: msg.Channel, ChatID: msg.ChatID, Content: "Message queue is busy. Please try again shortly.", @@ -275,6 +277,7 @@ func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, wo if response != "" { al.bus.PublishOutbound(bus.OutboundMessage{ + Buttons: buttons, Channel: msg.Channel, ChatID: msg.ChatID, Content: response, diff --git a/pkg/sentinel/service.go b/pkg/sentinel/service.go index e438e04..ec342bf 100644 --- a/pkg/sentinel/service.go +++ b/pkg/sentinel/service.go @@ -25,7 +25,8 @@ type Service struct { runner *lifecycle.LoopRunner mu sync.RWMutex lastAlerts map[string]time.Time - mgr *channels.Manager + mgr *channels.Manager + healingChannels map[string]bool } func NewService(cfgPath, workspace string, intervalSec int, autoHeal bool, onAlert AlertFunc) *Service { @@ -40,6 +41,7 @@ func NewService(cfgPath, workspace string, intervalSec int, autoHeal bool, onAle onAlert: onAlert, runner: lifecycle.NewLoopRunner(), lastAlerts: map[string]time.Time{}, + healingChannels: map[string]bool{}, } } @@ -108,8 +110,22 @@ func (s *Service) checkChannels() []string { msg := fmt.Sprintf("sentinel: channel %s health check failed: %v", name, err) issues = append(issues, msg) if s.autoHeal { + s.mu.Lock() + if s.healingChannels[name] { + s.mu.Unlock() + continue + } + s.healingChannels[name] = true + s.mu.Unlock() + go func(n string) { + defer func() { + s.mu.Lock() + delete(s.healingChannels, n) + s.mu.Unlock() + }() logger.InfoCF("sentinel", "Attempting auto-heal for channel", map[string]interface{}{"channel": n}) + // Use a fresh context for restart to avoid being canceled by sentinel loop if rErr := s.mgr.RestartChannel(context.Background(), n); rErr != nil { logger.ErrorCF("sentinel", "Auto-heal restart failed", map[string]interface{}{"channel": n, "error": rErr.Error()}) } else { diff --git a/pkg/tools/message.go b/pkg/tools/message.go index e090234..6fdaffd 100644 --- a/pkg/tools/message.go +++ b/pkg/tools/message.go @@ -3,9 +3,10 @@ package tools import ( "context" "fmt" + "clawgo/pkg/bus" ) -type SendCallback func(channel, chatID, content string) error +type SendCallback func(channel, chatID, content string, buttons [][]bus.Button) error type MessageTool struct { sendCallback SendCallback @@ -41,6 +42,21 @@ func (t *MessageTool) Parameters() map[string]interface{} { "type": "string", "description": "Optional: target chat/user ID", }, + "buttons": map[string]interface{}{ + "type": "array", + "description": "Optional: buttons to include in the message (2D array: rows of buttons)", + "items": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "text": map[string]interface{}{"type": "string", "description": "Button text"}, + "data": map[string]interface{}{"type": "string", "description": "Callback data"}, + }, + "required": []string{"text", "data"}, + }, + }, + }, }, "required": []string{"content"}, } @@ -79,7 +95,28 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) return "Error: Message sending not configured", nil } - if err := t.sendCallback(channel, chatID, content); err != nil { + var buttons [][]bus.Button + if btns, ok := args["buttons"].([]interface{}); ok { + for _, row := range btns { + if rowArr, ok := row.([]interface{}); ok { + var buttonRow []bus.Button + for _, b := range rowArr { + if bMap, ok := b.(map[string]interface{}); ok { + text, _ := bMap["text"].(string) + data, _ := bMap["data"].(string) + if text != "" && data != "" { + buttonRow = append(buttonRow, bus.Button{Text: text, Data: data}) + } + } + } + if len(buttonRow) > 0 { + buttons = append(buttons, buttonRow) + } + } + } + } + + if err := t.sendCallback(channel, chatID, content, buttons); err != nil { return fmt.Sprintf("Error sending message: %v", err), nil }