Ensure cron reminders deliver to explicit targets

This commit is contained in:
野生派Coder~
2026-03-03 00:10:20 +08:00
parent d1c277d3c0
commit 8b97064255
5 changed files with 151 additions and 41 deletions

View File

@@ -70,42 +70,7 @@ func gatewayCmd() {
msgBus := bus.NewMessageBus()
cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json")
cronService := cron.NewCronService(cronStorePath, func(job *cron.CronJob) (string, error) {
if job == nil {
return "", nil
}
targetChannel := strings.TrimSpace(job.Payload.Channel)
targetChatID := strings.TrimSpace(job.Payload.To)
message := strings.TrimSpace(job.Payload.Message)
if job.Payload.Deliver && targetChannel != "" && targetChatID != "" && message != "" {
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: targetChannel,
ChatID: targetChatID,
Content: message,
})
return "delivered", nil
}
if message == "" {
return "", nil
}
if targetChannel == "" || targetChatID == "" {
targetChannel = "internal"
targetChatID = "cron"
}
msgBus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: "cron",
ChatID: fmt.Sprintf("%s:%s", targetChannel, targetChatID),
Content: message,
SessionKey: fmt.Sprintf("cron:%s", job.ID),
Metadata: map[string]string{
"trigger": "cron",
"job_id": job.ID,
},
})
return "scheduled", nil
return dispatchCronJob(msgBus, job), nil
})
configureCronServiceRuntime(cronService, cfg)
heartbeatService := buildHeartbeatService(cfg, msgBus)
@@ -904,6 +869,56 @@ func buildGatewayRuntime(ctx context.Context, cfg *config.Config, msgBus *bus.Me
return agentLoop, channelManager, nil
}
func normalizeCronTargetChatID(channel, chatID string) string {
ch := strings.ToLower(strings.TrimSpace(channel))
target := strings.TrimSpace(chatID)
if ch == "" || target == "" {
return target
}
prefix := ch + ":"
if strings.HasPrefix(strings.ToLower(target), prefix) {
return strings.TrimSpace(target[len(prefix):])
}
return target
}
func dispatchCronJob(msgBus *bus.MessageBus, job *cron.CronJob) string {
if job == nil {
return ""
}
message := strings.TrimSpace(job.Payload.Message)
if message == "" {
return ""
}
targetChannel := strings.TrimSpace(job.Payload.Channel)
targetChatID := normalizeCronTargetChatID(targetChannel, job.Payload.To)
if targetChannel != "" && targetChatID != "" {
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: targetChannel,
ChatID: targetChatID,
Content: message,
})
if job.Payload.Deliver {
return "delivered"
}
return "delivered_targeted"
}
msgBus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: "cron",
ChatID: "internal:cron",
Content: message,
SessionKey: fmt.Sprintf("cron:%s", job.ID),
Metadata: map[string]string{
"trigger": "cron",
"job_id": job.ID,
},
})
return "scheduled"
}
func configureCronServiceRuntime(cs *cron.CronService, cfg *config.Config) {
if cs == nil || cfg == nil {
return

View File

@@ -0,0 +1,68 @@
package main
import (
"context"
"testing"
"time"
"clawgo/pkg/bus"
"clawgo/pkg/cron"
)
func TestNormalizeCronTargetChatID(t *testing.T) {
if got := normalizeCronTargetChatID("telegram", "telegram:12345"); got != "12345" {
t.Fatalf("expected 12345, got %q", got)
}
if got := normalizeCronTargetChatID("telegram", "12345"); got != "12345" {
t.Fatalf("expected unchanged chat id, got %q", got)
}
}
func TestDispatchCronJob_DeliversTargetedMessageEvenWhenDeliverFalse(t *testing.T) {
mb := bus.NewMessageBus()
defer mb.Close()
status := dispatchCronJob(mb, &cron.CronJob{
ID: "job-1",
Payload: cron.CronPayload{
Message: "time to sleep",
Deliver: false,
Channel: "telegram",
To: "telegram:5988738763",
},
})
if status != "delivered_targeted" {
t.Fatalf("unexpected status: %s", status)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
out, ok := mb.SubscribeOutbound(ctx)
if !ok {
t.Fatal("expected outbound message")
}
if out.Channel != "telegram" || out.ChatID != "5988738763" || out.Content != "time to sleep" {
t.Fatalf("unexpected outbound: %#v", out)
}
}
func TestDispatchCronJob_FallsBackToSystemInboundWithoutTarget(t *testing.T) {
mb := bus.NewMessageBus()
defer mb.Close()
status := dispatchCronJob(mb, &cron.CronJob{ID: "job-2", Payload: cron.CronPayload{Message: "tick"}})
if status != "scheduled" {
t.Fatalf("unexpected status: %s", status)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
in, ok := mb.ConsumeInbound(ctx)
if !ok {
t.Fatal("expected inbound message")
}
if in.Channel != "system" || in.ChatID != "internal:cron" {
t.Fatalf("unexpected inbound: %#v", in)
}
}