diff --git a/c-questdb-client b/c-questdb-client index 05d9ada6..5b177158 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit 05d9ada6d3c9ad48e16d66a4ba1cf37ed4a80f72 +Subproject commit 5b177158a96a69144dccd168aadc55a4878f813d diff --git a/src/questdb/dataframe.md b/src/questdb/dataframe.md index 5d42d389..7b41e9b4 100644 --- a/src/questdb/dataframe.md +++ b/src/questdb/dataframe.md @@ -93,6 +93,7 @@ We need to extract: * 64-bit floats * UTF-8 string buffers * Nanosecond-precision UTC unix epoch 64-bit signed int timestamps +* decimals ```python import pandas as pd @@ -100,6 +101,60 @@ import pyarrow as pa import datetime as dt ``` +### Decimals + +Decimals aren't natively supported by pandas nor numpy, they use the `decimal.Decimal` objects. + +#### Pandas + +Decimals stored as Python objects in an 'object' dtype column. + +```python +>>> df = pd.DataFrame({ 'decimals': [Decimal('123.456')] }) +>>> df.dtypes +decimals object +dtype: object +``` + +#### Numpy + +Similarly, numpy stores decimals as Python objects. + +```python +>>> arr = numpy.array([Decimal('123.456')]) +>>> arr +array([Decimal('123.456')], dtype=object) +``` + +#### PyArrow + +PyArrow provides native decimal support with configurable precision and scale. +The data is stored in a fixed-width binary format. + +```python +import pyarrow as pa +from decimal import Decimal + +# Create decimal array: decimal128(precision, scale) +# precision = total digits, scale = digits after decimal point +decimal_array = pa.array( + [Decimal('123.456'), Decimal('789.012'), Decimal('-456.789'), None], + type=pa.decimal128(10, 3) # 10 total digits, 3 after decimal +) + +# Use in DataFrame with ArrowDtype +df = pd.DataFrame({ + 'prices': pd.array( + [Decimal('123.45'), Decimal('678.90'), None], + dtype=pd.ArrowDtype(pa.decimal128(10, 2)) + ) +}) +``` + +Notes: +- 4 datatypes: `decimal32`, `decimal64`, `decimal128` and `decimal256` +- Nulls are supported via Arrow's validity bitmap + ### Booleans ```python diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 1dffc643..c3fad59e 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -1,5 +1,10 @@ # See: dataframe.md for technical overview. +from decimal import Decimal + +from cpython.bytes cimport PyBytes_AsString +from .mpdecimal_compat cimport decimal_pyobj_to_binary + # Auto-flush settings. # The individual `interval`, `row_count` and `byte_count` # settings are set to `-1` when disabled. @@ -51,6 +56,21 @@ cdef bint should_auto_flush( return False +cdef inline uint32_t bswap32(uint32_t value): + return (((value & 0xFF000000u) >> 24u) | + ((value & 0x00FF0000u) >> 8u) | + ((value & 0x0000FF00u) << 8u) | + ((value & 0x000000FFu) << 24u)) + +cdef inline uint64_t bswap64(uint64_t value): + return (((value & 0xFF00000000000000u) >> 56u) | + ((value & 0x00FF000000000000u) >> 40u) | + ((value & 0x0000FF0000000000u) >> 24u) | + ((value & 0x000000FF00000000u) >> 8u) | + ((value & 0x00000000FF000000u) << 8u) | + ((value & 0x0000000000FF0000u) << 24u) | + ((value & 0x000000000000FF00u) << 40u) | + ((value & 0x00000000000000FFu) << 56u)) cdef struct col_chunks_t: size_t n_chunks @@ -73,7 +93,8 @@ cdef enum col_target_t: col_target_column_str = 6 col_target_column_ts = 7 col_target_column_arr_f64 = 8 - col_target_at = 9 + col_target_column_decimal = 9 + col_target_at = 10 cdef dict _TARGET_NAMES = { @@ -86,6 +107,7 @@ cdef dict _TARGET_NAMES = { col_target_t.col_target_column_str: "string", col_target_t.col_target_column_ts: "timestamp", col_target_t.col_target_column_arr_f64: "array", + col_target_t.col_target_column_decimal: "decimal", col_target_t.col_target_at: "designated timestamp", } @@ -127,6 +149,11 @@ cdef enum col_source_t: col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 + col_source_decimal_pyobj = 701000 + col_source_decimal32_arrow = 702100 + col_source_decimal64_arrow = 703100 + col_source_decimal128_arrow = 704100 + col_source_decimal256_arrow = 705100 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: @@ -149,6 +176,7 @@ cdef dict _PYOBJ_SOURCE_DESCR = { col_source_t.col_source_int_pyobj: "int", col_source_t.col_source_float_pyobj: "float", col_source_t.col_source_str_pyobj: "str", + col_source_t.col_source_decimal_pyobj: "Decimal", } @@ -218,6 +246,13 @@ cdef dict _TARGET_TO_SOURCES = { col_target_t.col_target_column_arr_f64: { col_source_t.col_source_arr_f64_numpyobj, }, + col_target_t.col_target_column_decimal: { + col_source_t.col_source_decimal_pyobj, + col_source_t.col_source_decimal32_arrow, + col_source_t.col_source_decimal64_arrow, + col_source_t.col_source_decimal128_arrow, + col_source_t.col_source_decimal256_arrow, + }, col_target_t.col_target_at: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, @@ -233,7 +268,8 @@ cdef tuple _FIELD_TARGETS = ( col_target_t.col_target_column_f64, col_target_t.col_target_column_str, col_target_t.col_target_column_ts, - col_target_t.col_target_column_arr_f64) + col_target_t.col_target_column_arr_f64, + col_target_t.col_target_column_decimal) # Targets that map directly from a meta target. @@ -358,6 +394,17 @@ cdef enum col_dispatch_code_t: col_dispatch_code_column_arr_f64__arr_f64_numpyobj = \ col_target_t.col_target_column_arr_f64 + col_source_t.col_source_arr_f64_numpyobj + col_dispatch_code_column_decimal__decimal_pyobj = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal_pyobj + col_dispatch_code_column_decimal__decimal32_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal32_arrow + col_dispatch_code_column_decimal__decimal64_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal64_arrow + col_dispatch_code_column_decimal__decimal128_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal128_arrow + col_dispatch_code_column_decimal__decimal256_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal256_arrow + # Int values in order for sorting (as needed for API's sequential coupling). cdef enum meta_target_t: @@ -382,6 +429,7 @@ cdef struct col_t: line_sender_column_name name col_cursor_t cursor col_setup_t* setup # Grouping to reduce size of struct. + uint8_t scale # For arrow decimal types only, else 0. cdef void col_t_release(col_t* col) noexcept: @@ -905,6 +953,30 @@ cdef void_int _dataframe_category_series_as_arrow( 'Expected a category of strings, ' + f'got a category of {pandas_col.series.dtype.categories.dtype}.') +cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrowtype, col_t *col) except -1: + _dataframe_series_as_arrow(pandas_col, col) + if arrowtype.id == _PYARROW.lib.Type_DECIMAL32: + col.setup.source = col_source_t.col_source_decimal32_arrow + elif arrowtype.id == _PYARROW.lib.Type_DECIMAL64: + col.setup.source = col_source_t.col_source_decimal64_arrow + elif arrowtype.id == _PYARROW.lib.Type_DECIMAL128: + col.setup.source = col_source_t.col_source_decimal128_arrow + elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256: + col.setup.source = col_source_t.col_source_decimal256_arrow + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported arrow type {arrowtype} for column {pandas_col.name!r}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') + if arrowtype.scale < 0 or arrowtype.scale > 76: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Bad column {pandas_col.name!r}: ' + + f'Unsupported decimal scale {arrowtype.scale}: ' + + 'Must be in the range 0 to 76 inclusive.') + col.scale = arrowtype.scale + return 0 cdef inline bint _dataframe_is_float_nan(PyObject* obj) noexcept: return PyFloat_CheckExact(obj) and isnan(PyFloat_AS_DOUBLE(obj)) @@ -971,6 +1043,8 @@ cdef void_int _dataframe_series_sniff_pyobj( 'Unsupported object column containing bytes.' + 'If this is a string column, decode it first. ' + 'See: https://stackoverflow.com/questions/40389764/') + elif isinstance(obj, Decimal): + col.setup.source = col_source_t.col_source_decimal_pyobj else: raise IngressError( IngressErrorCode.BadDataFrame, @@ -1086,6 +1160,8 @@ cdef void_int _dataframe_resolve_source_and_buffers( _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): _dataframe_series_sniff_pyobj(pandas_col, col) + elif isinstance(dtype, _PANDAS.ArrowDtype): + _dataframe_series_resolve_arrow(pandas_col, dtype.pyarrow_dtype, col) else: raise IngressError( IngressErrorCode.BadDataFrame, @@ -1093,7 +1169,6 @@ cdef void_int _dataframe_resolve_source_and_buffers( 'Raise an issue if you think it should be supported: ' + 'https://github.com/questdb/py-questdb-client/issues.') - cdef void_int _dataframe_resolve_target( PandasCol pandas_col, col_t* col) except -1: cdef col_target_t target @@ -1225,7 +1300,8 @@ cdef void_int _dataframe_resolve_args( cdef inline bint _dataframe_arrow_get_bool(col_cursor_t* cursor) noexcept nogil: return ( (cursor.chunk.buffers[1])[cursor.offset // 8] & - (1 << (cursor.offset % 8))) + (1 << (cursor.offset % 8)) + ) cdef inline bint _dataframe_arrow_is_valid(col_cursor_t* cursor) noexcept nogil: @@ -1234,7 +1310,9 @@ cdef inline bint _dataframe_arrow_is_valid(col_cursor_t* cursor) noexcept nogil: cursor.chunk.null_count == 0 or ( (cursor.chunk.buffers[0])[cursor.offset // 8] & - (1 << (cursor.offset % 8)))) + (1 << (cursor.offset % 8)) + ) + ) cdef inline void _dataframe_arrow_get_cat_value( @@ -2089,6 +2167,134 @@ cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj( &err): raise c_err_to_py(err) + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col) except -1: + cdef line_sender_error* err = NULL + cdef PyObject** access = col.cursor.chunk.buffers[1] + cdef PyObject* cell = access[col.cursor.offset] + cdef unsigned int scale = 0 + cdef object mantissa + cdef const uint8_t* mantissa_ptr + cdef Py_ssize_t mantissa_len + + if _dataframe_is_null_pyobj(cell): + return 0 + + # Convert the Python Decimal into (scale, mantissa) bytes; returns None for special values. + mantissa = decimal_pyobj_to_binary( + cell, + &scale, + IngressError, + IngressErrorCode.BadDataFrame) + if mantissa is None: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + raise c_err_to_py(err) + return 0 + + if len(mantissa) > 127: + raise IngressError( + IngressErrorCode.BadDataFrame, + 'Decimal mantissa too large; maximum supported size is 127 bytes.') + + mantissa_ptr = PyBytes_AsString(mantissa) + if mantissa_ptr is NULL: + raise MemoryError() + mantissa_len = PyBytes_GET_SIZE(mantissa) + + if not line_sender_buffer_column_dec(ls_buf, col.name, scale, mantissa_ptr, mantissa_len, &err): + raise c_err_to_py(err) + + return 0 + + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal32_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint32_t value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + value = bswap32((col.cursor.chunk.buffers[1])[col.cursor.offset]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, &value, sizeof(value), &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal64_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint64_t value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + value = bswap64((col.cursor.chunk.buffers[1])[col.cursor.offset]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, &value, sizeof(value), &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal128_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint64_t *cell + cdef uint64_t[2] value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + cell = &(col.cursor.chunk.buffers[1])[col.cursor.offset << 1] + value[0] = bswap64(cell[1]) + value[1] = bswap64(cell[0]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, value, 16, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal256_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint64_t *cell + cdef uint64_t[4] value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + cell = &(col.cursor.chunk.buffers[1])[col.cursor.offset << 2] + value[0] = bswap64(cell[3]) + value[1] = bswap64(cell[2]) + value[2] = bswap64(cell[1]) + value[3] = bswap64(cell[0]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, value, 32, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + + cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2247,6 +2453,16 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_arr_f64__arr_f64_numpyobj: _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj: + _dataframe_serialize_cell_column_decimal__decimal_pyobj(ls_buf, b, col) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal32_arrow: + _dataframe_serialize_cell_column_decimal__decimal32_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal64_arrow: + _dataframe_serialize_cell_column_decimal__decimal64_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal128_arrow: + _dataframe_serialize_cell_column_decimal__decimal128_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal256_arrow: + _dataframe_serialize_cell_column_decimal__decimal256_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow: _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy: diff --git a/src/questdb/ingress.pyi b/src/questdb/ingress.pyi index 855e5f36..7989a79a 100644 --- a/src/questdb/ingress.pyi +++ b/src/questdb/ingress.pyi @@ -40,6 +40,7 @@ from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd +from decimal import Decimal class IngressErrorCode(Enum): """Category of Error.""" @@ -57,6 +58,7 @@ class IngressErrorCode(Enum): ConfigError = ... ArrayError = ... ProtocolVersionError = ... + DecimalError = ... BadDataFrame = ... @@ -202,7 +204,7 @@ class SenderTransaction: *, symbols: Optional[Dict[str, Optional[str]]] = None, columns: Optional[ - Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] + Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray, Decimal]] ] = None, at: Union[ServerTimestampType, TimestampNanos, datetime], ) -> SenderTransaction: @@ -381,7 +383,7 @@ class Buffer: *, symbols: Optional[Dict[str, Optional[str]]] = None, columns: Optional[ - Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] + Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray, Decimal]] ] = None, at: Union[ServerTimestampType, TimestampNanos, datetime], ) -> Buffer: @@ -402,7 +404,8 @@ class Buffer: 'col5': TimestampMicros(123456789), 'col6': datetime(2019, 1, 1, 12, 0, 0), 'col7': np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]), - 'col8': None}, + 'col8': None, + 'col9': Decimal('123.456')}, at=TimestampNanos(123456789)) # Only symbols specified. Designated timestamp assigned by the db. @@ -449,6 +452,8 @@ class Buffer: - `ARRAY `_ * - ``datetime.datetime`` and ``TimestampMicros`` - `TIMESTAMP `_ + * - ``Decimal`` + - `DECIMAL `_ * - ``None`` - *Column is skipped and not serialized.* @@ -701,6 +706,9 @@ class Buffer: * - ``'datetime64[ns, tz]'`` - Y - ``TIMESTAMP`` **ζ** + * - ``'object'`` (``Decimal`` objects) + - Y (``NaN``) + - ``DECIMAL`` .. note:: diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index d9139de8..0b45f627 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -142,7 +142,8 @@ class IngressErrorCode(Enum): ConfigError = line_sender_error_config_error ArrayError = line_sender_error_array_error ProtocolVersionError = line_sender_error_protocol_version_error - BadDataFrame = line_sender_error_protocol_version_error + 1 + DecimalError = line_sender_error_invalid_decimal + BadDataFrame = line_sender_error_invalid_decimal + 1 def __str__(self) -> str: """Return the name of the enum.""" @@ -188,6 +189,8 @@ cdef inline object c_err_code_to_py(line_sender_error_code code): return IngressErrorCode.ArrayError elif code == line_sender_error_protocol_version_error: return IngressErrorCode.ProtocolVersionError + elif code == line_sender_error_invalid_decimal: + return IngressErrorCode.DecimalError else: raise ValueError('Internal error converting error code.') @@ -824,10 +827,10 @@ cdef class Buffer: :param int init_buf_size: Initial capacity of the buffer in bytes. :param int max_name_len: Maximum length of a table or column name. """ - if protocol_version not in (1, 2): + if protocol_version not in range(1, 4): raise IngressError( IngressErrorCode.ProtocolVersionError, - 'Invalid protocol version. Supported versions are 1 and 2.') + 'Invalid protocol version. Supported versions are 1-3.') self._cinit_impl(protocol_version, init_buf_size, max_name_len) cdef inline _cinit_impl(self, line_sender_protocol_version version, size_t init_buf_size, size_t max_name_len): @@ -1459,6 +1462,9 @@ cdef class Buffer: * - ``'datetime64[ns, tz]'`` - Y - ``TIMESTAMP`` **ζ** + * - ``'object'`` (``Decimal`` objects) + - Y (``NaN``) + - ``DECIMAL`` .. note:: @@ -1953,10 +1959,14 @@ cdef class Sender: if not line_sender_opts_protocol_version( self._opts, line_sender_protocol_version_2, &err): raise c_err_to_py(err) + elif (protocol_version == 3) or (protocol_version == '3'): + if not line_sender_opts_protocol_version( + self._opts, line_sender_protocol_version_3, &err): + raise c_err_to_py(err) else: raise IngressError( IngressErrorCode.ConfigError, - '"protocol_version" must be None, "auto", 1 or 2' + + '"protocol_version" must be None, "auto", 1-3' + f' not {protocol_version!r}') if auth_timeout is not None: diff --git a/src/questdb/line_sender.pxd b/src/questdb/line_sender.pxd index 2b00404e..059c5687 100644 --- a/src/questdb/line_sender.pxd +++ b/src/questdb/line_sender.pxd @@ -40,8 +40,9 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error_http_not_supported, line_sender_error_server_flush_error, line_sender_error_config_error, - line_sender_error_array_error - line_sender_error_protocol_version_error + line_sender_error_array_error, + line_sender_error_protocol_version_error, + line_sender_error_invalid_decimal cdef enum line_sender_protocol: line_sender_protocol_tcp, @@ -52,6 +53,7 @@ cdef extern from "questdb/ingress/line_sender.h": cdef enum line_sender_protocol_version: line_sender_protocol_version_1 = 1, line_sender_protocol_version_2 = 2, + line_sender_protocol_version_3 = 3, cdef enum line_sender_ca: line_sender_ca_webpki_roots, @@ -263,6 +265,22 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + bint line_sender_buffer_column_dec_str( + line_sender_buffer* buffer, + line_sender_column_name name, + line_sender_utf8 value, + line_sender_error** err_out + ) noexcept nogil + + bint line_sender_buffer_column_dec( + line_sender_buffer* buffer, + line_sender_column_name name, + const unsigned int scale, + const uint8_t* data, + size_t data_len, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_buffer_at_nanos( line_sender_buffer* buffer, int64_t epoch_nanos, diff --git a/src/questdb/mpdecimal_compat.h b/src/questdb/mpdecimal_compat.h new file mode 100644 index 00000000..9abdaafb --- /dev/null +++ b/src/questdb/mpdecimal_compat.h @@ -0,0 +1,56 @@ +#ifndef MPDECIMAL_COMPAT_H +#define MPDECIMAL_COMPAT_H + +#include +#include +#include + +/* Determine the limb type used by CPython's libmpdec build. */ +#if SIZE_MAX == UINT64_MAX +typedef uint64_t mpd_uint_t; +typedef int64_t mpd_ssize_t; +#define MPD_RADIX_CONST UINT64_C(10000000000000000000) /* 10**19 */ +#elif SIZE_MAX == UINT32_MAX +typedef uint32_t mpd_uint_t; +typedef int32_t mpd_ssize_t; +#define MPD_RADIX_CONST UINT32_C(1000000000) /* 10**9 */ +#else +#error "Unsupported platform: mpdecimal compatibility requires 32-bit or 64-bit size_t." +#endif + +typedef struct { + uint8_t flags; + mpd_ssize_t exp; + mpd_ssize_t digits; + mpd_ssize_t len; + mpd_ssize_t alloc; + mpd_uint_t* data; +} mpd_t; + +typedef struct { + PyObject_HEAD + Py_hash_t hash; + mpd_t dec; + mpd_uint_t data[4]; +} PyDecObject; + +static inline mpd_t* decimal_mpd(PyObject* obj) { + return &((PyDecObject*)obj)->dec; +} + +static inline mpd_uint_t* decimal_digits(PyObject* obj) { + PyDecObject* dec = (PyDecObject*)obj; + return dec->dec.data != NULL ? dec->dec.data : dec->data; +} + +enum { + MPD_FLAG_SIGN = 0x01, + MPD_FLAG_INF = 0x02, + MPD_FLAG_NAN = 0x04, + MPD_FLAG_SNAN = 0x08, + MPD_FLAG_SPECIAL_MASK = MPD_FLAG_INF | MPD_FLAG_NAN | MPD_FLAG_SNAN +}; + +static const mpd_uint_t MPD_RADIX = MPD_RADIX_CONST; + +#endif /* MPDECIMAL_COMPAT_H */ diff --git a/src/questdb/mpdecimal_compat.pxd b/src/questdb/mpdecimal_compat.pxd new file mode 100644 index 00000000..8a8b412e --- /dev/null +++ b/src/questdb/mpdecimal_compat.pxd @@ -0,0 +1,71 @@ +from libc.stdint cimport uint8_t +from libc.stddef cimport size_t +from cpython.object cimport PyObject + +# Mirror the subset of libmpdec types that CPython embeds in Decimal objects. +ctypedef size_t mpd_uint_t +ctypedef Py_ssize_t mpd_ssize_t + +cdef extern from "mpdecimal_compat.h": + ctypedef struct mpd_t: + uint8_t flags + mpd_ssize_t exp + mpd_ssize_t digits + mpd_ssize_t len + mpd_ssize_t alloc + mpd_uint_t* data + + mpd_t* decimal_mpd(PyObject* obj) + mpd_uint_t* decimal_digits(PyObject* obj) + const mpd_uint_t MPD_RADIX + const uint8_t MPD_FLAG_SIGN + const uint8_t MPD_FLAG_SPECIAL_MASK + +cdef inline object decimal_pyobj_to_binary( + PyObject* cell, + unsigned int* encoded_scale, + object ingress_error_cls, + object bad_dataframe_code) except *: + """Convert a Python ``Decimal`` to ILP binary components.""" + cdef mpd_t* mpd + cdef mpd_uint_t* digits_ptr + cdef unsigned long long flag_low + cdef Py_ssize_t idx + cdef Py_ssize_t scale_value + cdef object unscaled_obj + + mpd = decimal_mpd(cell) + + flag_low = mpd.flags & 0xFF + if (flag_low & MPD_FLAG_SPECIAL_MASK) != 0: + # NaN/Inf values propagate as ILP nulls (caller will emit empty payload). + encoded_scale[0] = 0 + return None + + digits_ptr = decimal_digits(cell) + + if mpd.len <= 0: + unscaled_obj = 0 + else: + unscaled_obj = digits_ptr[mpd.len - 1] + for idx in range(mpd.len - 2, -1, -1): + # Each limb stores MPD_RADIX (10^9 or 10^19) digits in little-endian order. + unscaled_obj = unscaled_obj * MPD_RADIX + digits_ptr[idx] + + if mpd.exp >= 0: + # Decimal ILP does not support negative scales; adjust the unscaled value instead. + if mpd.exp != 0: + unscaled_obj = unscaled_obj * (10 ** mpd.exp) + scale_value = 0 + else: + scale_value = -mpd.exp + if scale_value > 76: + raise ingress_error_cls( + bad_dataframe_code, + f'Decimal scale {scale_value} exceeds the maximum supported scale of 76') + + if (flag_low & MPD_FLAG_SIGN) != 0: + unscaled_obj = -unscaled_obj + + encoded_scale[0] = scale_value + return unscaled_obj.to_bytes((unscaled_obj.bit_length() + 7) // 8, byteorder='big', signed=True) diff --git a/test/test.py b/test/test.py index 1ef5c05a..9b8d7262 100755 --- a/test/test.py +++ b/test/test.py @@ -45,6 +45,7 @@ if pd is not None: from test_dataframe import TestPandasProtocolVersionV1 from test_dataframe import TestPandasProtocolVersionV2 + from test_dataframe import TestPandasProtocolVersionV3 else: class TestNoPandas(unittest.TestCase): def test_no_pandas(self): @@ -416,8 +417,8 @@ def test_bad_protocol_versions(self): '0', 'automatic', 0, - 3, - '3', + 4, + '4', 1.5, '1.5', '2.0', @@ -426,8 +427,8 @@ def test_bad_protocol_versions(self): for version in bad_versions: with self.assertRaisesRegex( qi.IngressError, - '"protocol_version" must be None, "auto", 1 or 2'): - self.builder('tcp', '127.0.0.1', 12345, protocol_version='3') + '"protocol_version" must be None, "auto", 1-3'): + self.builder('tcp', '127.0.0.1', 12345, protocol_version=version) self.fail('Should not have reached here - constructing sender') bad_versions.append(None) @@ -1474,7 +1475,7 @@ class TestBufferProtocolVersionV1(TestBases.TestBuffer): class TestBufferProtocolVersionV2(TestBases.TestBuffer): - name = 'protocol version 1' + name = 'protocol version 2' version = 2 diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 66bbd71d..2d85cd43 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -8,6 +8,7 @@ import functools import tempfile import pathlib +from decimal import Decimal from test_tools import _float_binary_bytes, _array_binary_bytes, TimestampEncodingMixin BROKEN_TIMEZONES = True @@ -80,6 +81,44 @@ def _dataframe(protocol_version: int, *args, **kwargs): pd.Timestamp('20180312')]} ) +DECIMAL_BINARY_FORMAT_TYPE = 23 + + +def _decode_decimal_payload(line: bytes, prefix: bytes = b'tbl dec=') -> tuple[int, bytes]: + """Extract (scale, mantissa-bytes) from a serialized decimal line.""" + if not line.startswith(prefix): + raise AssertionError(f'Unexpected decimal prefix in line: {line!r}') + payload = line[len(prefix):] + if len(payload) < 4: + raise AssertionError(f'Invalid decimal payload length: {len(payload)}') + if payload[0] != ord('='): + raise AssertionError(f'Unexpected decimal type marker: {payload[0]}') + if payload[1] != DECIMAL_BINARY_FORMAT_TYPE: + raise AssertionError(f'Unexpected decimal format type: {payload[1]}') + scale = payload[2] + byte_width = payload[3] + mantissa = payload[4:] + if len(mantissa) != byte_width: + raise AssertionError( + f'Expected {byte_width} mantissa bytes, got {len(mantissa)}') + return scale, mantissa + + +def _decimal_from_unscaled(unscaled, scale: int): + if unscaled is None: + return None + return Decimal(unscaled).scaleb(-scale) + + +def _decimal_binary_payload(unscaled, scale: int, byte_width: int) -> bytes: + if unscaled is None: + return b'=' + bytes([DECIMAL_BINARY_FORMAT_TYPE, 0, 0]) + return ( + b'=' + + bytes([DECIMAL_BINARY_FORMAT_TYPE, scale, byte_width]) + + int(unscaled).to_bytes(byte_width, byteorder='big', signed=True) + ) + def with_tmp_dir(func): @functools.wraps(func) @@ -528,6 +567,83 @@ def test_f64_numpy_col(self): b'tbl1 a' + _float_binary_bytes(float('NAN'), self.version == 1) + b'\n' + b'tbl1 a' + _float_binary_bytes(1.7976931348623157e308, self.version == 1) + b'\n') + def test_decimal_pyobj_column(self): + df = pd.DataFrame({'dec': [Decimal('123.45'), Decimal('-0.5')]}) + if self.version < 3: + with self.assertRaisesRegex( + qi.IngressError, + 'does not support the decimal datatype'): + _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + return + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + decoded = [_decode_decimal_payload(line) for line in buf.splitlines()] + expected = [Decimal('123.45'), Decimal('-0.5')] + self.assertEqual(len(decoded), len(expected)) + for (scale, mantissa), expected_value in zip(decoded, expected): + unscaled = int.from_bytes(mantissa, byteorder='big', signed=True) + self.assertEqual(Decimal(unscaled).scaleb(-scale), expected_value) + + def test_decimal_pyobj_trailing_zeros_and_integer(self): + if self.version < 3: + self.skipTest('decimal datatype requires ILP version 3 or later') + df = pd.DataFrame({'dec': [Decimal('1.2300'), Decimal('1000')]}) + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + decoded = [_decode_decimal_payload(line) for line in buf.splitlines()] + expected = [Decimal('1.23'), Decimal('1000')] + self.assertEqual(len(decoded), len(expected)) + for (scale, mantissa), expected_value in zip(decoded, expected): + unscaled = int.from_bytes(mantissa, byteorder='big', signed=True) + self.assertEqual(Decimal(unscaled).scaleb(-scale), expected_value) + + def test_decimal_pyobj_special_values(self): + if self.version < 3: + self.skipTest('decimal datatype requires ILP version 3 or later') + df = pd.DataFrame({'dec': [Decimal('NaN'), Decimal('Infinity'), Decimal('-Infinity')]}) + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + decoded = [_decode_decimal_payload(line) for line in buf.splitlines()] + self.assertEqual(len(decoded), 3) + for scale, mantissa in decoded: + self.assertEqual(scale, 0) + self.assertEqual(len(mantissa), 0) + + def test_decimal_arrow_columns(self): + if self.version < 3: + arr = pd.array( + [Decimal('1.23')], + dtype=pd.ArrowDtype(pa.decimal128(10, 2))) + df = pd.DataFrame({'dec': arr, 'count': [0]}) + with self.assertRaisesRegex( + qi.IngressError, + 'does not support the decimal datatype'): + _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + return + + arrow_cases = [ + (pa.decimal32(7, 2), [12345, -6789]), + (pa.decimal64(14, 4), [123456789, -987654321]), + (pa.decimal128(38, 6), [123456789012345, -987654321012345, None]), + (pa.decimal256(76, 10), [1234567890123456789012345, -987654321098765432109876, None]), + ] + + for arrow_type, unscaled_values in arrow_cases: + values = [_decimal_from_unscaled(unscaled, arrow_type.scale) for unscaled in unscaled_values] + arr = pd.array(values, dtype=pd.ArrowDtype(arrow_type)) + counts = list(range(len(values))) + df = pd.DataFrame({'dec': arr, 'count': counts}) + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + offset = 0 + prefix = b'tbl dec=' + for unscaled, count in zip(unscaled_values, counts): + suffix = f',count={count}i\n'.encode('ascii') + end = buf.index(suffix, offset) + line = buf[offset:end + len(suffix)] + self.assertTrue(line.startswith(prefix), line) + payload = line[len(prefix):len(line) - len(suffix)] if len(suffix) else line[len(prefix):] + expected_payload = _decimal_binary_payload(unscaled, arrow_type.scale, arrow_type.byte_width) + self.assertEqual(payload, expected_payload) + offset = end + len(suffix) + self.assertEqual(offset, len(buf)) + def test_u8_arrow_col(self): df = pd.DataFrame({ 'a': pd.Series([ @@ -1588,7 +1704,7 @@ def test_arrow_chunked_array(self): # need to, so - as for now - we just test that we raise a nice error. with self.assertRaisesRegex( qi.IngressError, - r"Unsupported dtype int16\[pyarrow\] for column 'a'.*github"): + r"Unsupported arrow type int16 for column 'a'.*github"): _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) @unittest.skipIf(not fastparquet, 'fastparquet not installed') @@ -1687,6 +1803,11 @@ class TestPandasProtocolVersionV2(TestPandasBase.TestPandas): version = 2 +class TestPandasProtocolVersionV3(TestPandasBase.TestPandas): + name = 'protocol version 3' + version = 3 + + if __name__ == '__main__': if os.environ.get('TEST_QUESTDB_PROFILE') == '1': import cProfile