diff --git a/cmd/clawgo/main.go b/cmd/clawgo/main.go index 0a16b9f..c429613 100644 --- a/cmd/clawgo/main.go +++ b/cmd/clawgo/main.go @@ -535,6 +535,7 @@ func agentCmd() { // Initialize CronService for tools (shared storage with gateway) cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") cronService := cron.NewCronService(cronStorePath, nil) + configureCronServiceRuntime(cronService, cfg) agentLoop := agent.NewAgentLoop(cfg, msgBus, provider, cronService) @@ -684,6 +685,7 @@ func gatewayCmd() { msgBus := bus.NewMessageBus() cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") cronService := cron.NewCronService(cronStorePath, nil) + configureCronServiceRuntime(cronService, cfg) heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), nil, @@ -768,6 +770,7 @@ func gatewayCmd() { if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") { applyMaximumPermissionPolicy(newCfg) } + configureCronServiceRuntime(cronService, newCfg) if reflect.DeepEqual(cfg, newCfg) { fmt.Println("✓ Config unchanged, skip reload") @@ -1121,6 +1124,20 @@ func buildGatewayRuntime(ctx context.Context, cfg *config.Config, msgBus *bus.Me return agentLoop, channelManager, nil } +func configureCronServiceRuntime(cs *cron.CronService, cfg *config.Config) { + if cs == nil || cfg == nil { + return + } + cs.SetRuntimeOptions(cron.RuntimeOptions{ + RunLoopMinSleep: time.Duration(cfg.Cron.MinSleepSec) * time.Second, + RunLoopMaxSleep: time.Duration(cfg.Cron.MaxSleepSec) * time.Second, + RetryBackoffBase: time.Duration(cfg.Cron.RetryBackoffBaseSec) * time.Second, + RetryBackoffMax: time.Duration(cfg.Cron.RetryBackoffMaxSec) * time.Second, + MaxConsecutiveFailureRetries: int64(cfg.Cron.MaxConsecutiveFailureRetries), + MaxWorkers: cfg.Cron.MaxWorkers, + }) +} + func configCmd() { if len(os.Args) < 3 { configHelp() diff --git a/config.example.json b/config.example.json index 14f5279..3da2777 100644 --- a/config.example.json +++ b/config.example.json @@ -66,6 +66,14 @@ "host": "0.0.0.0", "port": 18790 }, + "cron": { + "min_sleep_sec": 1, + "max_sleep_sec": 30, + "retry_backoff_base_sec": 30, + "retry_backoff_max_sec": 1800, + "max_consecutive_failure_retries": 5, + "max_workers": 4 + }, "logging": { "enabled": true, "dir": "~/.clawgo/logs", diff --git a/pkg/config/config.go b/pkg/config/config.go index 07eed03..247eb9b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,6 +15,7 @@ type Config struct { Channels ChannelsConfig `json:"channels"` Providers ProvidersConfig `json:"providers"` Gateway GatewayConfig `json:"gateway"` + Cron CronConfig `json:"cron"` Tools ToolsConfig `json:"tools"` Logging LoggingConfig `json:"logging"` Sentinel SentinelConfig `json:"sentinel"` @@ -118,6 +119,15 @@ type GatewayConfig struct { Port int `json:"port" env:"CLAWGO_GATEWAY_PORT"` } +type CronConfig struct { + MinSleepSec int `json:"min_sleep_sec" env:"CLAWGO_CRON_MIN_SLEEP_SEC"` + MaxSleepSec int `json:"max_sleep_sec" env:"CLAWGO_CRON_MAX_SLEEP_SEC"` + RetryBackoffBaseSec int `json:"retry_backoff_base_sec" env:"CLAWGO_CRON_RETRY_BACKOFF_BASE_SEC"` + RetryBackoffMaxSec int `json:"retry_backoff_max_sec" env:"CLAWGO_CRON_RETRY_BACKOFF_MAX_SEC"` + MaxConsecutiveFailureRetries int `json:"max_consecutive_failure_retries" env:"CLAWGO_CRON_MAX_CONSECUTIVE_FAILURE_RETRIES"` + MaxWorkers int `json:"max_workers" env:"CLAWGO_CRON_MAX_WORKERS"` +} + type WebSearchConfig struct { APIKey string `json:"api_key" env:"CLAWGO_TOOLS_WEB_SEARCH_API_KEY"` MaxResults int `json:"max_results" env:"CLAWGO_TOOLS_WEB_SEARCH_MAX_RESULTS"` @@ -285,6 +295,14 @@ func DefaultConfig() *Config { Host: "0.0.0.0", Port: 18790, }, + Cron: CronConfig{ + MinSleepSec: 1, + MaxSleepSec: 30, + RetryBackoffBaseSec: 30, + RetryBackoffMaxSec: 1800, + MaxConsecutiveFailureRetries: 5, + MaxWorkers: 4, + }, Tools: ToolsConfig{ Web: WebToolsConfig{ Search: WebSearchConfig{ diff --git a/pkg/config/validate.go b/pkg/config/validate.go index bef09d1..f466527 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -46,6 +46,30 @@ func Validate(cfg *Config) []error { if cfg.Gateway.Port <= 0 || cfg.Gateway.Port > 65535 { errs = append(errs, fmt.Errorf("gateway.port must be in 1..65535")) } + if cfg.Cron.MinSleepSec <= 0 { + errs = append(errs, fmt.Errorf("cron.min_sleep_sec must be > 0")) + } + if cfg.Cron.MaxSleepSec <= 0 { + errs = append(errs, fmt.Errorf("cron.max_sleep_sec must be > 0")) + } + if cfg.Cron.MinSleepSec > cfg.Cron.MaxSleepSec { + errs = append(errs, fmt.Errorf("cron.min_sleep_sec must be <= cron.max_sleep_sec")) + } + if cfg.Cron.RetryBackoffBaseSec <= 0 { + errs = append(errs, fmt.Errorf("cron.retry_backoff_base_sec must be > 0")) + } + if cfg.Cron.RetryBackoffMaxSec <= 0 { + errs = append(errs, fmt.Errorf("cron.retry_backoff_max_sec must be > 0")) + } + if cfg.Cron.RetryBackoffBaseSec > cfg.Cron.RetryBackoffMaxSec { + errs = append(errs, fmt.Errorf("cron.retry_backoff_base_sec must be <= cron.retry_backoff_max_sec")) + } + if cfg.Cron.MaxConsecutiveFailureRetries < 0 { + errs = append(errs, fmt.Errorf("cron.max_consecutive_failure_retries must be >= 0")) + } + if cfg.Cron.MaxWorkers <= 0 { + errs = append(errs, fmt.Errorf("cron.max_workers must be > 0")) + } if cfg.Logging.Enabled { if cfg.Logging.Dir == "" { diff --git a/pkg/cron/service.go b/pkg/cron/service.go index d22c2bc..2b5d864 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -3,6 +3,7 @@ package cron import ( "encoding/json" "fmt" + "math" "os" "path/filepath" "sync" @@ -11,6 +12,64 @@ import ( "clawgo/pkg/lifecycle" ) +const ( + defaultRunLoopMinSleep = 1 * time.Second + defaultRunLoopMaxSleep = 30 * time.Second + defaultRetryBackoffBase = 30 * time.Second + defaultRetryBackoffMax = 30 * time.Minute + defaultMaxConsecutiveFailureRetries = 5 +) + +type RuntimeOptions struct { + RunLoopMinSleep time.Duration + RunLoopMaxSleep time.Duration + RetryBackoffBase time.Duration + RetryBackoffMax time.Duration + MaxConsecutiveFailureRetries int64 + MaxWorkers int +} + +func DefaultRuntimeOptions() RuntimeOptions { + return RuntimeOptions{ + RunLoopMinSleep: defaultRunLoopMinSleep, + RunLoopMaxSleep: defaultRunLoopMaxSleep, + RetryBackoffBase: defaultRetryBackoffBase, + RetryBackoffMax: defaultRetryBackoffMax, + MaxConsecutiveFailureRetries: defaultMaxConsecutiveFailureRetries, + MaxWorkers: 1, + } +} + +func normalizeRuntimeOptions(opts RuntimeOptions) RuntimeOptions { + def := DefaultRuntimeOptions() + + if opts.RunLoopMinSleep <= 0 { + opts.RunLoopMinSleep = def.RunLoopMinSleep + } + if opts.RunLoopMaxSleep <= 0 { + opts.RunLoopMaxSleep = def.RunLoopMaxSleep + } + if opts.RunLoopMinSleep > opts.RunLoopMaxSleep { + opts.RunLoopMinSleep = opts.RunLoopMaxSleep + } + if opts.RetryBackoffBase <= 0 { + opts.RetryBackoffBase = def.RetryBackoffBase + } + if opts.RetryBackoffMax <= 0 { + opts.RetryBackoffMax = def.RetryBackoffMax + } + if opts.RetryBackoffBase > opts.RetryBackoffMax { + opts.RetryBackoffBase = opts.RetryBackoffMax + } + if opts.MaxConsecutiveFailureRetries < 0 { + opts.MaxConsecutiveFailureRetries = def.MaxConsecutiveFailureRetries + } + if opts.MaxWorkers <= 0 { + opts.MaxWorkers = def.MaxWorkers + } + return opts +} + type CronSchedule struct { Kind string `json:"kind"` AtMS *int64 `json:"atMs,omitempty"` @@ -28,10 +87,15 @@ type CronPayload struct { } type CronJobState struct { - NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"` - LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"` - LastStatus string `json:"lastStatus,omitempty"` - LastError string `json:"lastError,omitempty"` + NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"` + LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"` + LastStatus string `json:"lastStatus,omitempty"` + LastError string `json:"lastError,omitempty"` + LastDurationMS int64 `json:"lastDurationMs,omitempty"` + LastScheduleDelayMS int64 `json:"lastScheduleDelayMs,omitempty"` + TotalRuns int64 `json:"totalRuns,omitempty"` + TotalFailures int64 `json:"totalFailures,omitempty"` + ConsecutiveFailures int64 `json:"consecutiveFailures,omitempty"` } type CronJob struct { @@ -57,6 +121,8 @@ type CronService struct { storePath string store *CronStore onJob JobHandler + opts RuntimeOptions + running map[string]struct{} mu sync.RWMutex runner *lifecycle.LoopRunner } @@ -65,12 +131,20 @@ func NewCronService(storePath string, onJob JobHandler) *CronService { cs := &CronService{ storePath: storePath, onJob: onJob, + opts: DefaultRuntimeOptions(), + running: make(map[string]struct{}), runner: lifecycle.NewLoopRunner(), } cs.loadStore() return cs } +func (cs *CronService) SetRuntimeOptions(opts RuntimeOptions) { + cs.mu.Lock() + defer cs.mu.Unlock() + cs.opts = normalizeRuntimeOptions(opts) +} + func (cs *CronService) Start() error { if cs.runner.Running() { return nil @@ -83,9 +157,10 @@ func (cs *CronService) Start() error { return fmt.Errorf("failed to load store: %w", err) } - cs.recomputeNextRuns() - if err := cs.saveStore(); err != nil { - return fmt.Errorf("failed to save store: %w", err) + if changed := cs.recomputeNextRuns(); changed { + if err := cs.saveStore(); err != nil { + return fmt.Errorf("failed to save store: %w", err) + } } cs.runner.Start(cs.runLoop) @@ -98,14 +173,19 @@ func (cs *CronService) Stop() { } func (cs *CronService) runLoop(stopCh <-chan struct{}) { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for { + sleepFor := cs.nextSleepDuration(time.Now()) + timer := time.NewTimer(sleepFor) select { case <-stopCh: + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } return - case <-ticker.C: + case <-timer.C: cs.checkJobs() } } @@ -118,61 +198,134 @@ func (cs *CronService) checkJobs() { cs.mu.RLock() now := time.Now().UnixMilli() - var dueJobs []*CronJob + dueIDs := make([]string, 0) for i := range cs.store.Jobs { job := &cs.store.Jobs[i] if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now { - dueJobs = append(dueJobs, job) + dueIDs = append(dueIDs, job.ID) } } cs.mu.RUnlock() - for _, job := range dueJobs { - cs.executeJob(job) + if len(dueIDs) == 0 { + return + } + + opts := cs.getRuntimeOptions() + changed := false + changedMu := sync.Mutex{} + sem := make(chan struct{}, opts.MaxWorkers) + var wg sync.WaitGroup + for _, jobID := range dueIDs { + if !cs.markJobRunning(jobID) { + continue + } + wg.Add(1) + go func(id string) { + defer wg.Done() + sem <- struct{}{} + defer func() { + <-sem + cs.unmarkJobRunning(id) + }() + if cs.executeJobByID(id) { + changedMu.Lock() + changed = true + changedMu.Unlock() + } + }(jobID) + } + wg.Wait() + + if !changed { + return } cs.mu.Lock() defer cs.mu.Unlock() - cs.saveStore() + _ = cs.saveStore() } -func (cs *CronService) executeJob(job *CronJob) { +func (cs *CronService) executeJobByID(jobID string) bool { + cs.mu.RLock() + idx := cs.findJobIndexByIDLocked(jobID) + if idx < 0 { + cs.mu.RUnlock() + return false + } + + jobSnapshot := cs.store.Jobs[idx] + if !jobSnapshot.Enabled || jobSnapshot.State.NextRunAtMS == nil { + cs.mu.RUnlock() + return false + } + plannedRun := *jobSnapshot.State.NextRunAtMS + cs.mu.RUnlock() + startTime := time.Now().UnixMilli() + execStart := time.Now() var err error if cs.onJob != nil { - _, err = cs.onJob(job) + _, err = cs.onJob(&jobSnapshot) } + durationMS := time.Since(execStart).Milliseconds() cs.mu.Lock() defer cs.mu.Unlock() + idx = cs.findJobIndexByIDLocked(jobID) + if idx < 0 { + return false + } + job := &cs.store.Jobs[idx] + if !job.Enabled { + return false + } + job.State.LastRunAtMS = &startTime job.UpdatedAtMS = time.Now().UnixMilli() + job.State.LastDurationMS = durationMS + job.State.LastScheduleDelayMS = maxInt64(0, startTime-plannedRun) + job.State.TotalRuns++ if err != nil { job.State.LastStatus = "error" job.State.LastError = err.Error() + job.State.TotalFailures++ + job.State.ConsecutiveFailures++ } else { job.State.LastStatus = "ok" job.State.LastError = "" + job.State.ConsecutiveFailures = 0 + } + + if err != nil && job.State.ConsecutiveFailures <= cs.opts.MaxConsecutiveFailureRetries { + retryAt := time.Now().Add(computeRetryBackoff(job.State.ConsecutiveFailures, cs.opts.RetryBackoffBase, cs.opts.RetryBackoffMax)).UnixMilli() + job.State.NextRunAtMS = &retryAt + return true } if job.Schedule.Kind == "at" { if job.DeleteAfterRun { - cs.removeJobUnsafe(job.ID) + cs.removeJobByIDUnsafe(job.ID) } else { job.Enabled = false job.State.NextRunAtMS = nil } } else { - nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) + nextRun := cs.computeNextRunAfter(&job.Schedule, plannedRun, time.Now().UnixMilli()) job.State.NextRunAtMS = nextRun } + return true } func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int64 { + return cs.computeNextRunAfter(schedule, nowMS, nowMS) +} + +func (cs *CronService) computeNextRunAfter(schedule *CronSchedule, baseMS int64, nowMS int64) *int64 { if schedule.Kind == "at" { if schedule.AtMS != nil && *schedule.AtMS > nowMS { return schedule.AtMS @@ -184,7 +337,7 @@ func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int6 if schedule.EveryMS == nil || *schedule.EveryMS <= 0 { return nil } - next := nowMS + *schedule.EveryMS + next := computeAlignedEveryNext(baseMS, nowMS, *schedule.EveryMS) return &next } @@ -200,14 +353,32 @@ func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int6 return nil } -func (cs *CronService) recomputeNextRuns() { +func (cs *CronService) recomputeNextRuns() bool { + changed := false now := time.Now().UnixMilli() for i := range cs.store.Jobs { job := &cs.store.Jobs[i] + oldNext := int64(0) + oldSet := false + if job.State.NextRunAtMS != nil { + oldSet = true + oldNext = *job.State.NextRunAtMS + } + if job.Enabled { - job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now) + job.State.NextRunAtMS = cs.computeNextRunAfter(&job.Schedule, now, now) + } + + newSet := job.State.NextRunAtMS != nil + newNext := int64(0) + if newSet { + newNext = *job.State.NextRunAtMS + } + if oldSet != newSet || oldNext != newNext { + changed = true } } + return changed } func (cs *CronService) getNextWakeMS() *int64 { @@ -254,7 +425,38 @@ func (cs *CronService) saveStore() error { return err } - return os.WriteFile(cs.storePath, data, 0644) + tmpPath := cs.storePath + ".tmp" + tmpFile, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + + if _, err := tmpFile.Write(data); err != nil { + tmpFile.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmpFile.Sync(); err != nil { + tmpFile.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmpFile.Close(); err != nil { + _ = os.Remove(tmpPath) + return err + } + + if err := os.Rename(tmpPath, cs.storePath); err != nil { + _ = os.Remove(tmpPath) + return err + } + + dirFile, err := os.Open(dir) + if err == nil { + _ = dirFile.Sync() + _ = dirFile.Close() + } + return nil } func (cs *CronService) AddJob(name string, schedule CronSchedule, message string, deliver bool, channel, to string) (*CronJob, error) { @@ -299,21 +501,117 @@ func (cs *CronService) RemoveJob(jobID string) bool { } func (cs *CronService) removeJobUnsafe(jobID string) bool { - before := len(cs.store.Jobs) - var jobs []CronJob - for _, job := range cs.store.Jobs { - if job.ID != jobID { - jobs = append(jobs, job) + removed := cs.removeJobByIDUnsafe(jobID) + if removed { + _ = cs.saveStore() + } + return removed +} + +func (cs *CronService) removeJobByIDUnsafe(jobID string) bool { + idx := cs.findJobIndexByIDLocked(jobID) + if idx < 0 { + return false + } + lastIdx := len(cs.store.Jobs) - 1 + cs.store.Jobs[idx] = cs.store.Jobs[lastIdx] + cs.store.Jobs = cs.store.Jobs[:lastIdx] + return true +} + +func (cs *CronService) findJobIndexByIDLocked(jobID string) int { + for i := range cs.store.Jobs { + if cs.store.Jobs[i].ID == jobID { + return i } } - cs.store.Jobs = jobs - removed := len(cs.store.Jobs) < before + return -1 +} - if removed { - cs.saveStore() +func (cs *CronService) nextSleepDuration(now time.Time) time.Duration { + cs.mu.RLock() + defer cs.mu.RUnlock() + + nextWake := cs.getNextWakeMS() + if nextWake == nil { + return cs.opts.RunLoopMaxSleep } - return removed + sleep := time.Until(time.UnixMilli(*nextWake)) + if sleep < cs.opts.RunLoopMinSleep { + return cs.opts.RunLoopMinSleep + } + if sleep > cs.opts.RunLoopMaxSleep { + return cs.opts.RunLoopMaxSleep + } + return sleep +} + +func (cs *CronService) getRuntimeOptions() RuntimeOptions { + cs.mu.RLock() + defer cs.mu.RUnlock() + return cs.opts +} + +func (cs *CronService) markJobRunning(jobID string) bool { + cs.mu.Lock() + defer cs.mu.Unlock() + if _, ok := cs.running[jobID]; ok { + return false + } + cs.running[jobID] = struct{}{} + return true +} + +func (cs *CronService) unmarkJobRunning(jobID string) { + cs.mu.Lock() + defer cs.mu.Unlock() + delete(cs.running, jobID) +} + +func computeAlignedEveryNext(baseMS, nowMS, intervalMS int64) int64 { + if intervalMS <= 0 { + return nowMS + } + next := baseMS + intervalMS + if next > nowMS { + return next + } + miss := (nowMS-next)/intervalMS + 1 + return next + miss*intervalMS +} + +func computeRetryBackoff(consecutiveFailures int64, base, max time.Duration) time.Duration { + if base <= 0 { + base = defaultRetryBackoffBase + } + if max <= 0 { + max = defaultRetryBackoffMax + } + if base > max { + base = max + } + + if consecutiveFailures <= 0 { + return base + } + shift := consecutiveFailures - 1 + if shift > 16 { + shift = 16 + } + mult := math.Pow(2, float64(shift)) + backoff := time.Duration(float64(base) * mult) + if backoff > max { + return max + } + return backoff +} + +func maxInt64(a, b int64) int64 { + if a > b { + return a + } + return b } func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { @@ -363,16 +661,33 @@ func (cs *CronService) Status() map[string]interface{} { defer cs.mu.RUnlock() var enabledCount int + var totalRuns int64 + var totalFailures int64 + var latestDelayMS int64 + var latestDurationMS int64 for _, job := range cs.store.Jobs { if job.Enabled { enabledCount++ } + totalRuns += job.State.TotalRuns + totalFailures += job.State.TotalFailures + if job.State.LastScheduleDelayMS > latestDelayMS { + latestDelayMS = job.State.LastScheduleDelayMS + } + if job.State.LastDurationMS > latestDurationMS { + latestDurationMS = job.State.LastDurationMS + } } return map[string]interface{}{ - "enabled": cs.runner.Running(), - "jobs": len(cs.store.Jobs), - "nextWakeAtMS": cs.getNextWakeMS(), + "enabled": cs.runner.Running(), + "jobs": len(cs.store.Jobs), + "nextWakeAtMS": cs.getNextWakeMS(), + "enabledJobs": enabledCount, + "totalRuns": totalRuns, + "totalFailures": totalFailures, + "latestDelayMs": latestDelayMS, + "latestDurationMs": latestDurationMS, } } diff --git a/pkg/cron/service_test.go b/pkg/cron/service_test.go new file mode 100644 index 0000000..ef64fee --- /dev/null +++ b/pkg/cron/service_test.go @@ -0,0 +1,236 @@ +package cron + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" +) + +func TestComputeAlignedEveryNext(t *testing.T) { + base := int64(1_000) + interval := int64(1_000) + + next := computeAlignedEveryNext(base, 1_100, interval) + if next != 2_000 { + t.Fatalf("unexpected next: %d", next) + } + + next = computeAlignedEveryNext(base, 2_500, interval) + if next != 3_000 { + t.Fatalf("unexpected next after missed windows: %d", next) + } +} + +func TestComputeRetryBackoff(t *testing.T) { + opts := DefaultRuntimeOptions() + if got := computeRetryBackoff(1, opts.RetryBackoffBase, opts.RetryBackoffMax); got != opts.RetryBackoffBase { + t.Fatalf("unexpected backoff for 1: %s", got) + } + + if got := computeRetryBackoff(2, opts.RetryBackoffBase, opts.RetryBackoffMax); got != 2*opts.RetryBackoffBase { + t.Fatalf("unexpected backoff for 2: %s", got) + } + + got := computeRetryBackoff(20, opts.RetryBackoffBase, opts.RetryBackoffMax) + if got != opts.RetryBackoffMax { + t.Fatalf("backoff should cap at %s, got %s", opts.RetryBackoffMax, got) + } +} + +func TestNextSleepDuration(t *testing.T) { + cs := &CronService{ + opts: DefaultRuntimeOptions(), + running: map[string]struct{}{}, + store: &CronStore{ + Jobs: []CronJob{}, + }, + } + + if got := cs.nextSleepDuration(time.Now()); got != cs.opts.RunLoopMaxSleep { + t.Fatalf("expected max sleep when no jobs, got %s", got) + } + + nowMS := time.Now().UnixMilli() + soon := nowMS + 100 + cs.store.Jobs = []CronJob{ + { + ID: "1", + Enabled: true, + State: CronJobState{ + NextRunAtMS: &soon, + }, + }, + } + + got := cs.nextSleepDuration(time.Now()) + if got != cs.opts.RunLoopMinSleep { + t.Fatalf("expected min sleep for near due jobs, got %s", got) + } +} + +func TestCheckJobs_NoConcurrentRunForSameJob(t *testing.T) { + var running int32 + var maxRunning int32 + var calls int32 + + storePath := filepath.Join(t.TempDir(), "jobs.json") + cs := NewCronService(storePath, func(job *CronJob) (string, error) { + cur := atomic.AddInt32(&running, 1) + for { + prev := atomic.LoadInt32(&maxRunning) + if cur <= prev || atomic.CompareAndSwapInt32(&maxRunning, prev, cur) { + break + } + } + time.Sleep(120 * time.Millisecond) + atomic.AddInt32(&running, -1) + atomic.AddInt32(&calls, 1) + return "ok", nil + }) + cs.SetRuntimeOptions(RuntimeOptions{ + RunLoopMinSleep: time.Second, + RunLoopMaxSleep: 2 * time.Second, + RetryBackoffBase: time.Second, + RetryBackoffMax: 5 * time.Second, + MaxConsecutiveFailureRetries: 1, + MaxWorkers: 4, + }) + + now := time.Now().UnixMilli() + every := int64(60_000) + cs.mu.Lock() + cs.store.Jobs = []CronJob{ + { + ID: "job-1", + Enabled: true, + Schedule: CronSchedule{ + Kind: "every", + EveryMS: &every, + }, + State: CronJobState{ + NextRunAtMS: &now, + }, + }, + } + cs.mu.Unlock() + + cs.runner.Start(func(stop <-chan struct{}) { <-stop }) + defer cs.runner.Stop() + + go cs.checkJobs() + time.Sleep(10 * time.Millisecond) + cs.checkJobs() + time.Sleep(220 * time.Millisecond) + + if atomic.LoadInt32(&maxRunning) > 1 { + t.Fatalf("same job executed concurrently, max running=%d", atomic.LoadInt32(&maxRunning)) + } + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("expected exactly one execution, got %d", atomic.LoadInt32(&calls)) + } +} + +func TestSetRuntimeOptions_AffectsRetryBackoff(t *testing.T) { + storePath := filepath.Join(t.TempDir(), "jobs.json") + cs := NewCronService(storePath, func(job *CronJob) (string, error) { + return "", fmt.Errorf("fail") + }) + cs.SetRuntimeOptions(RuntimeOptions{ + RunLoopMinSleep: time.Second, + RunLoopMaxSleep: 2 * time.Second, + RetryBackoffBase: 2 * time.Second, + RetryBackoffMax: 2 * time.Second, + MaxConsecutiveFailureRetries: 10, + MaxWorkers: 1, + }) + + now := time.Now().UnixMilli() + every := int64(60_000) + cs.mu.Lock() + cs.store.Jobs = []CronJob{ + { + ID: "job-1", + Enabled: true, + Schedule: CronSchedule{ + Kind: "every", + EveryMS: &every, + }, + State: CronJobState{ + NextRunAtMS: &now, + }, + }, + } + cs.mu.Unlock() + + cs.runner.Start(func(stop <-chan struct{}) { <-stop }) + defer cs.runner.Stop() + + before := time.Now().UnixMilli() + cs.checkJobs() + cs.mu.RLock() + next1 := *cs.store.Jobs[0].State.NextRunAtMS + cs.mu.RUnlock() + delta1 := next1 - before + if delta1 < 1800 || delta1 > 3500 { + t.Fatalf("expected retry around 2s, got %dms", delta1) + } + + cs.SetRuntimeOptions(RuntimeOptions{ + RunLoopMinSleep: time.Second, + RunLoopMaxSleep: 2 * time.Second, + RetryBackoffBase: 5 * time.Second, + RetryBackoffMax: 5 * time.Second, + MaxConsecutiveFailureRetries: 10, + MaxWorkers: 1, + }) + + now2 := time.Now().UnixMilli() + cs.mu.Lock() + cs.store.Jobs[0].State.NextRunAtMS = &now2 + cs.mu.Unlock() + + before = time.Now().UnixMilli() + cs.checkJobs() + cs.mu.RLock() + next2 := *cs.store.Jobs[0].State.NextRunAtMS + cs.mu.RUnlock() + delta2 := next2 - before + if delta2 < 4800 || delta2 > 6500 { + t.Fatalf("expected retry around 5s after hot update, got %dms", delta2) + } +} + +func TestSaveStore_IsAtomicAndValidJSON(t *testing.T) { + dir := t.TempDir() + storePath := filepath.Join(dir, "jobs.json") + cs := NewCronService(storePath, nil) + + at := time.Now().Add(10 * time.Minute).UnixMilli() + _, err := cs.AddJob("atomic-write", CronSchedule{ + Kind: "at", + AtMS: &at, + }, "hello", false, "", "") + if err != nil { + t.Fatalf("AddJob failed: %v", err) + } + + if _, err := os.Stat(storePath + ".tmp"); err == nil { + t.Fatalf("unexpected temp file left behind") + } + + data, err := os.ReadFile(storePath) + if err != nil { + t.Fatalf("read store failed: %v", err) + } + var parsed CronStore + if err := json.Unmarshal(data, &parsed); err != nil { + t.Fatalf("invalid json store: %v", err) + } + if len(parsed.Jobs) != 1 { + t.Fatalf("expected 1 job, got %d", len(parsed.Jobs)) + } +}