@@ -27,125 +27,55 @@ go get github.com/golang-queue/queue@master
27
27
28
28
## Usage
29
29
30
- The first step to create a new job as ` QueueMessage ` interface:
30
+ ### Basic usage of Pool (use Task function)
31
31
32
- ``` go
33
- type job struct {
34
- Message string
35
- }
36
-
37
- func (j *job ) Bytes () []byte {
38
- b , err := json.Marshal (j)
39
- if err != nil {
40
- panic (err)
41
- }
42
- return b
43
- }
44
- ```
45
-
46
- The second step to create the new worker, use the buffered channel as an example.
32
+ By calling ` QueueTask() ` method, it schedules the task executed by worker (goroutines) in the Pool.
47
33
48
34
``` go
49
- // define the worker
50
- w := simple.NewWorker (
51
- simple.WithQueueNum (taskN),
52
- simple.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
53
- v , ok := m.(*job)
54
- if !ok {
55
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
56
- return err
57
- }
58
- }
59
-
60
- rets <- v.Message
61
- return nil
62
- }),
63
- )
64
- ```
65
-
66
- or use the [ NSQ] ( https://nsq.io/ ) as backend, see the worker example:
67
-
68
- ``` go
69
- // define the worker
70
- w := nsq.NewWorker (
71
- nsq.WithAddr (" 127.0.0.1:4150" ),
72
- nsq.WithTopic (" example" ),
73
- nsq.WithChannel (" foobar" ),
74
- // concurrent job number
75
- nsq.WithMaxInFlight (10 ),
76
- nsq.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
77
- v , ok := m.(*job)
78
- if !ok {
79
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
80
- return err
81
- }
82
- }
83
-
84
- rets <- v.Message
85
- return nil
86
- }),
87
- )
88
- ```
89
-
90
- or use the [ NATS] ( https://nats.io/ ) as backend, see the worker example:
35
+ package main
91
36
92
- ``` go
93
- w := nats.NewWorker (
94
- nats.WithAddr (" 127.0.0.1:4222" ),
95
- nats.WithSubj (" example" ),
96
- nats.WithQueue (" foobar" ),
97
- nats.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
98
- v , ok := m.(*job)
99
- if !ok {
100
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
101
- return err
102
- }
103
- }
37
+ import (
38
+ " context"
39
+ " fmt"
40
+ " time"
104
41
105
- rets <- v.Message
106
- return nil
107
- }),
42
+ " github.com/golang-queue/queue"
108
43
)
109
- ```
110
44
111
- The third step to create a queue and initialize multiple workers, receive all job messages:
45
+ func main () {
46
+ taskN := 100
47
+ rets := make (chan string , taskN)
112
48
113
- ``` go
114
- // define the queue
115
- q , err := queue.NewQueue (
116
- queue.WithWorkerCount (5 ),
117
- queue.WithWorker (w),
118
- )
119
- if err != nil {
120
- log.Fatal (err)
121
- }
49
+ // initial queue pool
50
+ q := queue.NewPool (5 )
51
+ // shutdown the service and notify all the worker
52
+ // wait all jobs are complete.
53
+ defer q.Release ()
122
54
123
- // start the five worker
124
- q.Start ()
125
-
126
- // assign tasks in queue
127
- for i := 0 ; i < taskN; i++ {
128
- go func (i int ) {
129
- q.Queue (&job{
130
- Name: " foobar" ,
131
- Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
132
- })
133
- }(i)
134
- }
55
+ // assign tasks in queue
56
+ for i := 0 ; i < taskN; i++ {
57
+ go func (i int ) {
58
+ if err := q.QueueTask (func (ctx context.Context ) error {
59
+ rets <- fmt.Sprintf (" Hi Gopher, handle the job: %02d " , +i)
60
+ return nil
61
+ }); err != nil {
62
+ panic (err)
63
+ }
64
+ }(i)
65
+ }
135
66
136
- // wait until all tasks done
137
- for i := 0 ; i < taskN; i++ {
138
- fmt.Println (" message:" , <- rets)
139
- time.Sleep (50 * time.Millisecond )
67
+ // wait until all tasks done
68
+ for i := 0 ; i < taskN; i++ {
69
+ fmt.Println (" message:" , <- rets)
70
+ time.Sleep (20 * time.Millisecond )
71
+ }
140
72
}
141
-
142
- // shutdown the service and notify all the worker
143
- q.Shutdown ()
144
- // wait all jobs are complete.
145
- q.Wait ()
146
73
```
147
74
148
- Full example code as below or [ try it in playground] ( https://play.golang.org/p/77PtkZRaPE- ) .
75
+ ### Basic usage of Pool (use message queue)
76
+
77
+ Define the new message struct and implement the ` Bytes() ` func to encode message. Give the ` WithFn ` func
78
+ to handle the message from Queue.
149
79
150
80
``` go
151
81
package main
@@ -158,7 +88,6 @@ import (
158
88
" time"
159
89
160
90
" github.com/golang-queue/queue"
161
- " github.com/golang-queue/queue/simple"
162
91
)
163
92
164
93
type job struct {
@@ -178,41 +107,31 @@ func main() {
178
107
taskN := 100
179
108
rets := make (chan string , taskN)
180
109
181
- // define the worker
182
- w := simple.NewWorker (
183
- simple.WithQueueNum (taskN),
184
- simple.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
185
- v , ok := m.(*job)
186
- if !ok {
187
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
188
- return err
189
- }
110
+ // initial queue pool
111
+ q := queue.NewPool (5 , queue.WithFn (func (ctx context.Context , m queue.QueuedMessage ) error {
112
+ v , ok := m.(*job)
113
+ if !ok {
114
+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
115
+ return err
190
116
}
117
+ }
191
118
192
- rets <- " Hi, " + v.Name + " , " + v.Message
193
- return nil
194
- }),
195
- )
196
-
197
- // define the queue
198
- q , err := queue.NewQueue (
199
- queue.WithWorkerCount (5 ),
200
- queue.WithWorker (w),
201
- )
202
- if err != nil {
203
- log.Fatal (err)
204
- }
205
-
206
- // start the five worker
207
- q.Start ()
119
+ rets <- " Hi, " + v.Name + " , " + v.Message
120
+ return nil
121
+ }))
122
+ // shutdown the service and notify all the worker
123
+ // wait all jobs are complete.
124
+ defer q.Release ()
208
125
209
126
// assign tasks in queue
210
127
for i := 0 ; i < taskN; i++ {
211
128
go func (i int ) {
212
- q.Queue (&job{
213
- Name: " foobar " ,
129
+ if err := q.Queue (&job{
130
+ Name: " Gopher " ,
214
131
Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
215
- })
132
+ }); err != nil {
133
+ log.Println (err)
134
+ }
216
135
}(i)
217
136
}
218
137
@@ -221,10 +140,5 @@ func main() {
221
140
fmt.Println (" message:" , <- rets)
222
141
time.Sleep (50 * time.Millisecond )
223
142
}
224
-
225
- // shutdown the service and notify all the worker
226
- q.Shutdown ()
227
- // wait all jobs are complete.
228
- q.Wait ()
229
143
}
230
144
```
0 commit comments