fix: harden cron persistence error visibility and timing testability

This commit is contained in:
野生派Coder~
2026-02-15 23:27:13 +08:00
parent 05821ff2fd
commit 7d89ee51be
2 changed files with 50 additions and 11 deletions

View File

@@ -118,13 +118,14 @@ type CronStore struct {
type JobHandler func(job *CronJob) (string, error) type JobHandler func(job *CronJob) (string, error)
type CronService struct { type CronService struct {
storePath string storePath string
store *CronStore store *CronStore
onJob JobHandler onJob JobHandler
opts RuntimeOptions opts RuntimeOptions
running map[string]struct{} running map[string]struct{}
mu sync.RWMutex lastSaveError string
runner *lifecycle.LoopRunner mu sync.RWMutex
runner *lifecycle.LoopRunner
} }
func NewCronService(storePath string, onJob JobHandler) *CronService { func NewCronService(storePath string, onJob JobHandler) *CronService {
@@ -244,7 +245,11 @@ func (cs *CronService) checkJobs() {
cs.mu.Lock() cs.mu.Lock()
defer cs.mu.Unlock() defer cs.mu.Unlock()
_ = cs.saveStore() if err := cs.saveStore(); err != nil {
cs.lastSaveError = err.Error()
} else {
cs.lastSaveError = ""
}
} }
func (cs *CronService) executeJobByID(jobID string) bool { func (cs *CronService) executeJobByID(jobID string) bool {
@@ -503,7 +508,11 @@ func (cs *CronService) RemoveJob(jobID string) bool {
func (cs *CronService) removeJobUnsafe(jobID string) bool { func (cs *CronService) removeJobUnsafe(jobID string) bool {
removed := cs.removeJobByIDUnsafe(jobID) removed := cs.removeJobByIDUnsafe(jobID)
if removed { if removed {
_ = cs.saveStore() if err := cs.saveStore(); err != nil {
cs.lastSaveError = err.Error()
} else {
cs.lastSaveError = ""
}
} }
return removed return removed
} }
@@ -537,7 +546,7 @@ func (cs *CronService) nextSleepDuration(now time.Time) time.Duration {
return cs.opts.RunLoopMaxSleep return cs.opts.RunLoopMaxSleep
} }
sleep := time.Until(time.UnixMilli(*nextWake)) sleep := time.UnixMilli(*nextWake).Sub(now)
if sleep < cs.opts.RunLoopMinSleep { if sleep < cs.opts.RunLoopMinSleep {
return cs.opts.RunLoopMinSleep return cs.opts.RunLoopMinSleep
} }
@@ -630,7 +639,11 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob {
job.State.NextRunAtMS = nil job.State.NextRunAtMS = nil
} }
cs.saveStore() if err := cs.saveStore(); err != nil {
cs.lastSaveError = err.Error()
} else {
cs.lastSaveError = ""
}
return job return job
} }
} }
@@ -688,6 +701,7 @@ func (cs *CronService) Status() map[string]interface{} {
"totalFailures": totalFailures, "totalFailures": totalFailures,
"latestDelayMs": latestDelayMS, "latestDelayMs": latestDelayMS,
"latestDurationMs": latestDurationMS, "latestDurationMs": latestDurationMS,
"lastSaveError": cs.lastSaveError,
} }
} }

View File

@@ -72,6 +72,31 @@ func TestNextSleepDuration(t *testing.T) {
} }
} }
func TestNextSleepDuration_UsesProvidedNow(t *testing.T) {
cs := &CronService{
opts: RuntimeOptions{
RunLoopMinSleep: 1 * time.Second,
RunLoopMaxSleep: 30 * time.Second,
},
running: map[string]struct{}{},
store: &CronStore{Jobs: []CronJob{}},
}
now := time.UnixMilli(10_000)
next := int64(15_000)
cs.store.Jobs = []CronJob{{
ID: "1",
Enabled: true,
State: CronJobState{
NextRunAtMS: &next,
},
}}
if got := cs.nextSleepDuration(now); got != 5*time.Second {
t.Fatalf("expected 5s sleep from provided now, got %s", got)
}
}
func TestCheckJobs_NoConcurrentRunForSameJob(t *testing.T) { func TestCheckJobs_NoConcurrentRunForSameJob(t *testing.T) {
var running int32 var running int32
var maxRunning int32 var maxRunning int32