Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for using mock clock in cron #473

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"runtime"
"sync"
"time"

"github.com/benbjohnson/clock"
)

// JobWrapper decorates the given Job with some behavior.
Expand All @@ -24,9 +26,12 @@ func NewChain(c ...JobWrapper) Chain {
// Then decorates the given job with all JobWrappers in the chain.
//
// This:
// NewChain(m1, m2, m3).Then(job)
//
// NewChain(m1, m2, m3).Then(job)
//
// is equivalent to:
// m1(m2(m3(job)))
//
// m1(m2(m3(job)))
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
Expand Down Expand Up @@ -59,13 +64,19 @@ func Recover(logger Logger) JobWrapper {
// previous one is complete. Jobs running after a delay of more than a minute
// have the delay logged at Info.
func DelayIfStillRunning(logger Logger) JobWrapper {
return DelayIfStillRunningWithClock(logger, clock.New())
}

// DelayIfStillRunningWithClock behaves identically to DelayIfStillRunning but
// uses the provided Clock for measuring the delay, for use in testing.
func DelayIfStillRunningWithClock(logger Logger, clk clock.Clock) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
start := time.Now()
start := clk.Now()
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
if dur := clk.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
Expand All @@ -77,13 +88,13 @@ func DelayIfStillRunning(logger Logger) JobWrapper {
// still running. It logs skips to the given logger at Info level.
func SkipIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var ch = make(chan struct{}, 1)
ch := make(chan struct{}, 1)
ch <- struct{}{}
return FuncJob(func() {
select {
case v := <-ch:
defer func() { ch <- v }()
j.Run()
ch <- v
default:
logger.Info("skip")
}
Expand Down
32 changes: 16 additions & 16 deletions chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
go wrappedJob.Run()
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
time.Sleep(50 * time.Millisecond) // Give the job 50 ms to complete.
if c := j.Done(); c != 1 {
t.Errorf("expected job run once, immediately, got %d", c)
}
Expand All @@ -115,35 +115,35 @@ func TestChainDelayIfStillRunning(t *testing.T) {
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
time.Sleep(10* time.Millisecond)
go wrappedJob.Run()
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
time.Sleep(100 * time.Millisecond) // Give both jobs 100 ms to complete.
if c := j.Done(); c != 2 {
t.Errorf("expected job run twice, immediately, got %d", c)
}
})

t.Run("second run delayed if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
j.delay = 100 * time.Millisecond
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)
go wrappedJob.Run()
}()

// After 5ms, the first job is still in progress, and the second job was
// After 50 ms, the first job is still in progress, and the second job was
// run but should be waiting for it to finish.
time.Sleep(5 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
started, done := j.Started(), j.Done()
if started != 1 || done != 0 {
t.Error("expected first job started, but not finished, got", started, done)
}

// Verify that the second job completes.
time.Sleep(25 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
started, done = j.Started(), j.Done()
if started != 2 || done != 2 {
t.Error("expected both jobs done, got", started, done)
Expand All @@ -158,7 +158,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go wrappedJob.Run()
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
time.Sleep(50 * time.Millisecond) // Give the job 50ms to complete.
if c := j.Done(); c != 1 {
t.Errorf("expected job run once, immediately, got %d", c)
}
Expand All @@ -169,35 +169,35 @@ func TestChainSkipIfStillRunning(t *testing.T) {
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
time.Sleep(10* time.Millisecond)
go wrappedJob.Run()
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
time.Sleep(100 * time.Millisecond) // Give both jobs 100ms to complete.
if c := j.Done(); c != 2 {
t.Errorf("expected job run twice, immediately, got %d", c)
}
})

t.Run("second run skipped if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
j.delay = 100 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)
go wrappedJob.Run()
}()

// After 5ms, the first job is still in progress, and the second job was
// After 50ms, the first job is still in progress, and the second job was
// aleady skipped.
time.Sleep(5 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
started, done := j.Started(), j.Done()
if started != 1 || done != 0 {
t.Error("expected first job started, but not finished, got", started, done)
}

// Verify that the first job completes and second does not run.
time.Sleep(25 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
started, done = j.Started(), j.Done()
if started != 1 || done != 1 {
t.Error("expected second job skipped, got", started, done)
Expand Down
32 changes: 18 additions & 14 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"sync"
"time"

"github.com/benbjohnson/clock"
)

// Cron keeps track of any number of entries, invoking the associated func as
Expand All @@ -24,6 +26,7 @@ type Cron struct {
parser ScheduleParser
nextID EntryID
jobWaiter sync.WaitGroup
clk clock.Clock
}

// ScheduleParser is an interface for schedule spec parsers that return a Schedule
Expand Down Expand Up @@ -97,17 +100,17 @@ func (s byTime) Less(i, j int) bool {
//
// Available Settings
//
// Time Zone
// Description: The time zone in which schedules are interpreted
// Default: time.Local
// Time Zone
// Description: The time zone in which schedules are interpreted
// Default: time.Local
//
// Parser
// Description: Parser converts cron spec strings into cron.Schedules.
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
// Parser
// Description: Parser converts cron spec strings into cron.Schedules.
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
//
// Chain
// Description: Wrap submitted jobs to customize behavior.
// Default: A chain that recovers panics and logs them to stderr.
// Chain
// Description: Wrap submitted jobs to customize behavior.
// Default: A chain that recovers panics and logs them to stderr.
//
// See "cron.With*" to modify the default behavior.
func New(opts ...Option) *Cron {
Expand All @@ -123,6 +126,7 @@ func New(opts ...Option) *Cron {
logger: DefaultLogger,
location: time.Local,
parser: standardParser,
clk: clock.New(),
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -250,13 +254,13 @@ func (c *Cron) run() {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))

var timer *time.Timer
var timer *clock.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
timer = c.clk.Timer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
timer = c.clk.Timer(c.entries[0].Next.Sub(now))
}

for {
Expand Down Expand Up @@ -315,7 +319,7 @@ func (c *Cron) startJob(j Job) {

// now returns current time in c location
func (c *Cron) now() time.Time {
return time.Now().In(c.location)
return c.clk.Now().In(c.location)
}

// Stop stops the cron scheduler if it is running; otherwise it does nothing.
Expand All @@ -337,7 +341,7 @@ func (c *Cron) Stop() context.Context {

// entrySnapshot returns a copy of the current cron entry list.
func (c *Cron) entrySnapshot() []Entry {
var entries = make([]Entry, len(c.entries))
entries := make([]Entry, len(c.entries))
for i, e := range c.entries {
entries[i] = *e
}
Expand Down
24 changes: 22 additions & 2 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"testing"
"time"

"github.com/benbjohnson/clock"
)

// Many tests schedule a job for every second, and then wait at most a second
Expand Down Expand Up @@ -388,7 +390,7 @@ func TestBlockingRun(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() { wg.Done() })

var unblockChan = make(chan struct{})
unblockChan := make(chan struct{})

go func() {
cron.Run()
Expand All @@ -407,7 +409,7 @@ func TestBlockingRun(t *testing.T) {

// Test that double-running is a no-op
func TestStartNoop(t *testing.T) {
var tickChan = make(chan struct{}, 2)
tickChan := make(chan struct{}, 2)

cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() {
Expand Down Expand Up @@ -667,8 +669,26 @@ func TestStopAndWait(t *testing.T) {
case <-time.After(time.Millisecond):
t.Error("context not done even when cron Stop is completed")
}
})
}

func TestMockClock(t *testing.T) {
clk := clock.NewMock()
clk.Set(time.Now())
cron := New(WithClock(clk))
counter := 0
cron.AddFunc("@every 1s", func() {
counter += 1
})
cron.Start()
defer cron.Stop()
for i := 0; i <= 10; i++ {
clk.Add(1 * time.Second)
}
time.Sleep(100 * time.Millisecond)
if counter != 10 {
t.Errorf("expected 10 calls, got %d", counter)
}
}

func TestMultiThreadedStartAndStop(t *testing.T) {
Expand Down
Loading