Skip to content

Commit 688bcad

Browse files
authored
VER: Release 0.66.0
See release notes.
2 parents 65a6a8f + 0557d33 commit 688bcad

File tree

8 files changed

+327
-100
lines changed

8 files changed

+327
-100
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# Changelog
22

3+
## 0.66.0 - 2025-11-18
4+
5+
#### Enhancements
6+
- Added a property `Live.session_id` which returns the streaming session ID when the client is connected
7+
- Streams added with `Live.add_stream()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback
8+
- Callback functions added with `Live.add_callback()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback
9+
- Upgraded `databento-dbn` to 0.44.0
10+
- Added logic to set `code` when upgrading version 1 `SystemMsg` to newer versions
11+
12+
#### Bug fixes
13+
- Streams opened by `Live.add_stream()` will now close properly when the streaming session is closed
14+
315
## 0.65.0 - 2025-11-11
416

517
#### Deprecations

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ The library is fully compatible with distributions of Anaconda 2023.x and above.
3232
The minimum dependencies as found in the `pyproject.toml` are also listed below:
3333
- python = "^3.10"
3434
- aiohttp = "^3.8.3"
35-
- databento-dbn = "~0.43.0"
35+
- databento-dbn = "~0.44.0"
3636
- numpy = ">=1.23.5"
3737
- pandas = ">=1.5.3"
3838
- pip-system-certs = ">=4.0" (Windows only)

databento/common/types.py

Lines changed: 201 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
import datetime as dt
2+
import logging
23
from collections.abc import Callable
4+
from os import PathLike
5+
import pathlib
36
from typing import Generic
7+
from typing import IO
48
from typing import TypedDict
59
from typing import TypeVar
10+
import warnings
611

712
import databento_dbn
813
import pandas as pd
914

15+
from databento.common.error import BentoWarning
16+
17+
logger = logging.getLogger(__name__)
1018

1119
DBNRecord = (
1220
databento_dbn.BBOMsg
@@ -32,10 +40,6 @@
3240
| databento_dbn.ErrorMsgV1
3341
)
3442

35-
RecordCallback = Callable[[DBNRecord], None]
36-
ExceptionCallback = Callable[[Exception], None]
37-
ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None]
38-
3943
_T = TypeVar("_T")
4044

4145

@@ -88,3 +92,196 @@ class MappingIntervalDict(TypedDict):
8892
start_date: dt.date
8993
end_date: dt.date
9094
symbol: str
95+
96+
97+
RecordCallback = Callable[[DBNRecord], None]
98+
ExceptionCallback = Callable[[Exception], None]
99+
ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None]
100+
101+
102+
class ClientStream:
103+
def __init__(
104+
self,
105+
stream: IO[bytes] | PathLike[str] | str,
106+
exc_fn: ExceptionCallback | None = None,
107+
max_warnings: int = 10,
108+
) -> None:
109+
is_managed = False
110+
111+
if isinstance(stream, (str, PathLike)):
112+
stream = pathlib.Path(stream).open("xb")
113+
is_managed = True
114+
115+
if not hasattr(stream, "write"):
116+
raise ValueError(f"{type(stream).__name__} does not support write()")
117+
118+
if not hasattr(stream, "writable") or not stream.writable():
119+
raise ValueError(f"{type(stream).__name__} is not a writable stream")
120+
121+
if exc_fn is not None and not callable(exc_fn):
122+
raise ValueError(f"{exc_fn} is not callable")
123+
124+
self._stream = stream
125+
self._exc_fn = exc_fn
126+
self._max_warnings = max(0, max_warnings)
127+
self._warning_count = 0
128+
self._is_managed = is_managed
129+
130+
@property
131+
def stream_name(self) -> str:
132+
return getattr(self._stream, "__name__", str(self._stream))
133+
134+
@property
135+
def is_closed(self) -> bool:
136+
"""
137+
Return `True` if the underlying stream is closed.
138+
139+
Returns
140+
-------
141+
bool
142+
143+
"""
144+
return self._stream.closed
145+
146+
@property
147+
def is_managed(self) -> bool:
148+
"""
149+
Return `True` if the underlying stream was opened by the
150+
`ClientStream`. This can be used to determine if the stream should be
151+
closed automatically.
152+
153+
Returns
154+
-------
155+
bool
156+
157+
"""
158+
return self._is_managed
159+
160+
@property
161+
def exc_callback_name(self) -> str:
162+
return getattr(self._exc_fn, "__name__", str(self._exc_fn))
163+
164+
def close(self) -> None:
165+
"""
166+
Close the underlying stream.
167+
"""
168+
self._stream.close()
169+
170+
def flush(self) -> None:
171+
"""
172+
Flush the underlying stream.
173+
"""
174+
self._stream.flush()
175+
176+
def write(self, data: bytes) -> None:
177+
"""
178+
Write data to the underlying stream. Any exceptions encountered will be
179+
dispatched to the exception callback, if defined.
180+
181+
Parameters
182+
----------
183+
data : bytes
184+
185+
"""
186+
try:
187+
self._stream.write(data)
188+
except Exception as exc:
189+
if self._exc_fn is None:
190+
self._warn(
191+
f"stream '{self.stream_name}' encountered an exception without an exception handler: {repr(exc)}",
192+
)
193+
else:
194+
try:
195+
self._exc_fn(exc)
196+
except Exception as inner_exc:
197+
self._warn(
198+
f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}",
199+
)
200+
raise inner_exc from exc
201+
raise exc
202+
203+
def _warn(self, msg: str) -> None:
204+
logger.warning(msg)
205+
if self._warning_count < self._max_warnings:
206+
self._warning_count += 1
207+
warnings.warn(
208+
msg,
209+
BentoWarning,
210+
stacklevel=3,
211+
)
212+
if self._warning_count == self._max_warnings:
213+
warnings.warn(
214+
f"suppressing further warnings for '{self.stream_name}'",
215+
BentoWarning,
216+
stacklevel=3,
217+
)
218+
219+
220+
class ClientRecordCallback:
221+
def __init__(
222+
self,
223+
fn: RecordCallback,
224+
exc_fn: ExceptionCallback | None = None,
225+
max_warnings: int = 10,
226+
) -> None:
227+
if not callable(fn):
228+
raise ValueError(f"{fn} is not callable")
229+
if exc_fn is not None and not callable(exc_fn):
230+
raise ValueError(f"{exc_fn} is not callable")
231+
232+
self._fn = fn
233+
self._exc_fn = exc_fn
234+
self._max_warnings = max(0, max_warnings)
235+
self._warning_count = 0
236+
237+
@property
238+
def callback_name(self) -> str:
239+
return getattr(self._fn, "__name__", str(self._fn))
240+
241+
@property
242+
def exc_callback_name(self) -> str:
243+
return getattr(self._exc_fn, "__name__", str(self._exc_fn))
244+
245+
def call(self, record: DBNRecord) -> None:
246+
"""
247+
Execute the callback function, passing `record` in as the first
248+
argument. Any exceptions encountered will be dispatched to the
249+
exception callback, if defined.
250+
251+
Parameters
252+
----------
253+
record : DBNRecord
254+
255+
"""
256+
try:
257+
self._fn(record)
258+
except Exception as exc:
259+
if self._exc_fn is None:
260+
self._warn(
261+
f"callback '{self.callback_name}' encountered an exception without an exception callback: {repr(exc)}",
262+
)
263+
else:
264+
try:
265+
self._exc_fn(exc)
266+
except Exception as inner_exc:
267+
self._warn(
268+
f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}",
269+
)
270+
raise inner_exc from exc
271+
raise exc
272+
273+
def _warn(self, msg: str) -> None:
274+
logger.warning(msg)
275+
if self._warning_count < self._max_warnings:
276+
self._warning_count += 1
277+
warnings.warn(
278+
msg,
279+
BentoWarning,
280+
stacklevel=3,
281+
)
282+
if self._warning_count == self._max_warnings:
283+
warnings.warn(
284+
f"suppressing further warnings for '{self.callback_name}'",
285+
BentoWarning,
286+
stacklevel=3,
287+
)

databento/live/client.py

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import asyncio
44
import logging
55
import os
6-
import pathlib
76
import queue
87
import threading
98
from collections.abc import Iterable
@@ -24,6 +23,8 @@
2423
from databento.common.error import BentoError
2524
from databento.common.parsing import optional_datetime_to_unix_nanoseconds
2625
from databento.common.publishers import Dataset
26+
from databento.common.types import ClientRecordCallback
27+
from databento.common.types import ClientStream
2728
from databento.common.types import DBNRecord
2829
from databento.common.types import ExceptionCallback
2930
from databento.common.types import ReconnectCallback
@@ -111,7 +112,7 @@ def __init__(
111112
reconnect_policy=reconnect_policy,
112113
)
113114

114-
self._session._user_callbacks.append((self._map_symbol, None))
115+
self._session._user_callbacks.append(ClientRecordCallback(self._map_symbol))
115116

116117
with Live._lock:
117118
if not Live._thread.is_alive():
@@ -214,6 +215,19 @@ def port(self) -> int:
214215
"""
215216
return self._port
216217

218+
@property
219+
def session_id(self) -> str | None:
220+
"""
221+
Return the session ID for the current session. If `None`, the client is
222+
not connected.
223+
224+
Returns
225+
-------
226+
str | None
227+
228+
"""
229+
return self._session.session_id
230+
217231
@property
218232
def symbology_map(self) -> dict[int, str | int]:
219233
"""
@@ -257,7 +271,9 @@ def add_callback(
257271
A callback to register for handling live records as they arrive.
258272
exception_callback : Callable[[Exception], None], optional
259273
An error handling callback to process exceptions that are raised
260-
in `record_callback`.
274+
in `record_callback`. If no exception callback is provided,
275+
any exceptions encountered will be logged and raised as warnings
276+
for visibility.
261277
262278
Raises
263279
------
@@ -270,15 +286,13 @@ def add_callback(
270286
Live.add_stream
271287
272288
"""
273-
if not callable(record_callback):
274-
raise ValueError(f"{record_callback} is not callable")
275-
276-
if exception_callback is not None and not callable(exception_callback):
277-
raise ValueError(f"{exception_callback} is not callable")
289+
client_callback = ClientRecordCallback(
290+
fn=record_callback,
291+
exc_fn=exception_callback,
292+
)
278293

279-
callback_name = getattr(record_callback, "__name__", str(record_callback))
280-
logger.info("adding user callback %s", callback_name)
281-
self._session._user_callbacks.append((record_callback, exception_callback))
294+
logger.info("adding user callback %s", client_callback.callback_name)
295+
self._session._user_callbacks.append(client_callback)
282296

283297
def add_stream(
284298
self,
@@ -294,7 +308,9 @@ def add_stream(
294308
The IO stream to write to when handling live records as they arrive.
295309
exception_callback : Callable[[Exception], None], optional
296310
An error handling callback to process exceptions that are raised
297-
when writing to the stream.
311+
when writing to the stream. If no exception callback is provided,
312+
any exceptions encountered will be logged and raised as warnings
313+
for visibility.
298314
299315
Raises
300316
------
@@ -309,23 +325,12 @@ def add_stream(
309325
Live.add_callback
310326
311327
"""
312-
if isinstance(stream, (str, PathLike)):
313-
stream = pathlib.Path(stream).open("xb")
314-
315-
if not hasattr(stream, "write"):
316-
raise ValueError(f"{type(stream).__name__} does not support write()")
317-
318-
if not hasattr(stream, "writable") or not stream.writable():
319-
raise ValueError(f"{type(stream).__name__} is not a writable stream")
328+
client_stream = ClientStream(stream=stream, exc_fn=exception_callback)
320329

321-
if exception_callback is not None and not callable(exception_callback):
322-
raise ValueError(f"{exception_callback} is not callable")
323-
324-
stream_name = getattr(stream, "name", str(stream))
325-
logger.info("adding user stream %s", stream_name)
330+
logger.info("adding user stream %s", client_stream.stream_name)
326331
if self.metadata is not None:
327-
stream.write(bytes(self.metadata))
328-
self._session._user_streams.append((stream, exception_callback))
332+
client_stream.write(self.metadata.encode())
333+
self._session._user_streams.append(client_stream)
329334

330335
def add_reconnect_callback(
331336
self,
@@ -365,7 +370,9 @@ def add_reconnect_callback(
365370

366371
callback_name = getattr(reconnect_callback, "__name__", str(reconnect_callback))
367372
logger.info("adding user reconnect callback %s", callback_name)
368-
self._session._user_reconnect_callbacks.append((reconnect_callback, exception_callback))
373+
self._session._user_reconnect_callbacks.append(
374+
(reconnect_callback, exception_callback),
375+
)
369376

370377
def start(
371378
self,

0 commit comments

Comments
 (0)