-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasic.go
141 lines (129 loc) · 3.28 KB
/
basic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package taskqueue
import (
"context"
"encoding/base64"
"fmt"
"github.com/redis/go-redis/v9"
)
// BasicTaskQueue implements a FIFO task queue of string values.
type BasicTaskQueue struct {
Name string
Redis redis.UniversalClient
ctx context.Context
noRetry bool
}
// NewBasic creates a new BasicTaskQueue instance.
func NewBasic(name string, option ...Option) (*BasicTaskQueue, error) {
options, err := getOptions(option)
if err != nil {
return nil, err
}
redisOptions := &redis.Options{
Addr: options.Host,
Username: options.Username,
Password: options.Password,
TLSConfig: options.TLSConfig,
ReadTimeout: options.Timeout,
WriteTimeout: options.Timeout,
Network: "tcp",
}
redisClient := redis.NewClient(redisOptions)
_, err = redisClient.Ping(options.Context).Result()
if err != nil {
return nil, err
}
taskQueue := &BasicTaskQueue{
Name: name,
Redis: redisClient,
ctx: options.Context,
noRetry: options.NoRetry,
}
return taskQueue, nil
}
// Size returns the number of items in the queue.
func (q *BasicTaskQueue) Size() uint64 {
zcard := q.Redis.LLen(q.ctx, q.Name)
size, err := zcard.Uint64()
if err != nil {
return 0
}
return size
}
// Pop removes and returns the first task from the queue. If the queue is empty, the return value
// will be nil.
func (q *BasicTaskQueue) Pop() *string {
pop := q.Redis.LPop(q.ctx, q.Name)
value, err := pop.Result()
if err != nil {
return nil
}
return &value
}
// Has determines if a queue has an given task.
func (q *BasicTaskQueue) Has(value string) bool {
count, err := q.Redis.LPosCount(q.ctx, q.Name, value, 0, redis.LPosArgs{}).Result()
if err != nil {
return false
}
return len(count) > 0
}
// Add adds any number of tasks to the queue in order. Items provided will be marshaled to JSON.
func (q *BasicTaskQueue) Add(tasks ...any) error {
if len(tasks) == 0 {
return nil
}
added, err := q.Redis.RPush(q.ctx, q.Name, tasks...).Result()
if err != nil {
return err
}
if added == 0 {
return fmt.Errorf("failed to add tasks to queue")
}
return nil
}
// Remove removes a task from the queue.
func (q *BasicTaskQueue) Remove(task any) error {
_, err := q.Redis.LRem(q.ctx, q.Name, 0, task).Result()
if err != nil {
return err
}
return nil
}
// Clear removes all tasks from the queue.
func (q *BasicTaskQueue) Clear() error {
_, err := q.Redis.Del(q.ctx, q.Name).Result()
return err
}
// Get retrieves an item from the queue based on its index.
func (q *BasicTaskQueue) Get(index int64) (string, error) {
value, err := q.Redis.LIndex(q.ctx, q.Name, index).Result()
if err != nil {
if err == redis.Nil {
return "", nil
}
return "", err
}
return value, nil
}
func (q *BasicTaskQueue) RemoveIndex(index int64) error {
value, err := q.Redis.LIndex(q.ctx, q.Name, index).Result()
if err != nil {
if err == redis.Nil {
return fmt.Errorf("index %d does not exist in queue", index)
}
return err
}
encoded := base64.StdEncoding.EncodeToString([]byte(value))
_, err = q.Redis.LSet(q.ctx, q.Name, index, encoded).Result()
if err != nil {
return err
}
removed, err := q.Redis.LRem(q.ctx, q.Name, 1, encoded).Result()
if err != nil {
return err
}
if removed == 0 {
return fmt.Errorf("failed to remove value item %d from queue", index)
}
return nil
}