Skip to content

Commit

Permalink
refactor concurrent consume
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed Sep 27, 2024
1 parent 0812e5a commit a752279
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 93 deletions.
184 changes: 102 additions & 82 deletions delayqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"log"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand All @@ -14,27 +14,31 @@ import (
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
type DelayQueue struct {
// name for this Queue. Make sure the name is unique in redis database
name string
redisCli RedisCli
cb func(string) bool
pendingKey string // sorted set: message id -> delivery time
readyKey string // list
unAckKey string // sorted set: message id -> retry time
retryKey string // list
retryCountKey string // hash: message id -> remain retry count
garbageKey string // set: message id
useHashTag bool
ticker *time.Ticker
logger *log.Logger
close chan struct{}

name string
redisCli RedisCli
cb func(string) bool
pendingKey string // sorted set: message id -> delivery time
readyKey string // list
unAckKey string // sorted set: message id -> retry time
retryKey string // list
retryCountKey string // hash: message id -> remain retry count
garbageKey string // set: message id
useHashTag bool
ticker *time.Ticker
logger *log.Logger
close chan struct{}
running int32
maxConsumeDuration time.Duration // default 5 seconds
msgTTL time.Duration // default 1 hour
defaultRetryCount uint // default 3
fetchInterval time.Duration // default 1 second
fetchLimit uint // default no limit
fetchCount int32 // actually running task number
concurrent uint // default 1, executed serially

// for batch consume
consumeBuffer chan string

eventListener EventListener
}

Expand Down Expand Up @@ -163,8 +167,9 @@ func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
// WithConcurrent sets the number of concurrent consumers
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
if c == 0 {
return q
panic("concurrent cannot be 0")
}
q.assertNotRunning()
q.concurrent = c
return q
}
Expand Down Expand Up @@ -339,44 +344,8 @@ func (q *DelayQueue) callback(idStr string) error {
return err
}

// batchCallback calls DelayQueue.callback in batch. callback is executed concurrently according to property DelayQueue.concurrent
// batchCallback must wait all callback finished, otherwise the actual number of processing messages may beyond DelayQueue.FetchLimit
func (q *DelayQueue) batchCallback(ids []string) {
if len(ids) == 1 || q.concurrent == 1 {
for _, id := range ids {
err := q.callback(id)
if err != nil {
q.logger.Printf("consume msg %s failed: %v", id, err)
}
}
return
}
ch := make(chan string, len(ids))
for _, id := range ids {
ch <- id
}
close(ch)
wg := sync.WaitGroup{}
concurrent := int(q.concurrent)
if concurrent > len(ids) { // too many goroutines is no use
concurrent = len(ids)
}
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
for id := range ch {
err := q.callback(id)
if err != nil {
q.logger.Printf("consume msg %s failed: %v", id, err)
}
}
}()
}
wg.Wait()
}

func (q *DelayQueue) ack(idStr string) error {
atomic.AddInt32(&q.fetchCount, -1)
err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
Expand All @@ -389,6 +358,7 @@ func (q *DelayQueue) ack(idStr string) error {
}

func (q *DelayQueue) nack(idStr string) error {
atomic.AddInt32(&q.fetchCount, -1)
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
idStr: float64(time.Now().Unix()),
Expand Down Expand Up @@ -501,60 +471,91 @@ func (q *DelayQueue) garbageCollect() error {
return nil
}

func (q *DelayQueue) consume() error {
func (q *DelayQueue) beforeConsume() ([]string, error) {
// pending to ready
err := q.pending2Ready()
if err != nil {
return err
return nil, err
}
// consume
// ready2Unack
// prioritize new message consumption to avoid avalanches
ids := make([]string, 0, q.fetchLimit)
var fetchCount int32
for {
fetchCount = atomic.LoadInt32(&q.fetchCount)
if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
break
}
idStr, err := q.ready2Unack()
if err == NilErr { // consumed all
break
}
if err != nil {
return err
return nil, err
}
ids = append(ids, idStr)
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
break
}
atomic.AddInt32(&q.fetchCount, 1)
}
if len(ids) > 0 {
q.batchCallback(ids)
// retry2Unack
if fetchCount < int32(q.fetchLimit) || q.fetchLimit == 0 {
for {
fetchCount = atomic.LoadInt32(&q.fetchCount)
if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
break
}
idStr, err := q.retry2Unack()
if err == NilErr { // consumed all
break
}
if err != nil {
return nil, err
}
ids = append(ids, idStr)
atomic.AddInt32(&q.fetchCount, 1)
}
}
return ids, nil
}

func (q *DelayQueue) afterConsume() error {
// unack to retry
err = q.unack2Retry()
err := q.unack2Retry()
if err != nil {
return err
}
err = q.garbageCollect()
if err != nil {
return err
}
// retry
ids = make([]string, 0, q.fetchLimit)
for {
idStr, err := q.retry2Unack()
if err == NilErr { // consumed all
break
}
if err != nil {
return err
}
ids = append(ids, idStr)
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
break
}
}
if len(ids) > 0 {
q.batchCallback(ids)
}
return nil
}

func (q *DelayQueue) setRunning() {
atomic.StoreInt32(&q.running, 1)
}

func (q *DelayQueue) setNotRunning() {
atomic.StoreInt32(&q.running, 0)
}

func (q *DelayQueue) assertNotRunning() {
running := atomic.LoadInt32(&q.running)
if running > 0 {
panic("operation cannot be performed during running")
}
}

func (q *DelayQueue)goWithRecover(fn func()) {
go func () {
defer func () {
if err := recover(); err != nil {
q.logger.Printf("panic: %v\n", err)
}
}()
fn()
}()
}

// StartConsume creates a goroutine to consume message from DelayQueue
// use `<-done` to wait consumer stopping
// If there is no callback set, StartConsume will panic
Expand All @@ -563,17 +564,34 @@ func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
panic("this instance has no callback")
}
q.close = make(chan struct{}, 1)
q.setRunning()
q.ticker = time.NewTicker(q.fetchInterval)
q.consumeBuffer = make(chan string, q.fetchLimit)
done0 := make(chan struct{})
// start worker
for i := 0; i < int(q.concurrent); i++ {
q.goWithRecover(func() {
for id := range q.consumeBuffer {
q.callback(id)
q.afterConsume()
}
})
}
// start main loop
go func() {
tickerLoop:
for {
select {
case <-q.ticker.C:
err := q.consume()
ids, err := q.beforeConsume()
if err != nil {
log.Printf("consume error: %v", err)
}
q.goWithRecover(func() {
for _, id := range ids {
q.consumeBuffer <- id
}
})
case <-q.close:
break tickerLoop
}
Expand All @@ -586,9 +604,11 @@ func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
// StopConsume stops consumer goroutine
func (q *DelayQueue) StopConsume() {
close(q.close)
q.setNotRunning()
if q.ticker != nil {
q.ticker.Stop()
}
close(q.consumeBuffer)
}

// GetPendingCount returns the number of pending messages
Expand Down
Loading

0 comments on commit a752279

Please sign in to comment.