diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 3076b28..f316d4f 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -70,42 +70,7 @@ func gatewayCmd() { msgBus := bus.NewMessageBus() cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") cronService := cron.NewCronService(cronStorePath, func(job *cron.CronJob) (string, error) { - if job == nil { - return "", nil - } - - targetChannel := strings.TrimSpace(job.Payload.Channel) - targetChatID := strings.TrimSpace(job.Payload.To) - message := strings.TrimSpace(job.Payload.Message) - - if job.Payload.Deliver && targetChannel != "" && targetChatID != "" && message != "" { - msgBus.PublishOutbound(bus.OutboundMessage{ - Channel: targetChannel, - ChatID: targetChatID, - Content: message, - }) - return "delivered", nil - } - - if message == "" { - return "", nil - } - if targetChannel == "" || targetChatID == "" { - targetChannel = "internal" - targetChatID = "cron" - } - msgBus.PublishInbound(bus.InboundMessage{ - Channel: "system", - SenderID: "cron", - ChatID: fmt.Sprintf("%s:%s", targetChannel, targetChatID), - Content: message, - SessionKey: fmt.Sprintf("cron:%s", job.ID), - Metadata: map[string]string{ - "trigger": "cron", - "job_id": job.ID, - }, - }) - return "scheduled", nil + return dispatchCronJob(msgBus, job), nil }) configureCronServiceRuntime(cronService, cfg) heartbeatService := buildHeartbeatService(cfg, msgBus) @@ -904,6 +869,56 @@ func buildGatewayRuntime(ctx context.Context, cfg *config.Config, msgBus *bus.Me return agentLoop, channelManager, nil } +func normalizeCronTargetChatID(channel, chatID string) string { + ch := strings.ToLower(strings.TrimSpace(channel)) + target := strings.TrimSpace(chatID) + if ch == "" || target == "" { + return target + } + prefix := ch + ":" + if strings.HasPrefix(strings.ToLower(target), prefix) { + return strings.TrimSpace(target[len(prefix):]) + } + return target +} + +func dispatchCronJob(msgBus *bus.MessageBus, job *cron.CronJob) string { + if job == nil { + return "" + } + message := strings.TrimSpace(job.Payload.Message) + if message == "" { + return "" + } + targetChannel := strings.TrimSpace(job.Payload.Channel) + targetChatID := normalizeCronTargetChatID(targetChannel, job.Payload.To) + + if targetChannel != "" && targetChatID != "" { + msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: targetChannel, + ChatID: targetChatID, + Content: message, + }) + if job.Payload.Deliver { + return "delivered" + } + return "delivered_targeted" + } + + msgBus.PublishInbound(bus.InboundMessage{ + Channel: "system", + SenderID: "cron", + ChatID: "internal:cron", + Content: message, + SessionKey: fmt.Sprintf("cron:%s", job.ID), + Metadata: map[string]string{ + "trigger": "cron", + "job_id": job.ID, + }, + }) + return "scheduled" +} + func configureCronServiceRuntime(cs *cron.CronService, cfg *config.Config) { if cs == nil || cfg == nil { return diff --git a/cmd/clawgo/cmd_gateway_test.go b/cmd/clawgo/cmd_gateway_test.go new file mode 100644 index 0000000..77946b9 --- /dev/null +++ b/cmd/clawgo/cmd_gateway_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "testing" + "time" + + "clawgo/pkg/bus" + "clawgo/pkg/cron" +) + +func TestNormalizeCronTargetChatID(t *testing.T) { + if got := normalizeCronTargetChatID("telegram", "telegram:12345"); got != "12345" { + t.Fatalf("expected 12345, got %q", got) + } + if got := normalizeCronTargetChatID("telegram", "12345"); got != "12345" { + t.Fatalf("expected unchanged chat id, got %q", got) + } +} + +func TestDispatchCronJob_DeliversTargetedMessageEvenWhenDeliverFalse(t *testing.T) { + mb := bus.NewMessageBus() + defer mb.Close() + + status := dispatchCronJob(mb, &cron.CronJob{ + ID: "job-1", + Payload: cron.CronPayload{ + Message: "time to sleep", + Deliver: false, + Channel: "telegram", + To: "telegram:5988738763", + }, + }) + + if status != "delivered_targeted" { + t.Fatalf("unexpected status: %s", status) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + out, ok := mb.SubscribeOutbound(ctx) + if !ok { + t.Fatal("expected outbound message") + } + if out.Channel != "telegram" || out.ChatID != "5988738763" || out.Content != "time to sleep" { + t.Fatalf("unexpected outbound: %#v", out) + } +} + +func TestDispatchCronJob_FallsBackToSystemInboundWithoutTarget(t *testing.T) { + mb := bus.NewMessageBus() + defer mb.Close() + + status := dispatchCronJob(mb, &cron.CronJob{ID: "job-2", Payload: cron.CronPayload{Message: "tick"}}) + if status != "scheduled" { + t.Fatalf("unexpected status: %s", status) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + in, ok := mb.ConsumeInbound(ctx) + if !ok { + t.Fatal("expected inbound message") + } + if in.Channel != "system" || in.ChatID != "internal:cron" { + t.Fatalf("unexpected inbound: %#v", in) + } +} diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index 982a1ec..cde2723 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -97,6 +97,7 @@ const resources = { kind: 'Kind', everyMs: 'Interval (ms)', cronExpression: 'Cron Expression', + runAt: 'Run At', message: 'Message', deliver: 'Deliver', channel: 'Channel', @@ -280,6 +281,7 @@ const resources = { kind: '类型', everyMs: '间隔 (毫秒)', cronExpression: 'Cron 表达式', + runAt: '执行时间', message: '消息', deliver: '投递', channel: '频道', diff --git a/webui/src/pages/Cron.tsx b/webui/src/pages/Cron.tsx index 2f5d0d1..070f8c3 100644 --- a/webui/src/pages/Cron.tsx +++ b/webui/src/pages/Cron.tsx @@ -32,6 +32,20 @@ const isNonGroupRecipient = (channel: string, id: string) => { return true; }; +const formatSchedule = (job: CronJob, t: (key: string) => string) => { + const kind = String(job.schedule?.kind || '').toLowerCase(); + if (kind === 'at' && job.schedule?.atMs) { + return { + label: t('runAt'), + value: new Date(job.schedule.atMs).toLocaleString(), + }; + } + return { + label: t('cronExpression'), + value: job.expr || '-', + }; +}; + const Cron: React.FC = () => { const { t } = useTranslation(); const { cron, refreshCron, q, cfg } = useAppContext(); @@ -76,10 +90,11 @@ const Cron: React.FC = () => { const r = await fetch(`/webui/api/cron${q}&id=${job.id}`); if (r.ok) { const details = await r.json(); + const isAtSchedule = String(details.job?.schedule?.kind || '').toLowerCase() === 'at'; setEditingCron(details.job); setCronForm({ name: details.job.name || '', - expr: details.job.expr || '', + expr: isAtSchedule ? '' : (details.job.expr || ''), message: details.job.message || '', deliver: details.job.deliver || false, channel: details.job.channel || 'telegram', @@ -141,7 +156,9 @@ const Cron: React.FC = () => {