File tree 5 files changed +22
-5
lines changed
5 files changed +22
-5
lines changed Original file line number Diff line number Diff line change @@ -18,6 +18,7 @@ pip install taskiq-redis
18
18
Let's see the example with the redis broker and redis async result:
19
19
20
20
``` python
21
+ # broker.py
21
22
import asyncio
22
23
23
24
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
@@ -42,12 +43,18 @@ async def best_task_ever() -> None:
42
43
43
44
async def main ():
44
45
task = await best_task_ever.kiq()
45
- print (await task.get_result ())
46
+ print (await task.wait_result ())
46
47
47
48
48
- asyncio.run(main())
49
+ if __name__ == " __main__" :
50
+ asyncio.run(main())
49
51
```
50
52
53
+ Launch the workers:
54
+ ` taskiq worker broker:broker `
55
+ Then run the main code:
56
+ ` python3 broker.py `
57
+
51
58
## PubSubBroker and ListQueueBroker configuration
52
59
53
60
We have two brokers with similar interfaces, but with different logic.
Original file line number Diff line number Diff line change 1
1
[tool .poetry ]
2
2
name = " taskiq-redis"
3
- version = " 0.3.0 "
3
+ version = " 0.3.1 "
4
4
description = " Redis integration for taskiq"
5
5
authors = [
" taskiq-team <[email protected] >" ]
6
6
readme = " README.md"
Original file line number Diff line number Diff line change @@ -8,3 +8,7 @@ class DuplicateExpireTimeSelectedError(TaskIQRedisError):
8
8
9
9
class ExpireTimeMustBeMoreThanZeroError (TaskIQRedisError ):
10
10
"""Error if two lifetimes are less or equal zero."""
11
+
12
+
13
+ class ResultIsMissingError (TaskIQRedisError ):
14
+ """Error if there is no result when trying to get it."""
Original file line number Diff line number Diff line change 8
8
from taskiq_redis .exceptions import (
9
9
DuplicateExpireTimeSelectedError ,
10
10
ExpireTimeMustBeMoreThanZeroError ,
11
+ ResultIsMissingError ,
11
12
)
12
13
13
14
_ReturnType = TypeVar ("_ReturnType" )
@@ -109,6 +110,7 @@ async def get_result( # noqa: WPS210
109
110
110
111
:param task_id: task's id.
111
112
:param with_logs: if True it will download task's logs.
113
+ :raises ResultIsMissingError: if there is no result when trying to get it.
112
114
:return: task's return value.
113
115
"""
114
116
async with Redis (connection_pool = self .redis_pool ) as redis :
@@ -121,6 +123,9 @@ async def get_result( # noqa: WPS210
121
123
name = task_id ,
122
124
)
123
125
126
+ if result_value is None :
127
+ raise ResultIsMissingError ()
128
+
124
129
taskiq_result : TaskiqResult [_ReturnType ] = pickle .loads (result_value )
125
130
126
131
if not with_logs :
Original file line number Diff line number Diff line change 9
9
from taskiq_redis .exceptions import (
10
10
DuplicateExpireTimeSelectedError ,
11
11
ExpireTimeMustBeMoreThanZeroError ,
12
+ ResultIsMissingError ,
12
13
)
13
14
14
15
_ReturnType = TypeVar ("_ReturnType" )
@@ -210,7 +211,7 @@ async def test_unsuccess_backend_expire_ex_param(
210
211
)
211
212
await asyncio .sleep (1.1 )
212
213
213
- with pytest .raises (TypeError ):
214
+ with pytest .raises (ResultIsMissingError ):
214
215
await backend .get_result (task_id = task_id )
215
216
216
217
@@ -270,5 +271,5 @@ async def test_unsuccess_backend_expire_px_param(
270
271
)
271
272
await asyncio .sleep (1.1 )
272
273
273
- with pytest .raises (TypeError ):
274
+ with pytest .raises (ResultIsMissingError ):
274
275
await backend .get_result (task_id = task_id )
You can’t perform that action at this time.
0 commit comments