Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Jitter #525

Open
wants to merge 12 commits into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ _cgo_export.*
_testmain.go

*.exe
.vscode
2 changes: 1 addition & 1 deletion constantdelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestConstantDelayNext(t *testing.T) {
{"Mon Jul 9 23:35:51 2012", 25*time.Hour + 44*time.Minute + 24*time.Second, "Thu Jul 11 01:20:15 2012"},

// Wrap around months
{"Mon Jul 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Oct 9 00:00 2012"},
{"Mon Jun 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Sep 9 00:00 2012"}, // Don't do JUL->OCT otherwise daylight savings breaks the test in some time zones...

// Wrap around minute, hour, day, month, and year
{"Mon Dec 31 23:59:45 2012", 15 * time.Second, "Tue Jan 1 00:00:00 2013"},
Expand Down
133 changes: 101 additions & 32 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cron

import (
"context"
"math"
"math/rand"
"sort"
"sync"
"time"
Expand All @@ -11,19 +13,21 @@ import (
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries []*Entry
chain Chain
stop chan struct{}
add chan *Entry
remove chan EntryID
snapshot chan chan []Entry
running bool
logger Logger
runningMu sync.Mutex
location *time.Location
parser Parser
nextID EntryID
jobWaiter sync.WaitGroup
entries []*Entry
chain Chain
stop chan struct{}
add chan *Entry
remove chan EntryID
snapshot chan chan []Entry
running bool
logger Logger
runningMu sync.Mutex
location *time.Location
parser Parser
nextID EntryID
jobWaiter sync.WaitGroup
timeCallback func() time.Time
recalculateTimer chan struct{}
}

// Job is an interface for submitted cron jobs.
Expand Down Expand Up @@ -64,11 +68,32 @@ type Entry struct {
// It is kept around so that user code that needs to get at the job later,
// e.g. via Entries() can do so.
Job Job

// Add some jitter to the Entry's scheduled time. The actual jitter used
// will be randomly chosen within the range [0, Jitter]. The jitter will
// always be converted to a positive duration.
Jitter time.Duration
}

// Valid returns true if this is not the zero entry.
func (e Entry) Valid() bool { return e.ID != 0 }

// Add a random amount of jitter to the input time using the maximum jitter
// amount. The calculated jitter is on the closed interval [0, jitterMaximum]
// where the only exception to this rule is if the jitter duration is the
// maximum value for an int64 which is on the closed interval [0, Int64Max].
func calculateJitteredTime(now time.Time, jitterMaximum time.Duration) time.Time {
result := now
if jitterMaximum > 0 {
val := jitterMaximum.Nanoseconds()
if val < math.MinInt64 {
val = jitterMaximum.Nanoseconds() + 1
}
result = result.Add(time.Duration(rand.Int63n(val)))
}
return result
}

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
Expand Down Expand Up @@ -107,17 +132,19 @@ func (s byTime) Less(i, j int) bool {
// See "cron.With*" to modify the default behavior.
func New(opts ...Option) *Cron {
c := &Cron{
entries: nil,
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan chan []Entry),
remove: make(chan EntryID),
running: false,
runningMu: sync.Mutex{},
logger: DefaultLogger,
location: time.Local,
parser: standardParser,
entries: nil,
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan chan []Entry),
remove: make(chan EntryID),
running: false,
runningMu: sync.Mutex{},
logger: DefaultLogger,
location: time.Local,
parser: standardParser,
timeCallback: nil,
recalculateTimer: make(chan struct{}),
}
for _, opt := range opts {
opt(c)
Expand All @@ -134,23 +161,49 @@ func (f FuncJob) Run() { f() }
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
return c.AddJobWithJitter(spec, FuncJob(cmd), 0)
}

// AddFuncWithJitter adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFuncWithJitter(spec string, cmd func(), jitter time.Duration) (EntryID, error) {
return c.AddJobWithJitter(spec, FuncJob(cmd), jitter)
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
return c.AddJobWithJitter(spec, cmd, 0)
}

// AddJobWithJitter adds a Job to the Cron to be run on the given schedule with
// a specified amount of jitter for each invocation.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJobWithJitter(spec string, cmd Job, jitter time.Duration) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
return c.ScheduleWithJitter(schedule, cmd, jitter), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
return c.ScheduleWithJitter(schedule, cmd, 0)
}

// ScheduleWithJitter adds a Job to the Cron to be run on the given schedule
// with a specified amount of jitter for each invocation.
// The job is wrapped with the configured Chain.
func (c *Cron) ScheduleWithJitter(schedule Schedule, cmd Job, jitter time.Duration) EntryID {
if jitter < 0 {
jitter = -jitter
}

c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
Expand All @@ -159,6 +212,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
Jitter: jitter,
}
if !c.running {
c.entries = append(c.entries, entry)
Expand Down Expand Up @@ -229,6 +283,10 @@ func (c *Cron) Run() {
c.run()
}

func (c *Cron) RecalculateNextEvent() {
c.recalculateTimer <- struct{}{}
}

// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
Expand All @@ -237,7 +295,7 @@ func (c *Cron) run() {
// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
entry.Next = calculateJitteredTime(entry.Schedule.Next(now), entry.Jitter)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}

Expand All @@ -256,8 +314,9 @@ func (c *Cron) run() {

for {
select {
case now = <-timer.C:
now = now.In(c.location)
case <-timer.C:
// Note: we can't just use the value which comes back from timer.C, because that won't respect the timeCallback, if one is set.
now = c.now()
c.logger.Info("wake", "now", now)

// Run every entry whose next time was less than now
Expand All @@ -267,14 +326,14 @@ func (c *Cron) run() {
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
e.Next = calculateJitteredTime(e.Schedule.Next(now), e.Jitter)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}

case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
newEntry.Next = calculateJitteredTime(newEntry.Schedule.Next(now), newEntry.Jitter)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

Expand All @@ -292,6 +351,9 @@ func (c *Cron) run() {
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
case <-c.recalculateTimer:
now = c.now()
break
}

break
Expand All @@ -310,7 +372,14 @@ func (c *Cron) startJob(j Job) {

// now returns current time in c location
func (c *Cron) now() time.Time {
return time.Now().In(c.location)
var result time.Time
if c.timeCallback == nil {
result = time.Now()
} else {
result = c.timeCallback()
}
result = result.In(c.location)
return result
}

// Stop stops the cron scheduler if it is running; otherwise it does nothing.
Expand Down
7 changes: 7 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ func WithLogger(logger Logger) Option {
c.logger = logger
}
}

// WithTimeCallback uses the provided time function callback for getting the current time.
func WithTimeCallback(timeCallback func() time.Time) Option {
return func(c *Cron) {
c.timeCallback = timeCallback
}
}