Skip to content

Commit 764fccb

Browse files
committed
Finish 0.2.0
2 parents aa11cb7 + 7f365b5 commit 764fccb

File tree

6 files changed

+174
-49
lines changed

6 files changed

+174
-49
lines changed

.github/workflows/test.yml

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
name: Testing taskiq-redis
22

3-
on: push
3+
on:
4+
push:
5+
branches:
6+
- main
7+
- develop
8+
pull_request:
49

510
jobs:
611
black:
@@ -78,3 +83,12 @@ jobs:
7883
POETRY_VIRTUALENVS_CREATE: false
7984
- name: Run pytest check
8085
run: poetry run pytest -vv -n auto --cov="taskiq_redis" .
86+
- name: Generate report
87+
run: poetry run coverage xml
88+
- name: Upload coverage reports to Codecov with GitHub Action
89+
uses: codecov/codecov-action@v3
90+
if: matrix.py_version == '3.9'
91+
with:
92+
token: ${{ secrets.CODECOV_TOKEN }}
93+
fail_ci_if_error: true
94+
verbose: true

README.md

+14-5
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,19 @@ pip install taskiq-redis
1616
# Usage
1717

1818
Let's see the example with the redis broker and redis async result:
19+
1920
```python
2021
import asyncio
2122

22-
from taskiq_redis.redis_broker import RedisBroker
23+
from taskiq_redis.redis_broker import ListQueueBroker
2324
from taskiq_redis.redis_backend import RedisAsyncResultBackend
2425

25-
2626
redis_async_result = RedisAsyncResultBackend(
2727
redis_url="redis://localhost:6379",
2828
)
2929

30-
broker = RedisBroker(
30+
# Or you can use PubSubBroker if you need broadcasting
31+
broker = ListQueueBroker(
3132
url="redis://localhost:6379",
3233
result_backend=redis_async_result,
3334
)
@@ -48,9 +49,17 @@ async def main():
4849
asyncio.run(main())
4950
```
5051

51-
## RedisBroker configuration
52+
## PubSubBroker and ListQueueBroker configuration
53+
54+
We have two brokers with similar interfaces, but with different logic.
55+
The PubSubBroker uses redis' pubsub mechanism and is very powerful,
56+
but it executes every task on all workers, because PUBSUB broadcasts message
57+
to all subscribers.
58+
59+
If you want your messages to be processed only once, please use ListQueueBroker.
60+
It uses redis' [LPUSH](https://redis.io/commands/lpush/) and [BRPOP](https://redis.io/commands/brpop/) commands to deal with messages.
5261

53-
RedisBroker parameters:
62+
Brokers parameters:
5463
* `url` - url to redis.
5564
* `task_id_generator` - custom task_id genertaor.
5665
* `result_backend` - custom result backend.

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq-redis"
3-
version = "0.1.0"
3+
version = "0.2.0"
44
description = "Redis integration for taskiq"
55
authors = ["taskiq-team <[email protected]>"]
66
readme = "README.md"

taskiq_redis/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Package for redis integration."""
22
from taskiq_redis.redis_backend import RedisAsyncResultBackend
3-
from taskiq_redis.redis_broker import RedisBroker
3+
from taskiq_redis.redis_broker import ListQueueBroker
44

5-
__all__ = ["RedisAsyncResultBackend", "RedisBroker"]
5+
__all__ = ["RedisAsyncResultBackend", "ListQueueBroker"]

taskiq_redis/redis_broker.py

+58-40
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pickle
2+
from abc import abstractmethod
23
from logging import getLogger
34
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
45

@@ -12,8 +13,8 @@
1213
logger = getLogger("taskiq.redis_broker")
1314

1415

15-
class RedisBroker(AsyncBroker):
16-
"""Broker that works with Redis."""
16+
class BaseRedisBroker(AsyncBroker):
17+
"""Base broker that works with Redis."""
1718

1819
def __init__(
1920
self,
@@ -44,31 +45,12 @@ def __init__(
4445
max_connections=max_connection_pool_size,
4546
**connection_kwargs,
4647
)
47-
48-
self.redis_pubsub_channel = queue_name
48+
self.queue_name = queue_name
4949

5050
async def shutdown(self) -> None:
5151
"""Closes redis connection pool."""
5252
await self.connection_pool.disconnect()
5353

54-
async def kick(self, message: BrokerMessage) -> None:
55-
"""
56-
Sends a message to the redis broker list.
57-
58-
This function constructs message for redis
59-
and sends it.
60-
61-
The message is pickled dict object with message,
62-
task_id, task_name and labels.
63-
64-
:param message: message to send.
65-
"""
66-
async with Redis(connection_pool=self.connection_pool) as redis_conn:
67-
await redis_conn.publish(
68-
self.redis_pubsub_channel,
69-
pickle.dumps(message),
70-
)
71-
7254
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
7355
"""
7456
Listen redis queue for new messages.
@@ -78,24 +60,60 @@ async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
7860
7961
:yields: broker messages.
8062
"""
63+
async for message in self._listen_to_raw_messages():
64+
try:
65+
redis_message = pickle.loads(message)
66+
if isinstance(redis_message, BrokerMessage):
67+
yield redis_message
68+
except (
69+
TypeError,
70+
AttributeError,
71+
pickle.UnpicklingError,
72+
) as exc:
73+
logger.debug(
74+
"Cannot read broker message %s",
75+
exc,
76+
exc_info=True,
77+
)
78+
79+
@abstractmethod
80+
async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]:
81+
"""
82+
Generator for reading raw data from Redis.
83+
84+
:yields: raw data.
85+
"""
86+
yield # type: ignore
87+
88+
89+
class PubSubBroker(BaseRedisBroker):
90+
"""Broker that works with Redis and broadcasts tasks to all workers."""
91+
92+
async def kick(self, message: BrokerMessage) -> None: # noqa: D102
93+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
94+
await redis_conn.publish(self.queue_name, pickle.dumps(message))
95+
96+
async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]:
8197
async with Redis(connection_pool=self.connection_pool) as redis_conn:
8298
redis_pubsub_channel = redis_conn.pubsub()
83-
await redis_pubsub_channel.subscribe(self.redis_pubsub_channel)
99+
await redis_pubsub_channel.subscribe(self.queue_name)
84100
async for message in redis_pubsub_channel.listen():
85-
if message:
86-
try:
87-
redis_message = pickle.loads(
88-
message["data"],
89-
)
90-
if isinstance(redis_message, BrokerMessage):
91-
yield redis_message
92-
except (
93-
TypeError,
94-
AttributeError,
95-
pickle.UnpicklingError,
96-
) as exc:
97-
logger.debug(
98-
"Cannot read broker message %s",
99-
exc,
100-
exc_info=True,
101-
)
101+
if not message:
102+
continue
103+
yield message["data"]
104+
105+
106+
class ListQueueBroker(BaseRedisBroker):
107+
"""Broker that works with Redis and distributes tasks between workers."""
108+
109+
async def kick(self, message: BrokerMessage) -> None: # noqa: D102
110+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
111+
await redis_conn.lpush(self.queue_name, pickle.dumps(message))
112+
113+
async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]:
114+
redis_brpop_data_position = 1
115+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
116+
while True: # noqa: WPS457
117+
yield (await redis_conn.brpop(self.queue_name))[
118+
redis_brpop_data_position
119+
]

tests/test_broker.py

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import asyncio
2+
import uuid
3+
4+
import pytest
5+
from taskiq import AsyncBroker, BrokerMessage
6+
7+
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
8+
9+
10+
async def get_message(broker: AsyncBroker) -> BrokerMessage: # type: ignore
11+
"""
12+
Get a message from the broker.
13+
14+
:param broker: async message broker.
15+
:return: first message from listen method.
16+
"""
17+
async for message in broker.listen(): # noqa: WPS328
18+
return message
19+
20+
21+
@pytest.fixture
22+
def valid_broker_message() -> BrokerMessage:
23+
"""
24+
Generate valid broker message for tests.
25+
26+
:returns: broker message.
27+
"""
28+
return BrokerMessage(
29+
task_id=uuid.uuid4().hex,
30+
task_name=uuid.uuid4().hex,
31+
message="my_msg",
32+
labels={
33+
"label1": "val1",
34+
},
35+
)
36+
37+
38+
@pytest.mark.anyio
39+
async def test_pub_sub_broker(
40+
valid_broker_message: BrokerMessage,
41+
redis_url: str,
42+
) -> None:
43+
"""
44+
Test that messages are published and read correctly by PubSubBroker.
45+
46+
We create two workers that listen and send a message to them.
47+
Expect both workers to receive the same message we sent.
48+
"""
49+
broker = PubSubBroker(url=redis_url, queue_name=uuid.uuid4().hex)
50+
worker1_task = asyncio.create_task(get_message(broker))
51+
worker2_task = asyncio.create_task(get_message(broker))
52+
await asyncio.sleep(0.3)
53+
54+
await broker.kick(valid_broker_message)
55+
await asyncio.sleep(0.3)
56+
57+
message1 = worker1_task.result()
58+
message2 = worker2_task.result()
59+
assert message1 == valid_broker_message
60+
assert message1 == message2
61+
62+
63+
@pytest.mark.anyio
64+
async def test_list_queue_broker(
65+
valid_broker_message: BrokerMessage,
66+
redis_url: str,
67+
) -> None:
68+
"""
69+
Test that messages are published and read correctly by ListQueueBroker.
70+
71+
We create two workers that listen and send a message to them.
72+
Expect only one worker to receive the same message we sent.
73+
"""
74+
broker = ListQueueBroker(url=redis_url, queue_name=uuid.uuid4().hex)
75+
worker1_task = asyncio.create_task(get_message(broker))
76+
worker2_task = asyncio.create_task(get_message(broker))
77+
await asyncio.sleep(0.3)
78+
79+
await broker.kick(valid_broker_message)
80+
await asyncio.sleep(0.3)
81+
82+
assert worker1_task.done() != worker2_task.done()
83+
message = worker1_task.result() if worker1_task.done() else worker2_task.result()
84+
assert message == valid_broker_message

0 commit comments

Comments
 (0)