From 2e259ed5f3da006f8f0f6a18ee5b5c1862d23650 Mon Sep 17 00:00:00 2001 From: finley Date: Fri, 16 Feb 2024 18:55:39 +0800 Subject: [PATCH] add monitor --- .vscode/launch.json | 9 +- README.md | 118 +++++++++++++++++++++- README_CN.md | 111 ++++++++++++++++++++- delayqueue.go | 166 ++++++++++++++++++++++++++++--- delayqueue_test.go | 4 +- events.go | 64 ++++++++++++ example/{ => getstarted}/main.go | 0 example/monitor/main.go | 81 +++++++++++++++ monitor.go | 72 ++++++++++++++ monitor_test.go | 163 ++++++++++++++++++++++++++++++ publisher.go | 6 +- wrapper.go | 62 ++++++++++++ 12 files changed, 832 insertions(+), 24 deletions(-) create mode 100644 events.go rename example/{ => getstarted}/main.go (100%) create mode 100644 example/monitor/main.go create mode 100644 monitor.go create mode 100644 monitor_test.go diff --git a/.vscode/launch.json b/.vscode/launch.json index 56635ff..a394856 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,12 +4,19 @@ // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "name": "Launch Package", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${fileDirname}" + }, { "name": "Run Example", "type": "go", "request": "launch", "mode": "auto", - "program": "${workspaceFolder}/example" + "program": "${workspaceFolder}/example/getstarted" } ] } \ No newline at end of file diff --git a/README.md b/README.md index 288e997..be5d694 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,8 @@ Core Advantages: - Works out of the box, Config Nothing and Deploy Nothing, A Redis is all you need. - Natively adapted to the distributed environment, messages processed concurrently on multiple machines . Workers can be added, removed or migrated at any time -- Support Redis Cluster for high availability +- Support Redis Cluster or clusters of most cloud service providers. see chapter [Cluster](./README.md#Cluster) +- Easy to use monitoring data exporter, see [Monitoring](./README.md#Monitoring) ## Install @@ -73,7 +74,9 @@ func main() { ## Producer consumer distributed deployment -By default, delayqueue instances can be both producers and consumers. If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you. +By default, delayqueue instances can be both producers and consumers. + +If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you. ```go func consumer() { @@ -143,6 +146,117 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message +## Monitoring + +We provides Monitor to monitor the running status. + +```go +monitor := delayqueue.NewMonitor("example", redisCli) +``` + +Monitor.ListenEvent can register a listener that can receive all internal events, so you can use it to implement customized data reporting and metrics. + +The monitor can receive events from all workers, even if they are running on another server. + +```go +type EventListener interface { + OnEvent(*Event) +} + +// returns: close function, error +func (m *Monitor) ListenEvent(listener EventListener) (func(), error) +``` + +The definition of event could be found in [events.go](./events.go). + +Besides, We provide a demo that uses EventListener to monitor the production and consumption amount per minute. + +The complete demo code can be found in [example/monitor](./example/monitor/main.go). + +```go +type MyProfiler struct { + List []*Metrics + Start int64 +} + +func (p *MyProfiler) OnEvent(event *delayqueue.Event) { + sinceUptime := event.Timestamp - p.Start + upMinutes := sinceUptime / 60 + if len(p.List) <= int(upMinutes) { + p.List = append(p.List, &Metrics{}) + } + current := p.List[upMinutes] + switch event.Code { + case delayqueue.NewMessageEvent: + current.ProduceCount += event.MsgCount + case delayqueue.DeliveredEvent: + current.DeliverCount += event.MsgCount + case delayqueue.AckEvent: + current.ConsumeCount += event.MsgCount + case delayqueue.RetryEvent: + current.RetryCount += event.MsgCount + case delayqueue.FinalFailedEvent: + current.FailCount += event.MsgCount + } +} + +func main() { + queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool { + return true + }) + start := time.Now() + // IMPORTANT: EnableReport must be called so monitor can do its work + queue.EnableReport() + + // setup monitor + monitor := delayqueue.NewMonitor("example", redisCli) + listener := &MyProfiler{ + Start: start.Unix(), + } + monitor.ListenEvent(listener) + + // print metrics every minute + tick := time.Tick(time.Minute) + go func() { + for range tick { + minutes := len(listener.List)-1 + fmt.Printf("%d: %#v", minutes, listener.List[minutes]) + } + }() +} +``` + +Monitor use redis pub/sub to collect data, so it is important to call `DelayQueue.EnableReport` of all workers, to enable events reporting for monitor. + +If you do not want to use redis pub/sub, you can use `DelayQueue.ListenEvent` to collect data yourself. + +Please be advised, `DelayQueue.ListenEvent` can only receive events from the current instance, while monitor can receive events from all instances in the queue. + +Once `DelayQueue.ListenEvent` is called, the monitor's listener will be overwritten unless EnableReport is called again to re-enable the monitor. + +### Get Status + +You could get Pending Count, Ready Count and Processing Count from the monitor: + +```go +func (m *Monitor) GetPendingCount() (int64, error) +``` + +GetPendingCount returns the number of which delivery time has not arrived. + +```go +func (m *Monitor) GetReadyCount() (int64, error) +``` + +GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet + +```go +func (m *Monitor) GetProcessingCount() (int64, error) +``` + +GetProcessingCount returns the number of messages which are being processed + + ## Cluster If you are using Redis Cluster, please use `NewQueueOnCluster` diff --git a/README_CN.md b/README_CN.md index 294d061..ab61386 100644 --- a/README_CN.md +++ b/README_CN.md @@ -12,7 +12,8 @@ DelayQueue 的主要优势: - 自动重试处理失败的消息 - 开箱即用, 无需部署或安装中间件, 只需要一个 Redis 即可工作 - 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker -- 支持各类 Redis 集群 +- 支持各类 Redis 集群, 详见[集群](./README_CN.md#集群) +- 简单易用的监控数据导出,详见[监控](./README_CN.md#监控) ## 安装 @@ -137,6 +138,114 @@ WithDefaultRetryCount(count uint) 在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。 +## 监控 + +我们提供了 `Monitor` 来监控运行数据: + +```go +monitor := delayqueue.NewMonitor("example", redisCli) +``` + +我们可以使用 `Monitor.ListenEvent` 注册一个可以收到队列中所有事件的监听器, 从而实现自定义的事件上报和指标监控。 + +Monitor 可以受到所有 Worker 的事件, 包括运行在其它服务器上的 Worker. + +```go +type EventListener interface { + OnEvent(*Event) +} + +// returns: close function, error +func (m *Monitor) ListenEvent(listener EventListener) (func(), error) +``` + +Event 的定义在 [events.go](./events.go). + +此外,我们提供了一个 Demo,它会每分钟显示一次队列中产生和处理的消息数量。 + +Demo 完整代码在 [example/monitor](./example/monitor/main.go). + +```go +type MyProfiler struct { + List []*Metrics + Start int64 +} + +func (p *MyProfiler) OnEvent(event *delayqueue.Event) { + sinceUptime := event.Timestamp - p.Start + upMinutes := sinceUptime / 60 + if len(p.List) <= int(upMinutes) { + p.List = append(p.List, &Metrics{}) + } + current := p.List[upMinutes] + switch event.Code { + case delayqueue.NewMessageEvent: + current.ProduceCount += event.MsgCount + case delayqueue.DeliveredEvent: + current.DeliverCount += event.MsgCount + case delayqueue.AckEvent: + current.ConsumeCount += event.MsgCount + case delayqueue.RetryEvent: + current.RetryCount += event.MsgCount + case delayqueue.FinalFailedEvent: + current.FailCount += event.MsgCount + } +} + +func main() { + queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool { + return true + }) + start := time.Now() + // 注意: 使用 Monitor 前必须调用 EnableReport + queue.EnableReport() + + // setup monitor + monitor := delayqueue.NewMonitor("example", redisCli) + listener := &MyProfiler{ + Start: start.Unix(), + } + monitor.ListenEvent(listener) + + // 每分钟打印一次报告 + tick := time.Tick(time.Minute) + go func() { + for range tick { + minutes := len(listener.List)-1 + fmt.Printf("%d: %#v", minutes, listener.List[minutes]) + } + }() +} +``` + +Monitor 使用 redis 的发布订阅功能来收集数据,使用 Monitor 前必须在所有 Worker 处调用 `EnableReport` 来启用上报。 + +如果你不想使用 redis pub/sub, 可以调用 `DelayQueue.ListenEvent` 来直接收集数据。请注意,`DelayQueue.ListenEvent` 只能收到当前 Worker 的事件, 而 Monitor 可以收到所有 Worker 的事件。 + +另外,`DelayQueue.ListenEvent` 会覆盖掉 Monitor 的监听器,再次调用 `EnableReport` 后 Monitor 才能恢复工作。 + +### 获得状态信息 + +Monitor 也可以直接获得一些队列的状态信息。 + +```go +func (m *Monitor) GetPendingCount() (int64, error) +``` + +返回未到投递时间的消息数。 + +```go +func (m *Monitor) GetReadyCount() (int64, error) +``` + +返回已到投递时间但尚未发给 Worker 的消息数。 + +```go +func (m *Monitor) GetProcessingCount() (int64, error) +``` + +返回 Worker 正在处理中的消息数。 + ## 集群 如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`: diff --git a/delayqueue.go b/delayqueue.go index 44e157c..94deac2 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -3,11 +3,12 @@ package delayqueue import ( "errors" "fmt" - "github.com/google/uuid" "log" "strconv" "sync" "time" + + "github.com/google/uuid" ) // DelayQueue is a message queue supporting delayed/scheduled delivery based on redis @@ -27,12 +28,14 @@ type DelayQueue struct { logger *log.Logger close chan struct{} - maxConsumeDuration time.Duration - msgTTL time.Duration - defaultRetryCount uint - fetchInterval time.Duration - fetchLimit uint - concurrent uint + maxConsumeDuration time.Duration // default 5 seconds + msgTTL time.Duration // default 1 hour + defaultRetryCount uint // default 3 + fetchInterval time.Duration // default 1 second + fetchLimit uint // default no limit + concurrent uint // default 1, executed serially + + eventListener EventListener } // NilErr represents redis nil @@ -40,8 +43,13 @@ var NilErr = errors.New("nil") // RedisCli is abstraction for redis client, required commands only not all commands type RedisCli interface { - Eval(script string, keys []string, args []interface{}) (interface{}, error) // args should be string, integer or float + // Eval sends lua script to redis + // args should be string, integer or float + // returns string, int64, []interface{} (elements can be string or int64) + Eval(script string, keys []string, args []interface{}) (interface{}, error) Set(key string, value string, expiration time.Duration) error + // Get represents redis command GET + // please NilErr when no such key in redis Get(key string) (string, error) Del(keys []string) error HSet(key string, field string, value string) error @@ -50,6 +58,14 @@ type RedisCli interface { SRem(key string, members []string) error ZAdd(key string, values map[string]float64) error ZRem(key string, fields []string) error + ZCard(key string) (int64, error) + LLen(key string) (int64, error) + + // Publish used for monitor only + Publish(channel string, payload string) error + // Subscribe used for monitor only + // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well + Subscribe(channel string) (payloads <-chan string, close func(), err error) } type hashTagKeyOpt int @@ -102,7 +118,7 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in retryCountKey: keyPrefix + ":retry:cnt", garbageKey: keyPrefix + ":garbage", useHashTag: useHashTag, - close: make(chan struct{}, 1), + close: nil, maxConsumeDuration: 5 * time.Second, msgTTL: time.Hour, logger: log.Default(), @@ -131,7 +147,7 @@ func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue { return q } -// WithFetchLimit limits the max number of unack (processing) messages +// WithFetchLimit limits the max number of processing messages, 0 means no limit func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue { q.fetchLimit = limit return q @@ -207,6 +223,7 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf if err != nil { return fmt.Errorf("push to pending failed: %v", err) } + q.reportEvent(NewMessageEvent, 1) return nil } @@ -219,6 +236,7 @@ func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts . // pending2ReadyScript atomically moves messages from pending to ready // keys: pendingKey, readyKey // argv: currentTime +// returns: ready message number const pending2ReadyScript = ` local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get ready msg if (#msgs == 0) then return end @@ -234,15 +252,20 @@ if (#args2 > 2) then redis.call('LPush', KEYS[2], unpack(args2)) end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending +return #msgs ` func (q *DelayQueue) pending2Ready() error { now := time.Now().Unix() keys := []string{q.pendingKey, q.readyKey} - _, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now}) + raw, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now}) if err != nil && err != NilErr { return fmt.Errorf("pending2ReadyScript failed: %v", err) } + count, ok := raw.(int64) + if ok { + q.reportEvent(ReadyEvent, int(count)) + } return nil } @@ -270,6 +293,7 @@ func (q *DelayQueue) ready2Unack() (string, error) { if !ok { return "", fmt.Errorf("illegal result: %#v", ret) } + q.reportEvent(DeliveredEvent, 1) return str, nil } @@ -353,6 +377,7 @@ func (q *DelayQueue) ack(idStr string) error { // msg key has ttl, ignore result of delete _ = q.redisCli.Del([]string{q.genMsgKey(idStr)}) _ = q.redisCli.HDel(q.retryCountKey, []string{idStr}) + q.reportEvent(AckEvent, 1) return nil } @@ -364,6 +389,7 @@ func (q *DelayQueue) nack(idStr string) error { if err != nil { return fmt.Errorf("negative ack failed: %v", err) } + q.reportEvent(NackEvent, 1) return nil } @@ -373,48 +399,74 @@ func (q *DelayQueue) nack(idStr string) error { // Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly // keys: unackKey, retryCountKey, retryKey, garbageKey // argv: currentTime +// returns: {retryMsgs, failMsgs} const unack2RetryScript = ` local unack2retry = function(msgs) local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count + local retryMsgs = 0 + local failMsgs = 0 for i,v in ipairs(retryCounts) do local k = msgs[i] if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count redis.call("LPush", KEYS[3], k) -- add to retry + retryMsgs = retryMsgs + 1 else redis.call("HDel", KEYS[2], k) -- del retry count redis.call("SAdd", KEYS[4], k) -- add to garbage + failMsgs = failMsgs + 1 end end + return retryMsgs, failMsgs end +local retryMsgs = 0 +local failMsgs = 0 local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg if (#msgs == 0) then return end if #msgs < 4000 then - unack2retry(msgs) + local d1, d2 = unack2retry(msgs) + retryMsgs = retryMsgs + d1 + failMsgs = failMsgs + d2 else local buf = {} for _,v in ipairs(msgs) do table.insert(buf, v) if #buf == 4000 then - unack2retry(buf) + local d1, d2 = unack2retry(buf) + retryMsgs = retryMsgs + d1 + failMsgs = failMsgs + d2 buf = {} end end if (#buf > 0) then - unack2retry(buf) + local d1, d2 = unack2retry(buf) + retryMsgs = retryMsgs + d1 + failMsgs = failMsgs + d2 end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack +return {retryMsgs, failMsgs} ` func (q *DelayQueue) unack2Retry() error { keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey} now := time.Now() - _, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()}) + raw, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()}) if err != nil && err != NilErr { return fmt.Errorf("unack to retry script failed: %v", err) } + infos, ok := raw.([]interface{}) + if ok && len(infos) == 2 { + retryCount, ok := infos[0].(int64) + if ok { + q.reportEvent(RetryEvent, int(retryCount)) + } + failCount, ok := infos[1].(int64) + if ok { + q.reportEvent(FinalFailedEvent, int(failCount)) + } + } return nil } @@ -499,6 +551,10 @@ func (q *DelayQueue) consume() error { // StartConsume creates a goroutine to consume message from DelayQueue // use `<-done` to wait consumer stopping func (q *DelayQueue) StartConsume() (done <-chan struct{}) { + if q.cb == nil { + panic("this instance has no callback") + } + q.close = make(chan struct{}, 1) q.ticker = time.NewTicker(q.fetchInterval) done0 := make(chan struct{}) go func() { @@ -526,3 +582,83 @@ func (q *DelayQueue) StopConsume() { q.ticker.Stop() } } + +// GetPendingCount returns the number of pending messages +func (q *DelayQueue) GetPendingCount() (int64, error) { + return q.redisCli.ZCard(q.pendingKey) +} + +// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered +func (q *DelayQueue) GetReadyCount() (int64, error) { + return q.redisCli.LLen(q.readyKey) +} + +// GetProcessingCount returns the number of messages which are being processed +func (q *DelayQueue) GetProcessingCount() (int64, error) { + return q.redisCli.ZCard(q.unAckKey) +} + +// EventListener which will be called when events occur +// This Listener can be used to monitor running status +type EventListener interface { + // OnEvent will be called when events occur + OnEvent(*Event) +} + +// ListenEvent register a listener which will be called when events occur, +// so it can be used to monitor running status +// +// But It can ONLY receive events from the CURRENT INSTANCE, +// if you want to listen to all events in queue, just use Monitor.ListenEvent +// +// There can be AT MOST ONE EventListener in an DelayQueue instance. +// If you are using customized listener, Monitor will stop working +func (q *DelayQueue) ListenEvent(listener EventListener) { + q.eventListener = listener +} + +// RemoveListener stops reporting events to EventListener +func (q *DelayQueue) DisableListener() { + q.eventListener = nil +} + +func (q *DelayQueue) reportEvent(code int, count int) { + listener := q.eventListener // eventListener may be changed during running + if listener != nil && count > 0 { + event := &Event{ + Code: code, + Timestamp: time.Now().Unix(), + MsgCount: count, + } + listener.OnEvent(event) + } +} + +// pubsubListener receives events and reports them through redis pubsub for monitoring +type pubsubListener struct { + redisCli RedisCli + reportChan string +} + +func genReportChannel(name string) string { + return "dq:" + name + ":reportEvents" +} + +// EnableReport enables reporting to monitor +func (q *DelayQueue) EnableReport() { + reportChan := genReportChannel(q.name) + q.ListenEvent(&pubsubListener{ + redisCli: q.redisCli, + reportChan: reportChan, + }) +} + +// DisableReport stops reporting to monitor +func (q *DelayQueue) DisableReport() { + q.DisableListener() +} + +func (l *pubsubListener) OnEvent(event *Event) { + payload := encodeEvent(event) + l.redisCli.Publish(l.reportChan, payload) +} diff --git a/delayqueue_test.go b/delayqueue_test.go index 4959e07..cd41acd 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -36,7 +36,7 @@ func TestDelayQueue_consume(t *testing.T) { t.Error(err) } } - for i := 0; i < 10*size; i++ { + for i := 0; i < 10; i++ { err := queue.consume() if err != nil { t.Errorf("consume error: %v", err) @@ -85,7 +85,7 @@ func TestDelayQueueOnCluster(t *testing.T) { t.Error(err) } } - for i := 0; i < 10*size; i++ { + for i := 0; i < size; i++ { err := queue.consume() if err != nil { t.Errorf("consume error: %v", err) diff --git a/events.go b/events.go new file mode 100644 index 0000000..9588a38 --- /dev/null +++ b/events.go @@ -0,0 +1,64 @@ +package delayqueue + +import ( + "errors" + "strconv" + "strings" +) + +const ( + // NewMessageEvent emmited when send message + NewMessageEvent = iota + 1 + // ReadyEvent emmited when messages has reached delivery time + ReadyEvent + // DeliveredEvent emmited when messages has been delivered to consumer + DeliveredEvent + // AckEvent emmited when receive message successfully consumed callback + AckEvent + // AckEvent emmited when receive message consumption failure callback + NackEvent + // RetryEvent emmited when message re-delivered to consumer + RetryEvent + // FinalFailedEvent emmited when message reaches max retry attempts + FinalFailedEvent +) + +// Event contains internal event information during the queue operation and can be used to monitor the queue status. +type Event struct { + // Code represents event type, such as NewMessageEvent, ReadyEvent + Code int + // Timestamp is the event time + Timestamp int64 + // MsgCount represents the number of messages related to the event + MsgCount int +} + +func encodeEvent(e *Event) string { + return strconv.Itoa(e.Code) + + " " + strconv.FormatInt(e.Timestamp, 10) + + " " + strconv.Itoa(e.MsgCount) +} + +func decodeEvent(payload string) (*Event, error) { + items := strings.Split(payload, " ") + if len(items) != 3 { + return nil, errors.New("[decode event error! wrong item count, payload: " + payload) + } + code, err := strconv.Atoi(items[0]) + if err != nil { + return nil, errors.New("decode event error! wrong event code, payload: " + payload) + } + timestamp, err := strconv.ParseInt(items[1], 10, 64) + if err != nil { + return nil, errors.New("decode event error! wrong timestamp, payload: " + payload) + } + count, err := strconv.Atoi(items[2]) + if err != nil { + return nil, errors.New("decode event error! wrong msg count, payload: " + payload) + } + return &Event{ + Code: code, + Timestamp: timestamp, + MsgCount: count, + }, nil +} diff --git a/example/main.go b/example/getstarted/main.go similarity index 100% rename from example/main.go rename to example/getstarted/main.go diff --git a/example/monitor/main.go b/example/monitor/main.go new file mode 100644 index 0000000..7e65679 --- /dev/null +++ b/example/monitor/main.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "strconv" + "time" + + "github.com/hdt3213/delayqueue" + "github.com/redis/go-redis/v9" +) + +type Metrics struct { + ProduceCount int + DeliverCount int + ConsumeCount int + RetryCount int + FailCount int +} + +type MyProfiler struct { + List []*Metrics + Start int64 +} + +func (p *MyProfiler) OnEvent(event *delayqueue.Event) { + sinceUptime := event.Timestamp - p.Start + upMinutes := sinceUptime / 60 + if len(p.List) <= int(upMinutes) { + p.List = append(p.List, &Metrics{}) + } + current := p.List[upMinutes] + switch event.Code { + case delayqueue.NewMessageEvent: + current.ProduceCount += event.MsgCount + case delayqueue.DeliveredEvent: + current.DeliverCount += event.MsgCount + case delayqueue.AckEvent: + current.ConsumeCount += event.MsgCount + case delayqueue.RetryEvent: + current.RetryCount += event.MsgCount + case delayqueue.FinalFailedEvent: + current.FailCount += event.MsgCount + } +} + +func main() { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool { + return true + }) + start := time.Now() + queue.EnableReport() + + // setup monitor + monitor := delayqueue.NewMonitor("example", redisCli) + listener := &MyProfiler{ + Start: start.Unix(), + } + monitor.ListenEvent(listener) + + // print metrics every minute + tick := time.Tick(time.Minute) + go func() { + for range tick { + minutes := len(listener.List)-1 + fmt.Printf("%d: %#v", minutes, listener.List[minutes]) + } + }() + + // start test + for i := 0; i < 10; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0, delayqueue.WithRetryCount(3)) + if err != nil { + panic(err) + } + } + done := queue.StartConsume() + <-done +} diff --git a/monitor.go b/monitor.go new file mode 100644 index 0000000..01c5d3a --- /dev/null +++ b/monitor.go @@ -0,0 +1,72 @@ +package delayqueue + +import ( + "log" + + "github.com/redis/go-redis/v9" +) + +// Monitor can get running status and events of DelayQueue +type Monitor struct { + inner *DelayQueue +} + +// NewMonitor0 creates a new Monitor by a RedisCli instance +func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor { + opts = append(opts, noCallbackOpt(1)) + return &Monitor{ + inner: NewQueue0(name, cli, nil, opts...), + } +} + +// NewPublisher creates a new Publisher by a *redis.Client +func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor { + rc := &redisV9Wrapper{ + inner: cli, + } + return NewMonitor0(name, rc, opts...) +} + +// WithLogger customizes logger for queue +func (m *Monitor) WithLogger(logger *log.Logger) *Monitor { + m.inner.logger = logger + return m +} + +// GetPendingCount returns the number of messages which delivery time has not arrived +func (m *Monitor) GetPendingCount() (int64, error) { + return m.inner.GetPendingCount() +} + +// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet +func (m *Monitor) GetReadyCount() (int64, error) { + return m.inner.GetReadyCount() +} + +// GetProcessingCount returns the number of messages which are being processed +func (m *Monitor) GetProcessingCount() (int64, error) { + return m.inner.GetProcessingCount() +} + +// ListenEvent register a listener which will be called when events occured in this queue +// so it can be used to monitor running status +// returns: close function, error +func (m *Monitor) ListenEvent(listener EventListener) (func(), error) { + reportChan := genReportChannel(m.inner.name) + sub, closer, err := m.inner.redisCli.Subscribe(reportChan) + if err != nil { + return nil, err + } + go func() { + for payload := range sub { + event, err := decodeEvent(payload) + if err != nil { + m.inner.logger.Printf("[listen event] %v\n", event) + } else { + listener.OnEvent(event) + } + } + }() + return closer, nil +} + diff --git a/monitor_test.go b/monitor_test.go new file mode 100644 index 0000000..6766c3d --- /dev/null +++ b/monitor_test.go @@ -0,0 +1,163 @@ +package delayqueue + +import ( + "context" + "log" + "os" + "strconv" + "testing" + + "github.com/redis/go-redis/v9" +) + +func TestMonitor_get_status(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 1000 + cb := func(s string) bool { + return true + } + logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags) + queue := NewQueue("test", redisCli, cb) + monitor := NewMonitor("test", redisCli).WithLogger(logger) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0) + if err != nil { + t.Error(err) + } + } + + // test pengding count + pending, err := monitor.GetPendingCount() + if err != nil { + t.Error(err) + return + } + if int(pending) != size { + t.Errorf("execting %d, got %d", int(pending), size) + return + } + + // test ready count + err = queue.pending2Ready() + if err != nil { + t.Errorf("consume error: %v", err) + return + } + ready, err := monitor.GetReadyCount() + if err != nil { + t.Error(err) + return + } + if int(ready) != size { + t.Errorf("execting %d, got %d", int(pending), size) + return + } + + // test processing count + for i := 0; i < size/2; i++ { + _ , _ = queue.ready2Unack() + } + processing, err := monitor.GetProcessingCount() + if err != nil { + t.Error(err) + return + } + if int(processing) != size/2 { + t.Errorf("execting %d, got %d", int(pending), size/2) + return + } +} + +type MyProfiler struct { + ProduceCount int + DeliverCount int + ConsumeCount int + RetryCount int + FailCount int +} + +func (p *MyProfiler) OnEvent(event *Event) { + switch event.Code { + case NewMessageEvent: + p.ProduceCount += event.MsgCount + case DeliveredEvent: + p.DeliverCount += event.MsgCount + case AckEvent: + p.ConsumeCount += event.MsgCount + case RetryEvent: + p.RetryCount += event.MsgCount + case FinalFailedEvent: + p.FailCount += event.MsgCount + } +} + +func TestMonitor_listener1(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 1000 + cb := func(s string) bool { + return true + } + queue := NewQueue("test", redisCli, cb) + queue.EnableReport() + monitor := NewMonitor("test", redisCli) + profile := &MyProfiler{} + monitor.ListenEvent(profile) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0) + if err != nil { + t.Error(err) + } + } + queue.consume() + + if profile.ProduceCount != size { + t.Error("wrong produce count") + } + if profile.DeliverCount != size { + t.Error("wrong deliver count") + } + if profile.ConsumeCount != size { + t.Error("wrong consume count") + } +} + +func TestMonitor_listener2(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 1000 + cb := func(s string) bool { + return false + } + queue := NewQueue("test", redisCli, cb).WithDefaultRetryCount(1) + queue.EnableReport() + monitor := NewMonitor("test", redisCli) + profile := &MyProfiler{} + monitor.ListenEvent(profile) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0) + if err != nil { + t.Error(err) + } + } + for i := 0; i < 3; i++ { + queue.consume() + } + + if profile.RetryCount != size { + t.Error("wrong deliver count") + } + if profile.FailCount != size { + t.Error("wrong consume count") + } +} \ No newline at end of file diff --git a/publisher.go b/publisher.go index f9bb8aa..387a7f4 100644 --- a/publisher.go +++ b/publisher.go @@ -13,7 +13,7 @@ type Publisher struct { } // NewPublisher0 creates a new Publisher by a RedisCli instance -func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher { +func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher { opts = append(opts, noCallbackOpt(1)) return &Publisher{ inner: NewQueue0(name, cli, nil, opts...), @@ -21,7 +21,7 @@ func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher { } // NewPublisher creates a new Publisher by a *redis.Client -func NewPublisher(name string, cli *redis.Client, opts ...interface{}) *Publisher { +func NewPublisher(name string, cli *redis.Client, opts ...interface{}) *Publisher { rc := &redisV9Wrapper{ inner: cli, } @@ -42,4 +42,4 @@ func (p *Publisher) SendScheduleMsg(payload string, t time.Time, opts ...interfa // SendDelayMsg submits a message delivered after given duration func (p *Publisher) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error { return p.inner.SendDelayMsg(payload, duration, opts...) -} \ No newline at end of file +} diff --git a/wrapper.go b/wrapper.go index 6c28f69..d663eb6 100644 --- a/wrapper.go +++ b/wrapper.go @@ -92,6 +92,37 @@ func (r *redisV9Wrapper) ZRem(key string, members []string) error { return wrapErr(r.inner.ZRem(ctx, key, members2...).Err()) } +func (r *redisV9Wrapper) ZCard(key string) (int64, error) { + ctx := context.Background() + return r.inner.ZCard(ctx, key).Result() +} + +func (r *redisV9Wrapper) LLen(key string) (int64, error) { + ctx := context.Background() + return r.inner.LLen(ctx, key).Result() +} + +func (r *redisV9Wrapper) Publish(channel string, payload string) error { + ctx := context.Background() + return r.inner.Publish(ctx, channel, payload).Err() +} + +func (r *redisV9Wrapper) Subscribe(channel string) (<-chan string, func(), error) { + ctx := context.Background() + sub := r.inner.Subscribe(ctx, channel) + close := func() { + _ = sub.Close() + } + resultChan := make(chan string) // sub.Channel() has its own buffer + go func() { + for msg := range sub.Channel() { + resultChan <- msg.Payload + } + }() + + return resultChan, close, nil +} + type redisClusterWrapper struct { inner *redis.ClusterClient } @@ -164,6 +195,37 @@ func (r *redisClusterWrapper) ZRem(key string, members []string) error { return wrapErr(r.inner.ZRem(ctx, key, members2...).Err()) } +func (r *redisClusterWrapper) ZCard(key string) (int64, error) { + ctx := context.Background() + return r.inner.ZCard(ctx, key).Result() +} + +func (r *redisClusterWrapper) LLen(key string) (int64, error) { + ctx := context.Background() + return r.inner.LLen(ctx, key).Result() +} + +func (r *redisClusterWrapper) Publish(channel string, payload string) error { + ctx := context.Background() + return r.inner.Publish(ctx, channel, payload).Err() +} + +func (r *redisClusterWrapper) Subscribe(channel string) (<-chan string, func(), error) { + ctx := context.Background() + sub := r.inner.Subscribe(ctx, channel) + close := func() { + _ = sub.Close() + } + resultChan := make(chan string) // sub.Channel() has its own buffer + go func() { + for msg := range sub.Channel() { + resultChan <- msg.Payload + } + }() + + return resultChan, close, nil +} + func NewQueueOnCluster(name string, cli *redis.ClusterClient, callback func(string) bool, opts ...interface{}) *DelayQueue { rc := &redisClusterWrapper{ inner: cli,