Files
clawgo/pkg/cron/service_test.go

262 lines
6.4 KiB
Go

package cron
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"
)
func TestComputeAlignedEveryNext(t *testing.T) {
base := int64(1_000)
interval := int64(1_000)
next := computeAlignedEveryNext(base, 1_100, interval)
if next != 2_000 {
t.Fatalf("unexpected next: %d", next)
}
next = computeAlignedEveryNext(base, 2_500, interval)
if next != 3_000 {
t.Fatalf("unexpected next after missed windows: %d", next)
}
}
func TestComputeRetryBackoff(t *testing.T) {
opts := DefaultRuntimeOptions()
if got := computeRetryBackoff(1, opts.RetryBackoffBase, opts.RetryBackoffMax); got != opts.RetryBackoffBase {
t.Fatalf("unexpected backoff for 1: %s", got)
}
if got := computeRetryBackoff(2, opts.RetryBackoffBase, opts.RetryBackoffMax); got != 2*opts.RetryBackoffBase {
t.Fatalf("unexpected backoff for 2: %s", got)
}
got := computeRetryBackoff(20, opts.RetryBackoffBase, opts.RetryBackoffMax)
if got != opts.RetryBackoffMax {
t.Fatalf("backoff should cap at %s, got %s", opts.RetryBackoffMax, got)
}
}
func TestNextSleepDuration(t *testing.T) {
cs := &CronService{
opts: DefaultRuntimeOptions(),
running: map[string]struct{}{},
store: &CronStore{
Jobs: []CronJob{},
},
}
if got := cs.nextSleepDuration(time.Now()); got != cs.opts.RunLoopMaxSleep {
t.Fatalf("expected max sleep when no jobs, got %s", got)
}
nowMS := time.Now().UnixMilli()
soon := nowMS + 100
cs.store.Jobs = []CronJob{
{
ID: "1",
Enabled: true,
State: CronJobState{
NextRunAtMS: &soon,
},
},
}
got := cs.nextSleepDuration(time.Now())
if got != cs.opts.RunLoopMinSleep {
t.Fatalf("expected min sleep for near due jobs, got %s", got)
}
}
func TestNextSleepDuration_UsesProvidedNow(t *testing.T) {
cs := &CronService{
opts: RuntimeOptions{
RunLoopMinSleep: 1 * time.Second,
RunLoopMaxSleep: 30 * time.Second,
},
running: map[string]struct{}{},
store: &CronStore{Jobs: []CronJob{}},
}
now := time.UnixMilli(10_000)
next := int64(15_000)
cs.store.Jobs = []CronJob{{
ID: "1",
Enabled: true,
State: CronJobState{
NextRunAtMS: &next,
},
}}
if got := cs.nextSleepDuration(now); got != 5*time.Second {
t.Fatalf("expected 5s sleep from provided now, got %s", got)
}
}
func TestCheckJobs_NoConcurrentRunForSameJob(t *testing.T) {
var running int32
var maxRunning int32
var calls int32
storePath := filepath.Join(t.TempDir(), "jobs.json")
cs := NewCronService(storePath, func(job *CronJob) (string, error) {
cur := atomic.AddInt32(&running, 1)
for {
prev := atomic.LoadInt32(&maxRunning)
if cur <= prev || atomic.CompareAndSwapInt32(&maxRunning, prev, cur) {
break
}
}
time.Sleep(120 * time.Millisecond)
atomic.AddInt32(&running, -1)
atomic.AddInt32(&calls, 1)
return "ok", nil
})
cs.SetRuntimeOptions(RuntimeOptions{
RunLoopMinSleep: time.Second,
RunLoopMaxSleep: 2 * time.Second,
RetryBackoffBase: time.Second,
RetryBackoffMax: 5 * time.Second,
MaxConsecutiveFailureRetries: 1,
MaxWorkers: 4,
})
now := time.Now().UnixMilli()
every := int64(60_000)
cs.mu.Lock()
cs.store.Jobs = []CronJob{
{
ID: "job-1",
Enabled: true,
Schedule: CronSchedule{
Kind: "every",
EveryMS: &every,
},
State: CronJobState{
NextRunAtMS: &now,
},
},
}
cs.mu.Unlock()
cs.runner.Start(func(stop <-chan struct{}) { <-stop })
defer cs.runner.Stop()
go cs.checkJobs()
time.Sleep(10 * time.Millisecond)
cs.checkJobs()
time.Sleep(220 * time.Millisecond)
if atomic.LoadInt32(&maxRunning) > 1 {
t.Fatalf("same job executed concurrently, max running=%d", atomic.LoadInt32(&maxRunning))
}
if atomic.LoadInt32(&calls) != 1 {
t.Fatalf("expected exactly one execution, got %d", atomic.LoadInt32(&calls))
}
}
func TestSetRuntimeOptions_AffectsRetryBackoff(t *testing.T) {
storePath := filepath.Join(t.TempDir(), "jobs.json")
cs := NewCronService(storePath, func(job *CronJob) (string, error) {
return "", fmt.Errorf("fail")
})
cs.SetRuntimeOptions(RuntimeOptions{
RunLoopMinSleep: time.Second,
RunLoopMaxSleep: 2 * time.Second,
RetryBackoffBase: 2 * time.Second,
RetryBackoffMax: 2 * time.Second,
MaxConsecutiveFailureRetries: 10,
MaxWorkers: 1,
})
now := time.Now().UnixMilli()
every := int64(60_000)
cs.mu.Lock()
cs.store.Jobs = []CronJob{
{
ID: "job-1",
Enabled: true,
Schedule: CronSchedule{
Kind: "every",
EveryMS: &every,
},
State: CronJobState{
NextRunAtMS: &now,
},
},
}
cs.mu.Unlock()
cs.runner.Start(func(stop <-chan struct{}) { <-stop })
defer cs.runner.Stop()
before := time.Now().UnixMilli()
cs.checkJobs()
cs.mu.RLock()
next1 := *cs.store.Jobs[0].State.NextRunAtMS
cs.mu.RUnlock()
delta1 := next1 - before
if delta1 < 1800 || delta1 > 3500 {
t.Fatalf("expected retry around 2s, got %dms", delta1)
}
cs.SetRuntimeOptions(RuntimeOptions{
RunLoopMinSleep: time.Second,
RunLoopMaxSleep: 2 * time.Second,
RetryBackoffBase: 5 * time.Second,
RetryBackoffMax: 5 * time.Second,
MaxConsecutiveFailureRetries: 10,
MaxWorkers: 1,
})
now2 := time.Now().UnixMilli()
cs.mu.Lock()
cs.store.Jobs[0].State.NextRunAtMS = &now2
cs.mu.Unlock()
before = time.Now().UnixMilli()
cs.checkJobs()
cs.mu.RLock()
next2 := *cs.store.Jobs[0].State.NextRunAtMS
cs.mu.RUnlock()
delta2 := next2 - before
if delta2 < 4800 || delta2 > 6500 {
t.Fatalf("expected retry around 5s after hot update, got %dms", delta2)
}
}
func TestSaveStore_IsAtomicAndValidJSON(t *testing.T) {
dir := t.TempDir()
storePath := filepath.Join(dir, "jobs.json")
cs := NewCronService(storePath, nil)
at := time.Now().Add(10 * time.Minute).UnixMilli()
_, err := cs.AddJob("atomic-write", CronSchedule{
Kind: "at",
AtMS: &at,
}, "hello", false, "", "")
if err != nil {
t.Fatalf("AddJob failed: %v", err)
}
if _, err := os.Stat(storePath + ".tmp"); err == nil {
t.Fatalf("unexpected temp file left behind")
}
data, err := os.ReadFile(storePath)
if err != nil {
t.Fatalf("read store failed: %v", err)
}
var parsed CronStore
if err := json.Unmarshal(data, &parsed); err != nil {
t.Fatalf("invalid json store: %v", err)
}
if len(parsed.Jobs) != 1 {
t.Fatalf("expected 1 job, got %d", len(parsed.Jobs))
}
}