This commit is contained in:
lpf
2026-02-26 12:26:04 +08:00
parent 2615a6b196
commit dea7f361f2
9 changed files with 321 additions and 268 deletions

View File

@@ -1,53 +0,0 @@
package cron
import (
"fmt"
"strings"
"time"
)
// SimpleCronParser is a minimal cron expression parser (minute hour day month dayOfWeek).
// It approximates standard cron behavior to provide missing Expr scheduling support.
type SimpleCronParser struct {
expr string
}
func NewSimpleCronParser(expr string) *SimpleCronParser {
return &SimpleCronParser{expr: expr}
}
func (p *SimpleCronParser) Next(from time.Time) time.Time {
fields := strings.Fields(p.expr)
if len(fields) != 5 {
return time.Time{} // Invalid format
}
// This minimal implementation only supports "*" and exact numbers.
// For production, use github.com/robfig/cron/v3.
next := from.Add(1 * time.Minute).Truncate(time.Minute)
// Simplified logic: if it is not "*" and does not match, keep incrementing until matched
// (up to one year of search).
for i := 0; i < 525600; i++ {
if p.match(next, fields) {
return next
}
next = next.Add(1 * time.Minute)
}
return time.Time{}
}
func (p *SimpleCronParser) match(t time.Time, fields []string) bool {
return matchField(fmt.Sprintf("%d", t.Minute()), fields[0]) &&
matchField(fmt.Sprintf("%d", t.Hour()), fields[1]) &&
matchField(fmt.Sprintf("%d", t.Day()), fields[2]) &&
matchField(fmt.Sprintf("%d", int(t.Month())), fields[3]) &&
matchField(fmt.Sprintf("%d", int(t.Weekday())), fields[4])
}
func matchField(val, field string) bool {
if field == "*" {
return true
}
return val == field
}

View File

@@ -6,10 +6,12 @@ import (
"math"
"os"
"path/filepath"
"strings"
"sync"
"time"
"clawgo/pkg/lifecycle"
robcron "github.com/robfig/cron/v3"
)
const (
@@ -342,6 +344,10 @@ func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int6
}
func (cs *CronService) computeNextRunAfter(schedule *CronSchedule, baseMS int64, nowMS int64) *int64 {
if schedule == nil {
return nil
}
if schedule.Kind == "at" {
if schedule.AtMS != nil && *schedule.AtMS > nowMS {
return schedule.AtMS
@@ -349,26 +355,126 @@ func (cs *CronService) computeNextRunAfter(schedule *CronSchedule, baseMS int64,
return nil
}
if schedule.Kind == "every" {
if schedule.EveryMS == nil || *schedule.EveryMS <= 0 {
return nil
}
next := computeAlignedEveryNext(baseMS, nowMS, *schedule.EveryMS)
return &next
expr, ok := cronExprFromSchedule(schedule)
if !ok {
return nil
}
compiled, err := parseCronSchedule(expr, strings.TrimSpace(schedule.TZ))
if err != nil {
return nil
}
if schedule.Kind == "cron" && schedule.Expr != "" {
parser := NewSimpleCronParser(schedule.Expr)
next := parser.Next(time.UnixMilli(nowMS))
if !next.IsZero() {
ms := next.UnixMilli()
return &ms
}
base := time.UnixMilli(baseMS)
now := time.UnixMilli(nowMS)
next := compiled.Next(base)
for i := 0; i < 1024 && !next.After(now); i++ {
next = compiled.Next(next)
}
if next.After(now) {
ms := next.UnixMilli()
return &ms
}
return nil
}
func cronExprFromSchedule(schedule *CronSchedule) (string, bool) {
if schedule == nil {
return "", false
}
if expr := strings.TrimSpace(schedule.Expr); expr != "" {
return expr, true
}
if schedule.Kind == "every" && schedule.EveryMS != nil && *schedule.EveryMS > 0 {
d := time.Duration(*schedule.EveryMS) * time.Millisecond
return "@every " + d.String(), true
}
return "", false
}
func parseCronSchedule(expr, tz string) (robcron.Schedule, error) {
spec := strings.TrimSpace(expr)
if spec == "" {
return nil, fmt.Errorf("empty cron expression")
}
tz = strings.TrimSpace(tz)
if tz != "" && !strings.HasPrefix(spec, "CRON_TZ=") && !strings.HasPrefix(spec, "TZ=") {
spec = "CRON_TZ=" + tz + " " + spec
}
parser := robcron.NewParser(
robcron.Minute |
robcron.Hour |
robcron.Dom |
robcron.Month |
robcron.Dow |
robcron.Descriptor,
)
return parser.Parse(spec)
}
func normalizeSchedule(schedule CronSchedule) CronSchedule {
schedule.Kind = strings.ToLower(strings.TrimSpace(schedule.Kind))
schedule.Expr = strings.TrimSpace(schedule.Expr)
schedule.TZ = strings.TrimSpace(schedule.TZ)
if schedule.Expr != "" {
schedule.Kind = "cron"
schedule.AtMS = nil
schedule.EveryMS = nil
return schedule
}
if schedule.Kind == "every" && schedule.EveryMS != nil && *schedule.EveryMS > 0 {
d := time.Duration(*schedule.EveryMS) * time.Millisecond
schedule.Expr = "@every " + d.String()
schedule.Kind = "cron"
schedule.EveryMS = nil
return schedule
}
if schedule.Kind == "at" {
schedule.EveryMS = nil
return schedule
}
if schedule.AtMS != nil {
schedule.Kind = "at"
return schedule
}
if schedule.EveryMS != nil && *schedule.EveryMS > 0 {
d := time.Duration(*schedule.EveryMS) * time.Millisecond
schedule.Expr = "@every " + d.String()
schedule.Kind = "cron"
schedule.EveryMS = nil
return schedule
}
if schedule.Expr != "" {
schedule.Kind = "cron"
}
return schedule
}
func validateSchedule(schedule CronSchedule) error {
if schedule.Kind == "at" {
if schedule.AtMS == nil || *schedule.AtMS <= time.Now().UnixMilli() {
return fmt.Errorf("invalid one-time schedule")
}
return nil
}
expr, ok := cronExprFromSchedule(&schedule)
if !ok {
return fmt.Errorf("cron expression is required")
}
_, err := parseCronSchedule(expr, schedule.TZ)
if err != nil {
return fmt.Errorf("invalid cron expression: %w", err)
}
return nil
}
func (cs *CronService) recomputeNextRuns() bool {
changed := false
now := time.Now().UnixMilli()
@@ -427,7 +533,13 @@ func (cs *CronService) loadStore() error {
return err
}
return json.Unmarshal(data, cs.store)
if err := json.Unmarshal(data, cs.store); err != nil {
return err
}
for i := range cs.store.Jobs {
cs.store.Jobs[i].Schedule = normalizeSchedule(cs.store.Jobs[i].Schedule)
}
return nil
}
func (cs *CronService) saveStore() error {
@@ -479,6 +591,11 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string
cs.mu.Lock()
defer cs.mu.Unlock()
schedule = normalizeSchedule(schedule)
if err := validateSchedule(schedule); err != nil {
return nil, err
}
now := time.Now().UnixMilli()
job := CronJob{
@@ -589,18 +706,6 @@ func (cs *CronService) unmarkJobRunning(jobID string) {
delete(cs.running, jobID)
}
func computeAlignedEveryNext(baseMS, nowMS, intervalMS int64) int64 {
if intervalMS <= 0 {
return nowMS
}
next := baseMS + intervalMS
if next > nowMS {
return next
}
miss := (nowMS-next)/intervalMS + 1
return next + miss*intervalMS
}
func computeRetryBackoff(consecutiveFailures int64, base, max time.Duration) time.Duration {
if base <= 0 {
base = defaultRetryBackoffBase
@@ -688,7 +793,11 @@ func (cs *CronService) UpdateJob(jobID string, in UpdateJobInput) (*CronJob, err
job.Name = *in.Name
}
if in.Schedule != nil {
job.Schedule = *in.Schedule
nextSchedule := normalizeSchedule(*in.Schedule)
if err := validateSchedule(nextSchedule); err != nil {
return nil, err
}
job.Schedule = nextSchedule
if job.Enabled {
now := time.Now().UnixMilli()
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now)