-
-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathpush.py
124 lines (98 loc) · 3.51 KB
/
push.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from ..logging_util import TokenMaskingFilter
from ..types import DEFAULT_TIMEOUT, Callback, _Hook
if TYPE_CHECKING:
from .channel import AsyncRealtimeChannel
logger = logging.getLogger(__name__)
logger.addFilter(TokenMaskingFilter())
class AsyncPush:
def __init__(
self,
channel: "AsyncRealtimeChannel",
event: str,
payload: Dict[str, Any] = {},
timeout: int = DEFAULT_TIMEOUT,
):
self.channel = channel
self.event = event
self.payload = payload
self.timeout = timeout
self.rec_hooks: List[_Hook] = []
self.ref: Optional[str] = None
self.ref_event: Optional[str] = None
self.received_resp: Optional[Dict[str, Any]] = None
self.sent = False
self.timeout_task: Optional[asyncio.Task] = None
async def resend(self):
self._cancel_ref_event()
self.ref = ""
self.ref_event = None
self.received_resp = None
self.sent = False
await self.send()
async def send(self):
if self._has_received("timeout"):
return
self.start_timeout()
self.sent = True
try:
await self.channel.socket.send(
{
"topic": self.channel.topic,
"event": self.event,
"payload": self.payload,
"ref": self.ref,
"join_ref": self.channel.join_push.ref,
}
)
except Exception as e:
logger.error(f"send push failed: {e}")
def update_payload(self, payload: Dict[str, Any]):
self.payload = {**self.payload, **payload}
def receive(self, status: str, callback: Callback) -> "AsyncPush":
if self._has_received(status):
callback(self.received_resp.get("response", {}))
self.rec_hooks.append(_Hook(status, callback))
return self
def start_timeout(self):
if self.timeout_task:
return
self.ref = self.channel.socket._make_ref()
self.ref_event = self.channel._reply_event_name(self.ref)
def on_reply(payload, *args):
self._cancel_ref_event()
self._cancel_timeout()
self.received_resp = payload
self._match_receive(**self.received_resp)
self.channel._on(self.ref_event, on_reply)
async def timeout(self):
await asyncio.sleep(self.timeout)
self.trigger("timeout", {})
self.timeout_task = asyncio.create_task(timeout(self))
def trigger(self, status: str, response: Any):
if self.ref_event:
payload = {
"status": status,
"response": response,
}
self.channel._trigger(self.ref_event, payload)
def destroy(self):
self._cancel_ref_event()
self._cancel_timeout()
def _cancel_ref_event(self):
if not self.ref_event:
return
self.channel._off(self.ref_event, {})
def _cancel_timeout(self):
if not self.timeout_task:
return
self.timeout_task.cancel()
self.timeout_task = None
def _match_receive(self, status: str, response: Any):
for hook in self.rec_hooks:
if hook.status == status:
hook.callback(response)
def _has_received(self, status: str):
return self.received_resp and self.received_resp.get("status") == status