diff --git a/pkg/cron/service.go b/pkg/cron/service.go index 2b5d864..13c16f2 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -118,13 +118,14 @@ type CronStore struct { type JobHandler func(job *CronJob) (string, error) type CronService struct { - storePath string - store *CronStore - onJob JobHandler - opts RuntimeOptions - running map[string]struct{} - mu sync.RWMutex - runner *lifecycle.LoopRunner + storePath string + store *CronStore + onJob JobHandler + opts RuntimeOptions + running map[string]struct{} + lastSaveError string + mu sync.RWMutex + runner *lifecycle.LoopRunner } func NewCronService(storePath string, onJob JobHandler) *CronService { @@ -244,7 +245,11 @@ func (cs *CronService) checkJobs() { cs.mu.Lock() defer cs.mu.Unlock() - _ = cs.saveStore() + if err := cs.saveStore(); err != nil { + cs.lastSaveError = err.Error() + } else { + cs.lastSaveError = "" + } } func (cs *CronService) executeJobByID(jobID string) bool { @@ -503,7 +508,11 @@ func (cs *CronService) RemoveJob(jobID string) bool { func (cs *CronService) removeJobUnsafe(jobID string) bool { removed := cs.removeJobByIDUnsafe(jobID) if removed { - _ = cs.saveStore() + if err := cs.saveStore(); err != nil { + cs.lastSaveError = err.Error() + } else { + cs.lastSaveError = "" + } } return removed } @@ -537,7 +546,7 @@ func (cs *CronService) nextSleepDuration(now time.Time) time.Duration { return cs.opts.RunLoopMaxSleep } - sleep := time.Until(time.UnixMilli(*nextWake)) + sleep := time.UnixMilli(*nextWake).Sub(now) if sleep < cs.opts.RunLoopMinSleep { return cs.opts.RunLoopMinSleep } @@ -630,7 +639,11 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { job.State.NextRunAtMS = nil } - cs.saveStore() + if err := cs.saveStore(); err != nil { + cs.lastSaveError = err.Error() + } else { + cs.lastSaveError = "" + } return job } } @@ -688,6 +701,7 @@ func (cs *CronService) Status() map[string]interface{} { "totalFailures": totalFailures, "latestDelayMs": latestDelayMS, "latestDurationMs": latestDurationMS, + "lastSaveError": cs.lastSaveError, } } diff --git a/pkg/cron/service_test.go b/pkg/cron/service_test.go index ef64fee..a8f0b4b 100644 --- a/pkg/cron/service_test.go +++ b/pkg/cron/service_test.go @@ -72,6 +72,31 @@ func TestNextSleepDuration(t *testing.T) { } } +func TestNextSleepDuration_UsesProvidedNow(t *testing.T) { + cs := &CronService{ + opts: RuntimeOptions{ + RunLoopMinSleep: 1 * time.Second, + RunLoopMaxSleep: 30 * time.Second, + }, + running: map[string]struct{}{}, + store: &CronStore{Jobs: []CronJob{}}, + } + + now := time.UnixMilli(10_000) + next := int64(15_000) + cs.store.Jobs = []CronJob{{ + ID: "1", + Enabled: true, + State: CronJobState{ + NextRunAtMS: &next, + }, + }} + + if got := cs.nextSleepDuration(now); got != 5*time.Second { + t.Fatalf("expected 5s sleep from provided now, got %s", got) + } +} + func TestCheckJobs_NoConcurrentRunForSameJob(t *testing.T) { var running int32 var maxRunning int32