Skip to content

Commit c18a6a6

Browse files
committed
ADD: Actively monitor for hung Live sessions
1 parent 60fe12c commit c18a6a6

File tree

4 files changed

+36
-12
lines changed

4 files changed

+36
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#### Enhancements
66
- Added a property `Live.subscription_requests` which returns a list of tuples containing every `SubscriptionRequest` for the live session
77
- Changed the return value of `Live.subscribe()` to `int`, the value of the subscription ID, which can be used to index into the `Live.subscription_requests` property
8+
- Added feature to automatically monitor for hung connections in the `Live` client
9+
- Hung connections will be disconnected client side with a `BentoError`
810

911
#### Breaking changes
1012
- Several log messages have been reformatted to improve clarity and reduce redundancy, especially at debug levels

databento/live/session.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dataclasses
55
import itertools
66
import logging
7+
import math
78
import queue
89
import struct
910
import threading
@@ -35,6 +36,7 @@
3536
CONNECT_TIMEOUT_SECONDS: Final = 10.0
3637
DBN_QUEUE_CAPACITY: Final = 2**20
3738
DEFAULT_REMOTE_PORT: Final = 13000
39+
CLIENT_TIMEOUT_MARGIN_SECONDS: Final = 10
3840

3941

4042
class DBNQueue(queue.SimpleQueue): # type: ignore [type-arg]
@@ -212,6 +214,7 @@ def __init__(
212214
self._user_callbacks = user_callbacks
213215
self._user_streams = user_streams
214216
self._last_ts_event: int | None = None
217+
self._last_msg_loop_time: float = math.inf
215218

216219
def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
217220
if self._metadata:
@@ -235,6 +238,7 @@ def received_record(self, record: DBNRecord) -> None:
235238
if self._dbn_queue.is_enabled():
236239
self._queue_for_iteration(record)
237240
self._last_ts_event = record.ts_event
241+
self._last_msg_loop_time = self._loop.time()
238242

239243
return super().received_record(record)
240244

@@ -324,7 +328,7 @@ def __init__(
324328

325329
self._api_key = api_key
326330
self._ts_out = ts_out
327-
self._heartbeat_interval_s = heartbeat_interval_s
331+
self._heartbeat_interval_s = heartbeat_interval_s or 30
328332

329333
self._protocol: _SessionProtocol | None = None
330334
self._transport: asyncio.Transport | None = None
@@ -333,6 +337,7 @@ def __init__(
333337
self._subscriptions: list[tuple[SubscriptionRequest, ...]] = []
334338
self._reconnect_policy = ReconnectPolicy(reconnect_policy)
335339
self._reconnect_task: asyncio.Task[None] | None = None
340+
self._heartbeat_monitor_task: asyncio.Task[None] | None = None
336341

337342
self._dataset = ""
338343

@@ -436,8 +441,6 @@ def stop(self) -> None:
436441
with self._lock:
437442
if self._transport is None:
438443
return
439-
if self._protocol is not None:
440-
self._protocol.disconnected.add_done_callback(lambda _: self._cleanup())
441444
self._loop.call_soon_threadsafe(self._transport.close)
442445

443446
def start(self) -> None:
@@ -454,6 +457,9 @@ def start(self) -> None:
454457
if self._protocol is None:
455458
raise ValueError("session is not connected")
456459
self._protocol.start()
460+
self._heartbeat_monitor_task = self._loop.create_task(
461+
self._heartbeat_monitor(),
462+
)
457463

458464
def subscribe(
459465
self,
@@ -553,6 +559,8 @@ def _cleanup(self) -> None:
553559
if stream.is_managed:
554560
stream.close()
555561

562+
if self._heartbeat_monitor_task is not None:
563+
self._heartbeat_monitor_task.cancel()
556564
self._user_callbacks.clear()
557565
self._user_streams.clear()
558566
self._user_reconnect_callbacks.clear()
@@ -647,6 +655,21 @@ async def _connect_task(
647655

648656
return transport, protocol
649657

658+
async def _heartbeat_monitor(self) -> None:
659+
while not self._protocol.disconnected.done():
660+
await asyncio.sleep(1)
661+
gap = self._loop.time() - self._protocol._last_msg_loop_time
662+
if gap > (self._heartbeat_interval_s + CLIENT_TIMEOUT_MARGIN_SECONDS):
663+
logger.error(
664+
"disconnecting client due to timeout, no data received for %d second(s)",
665+
int(gap),
666+
)
667+
self._protocol.disconnected.set_exception(
668+
BentoError(
669+
f"Gateway timeout: {gap:.0f} second(s) since last message",
670+
),
671+
)
672+
650673
async def _reconnect(self) -> None:
651674
while True:
652675
try:

tests/test_live_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from __future__ import annotations
66

7-
import asyncio
87
import pathlib
98
import platform
109
import random
@@ -279,7 +278,7 @@ async def test_live_client_reuse(
279278
live_client.stop()
280279
assert live_client.session_id == first_session_id
281280

282-
await asyncio.sleep(1)
281+
await live_client.wait_for_close()
283282

284283
live_client.subscribe(
285284
dataset=Dataset.GLBX_MDP3,

tests/test_live_client_reconnect.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ async def test_reconnect_before_start(
6565
reconnect_policy: ReconnectPolicy = ReconnectPolicy.RECONNECT,
6666
) -> None:
6767
"""
68-
Test that a reconnect policy of "reconnect_do_not_replay" reconnects a
69-
client but does not send the session start command if the session was not
70-
streaming previously.
68+
Test that a reconnect policy of "reconnect" reconnects a client but does
69+
not send the session start command if the session was not streaming
70+
previously.
7171
"""
7272
# Arrange
7373
live_client = client.Live(
@@ -138,8 +138,8 @@ async def test_reconnect_subscriptions(
138138
reconnect_policy: ReconnectPolicy = ReconnectPolicy.RECONNECT,
139139
) -> None:
140140
"""
141-
Test that a reconnect policy of "reconnect_do_not_replay" re-sends the
142-
subscription requests with a start of `None`.
141+
Test that a reconnect policy of "reconnect" re-sends the subscription
142+
requests with a start of `None`.
143143
"""
144144
# Arrange
145145
live_client = client.Live(
@@ -192,8 +192,8 @@ async def test_reconnect_callback(
192192
reconnect_policy: ReconnectPolicy = ReconnectPolicy.RECONNECT,
193193
) -> None:
194194
"""
195-
Test that a reconnect policy of "reconnect_do_not_replay" will cause a user
196-
supplied reconnection callback to be executed when a reconnection occurs.
195+
Test that a reconnect policy of "reconnect" will cause a user supplied
196+
reconnection callback to be executed when a reconnection occurs.
197197
"""
198198
# Arrange
199199
live_client = client.Live(

0 commit comments

Comments
 (0)