Skip to content

Commit

Permalink
Adding support for enqueue unique by specified key (gocraft#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
David Long authored and shdunning committed Sep 12, 2018
1 parent 5db65ca commit c85b71e
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 64 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) //
job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // job != nil (diff id)
```
Alternatively, you can provide your own key for making a job unique. When another job is enqueued with the same key as a job already in the queue, it will simply update the arguments.
```go
enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUniqueByKey("clear_cache", work.Q{"object_id_": "123"}, map[string]interface{}{"my_key": "586"})
job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_": "789"}, map[string]interface{}{"my_key": "586"})
```
For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal).
### Periodic Enqueueing (Cron)
You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
Expand Down
140 changes: 85 additions & 55 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,83 +105,48 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
if err != nil {
return nil, err
}
return e.EnqueueUniqueByKey(jobName, args, nil)
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}
// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
return e.EnqueueUniqueInByKey(jobName, secondsFromNow, args, nil)
}

rawJSON, err := job.serialize()
// EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments.
// The already-enqueued job can be in the normal work queue or in the scheduled job queue.
// Once a worker begins processing a job, another job with the same name and key can be enqueued again.
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
}

conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return nil, err
}

scriptArgs := make([]interface{}, 0, 3)
scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
res, err := enqueue(nil)

res, err := redis.String(e.enqueueUniqueScript.Do(conn, scriptArgs...))
if res == "ok" && err == nil {
return job, nil
}
return nil, err
}

// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
if err != nil {
return nil, err
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}

rawJSON, err := job.serialize()
// EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
// Subsequent calls with same key will update arguments
func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
}

conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return nil, err
}

scheduledJob := &ScheduledJob{
RunAt: nowEpochSeconds() + secondsFromNow,
Job: job,
}

scriptArgs := make([]interface{}, 0, 4)
scriptArgs = append(scriptArgs, redisKeyScheduled(e.Namespace)) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
scriptArgs = append(scriptArgs, scheduledJob.RunAt) // ARGV[2]

res, err := redis.String(e.enqueueUniqueInScript.Do(conn, scriptArgs...))

res, err := enqueue(&scheduledJob.RunAt)
if res == "ok" && err == nil {
return scheduledJob, nil
}
Expand Down Expand Up @@ -213,3 +178,68 @@ func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error {

return nil
}

type enqueueFnType func(*int64) (string, error)

func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (enqueueFnType, *Job, error) {
useDefaultKeys := false
if keyMap == nil {
useDefaultKeys = true
keyMap = args
}

uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, keyMap)
if err != nil {
return nil, nil, err
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
UniqueKey: uniqueKey,
}

rawJSON, err := job.serialize()
if err != nil {
return nil, nil, err
}

enqueueFn := func(runAt *int64) (string, error) {
conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return "", err
}

scriptArgs := []interface{}{}
script := e.enqueueUniqueScript

scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
if useDefaultKeys {
// keying on arguments so arguments can't be updated
// we'll just get them off the original job so to save space, make this "1"
scriptArgs = append(scriptArgs, "1") // ARGV[2]
} else {
// we'll use this for updated arguments since the job on the queue
// doesn't get updated
scriptArgs = append(scriptArgs, rawJSON) // ARGV[2]
}

if runAt != nil { // Scheduled job so different job queue with additional arg
scriptArgs[0] = redisKeyScheduled(e.Namespace) // KEY[1]
scriptArgs = append(scriptArgs, *runAt) // ARGV[3]

script = e.enqueueUniqueInScript
}

return redis.String(script.Do(conn, scriptArgs...))
}

return enqueueFn, job, nil
}
124 changes: 124 additions & 0 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,127 @@ func TestEnqueueUniqueIn(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, job)
}

func TestEnqueueUniqueByKey(t *testing.T) {
var arg3 string
var arg4 string

pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)
var mutex = &sync.Mutex{}
job, err := enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "foo"}, Q{"key": "123"})
assert.NoError(t, err)
if assert.NotNil(t, job) {
assert.Equal(t, "wat", job.Name)
assert.True(t, len(job.ID) > 10) // Something is in it
assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "foo", job.ArgString("b"))
assert.EqualValues(t, 3, job.ArgInt64("a"))
assert.NoError(t, job.ArgError())
}

job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "bar"}, Q{"key": "123"})
assert.NoError(t, err)
assert.Nil(t, job)

job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 4, "b": "baz"}, Q{"key": "124"})
assert.NoError(t, err)
assert.NotNil(t, job)

job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "125"})
assert.NoError(t, err)
assert.NotNil(t, job)

// Process the queues. Ensure the right number of jobs were processed
var wats, taws int64
wp := NewWorkerPool(TestContext{}, 3, ns, pool)
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
mutex.Lock()
argA := job.Args["a"].(float64)
argB := job.Args["b"].(string)
if argA == 3 {
arg3 = argB
}
if argA == 4 {
arg4 = argB
}

wats++
mutex.Unlock()
return nil
})
wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
mutex.Lock()
taws++
mutex.Unlock()
return fmt.Errorf("ohno")
})
wp.Start()
wp.Drain()
wp.Stop()

assert.EqualValues(t, 2, wats)
assert.EqualValues(t, 1, taws)

// Check that arguments got updated to new value
assert.EqualValues(t, "bar", arg3)
assert.EqualValues(t, "baz", arg4)

// Enqueue again. Ensure we can.
job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
assert.NotNil(t, job)

job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "coolio"}, Q{"key": "124"})
assert.NoError(t, err)
assert.NotNil(t, job)

// Even though taw resulted in an error, we should still be able to re-queue it.
// This could result in multiple taws enqueued at the same time in a production system.
job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "123"})
assert.NoError(t, err)
assert.NotNil(t, job)
}

func EnqueueUniqueInByKey(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)

// Enqueue two unique jobs -- ensure one job sticks.
job, err := enqueuer.EnqueueUniqueInByKey("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
if assert.NotNil(t, job) {
assert.Equal(t, "wat", job.Name)
assert.True(t, len(job.ID) > 10) // Something is in it
assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "cool", job.ArgString("b"))
assert.EqualValues(t, 1, job.ArgInt64("a"))
assert.NoError(t, job.ArgError())
assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt)
}

job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
assert.Nil(t, job)

// Get the job
score, j := jobOnZset(pool, redisKeyScheduled(ns))

assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time
assert.True(t, score <= time.Now().Unix()+300)

assert.Equal(t, "wat", j.Name)
assert.True(t, len(j.ID) > 10) // Something is in it
assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "cool", j.ArgString("b"))
assert.EqualValues(t, 1, j.ArgInt64("a"))
assert.NoError(t, j.ArgError())
assert.True(t, j.Unique)
}
1 change: 1 addition & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Job struct {
EnqueuedAt int64 `json:"t"`
Args map[string]interface{} `json:"args"`
Unique bool `json:"unique,omitempty"`
UniqueKey string `json:"unique_key,omitempty"`

// Inputs when retrying
Fails int64 `json:"fails,omitempty"` // number of times this job has failed
Expand Down
14 changes: 10 additions & 4 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,22 +348,28 @@ return requeuedCount
// KEYS[1] = job queue to push onto
// KEYS[2] = Unique job's key. Test for existence and set if we push.
// ARGV[1] = job
// ARGV[2] = updated job or just a 1 if arguments don't update
var redisLuaEnqueueUnique = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then
redis.call('lpush', KEYS[1], ARGV[1])
return 'ok'
else
redis.call('set', KEYS[2], ARGV[2], 'EX', '86400')
end
return 'dup'
`

// KEYS[1] = scheduled job queue
// KEYS[2] = Unique job's key. Test for existence and set if we push.
// ARGV[1] = job
// ARGV[2] = epoch seconds for job to be run at
// ARGV[2] = updated job or just a 1 if arguments don't update
// ARGV[3] = epoch seconds for job to be run at
var redisLuaEnqueueUniqueIn = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then
redis.call('zadd', KEYS[1], ARGV[3], ARGV[1])
return 'ok'
else
redis.call('set', KEYS[2], ARGV[2], 'EX', '86400')
end
return 'dup'
`
Loading

0 comments on commit c85b71e

Please sign in to comment.