From 95790b6e064764b9ea785acc3061f294c0280d0b Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Wed, 23 Apr 2025 19:26:26 +0200 Subject: [PATCH] Added count argument for better distribution. --- taskiq_redis/redis_broker.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 10baff9..4be8337 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -168,6 +168,7 @@ def __init__( maxlen: Optional[int] = None, idle_timeout: int = 600000, # 10 minutes unacknowledged_batch_size: int = 100, + xread_count: Optional[int] = 100, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -189,6 +190,7 @@ def __init__( Better to set it to a bigger value, to avoid unnecessary calls. :param maxlen: sets the maximum length of the stream trims (the old values of) the stream each time a new element is added + :param xread_count: number of messages to fetch from the stream at once. :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. :param redeliver_timeout: time in ms to wait before redelivering a message. @@ -211,6 +213,7 @@ def __init__( self.additional_streams = additional_streams or {} self.idle_timeout = idle_timeout self.unacknowledged_batch_size = unacknowledged_batch_size + self.count = xread_count async def _declare_consumer_group(self) -> None: """ @@ -276,6 +279,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: }, block=self.block, noack=False, + count=self.count, ) for _, msg_list in fetched: for msg_id, msg in msg_list: