-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkrispbroadcaster.py
84 lines (70 loc) · 2.58 KB
/
krispbroadcaster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from __future__ import annotations
import asyncio
from typing import List, Union, Any
from contextlib import asynccontextmanager
from broadcaster import Broadcast, Event
class Unsubscribed(Exception):
pass
class Subscriber:
def __init__(self, queue):
self._queue = queue
async def __aiter__(self):
try:
while True:
yield await self.get()
except Unsubscribed:
pass
async def get(self) -> Event:
item = await self._queue.get()
if item is None:
raise Unsubscribed()
return item
"""
KrispBroadcast inherits broadcast and add features:
1. add topics
2. subscribe list of channels
"""
class KrispBroadcast(Broadcast):
async def publish(
self, channel: str, message: Any, topic: str = ""
) -> None:
# import pdb;
# pdb.set_trace()
print("Publishing the message to the channel")
print(message)
print(channel)
await self._backend.publish(f"{topic}:{channel}", message)
@asynccontextmanager
async def subscribe(
self, channel: Union[List[str], str], topic: str = ""
) -> "Subscriber":
queue: asyncio.Queue = asyncio.Queue()
try:
if isinstance(channel, str):
_channel = f"{topic}:{channel}"
if not self._subscribers.get(_channel):
await self._backend.subscribe(_channel)
self._subscribers[_channel] = set([queue])
else:
self._subscribers[_channel].add(queue)
yield Subscriber(queue)
self._subscribers[_channel].remove(queue)
if not self._subscribers.get(_channel):
del self._subscribers[_channel]
await self._backend.unsubscribe(_channel)
elif isinstance(channel, list):
for c in channel:
_channel = f"{topic}:{c}"
if not self._subscribers.get(_channel):
await self._backend.subscribe(_channel)
self._subscribers[_channel] = set([queue])
else:
self._subscribers[_channel].add(queue)
yield Subscriber(queue)
for c in channel:
self._subscribers[_channel].remove(queue)
if not self._subscribers.get(_channel):
del self._subscribers[_channel]
await self._backend.unsubscribe(_channel)
finally:
await queue.put(None)