Skip to content

Commit aa11cb7

Browse files
committedMar 13, 2023
Merge branch 'release/0.1.0'
2 parents dd9b66e + 459b814 commit aa11cb7

File tree

8 files changed

+861
-821
lines changed

8 files changed

+861
-821
lines changed
 

‎README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ from taskiq_redis.redis_backend import RedisAsyncResultBackend
2424

2525

2626
redis_async_result = RedisAsyncResultBackend(
27-
url="redis://localhost:6379",
27+
redis_url="redis://localhost:6379",
2828
)
2929

3030
broker = RedisBroker(
@@ -41,7 +41,7 @@ async def best_task_ever() -> None:
4141

4242

4343
async def main():
44-
task = await my_async_task.kiq()
44+
task = await best_task_ever.kiq()
4545
print(await task.get_result())
4646

4747

@@ -60,4 +60,5 @@ RedisBroker parameters:
6060
## RedisAsyncResultBackend configuration
6161

6262
RedisAsyncResultBackend parameters:
63-
* `url` - url to redis.
63+
* `redis_url` - url to redis.
64+
* `keep_results` - flag to not remove results from Redis after reading.

‎poetry.lock

+781-803
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pyproject.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq-redis"
3-
version = "0.0.4"
3+
version = "0.1.0"
44
description = "Redis integration for taskiq"
55
authors = ["taskiq-team <taskiq@norely.com>"]
66
readme = "README.md"
@@ -19,7 +19,7 @@ keywords = ["taskiq", "tasks", "distributed", "async", "redis", "result_backend"
1919

2020
[tool.poetry.dependencies]
2121
python = "^3.7"
22-
taskiq = "^0"
22+
taskiq = "^0.2.0"
2323
redis = "^4.2.0"
2424

2525
[tool.poetry.dev-dependencies]

‎taskiq_redis/redis_backend.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,15 @@
1111
class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
1212
"""Async result based on redis."""
1313

14-
def __init__(self, redis_url: str):
14+
def __init__(self, redis_url: str, keep_results: bool = True):
15+
"""
16+
Constructs a new result backend.
17+
18+
:param redis_url: url to redis.
19+
:param keep_results: flag to not remove results from Redis after reading.
20+
"""
1521
self.redis_pool = ConnectionPool.from_url(redis_url)
22+
self.keep_results = keep_results
1623

1724
async def shutdown(self) -> None:
1825
"""Closes redis connection."""
@@ -80,6 +87,9 @@ async def get_result( # noqa: WPS210
8087
keys=fields,
8188
)
8289

90+
if not self.keep_results:
91+
await redis.delete(task_id)
92+
8393
result = {
8494
result_key: pickle.loads(result_value)
8595
for result_value, result_key in zip(result_values, fields)

‎taskiq_redis/redis_broker.py

+7-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import asyncio
21
import pickle
32
from logging import getLogger
4-
from typing import Any, Callable, Coroutine, Optional, TypeVar
3+
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
54

65
from redis.asyncio import ConnectionPool, Redis
76
from taskiq.abc.broker import AsyncBroker
@@ -70,19 +69,15 @@ async def kick(self, message: BrokerMessage) -> None:
7069
pickle.dumps(message),
7170
)
7271

73-
async def listen(
74-
self,
75-
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
76-
) -> None:
72+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
7773
"""
78-
Listen redis list for new messages.
74+
Listen redis queue for new messages.
7975
80-
This function listens to list calls callback on
81-
new messages.
76+
This function listens to the queue
77+
and yields new messages if they have BrokerMessage type.
8278
83-
:param callback: function to call on new message.
79+
:yields: broker messages.
8480
"""
85-
loop = asyncio.get_event_loop()
8681
async with Redis(connection_pool=self.connection_pool) as redis_conn:
8782
redis_pubsub_channel = redis_conn.pubsub()
8883
await redis_pubsub_channel.subscribe(self.redis_pubsub_channel)
@@ -93,7 +88,7 @@ async def listen(
9388
message["data"],
9489
)
9590
if isinstance(redis_message, BrokerMessage):
96-
loop.create_task(callback(redis_message))
91+
yield redis_message
9792
except (
9893
TypeError,
9994
AttributeError,
File renamed without changes.
File renamed without changes.

‎taskiq_redis/tests/test_result_backend.py renamed to ‎tests/test_result_backend.py

+56
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,59 @@ async def test_fetch_without_logs(redis_url: str) -> None:
6868
assert fetched_result.return_value == 11
6969
assert fetched_result.execution_time == 112.2 # noqa: WPS459
7070
assert fetched_result.is_err
71+
72+
73+
@pytest.mark.anyio
74+
async def test_remove_results_after_reading(redis_url: str) -> None:
75+
"""
76+
Check if removing results after reading works fine.
77+
78+
:param redis_url: redis URL.
79+
"""
80+
result_backend = RedisAsyncResultBackend( # type: ignore
81+
redis_url=redis_url,
82+
keep_results=False,
83+
)
84+
task_id = uuid.uuid4().hex
85+
result: "TaskiqResult[int]" = TaskiqResult(
86+
is_err=True,
87+
log="My Log",
88+
return_value=11,
89+
execution_time=112.2,
90+
)
91+
await result_backend.set_result(
92+
task_id=task_id,
93+
result=result,
94+
)
95+
96+
await result_backend.get_result(task_id=task_id)
97+
with pytest.raises(Exception):
98+
await result_backend.get_result(task_id=task_id)
99+
100+
101+
@pytest.mark.anyio
102+
async def test_keep_results_after_reading(redis_url: str) -> None:
103+
"""
104+
Check if keeping results after reading works fine.
105+
106+
:param redis_url: redis URL.
107+
"""
108+
result_backend = RedisAsyncResultBackend( # type: ignore
109+
redis_url=redis_url,
110+
keep_results=True,
111+
)
112+
task_id = uuid.uuid4().hex
113+
result: "TaskiqResult[int]" = TaskiqResult(
114+
is_err=True,
115+
log="My Log",
116+
return_value=11,
117+
execution_time=112.2,
118+
)
119+
await result_backend.set_result(
120+
task_id=task_id,
121+
result=result,
122+
)
123+
124+
res1 = await result_backend.get_result(task_id=task_id)
125+
res2 = await result_backend.get_result(task_id=task_id)
126+
assert res1 == res2

0 commit comments

Comments
 (0)