-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfunnel.go
More file actions
110 lines (94 loc) · 2.72 KB
/
funnel.go
File metadata and controls
110 lines (94 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package disco
import(
"log"
"time"
)
// A funnel is a high-level API for Disque usage: it acts as a bridge between Disque and
// Go native channels, allowing for idiomatic interaction with the datastore.
type Funnel struct {
Queues []string
Incoming chan Job
Outgoing chan Job
Connections *Pool
FetchTimeout time.Duration
FetchCount int
Closed bool
}
// Creates a new funnel with a specific queue configuration and starts the
// appropriate goroutines to keep it's go channels synchronized with Disque.
func NewFunnel(pool *Pool, fetchCount int, fetchTimeout time.Duration, queues ...string) Funnel {
incoming := make(chan Job)
outgoing := make(chan Job)
funnel := Funnel{
Queues: queues,
Incoming: incoming,
Outgoing: outgoing,
Connections: pool,
FetchCount: fetchCount,
FetchTimeout: fetchTimeout,
}
go funnel.Dispatch()
go funnel.Listen()
return funnel
}
// Takes a connection from the funnel's Disque connection pool and uses it to fetch
// jobs from the funnel's configured queues.
//
// This is a blocking call which is launched on a goroutine when #NewFunnel is called,
// you won't reguarly call it directly, but it's left as a public method to allow
// more flexibility of use cases.
func (f *Funnel) Listen() {
for {
if f.Closed {
close(f.Incoming)
return
}
connection := f.Connections.Get()
for {
job, err := connection.GetJob(f.FetchCount, f.FetchTimeout, f.Queues...); if err != nil {
log.Printf("Error fetching jobs in background: %v\n", err.Error())
break
}
if f.Closed {
connection.NAck(job.ID)
connection.Close()
close(f.Incoming)
return
}
f.Incoming <- job
connection.Ack(job.ID)
}
connection.Close()
}
}
// Listens to the `Outgoing` channel in the funnel, and dispatches any messages received
// to it's appropriate queue taking a connection from the funnel's pool.
//
// This is a blocking call which is launched on a goroutine when #NewFunnel is called,
// you won't reguarly call it directly, but it's left as a public method to allow
// more flexibility of use cases.
//
func (f *Funnel) Dispatch() {
for {
select {
case job := <- f.Outgoing:
connection := f.Connections.Get()
connection.AddJob(job.Queue, string(job.Payload), time.Second * 10) // TODO: Push timeout should be configurable.
connection.Close()
if f.Closed {
close(f.Outgoing)
return
}
case <- time.Tick(time.Second):
if f.Closed {
close(f.Outgoing)
return
}
}
}
}
// Marks the funnel as closed, which in turn closes its internal go channels
// gracefully.
func (f *Funnel) Close() {
f.Closed = true
}