Skip to content

Commit f0fb3b3

Browse files
committed
FEATURE: unique jobs
Unique jobs let you only have one job with a given name/arguments on the queue at once. This includes both the normal queue as well as the scheduled queue.
1 parent 0ee600d commit f0fb3b3

File tree

7 files changed

+295
-7
lines changed

7 files changed

+295
-7
lines changed

README.md

+18
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ gocraft/work lets you enqueue and processes background jobs in Go. Jobs are dura
77
* Middleware on jobs -- good for metrics instrumentation, logging, etc.
88
* If a job fails, it will be retried a specified number of times.
99
* Schedule jobs to happen in the future.
10+
* Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
1011
* Web UI to manage failed jobs and observe the system.
1112
* Periodically enqueue jobs on a cron-like schedule.
1213

@@ -184,6 +185,17 @@ enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"address": "
184185

185186
```
186187

188+
### Unique Jobs
189+
190+
You can enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once. For instance, you might have a worker that expires the cache of an object. It doesn't make sense for multiple such jobs to exist at once. Also note that unique jobs are supported for normal enqueues as well as scheduled enqueues.
191+
192+
```go
193+
enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
194+
ok, err := enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // ok=true
195+
ok, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // ok=false -- this duplicate job isn't enqueued.
196+
ok, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // ok=true (diff id)
197+
```
198+
187199
### Periodic Enqueueing (Cron)
188200

189201
You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
@@ -265,6 +277,12 @@ You'll see a view that looks like this:
265277
* If a process crashes hard (eg, the power on the server turns off or the kernal freezes), some jobs may be in progress and we won't want to lose them. They're safe in their in-progress queue.
266278
* The reaper will look for worker pools without a heartbeat. It will scan their in-progress queues and requeue anything it finds.
267279

280+
### Unique jobs
281+
282+
* You can enqueue unique jobs such that a given name/arguments are on the queue at once.
283+
* Both normal queues and the scheduled queue are considered.
284+
* When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job.
285+
* When the job is processed, we'll delete that key to permit another job to be enqueued.
268286

269287
### Periodic Jobs
270288

enqueue.go

+87-6
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,21 @@ type Enqueuer struct {
1010
Namespace string // eg, "myapp-work"
1111
Pool *redis.Pool
1212

13-
queuePrefix string // eg, "myapp-work:jobs:"
14-
knownJobs map[string]int64
13+
queuePrefix string // eg, "myapp-work:jobs:"
14+
knownJobs map[string]int64
15+
enqueueUniqueScript *redis.Script
16+
enqueueUniqueInScript *redis.Script
1517
}
1618

1719
// NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.
1820
func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer {
1921
return &Enqueuer{
20-
Namespace: namespace,
21-
Pool: pool,
22-
queuePrefix: redisKeyJobsPrefix(namespace),
23-
knownJobs: make(map[string]int64),
22+
Namespace: namespace,
23+
Pool: pool,
24+
queuePrefix: redisKeyJobsPrefix(namespace),
25+
knownJobs: make(map[string]int64),
26+
enqueueUniqueScript: redis.NewScript(2, redisLuaEnqueueUnique),
27+
enqueueUniqueInScript: redis.NewScript(2, redisLuaEnqueueUniqueIn),
2428
}
2529
}
2630

@@ -82,6 +86,83 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
8286
return nil
8387
}
8488

89+
// EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
90+
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for schedule jobs.
91+
// EnqueueUnique returns true if the job is enqueued and false if it wasn't enqueued.
92+
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (bool, error) {
93+
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
94+
if err != nil {
95+
return false, err
96+
}
97+
98+
job := &Job{
99+
Name: jobName,
100+
ID: makeIdentifier(),
101+
EnqueuedAt: nowEpochSeconds(),
102+
Args: args,
103+
Unique: true,
104+
}
105+
106+
rawJSON, err := job.serialize()
107+
if err != nil {
108+
return false, err
109+
}
110+
111+
conn := e.Pool.Get()
112+
defer conn.Close()
113+
114+
if err := e.addToKnownJobs(conn, jobName); err != nil {
115+
return false, err
116+
}
117+
118+
scriptArgs := make([]interface{}, 0, 3)
119+
scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
120+
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
121+
scriptArgs = append(scriptArgs, rawJSON)
122+
123+
res, err := redis.String(e.enqueueUniqueScript.Do(conn, scriptArgs...))
124+
125+
return res == "ok", err
126+
}
127+
128+
// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
129+
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (bool, error) {
130+
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
131+
if err != nil {
132+
return false, err
133+
}
134+
135+
job := &Job{
136+
Name: jobName,
137+
ID: makeIdentifier(),
138+
EnqueuedAt: nowEpochSeconds(),
139+
Args: args,
140+
Unique: true,
141+
}
142+
143+
rawJSON, err := job.serialize()
144+
if err != nil {
145+
return false, err
146+
}
147+
148+
conn := e.Pool.Get()
149+
defer conn.Close()
150+
151+
if err := e.addToKnownJobs(conn, jobName); err != nil {
152+
return false, err
153+
}
154+
155+
scriptArgs := make([]interface{}, 0, 4)
156+
scriptArgs = append(scriptArgs, redisKeyScheduled(e.Namespace)) // KEY[1]
157+
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
158+
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
159+
scriptArgs = append(scriptArgs, nowEpochSeconds()+secondsFromNow) // ARGV[2]
160+
161+
res, err := redis.String(e.enqueueUniqueInScript.Do(conn, scriptArgs...))
162+
163+
return res == "ok", err
164+
}
165+
85166
func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error {
86167
needSadd := true
87168
now := time.Now().Unix()

enqueue_test.go

+113
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package work
22

33
import (
4+
"fmt"
45
"github.com/stretchr/testify/assert"
56
"testing"
67
"time"
@@ -79,3 +80,115 @@ func TestEnqueueIn(t *testing.T) {
7980
assert.EqualValues(t, 1, j.ArgInt64("a"))
8081
assert.NoError(t, j.ArgError())
8182
}
83+
84+
func TestEnqueueUnique(t *testing.T) {
85+
pool := newTestPool(":6379")
86+
ns := "work"
87+
cleanKeyspace(ns, pool)
88+
enqueuer := NewEnqueuer(ns, pool)
89+
90+
ok, err := enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
91+
assert.NoError(t, err)
92+
assert.True(t, ok)
93+
94+
ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
95+
assert.NoError(t, err)
96+
assert.False(t, ok)
97+
98+
ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "coolio"})
99+
assert.NoError(t, err)
100+
assert.True(t, ok)
101+
102+
ok, err = enqueuer.EnqueueUnique("wat", nil)
103+
assert.NoError(t, err)
104+
assert.True(t, ok)
105+
106+
ok, err = enqueuer.EnqueueUnique("wat", nil)
107+
assert.NoError(t, err)
108+
assert.False(t, ok)
109+
110+
ok, err = enqueuer.EnqueueUnique("taw", nil)
111+
assert.NoError(t, err)
112+
assert.True(t, ok)
113+
114+
// Process the queues. Ensure the right numbero of jobs was processed
115+
var wats, taws int64
116+
wp := NewWorkerPool(TestContext{}, 3, ns, pool)
117+
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
118+
wats++
119+
return nil
120+
})
121+
wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
122+
taws++
123+
return fmt.Errorf("ohno")
124+
})
125+
wp.Start()
126+
wp.Drain()
127+
wp.Stop()
128+
129+
assert.EqualValues(t, 3, wats)
130+
assert.EqualValues(t, 1, taws)
131+
132+
// Enqueue again. Ensure we can.
133+
ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
134+
assert.NoError(t, err)
135+
assert.True(t, ok)
136+
137+
ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "coolio"})
138+
assert.NoError(t, err)
139+
assert.True(t, ok)
140+
141+
// Even though taw resulted in an error, we should still be able to re-queue it.
142+
// This could result in multiple taws enqueued at the same time in a production system.
143+
ok, err = enqueuer.EnqueueUnique("taw", nil)
144+
assert.NoError(t, err)
145+
assert.True(t, ok)
146+
}
147+
148+
func TestEnqueueUniqueIn(t *testing.T) {
149+
pool := newTestPool(":6379")
150+
ns := "work"
151+
cleanKeyspace(ns, pool)
152+
enqueuer := NewEnqueuer(ns, pool)
153+
154+
// Enqueue two unique jobs -- ensure one job sticks.
155+
ok, err := enqueuer.EnqueueUniqueIn("wat", 300, Q{"a": 1, "b": "cool"})
156+
assert.NoError(t, err)
157+
assert.True(t, ok)
158+
159+
ok, err = enqueuer.EnqueueUniqueIn("wat", 10, Q{"a": 1, "b": "cool"})
160+
assert.NoError(t, err)
161+
assert.False(t, ok)
162+
163+
// Get the job
164+
score, j := jobOnZset(pool, redisKeyScheduled(ns))
165+
166+
assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time
167+
assert.True(t, score <= time.Now().Unix()+300)
168+
169+
assert.Equal(t, "wat", j.Name)
170+
assert.True(t, len(j.ID) > 10) // Something is in it
171+
assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
172+
assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
173+
assert.Equal(t, "cool", j.ArgString("b"))
174+
assert.EqualValues(t, 1, j.ArgInt64("a"))
175+
assert.NoError(t, j.ArgError())
176+
assert.True(t, j.Unique)
177+
178+
// Now try to enqueue more stuff and ensure it
179+
ok, err = enqueuer.EnqueueUniqueIn("wat", 300, Q{"a": 1, "b": "coolio"})
180+
assert.NoError(t, err)
181+
assert.True(t, ok)
182+
183+
ok, err = enqueuer.EnqueueUniqueIn("wat", 300, nil)
184+
assert.NoError(t, err)
185+
assert.True(t, ok)
186+
187+
ok, err = enqueuer.EnqueueUniqueIn("wat", 300, nil)
188+
assert.NoError(t, err)
189+
assert.False(t, ok)
190+
191+
ok, err = enqueuer.EnqueueUniqueIn("taw", 300, nil)
192+
assert.NoError(t, err)
193+
assert.True(t, ok)
194+
}

job.go

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type Job struct {
1414
ID string `json:"id"`
1515
EnqueuedAt int64 `json:"t"`
1616
Args map[string]interface{} `json:"args"`
17+
Unique bool `json:"unique,omitempty"`
1718

1819
// Inputs when retrying
1920
Fails int64 `json:"fails,omitempty"` // number of times this job has failed

redis.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package work
22

3-
import "fmt"
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
)
48

59
func redisNamespacePrefix(namespace string) string {
610
l := len(namespace)
@@ -52,6 +56,24 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string {
5256
return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID
5357
}
5458

59+
func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) {
60+
var buf bytes.Buffer
61+
62+
buf.WriteString(redisNamespacePrefix(namespace))
63+
buf.WriteString("unique:")
64+
buf.WriteString(jobName)
65+
buf.WriteRune(':')
66+
67+
if args != nil {
68+
err := json.NewEncoder(&buf).Encode(args)
69+
if err != nil {
70+
return "", err
71+
}
72+
}
73+
74+
return buf.String(), nil
75+
}
76+
5577
func redisKeyLastPeriodicEnqueue(namespace string) string {
5678
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
5779
}
@@ -130,3 +152,26 @@ for i=1,jobCount do
130152
end
131153
return requeuedCount
132154
`
155+
156+
// KEYS[1] = job queue to push onto
157+
// KEYS[2] = Unique job's key. Test for existance and set if we push.
158+
// ARGV[1] = job
159+
var redisLuaEnqueueUnique = `
160+
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
161+
redis.call('lpush', KEYS[1], ARGV[1])
162+
return 'ok'
163+
end
164+
return 'dup'
165+
`
166+
167+
// KEYS[1] = scheduled job queue
168+
// KEYS[2] = Unique job's key. Test for existance and set if we push.
169+
// ARGV[1] = job
170+
// ARGV[2] = epoch seconds for job to be run at
171+
var redisLuaEnqueueUniqueIn = `
172+
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
173+
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
174+
return 'ok'
175+
end
176+
return 'dup'
177+
`

todo.txt

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
Enqueue API change
2+
- return job
3+
- update documentation
4+
5+
Client refactor:
6+
- delete/retry dead jobs should take the explicit params instead of job
7+
8+
Client:
9+
- delete scheduled jobs. Take into account unique jobs.
10+
11+
refactor:
12+
- clean up enqueue code -- bit too much duplication.
13+
114
IDEAS/TODO:
215
----
316
- zero context each time -- see if that affects performance

worker.go

+17
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ func (w *worker) fetchJob() (*Job, error) {
176176
}
177177

178178
func (w *worker) processJob(job *Job) {
179+
if job.Unique {
180+
w.deleteUniqueJob(job)
181+
}
179182
if jt, ok := w.jobTypes[job.Name]; ok {
180183
w.observeStarted(job.Name, job.ID, job.Args)
181184
job.observer = w.observer // for Checkin
@@ -196,6 +199,20 @@ func (w *worker) processJob(job *Job) {
196199
}
197200
}
198201

202+
func (w *worker) deleteUniqueJob(job *Job) {
203+
uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args)
204+
if err != nil {
205+
logError("worker.delete_unique_job.key", err)
206+
}
207+
conn := w.pool.Get()
208+
defer conn.Close()
209+
210+
_, err = conn.Do("DEL", uniqueKey)
211+
if err != nil {
212+
logError("worker.delete_unique_job.del", err)
213+
}
214+
}
215+
199216
func (w *worker) removeJobFromInProgress(job *Job) {
200217
conn := w.pool.Get()
201218
defer conn.Close()

0 commit comments

Comments
 (0)