Skip to content

Commit ec75232

Browse files
authored
Initial stream support. (#77)
* Initial stream support. * Added stream broker, updated redis. * Changed supported python version. * Readme updated.
1 parent 090a00a commit ec75232

13 files changed

+1133
-737
lines changed

.github/workflows/test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
pytest:
3333
strategy:
3434
matrix:
35-
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
35+
py_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
3636
runs-on: "ubuntu-latest"
3737
steps:
3838
- uses: actions/checkout@v4

README.md

+57-32
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,48 @@ Launch the workers:
5555
Then run the main code:
5656
`python3 broker.py`
5757

58-
## PubSubBroker and ListQueueBroker configuration
59-
60-
We have two brokers with similar interfaces, but with different logic.
61-
The PubSubBroker uses redis' pubsub mechanism and is very powerful,
62-
but it executes every task on all workers, because PUBSUB broadcasts message
63-
to all subscribers.
64-
65-
If you want your messages to be processed only once, please use ListQueueBroker.
66-
It uses redis' [LPUSH](https://redis.io/commands/lpush/) and [BRPOP](https://redis.io/commands/brpop/) commands to deal with messages.
67-
68-
Brokers parameters:
69-
* `url` - url to redis.
70-
* `task_id_generator` - custom task_id genertaor.
71-
* `result_backend` - custom result backend.
72-
* `queue_name` - name of the pub/sub channel in redis.
73-
* `max_connection_pool_size` - maximum number of connections in pool.
74-
* Any other keyword arguments are passed to `redis.asyncio.BlockingConnectionPool`.
75-
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
76-
(or set it to `None` to try reconnects indefinitely).
58+
59+
## Brokers
60+
61+
This package contains 6 broker implementations.
62+
3 broker types:
63+
* PubSub broker
64+
* ListQueue broker
65+
* Stream broker
66+
67+
Each of type is implemented for each redis architecture:
68+
* Single node
69+
* Cluster
70+
* Sentinel
71+
72+
Here's a small breakdown of how they differ from eachother.
73+
74+
75+
### PubSub
76+
77+
By default on old redis versions PUBSUB was the way of making redis into a queue.
78+
But using PUBSUB means that all messages delivered to all subscribed consumers.
79+
80+
> [!WARNING]
81+
> This broker doesn't support acknowledgements. If during message processing
82+
> Worker was suddenly killed the message is going to be lost.
83+
84+
### ListQueue
85+
86+
This broker creates a list of messages at some key. Adding new tasks will be done
87+
by appending them from the left side using `lpush`, and taking them from the right side using `brpop`.
88+
89+
> [!WARNING]
90+
> This broker doesn't support acknowledgements. If during message processing
91+
> Worker was suddenly killed the message is going to be lost.
92+
93+
### Stream
94+
95+
Stream brokers use redis [stream type](https://redis.io/docs/latest/develop/data-types/streams/) to store and fetch messages.
96+
97+
> [!TIP]
98+
> This broker **supports** acknowledgements and therefore is fine to use in cases when data durability is
99+
> required.
77100
78101
## RedisAsyncResultBackend configuration
79102

@@ -85,18 +108,20 @@ RedisAsyncResultBackend parameters:
85108
* Any other keyword arguments are passed to `redis.asyncio.BlockingConnectionPool`.
86109
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
87110
(or set it to `None` to try reconnects indefinitely).
88-
> IMPORTANT: **It is highly recommended to use expire time ​​in RedisAsyncResultBackend**
111+
112+
> [!WARNING]
113+
> **It is highly recommended to use expire time in RedisAsyncResultBackend**
89114
> If you want to add expiration, either `result_ex_time` or `result_px_time` must be set.
90-
>```python
91-
># First variant
92-
>redis_async_result = RedisAsyncResultBackend(
93-
> redis_url="redis://localhost:6379",
94-
> result_ex_time=1000,
95-
>)
115+
> ```python
116+
> # First variant
117+
> redis_async_result = RedisAsyncResultBackend(
118+
> redis_url="redis://localhost:6379",
119+
> result_ex_time=1000,
120+
> )
96121
>
97-
># Second variant
98-
>redis_async_result = RedisAsyncResultBackend(
99-
> redis_url="redis://localhost:6379",
100-
> result_px_time=1000000,
101-
>)
102-
>```
122+
> # Second variant
123+
> redis_async_result = RedisAsyncResultBackend(
124+
> redis_url="redis://localhost:6379",
125+
> result_px_time=1000000,
126+
> )
127+
> ```

docker-compose.yml

+5-7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
version: '3.2'
2-
31
services:
42
redis:
5-
image: bitnami/redis:6.2.5
3+
image: bitnami/redis:7.4.2
64
environment:
75
ALLOW_EMPTY_PASSWORD: "yes"
86
healthcheck:
@@ -14,7 +12,7 @@ services:
1412
ports:
1513
- 7000:6379
1614
redis-node-0: &redis-node
17-
image: docker.io/bitnami/redis-cluster:7.2
15+
image: docker.io/bitnami/redis-cluster:7.4.2
1816
environment:
1917
ALLOW_EMPTY_PASSWORD: "yes"
2018
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
@@ -38,7 +36,7 @@ services:
3836
<<: *redis-node
3937

4038
redis-node-5:
41-
image: docker.io/bitnami/redis-cluster:7.2
39+
image: docker.io/bitnami/redis-cluster:7.4.2
4240
depends_on:
4341
- redis-node-0
4442
- redis-node-1
@@ -60,7 +58,7 @@ services:
6058
- 7001:6379
6159

6260
redis-master:
63-
image: bitnami/redis:6.2.5
61+
image: bitnami/redis:7.4.2
6462
environment:
6563
ALLOW_EMPTY_PASSWORD: "yes"
6664
healthcheck:
@@ -71,7 +69,7 @@ services:
7169
start_period: 10s
7270

7371
redis-sentinel:
74-
image: bitnami/redis-sentinel:latest
72+
image: bitnami/redis-sentinel:7.4.2
7573
depends_on:
7674
- redis-master
7775
environment:

0 commit comments

Comments
 (0)