Skip to content

Commit fe2bfdf

Browse files
committed
feat: support custom queue_name per message for all kick methods
1 parent e1eca8a commit fe2bfdf

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,17 @@ scheduler = TaskiqScheduler(broker, [array_source])
189189
```
190190
191191
During startup the scheduler will try to migrate schedules from an old source to a new one. Please be sure to specify different prefixe just to avoid any kind of collision between these two.
192+
193+
194+
## Dynamic queue names
195+
196+
197+
Brokers supports dynamic queue names, allowing you to specify different queues when kicking tasks. This is useful for routing tasks to specific queues based on runtime conditions, such as priority levels, tenant isolation, or environment-specific processing.
198+
199+
Simply pass the desired queue name as message's label when kicking a task to override the broker's default queue configuration.
200+
201+
```python
202+
@broker.task(queue_name="low_priority")
203+
async def low_priority_task() -> None:
204+
print("I don't mind waiting a little longer")
205+
```

taskiq_redis/redis_broker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,10 @@ async def kick(self, message: BrokerMessage) -> None:
251251
252252
:param message: message to append.
253253
"""
254+
queue_name = message.labels.get("queue_name") or self.queue_name
254255
async with Redis(connection_pool=self.connection_pool) as redis_conn:
255256
await redis_conn.xadd(
256-
self.queue_name,
257+
queue_name,
257258
{b"data": message.message},
258259
maxlen=self.maxlen,
259260
approximate=self.approximate,

taskiq_redis/redis_sentinel_broker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,10 @@ async def kick(self, message: BrokerMessage) -> None:
230230
231231
:param message: message to append.
232232
"""
233+
queue_name = message.labels.get("queue_name") or self.queue_name
233234
async with self._acquire_master_conn() as redis_conn:
234235
await redis_conn.xadd(
235-
self.queue_name,
236+
queue_name,
236237
{b"data": message.message},
237238
maxlen=self.maxlen,
238239
approximate=self.approximate,

0 commit comments

Comments
 (0)