Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis schedulePending target messages continue to increase #144

Open
sun617 opened this issue Oct 13, 2021 · 1 comment
Open

redis schedulePending target messages continue to increase #144

sun617 opened this issue Oct 13, 2021 · 1 comment

Comments

@sun617
Copy link

sun617 commented Oct 13, 2021

Example

  • Create stream(taskq:{test}:stream) and consumer group(taskq)
127.0.0.1:6379> XGROUP CREATE taskq:{test}:stream taskq $ MKSTREAM
OK
  • Add 3 messages
127.0.0.1:6379> XADD taskq:{test}:stream * body apple
"1634099225543-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body orange
"1634099233517-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body banana
"1634099237506-0"
127.0.0.1:6379> XLEN taskq:{test}:stream
(integer) 3
  • This time pending list is empty
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
(empty array)
  • Consumer foo allocated 3 messages
127.0.0.1:6379> XREADGROUP GROUP taskq foo STREAMS taskq:{test}:stream >
1) 1) "taskq:{test}:stream"
   2) 1) 1) "1634099225543-0"
         2) 1) "body"
            2) "apple"
      2) 1) "1634099233517-0"
         2) 1) "body"
            2) "orange"
      3) 1) "1634099237506-0"
         2) 1) "body"
            2) "banana"
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
2) 1) "1634099233517-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
3) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
  • When message processed successfully the message will be deleted, but pending list remains 3 messages
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099225543-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1
2) 1) "1634099233517-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1
3) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1

All messages will leaves in pending list that schedulePending method will got an error trying to release a deleted messages
issue: #143

When Delete and Release do we should execute XACK and then execute XDEL
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L225
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L245

127.0.0.1:6379> XACK taskq:{test}:stream taskq 1634099233517-0
(integer) 1
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099233517-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 199531
   4) (integer) 1
2) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 199531
   4) (integer) 1
@lzap
Copy link

lzap commented Jan 2, 2023

Hello, is this still an issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants