From c7d1f994aa8f60100fc207a9bc2dcc91d53b0efe Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Mon, 28 Jul 2025 11:40:49 -0500 Subject: [PATCH 1/4] Add object support for UDF parameters and returns --- accel.c | 61 ++- setup.cfg | 2 +- singlestoredb/functions/decorator.py | 55 +-- singlestoredb/functions/ext/asgi.py | 15 +- singlestoredb/functions/ext/json.py | 112 +++-- singlestoredb/functions/ext/rowdat_1.py | 159 ++++--- singlestoredb/functions/signature.py | 484 +++++++++++++++------- singlestoredb/tests/ext_funcs/__init__.py | 96 ++++- singlestoredb/tests/test_ext_func.py | 18 +- singlestoredb/utils/dtypes.py | 4 +- 10 files changed, 703 insertions(+), 303 deletions(-) diff --git a/accel.c b/accel.c index 9436a04d1..39e8d0318 100644 --- a/accel.c +++ b/accel.c @@ -4108,6 +4108,7 @@ static PyObject *load_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *py_colspec = NULL; PyObject *py_str = NULL; PyObject *py_blob = NULL; + PyObject **py_transformers = NULL; Py_ssize_t length = 0; uint64_t row_id = 0; uint8_t is_null = 0; @@ -4138,6 +4139,7 @@ static PyObject *load_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) colspec_l = PyObject_Length(py_colspec); ctypes = malloc(sizeof(int) * colspec_l); + py_transformers = calloc(sizeof(PyObject*), colspec_l); for (i = 0; i < colspec_l; i++) { PyObject *py_cspec = PySequence_GetItem(py_colspec, i); @@ -4145,6 +4147,15 @@ static PyObject *load_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *py_ctype = PySequence_GetItem(py_cspec, 1); if (!py_ctype) { Py_DECREF(py_cspec); goto error; } ctypes[i] = (int)PyLong_AsLong(py_ctype); + py_transformers[i] = PySequence_GetItem(py_cspec, 2); + if (!py_transformers[i]) { + Py_DECREF(py_ctype); + Py_DECREF(py_cspec); + goto error; + } + if (py_transformers[i] == Py_None) { + py_transformers[i] = NULL; + } Py_DECREF(py_ctype); Py_DECREF(py_cspec); if (PyErr_Occurred()) { goto error; } @@ -4380,6 +4391,14 @@ static PyObject *load_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) default: goto error; } + + if (py_transformers[i]) { + PyObject *py_item = PyTuple_GetItem(py_row, i); + PyObject *py_transformed = PyObject_CallFunction(py_transformers[i], "O", py_item); + if (!py_transformed) goto error; + Py_DECREF(py_item); + CHECKRC(PyTuple_SetItem(py_row, i, py_transformed)); + } } CHECKRC(PyList_Append(py_out_rows, py_row)); @@ -4389,6 +4408,12 @@ static PyObject *load_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) exit: if (ctypes) free(ctypes); + if (py_transformers) { + for (i = 0; i < colspec_l; i++) { + Py_XDECREF(py_transformers[i]); + } + free(py_transformers); + } Py_XDECREF(py_row); @@ -4412,6 +4437,7 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *py_row_ids = NULL; PyObject *py_row_ids_iter = NULL; PyObject *py_item = NULL; + PyObject **py_transformers = NULL; uint64_t row_id = 0; uint8_t is_null = 0; int8_t i8 = 0; @@ -4459,12 +4485,26 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) returns = malloc(sizeof(int) * n_cols); if (!returns) goto error; + py_transformers = calloc(sizeof(PyObject*), n_cols); + if (!py_transformers) goto error; for (i = 0; i < n_cols; i++) { - PyObject *py_item = PySequence_GetItem(py_returns, i); - if (!py_item) goto error; - returns[i] = (int)PyLong_AsLong(py_item); - Py_DECREF(py_item); + PyObject *py_cspec = PySequence_GetItem(py_returns, i); + if (!py_cspec) goto error; + PyObject *py_ctype = PySequence_GetItem(py_cspec, 1); + if (!py_ctype) { Py_DECREF(py_cspec); goto error; } + returns[i] = (int)PyLong_AsLong(py_ctype); + py_transformers[i] = PySequence_GetItem(py_cspec, 2); + if (!py_transformers[i]) { + Py_DECREF(py_ctype); + Py_DECREF(py_cspec); + goto error; + } + if (py_transformers[i] == Py_None) { + py_transformers[i] = NULL; + } + Py_DECREF(py_ctype); + Py_DECREF(py_cspec); if (PyErr_Occurred()) { goto error; } } @@ -4504,6 +4544,13 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) memcpy(out+out_idx, &is_null, 1); out_idx += 1; + if (py_transformers[i]) { + PyObject *py_transformed = PyObject_CallFunction(py_transformers[i], "O", py_item); + if (!py_transformed) goto error; + Py_DECREF(py_item); + py_item = py_transformed; + } + switch (returns[i]) { case MYSQL_TYPE_BIT: // TODO @@ -4702,6 +4749,12 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs) exit: if (returns) free(returns); + if (py_transformers) { + for (i = 0; i < n_cols; i++) { + Py_XDECREF(py_transformers[i]); + } + free(py_transformers); + } Py_XDECREF(py_item); Py_XDECREF(py_row_iter); diff --git a/setup.cfg b/setup.cfg index 0547a17e6..79ee975d3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -86,7 +86,7 @@ exclude = docs/* resources/* licenses/* -max-complexity = 45 +max-complexity = 50 max-line-length = 90 per-file-ignores = singlestoredb/__init__.py:F401 diff --git a/singlestoredb/functions/decorator.py b/singlestoredb/functions/decorator.py index 687211368..55be87bae 100644 --- a/singlestoredb/functions/decorator.py +++ b/singlestoredb/functions/decorator.py @@ -23,7 +23,7 @@ UDFType = Callable[..., Any] -def is_valid_type(obj: Any) -> bool: +def is_valid_object_type(obj: Any) -> bool: """Check if the object is a valid type for a schema definition.""" if not inspect.isclass(obj): return False @@ -52,48 +52,29 @@ def is_valid_callable(obj: Any) -> bool: returns = utils.get_annotations(obj).get('return', None) - if inspect.isclass(returns) and issubclass(returns, str): + if inspect.isclass(returns) and issubclass(returns, SQLString): return True - raise TypeError( - f'callable {obj} must return a str, ' - f'but got {returns}', - ) + return False -def expand_types(args: Any) -> Optional[Union[List[str], Type[Any]]]: +def expand_types(args: Any) -> List[Any]: """Expand the types for the function arguments / return values.""" if args is None: - return None - - # SQL string - if isinstance(args, str): - return [args] - - # General way of accepting pydantic.BaseModel, NamedTuple, TypedDict - elif is_valid_type(args): - return args - - # List of SQL strings or callables - elif isinstance(args, list): - new_args = [] - for arg in args: - if isinstance(arg, str): - new_args.append(arg) - elif callable(arg): - new_args.append(arg()) - else: - raise TypeError(f'unrecognized type for parameter: {arg}') - return new_args - - # Callable that returns a SQL string - elif is_valid_callable(args): - out = args() - if not isinstance(out, str): - raise TypeError(f'unrecognized type for parameter: {args}') - return [out] - - raise TypeError(f'unrecognized type for parameter: {args}') + return [] + + if not isinstance(args, list): + args = [args] + + new_args = [] + for arg in args: + if isinstance(arg, str): + new_args.append(arg) + elif is_valid_callable(arg): + new_args.append(arg()) + else: + new_args.append(arg) + return new_args def _func( diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 69b498bd4..25a00f6ed 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -126,6 +126,7 @@ async def to_thread( 'float64': ft.DOUBLE, 'str': ft.STRING, 'bytes': -ft.STRING, + 'json': ft.STRING, } @@ -586,7 +587,11 @@ def make_func( dtype = x['dtype'].replace('?', '') if dtype not in rowdat_1_type_map: raise TypeError(f'no data type mapping for {dtype}') - colspec.append((x['name'], rowdat_1_type_map[dtype])) + colspec.append(( + x['name'], + rowdat_1_type_map[dtype], + x.get('transformer', None), + )) info['colspec'] = colspec # Setup return type @@ -595,7 +600,11 @@ def make_func( dtype = x['dtype'].replace('?', '') if dtype not in rowdat_1_type_map: raise TypeError(f'no data type mapping for {dtype}') - returns.append((x['name'], rowdat_1_type_map[dtype])) + returns.append(( + x['name'], + rowdat_1_type_map[dtype], + x.get('transformer', None), + )) info['returns'] = returns return do_func, info @@ -1084,7 +1093,7 @@ async def __call__( with timer('format_output'): body = output_handler['dump']( - [x[1] for x in func_info['returns']], *result, # type: ignore + func_info['returns'], *result, # type: ignore ) await send(output_handler['response']) diff --git a/singlestoredb/functions/ext/json.py b/singlestoredb/functions/ext/json.py index 05710247d..2695c4e39 100644 --- a/singlestoredb/functions/ext/json.py +++ b/singlestoredb/functions/ext/json.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 import base64 import json +from collections.abc import Callable from typing import Any from typing import List +from typing import Optional from typing import Tuple from typing import TYPE_CHECKING @@ -40,10 +42,17 @@ def default(self, obj: Any) -> Any: return json.JSONEncoder.default(self, obj) -def decode_row(coltypes: List[int], row: List[Any]) -> List[Any]: +def decode_row( + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], + row: List[Any], +) -> List[Any]: out = [] - for dtype, item in zip(coltypes, row): - out.append(PYTHON_CONVERTERS[dtype](item)) # type: ignore + for (name, dtype, transformer), item in zip(colspec, row): + out.append( + apply_transformer( + transformer, PYTHON_CONVERTERS[dtype](item), # type: ignore + ), + ) return out @@ -51,8 +60,15 @@ def decode_value(coltype: int, data: Any) -> Any: return PYTHON_CONVERTERS[coltype](data) # type: ignore +def apply_transformer( + transformer: Optional[Callable[..., Any]], + value: Any, +) -> Any: + return transformer(value) if transformer is not None else value + + def load( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[List[int], List[Any]]: ''' @@ -60,7 +76,7 @@ def load( Parameters ---------- - colspec : Iterable[Tuple[str, int]] + colspec : Iterable[Tuple[str, int, Optional[Callable[..., Any]]]] An Iterable of column data types data : bytes The data in JSON format @@ -74,12 +90,12 @@ def load( rows = [] for row_id, *row in json.loads(data.decode('utf-8'))['data']: row_ids.append(row_id) - rows.append(decode_row([x[1] for x in colspec], row)) + rows.append(decode_row(colspec, row)) return row_ids, rows def _load_vectors( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[List[int], List[Any]]: ''' @@ -87,7 +103,7 @@ def _load_vectors( Parameters ---------- - colspec : Iterable[Tuple[str, int]] + colspec : Iterable[Tuple[str, int, Optional[Callable[..., Any]]]] An Iterable of column data types data : bytes The data in JSON format @@ -107,13 +123,19 @@ def _load_vectors( if not cols: cols = [([], []) for _ in row] for i, (spec, x) in enumerate(zip(colspec, row)): - cols[i][0].append(decode_value(spec[1], x) if x is not None else defaults[i]) + cols[i][0].append( + apply_transformer( + spec[2], + decode_value(spec[1], x) + if x is not None else defaults[i], + ), + ) cols[i][1].append(False if x is not None else True) return row_ids, cols def load_pandas( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[List[int], List[Any]]: ''' @@ -121,7 +143,7 @@ def load_pandas( Parameters ---------- - colspec : Iterable[Tuple[str, int]] + colspec : Iterable[Tuple[str, int, Optional[Callable[..., Any]]]] An Iterable of column data types data : bytes The data in JSON format @@ -149,7 +171,7 @@ def load_pandas( def load_polars( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[List[int], List[Any]]: ''' @@ -157,7 +179,7 @@ def load_polars( Parameters ---------- - colspec : Iterable[Tuple[str, int]] + colspec : Iterable[Tuple[str, int, Optional[Callable[..., Any]]]] An Iterable of column data types data : bytes The data in JSON format @@ -172,7 +194,13 @@ def load_polars( return pl.Series(None, row_ids, dtype=pl.Int64), \ [ ( - pl.Series(spec[0], data, dtype=POLARS_TYPE_MAP[spec[1]]), + pl.Series( + spec[0], + data, + # We have to use the TEXT type for JSON objects at the moment, + # so don't explicitly cast the dtypes for a db type of 254. + dtype=POLARS_TYPE_MAP[spec[1]] if spec[1] != 254 else None, + ), pl.Series(None, mask, dtype=pl.Boolean), ) for (data, mask), spec in zip(cols, colspec) @@ -180,7 +208,7 @@ def load_polars( def load_numpy( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[Any, List[Any]]: ''' @@ -188,7 +216,7 @@ def load_numpy( Parameters ---------- - colspec : Iterable[Tuple[str, int]] + colspec : Iterable[Tuple[str, int, Optional[Callable[..., Any]]]] An Iterable of column data types data : bytes The data in JSON format @@ -211,7 +239,7 @@ def load_numpy( def load_arrow( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[Any, List[Any]]: ''' @@ -219,7 +247,7 @@ def load_arrow( Parameters ---------- - colspec : Iterable[Tuple[str, int]] + colspec : Iterable[Tuple[str, int, Optional[Callable[..., Any]]]] An Iterable of column data types data : bytes The data in JSON format @@ -240,12 +268,12 @@ def load_arrow( ), pa.array(mask, type=pa.bool_()), ) - for (data, mask), (name, dtype) in zip(cols, colspec) + for (data, mask), (name, dtype, transformer) in zip(cols, colspec) ] def dump( - returns: List[int], + returns: List[Tuple[str, int, Callable[..., Any]]], row_ids: List[int], rows: List[List[Any]], ) -> bytes: @@ -254,7 +282,7 @@ def dump( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Callable[..., Any]]] The returned data type row_ids : List[int] Row IDs @@ -266,12 +294,16 @@ def dump( bytes ''' + rows = [ + [apply_transformer(returns[i][2], item) for i, item in enumerate(row)] + for row in rows + ] data = list(zip(row_ids, *list(zip(*rows)))) return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8') def _dump_vectors( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: List[int], cols: List[Tuple[Any, Any]], ) -> bytes: @@ -280,7 +312,7 @@ def _dump_vectors( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Optional[Callable[..., Any]]]] The returned data type row_ids : List[int] Row IDs @@ -295,9 +327,14 @@ def _dump_vectors( masked_cols = [] for i, (data, mask) in enumerate(cols): if mask is not None: - masked_cols.append([d if m is not None else None for d, m in zip(data, mask)]) + masked_cols.append([ + apply_transformer(returns[i][2], d) + if m is not None else None for d, m in zip(data, mask) + ]) else: - masked_cols.append(cols[i][0]) + masked_cols.append([ + apply_transformer(returns[i][2], d) for d in data + ]) data = list(zip(row_ids, *masked_cols)) return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8') @@ -307,7 +344,7 @@ def _dump_vectors( def dump_pandas( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pd.Series[int]', cols: List[Tuple['pd.Series[int]', 'pd.Series[bool]']], ) -> bytes: @@ -316,7 +353,7 @@ def dump_pandas( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Optional[Callable[..., Any]]]] The returned data type row_ids : pd.Series[int] Row IDs @@ -328,14 +365,15 @@ def dump_pandas( bytes ''' - import pandas as pd - row_ids.index = row_ids - df = pd.concat([row_ids] + [x[0] for x in cols], axis=1) - return ('{"data": %s}' % df.to_json(orient='values')).encode('utf-8') + return _dump_vectors( + returns, + row_ids.tolist(), + [(x[0].tolist(), x[1].tolist() if x[1] is not None else None) for x in cols], + ) def dump_polars( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pl.Series[int]', cols: List[Tuple['pl.Series[Any]', 'pl.Series[int]']], ) -> bytes: @@ -344,7 +382,7 @@ def dump_polars( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Optional[Callable[..., Any]]]] The returned data type row_ids : List[int] cols : List[Tuple[polars.Series[Any], polars.Series[bool]] @@ -363,7 +401,7 @@ def dump_polars( def dump_numpy( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'np.typing.NDArray[np.int64]', cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']], ) -> bytes: @@ -372,7 +410,7 @@ def dump_numpy( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Optional[Callable[..., Any]]]] The returned data type row_ids : List[int] Row IDs @@ -392,7 +430,7 @@ def dump_numpy( def dump_arrow( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pa.Array[int]', cols: List[Tuple['pa.Array[int]', 'pa.Array[bool]']], ) -> bytes: @@ -401,7 +439,7 @@ def dump_arrow( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Optional[Callable[..., Any]]]] The returned data type row_ids : pyarrow.Array[int] Row IDs diff --git a/singlestoredb/functions/ext/rowdat_1.py b/singlestoredb/functions/ext/rowdat_1.py index 83052b671..abef90783 100644 --- a/singlestoredb/functions/ext/rowdat_1.py +++ b/singlestoredb/functions/ext/rowdat_1.py @@ -3,6 +3,7 @@ import warnings from io import BytesIO from typing import Any +from typing import Callable from typing import List from typing import Optional from typing import Sequence @@ -88,8 +89,15 @@ binary_types = set([-x for x in string_types]) +def apply_transformer( + transformer: Optional[Callable[..., Any]], + value: Any, +) -> Any: + return transformer(value) if transformer is not None else value + + def _load( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[List[int], List[Any]]: ''' @@ -115,7 +123,7 @@ def _load( while data_io.tell() < data_len: row_ids.append(struct.unpack(' Tuple[List[int], List[Tuple[Sequence[Any], Optional[Sequence[Any]]]]]: ''' @@ -162,7 +170,7 @@ def _load_vectors( val = None while data_io.tell() < data_len: row_ids.append(struct.unpack(' Tuple[ 'pd.Series[np.int64]', @@ -215,12 +223,12 @@ def _load_pandas( pd.Series(data, index=index, name=name, dtype=PANDAS_TYPE_MAP[dtype]), pd.Series(mask, index=index, dtype=np.bool_), ) - for (data, mask), (name, dtype) in zip(cols, colspec) + for (data, mask), (name, dtype, transformer) in zip(cols, colspec) ] def _load_polars( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'pl.Series[pl.Int64]', @@ -247,15 +255,21 @@ def _load_polars( return pl.Series(None, row_ids, dtype=pl.Int64), \ [ ( - pl.Series(name=name, values=data, dtype=POLARS_TYPE_MAP[dtype]), + pl.Series( + name=name, + values=data, + # We have to use the TEXT type for JSON objects at the moment, + # so don't explicitly cast the dtypes for a db type of 254. + dtype=POLARS_TYPE_MAP[dtype] if dtype != 254 else None, + ), pl.Series(values=mask, dtype=pl.Boolean), ) - for (data, mask), (name, dtype) in zip(cols, colspec) + for (data, mask), (name, dtype, transformer) in zip(cols, colspec) ] def _load_numpy( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'np.typing.NDArray[np.int64]', @@ -285,12 +299,12 @@ def _load_numpy( np.asarray(data, dtype=NUMPY_TYPE_MAP[dtype]), # type: ignore np.asarray(mask, dtype=np.bool_), # type: ignore ) - for (data, mask), (name, dtype) in zip(cols, colspec) + for (data, mask), (name, dtype, transformer) in zip(cols, colspec) ] def _load_arrow( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'pa.Array[pa.int64]', @@ -323,12 +337,12 @@ def _load_arrow( ), pa.array(mask, type=pa.bool_()), ) - for (data, mask), (name, dtype) in zip(cols, colspec) + for (data, mask), (name, dtype, transformer) in zip(cols, colspec) ] def _dump( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: List[int], rows: List[List[Any]], ) -> bytes: @@ -337,7 +351,7 @@ def _dump( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Callable[..., Any]]] The returned data type row_ids : List[int] The row IDs @@ -356,8 +370,9 @@ def _dump( for row_id, *values in zip(row_ids, *list(zip(*rows))): out.write(struct.pack(' bytes: @@ -406,7 +421,7 @@ def _dump_vectors( Parameters ---------- - returns : List[int] + returns : List[Tuple[str, int, Callable[..., Any]]] The returned data type row_ids : List[int] The row IDs @@ -427,8 +442,8 @@ def _dump_vectors( out.write(struct.pack(' bytes: @@ -490,7 +505,7 @@ def _dump_arrow( def _dump_numpy( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'np.typing.NDArray[np.int64]', cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']], ) -> bytes: @@ -502,7 +517,7 @@ def _dump_numpy( def _dump_pandas( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pd.Series[np.int64]', cols: List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']], ) -> bytes: @@ -514,7 +529,7 @@ def _dump_pandas( def _dump_polars( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pl.Series[pl.Int64]', cols: List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']], ) -> bytes: @@ -526,7 +541,7 @@ def _dump_polars( def _load_numpy_accel( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'np.typing.NDArray[np.int64]', @@ -535,22 +550,43 @@ def _load_numpy_accel( if not has_accel: raise RuntimeError('could not load SingleStoreDB extension') - return _singlestoredb_accel.load_rowdat_1_numpy(colspec, data) + import numpy as np + + row_ids, cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data) + cols = list(cols) + for i, (d, mask) in enumerate(cols): + transformer = colspec[i][2] + if transformer is not None: + d = np.vectorize(lambda x: apply_transformer(transformer, x))(d) + cols[i] = (d, mask) + + return row_ids, cols def _dump_numpy_accel( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'np.typing.NDArray[np.int64]', cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']], ) -> bytes: if not has_accel: raise RuntimeError('could not load SingleStoreDB extension') - return _singlestoredb_accel.dump_rowdat_1_numpy(returns, row_ids, cols) + import numpy as np + + cols = list(cols) + for i, (d, mask) in enumerate(cols): + transformer = returns[i][2] + if transformer is not None: + d = np.vectorize(lambda x: apply_transformer(transformer, x))(d) + cols[i] = (d, mask) + + return _singlestoredb_accel.dump_rowdat_1_numpy( + [x[1] for x in returns], row_ids, cols, + ) def _load_pandas_accel( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'pd.Series[np.int64]', @@ -562,26 +598,25 @@ def _load_pandas_accel( import numpy as np import pandas as pd - numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data) + numpy_ids, numpy_cols = _load_numpy_accel(colspec, data) cols = [ ( pd.Series(data, name=name, dtype=PANDAS_TYPE_MAP[dtype]), pd.Series(mask, dtype=np.bool_), ) - for (name, dtype), (data, mask) in zip(colspec, numpy_cols) + for (name, dtype, transformer), (data, mask) in zip(colspec, numpy_cols) ] return pd.Series(numpy_ids, dtype=np.int64), cols def _dump_pandas_accel( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pd.Series[np.int64]', cols: List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']], ) -> bytes: if not has_accel: raise RuntimeError('could not load SingleStoreDB extension') - numpy_ids = row_ids.to_numpy() numpy_cols = [ ( data.to_numpy(), @@ -589,11 +624,12 @@ def _dump_pandas_accel( ) for data, mask in cols ] - return _singlestoredb_accel.dump_rowdat_1_numpy(returns, numpy_ids, numpy_cols) + + return _dump_numpy_accel(returns, row_ids.to_numpy(), numpy_cols) def _load_polars_accel( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'pl.Series[pl.Int64]', @@ -604,7 +640,7 @@ def _load_polars_accel( import polars as pl - numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data) + numpy_ids, numpy_cols = _load_numpy_accel(colspec, data) cols = [ ( pl.Series( @@ -614,13 +650,13 @@ def _load_polars_accel( ), pl.Series(values=mask, dtype=pl.Boolean), ) - for (name, dtype), (data, mask) in zip(colspec, numpy_cols) + for (name, dtype, transformer), (data, mask) in zip(colspec, numpy_cols) ] return pl.Series(values=numpy_ids, dtype=pl.Int64), cols def _dump_polars_accel( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pl.Series[pl.Int64]', cols: List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']], ) -> bytes: @@ -635,11 +671,12 @@ def _dump_polars_accel( ) for data, mask in cols ] - return _singlestoredb_accel.dump_rowdat_1_numpy(returns, numpy_ids, numpy_cols) + + return _dump_numpy_accel(returns, numpy_ids, numpy_cols) def _load_arrow_accel( - colspec: List[Tuple[str, int]], + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], data: bytes, ) -> Tuple[ 'pa.Array[pa.int64]', @@ -650,13 +687,13 @@ def _load_arrow_accel( import pyarrow as pa - numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data) + numpy_ids, numpy_cols = _load_numpy_accel(colspec, data) cols = [ ( pa.array(data, type=PYARROW_TYPE_MAP[dtype], mask=mask), pa.array(mask, type=pa.bool_()), ) - for (data, mask), (name, dtype) in zip(numpy_cols, colspec) + for (data, mask), (name, dtype, transformer) in zip(numpy_cols, colspec) ] return pa.array(numpy_ids, type=pa.int64()), cols @@ -674,7 +711,7 @@ def _create_arrow_mask( def _dump_arrow_accel( - returns: List[int], + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], row_ids: 'pa.Array[pa.int64]', cols: List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']], ) -> bytes: @@ -686,16 +723,34 @@ def _dump_arrow_accel( data.fill_null(DEFAULT_VALUES[dtype]).to_numpy(zero_copy_only=False), _create_arrow_mask(data, mask), ) - for (data, mask), dtype in zip(cols, returns) + for (data, mask), (name, dtype, transformer) in zip(cols, returns) ] - return _singlestoredb_accel.dump_rowdat_1_numpy( - returns, row_ids.to_numpy(), numpy_cols, - ) + + return _dump_numpy_accel(returns, row_ids.to_numpy(), numpy_cols) + + +def _load_accel( + colspec: List[Tuple[str, int, Optional[Callable[..., Any]]]], + data: bytes, +) -> Tuple[List[int], List[Any]]: + if not has_accel: + raise RuntimeError('could not load SingleStoreDB extension') + return _singlestoredb_accel.load_rowdat_1(colspec, data) + + +def _dump_accel( + returns: List[Tuple[str, int, Optional[Callable[..., Any]]]], + row_ids: List[int], + rows: List[List[Any]], +) -> bytes: + if not has_accel: + raise RuntimeError('could not load SingleStoreDB extension') + return _singlestoredb_accel.dump_rowdat_1(returns, row_ids, rows) if not has_accel: - load = _load_accel = _load - dump = _dump_accel = _dump + load = _load_accel = _load # noqa: F811 + dump = _dump_accel = _dump # noqa: F811 load_list = _load_vectors # noqa: F811 dump_list = _dump_vectors # noqa: F811 load_pandas = _load_pandas_accel = _load_pandas # noqa: F811 @@ -708,8 +763,6 @@ def _dump_arrow_accel( dump_polars = _dump_polars_accel = _dump_polars # noqa: F811 else: - _load_accel = _singlestoredb_accel.load_rowdat_1 - _dump_accel = _singlestoredb_accel.dump_rowdat_1 load = _load_accel dump = _dump_accel load_list = _load_vectors diff --git a/singlestoredb/functions/signature.py b/singlestoredb/functions/signature.py index 35504401d..de32cfc57 100644 --- a/singlestoredb/functions/signature.py +++ b/singlestoredb/functions/signature.py @@ -16,10 +16,11 @@ from typing import Optional from typing import Sequence from typing import Tuple -from typing import Type from typing import TypeVar from typing import Union +from singlestoredb.functions import transformers + try: import numpy as np has_numpy = True @@ -185,9 +186,31 @@ class NoDefaultType: 'TINYBLOB': 'bytes', 'MEDIUMBLOB': 'bytes', 'LONGBLOB': 'bytes', + 'JSON': 'json', } +@dataclasses.dataclass +class ParamSpec: + # Normalized data type of the parameter + dtype: Any + + # Name of the parameter, if applicable + name: str = '' + + # SQL type of the parameter + sql_type: str = '' + + # Default value of the parameter, if applicable + default: Any = NO_DEFAULT + + # Transformer function to apply to the parameter + transformer: Optional[Callable[..., Any]] = None + + # Whether the parameter is optional (e.g., Union[T, None] or Optional[T]) + is_optional: bool = False + + class Collection: """Base class for collection data types.""" @@ -522,7 +545,7 @@ def collapse_dtypes(dtypes: Union[str, List[str]], include_null: bool = False) - def get_dataclass_schema( obj: Any, include_default: bool = False, -) -> List[Union[Tuple[str, Any], Tuple[str, Any, Any]]]: +) -> List[ParamSpec]: """ Get the schema of a dataclass. @@ -533,25 +556,26 @@ def get_dataclass_schema( Returns ------- - List[Tuple[str, Any]] | List[Tuple[str, Any, Any]] - A list of tuples containing the field names and field types + List[ParamSpec] + A list of parameter specifications for the dataclass fields """ if include_default: return [ - ( - f.name, f.type, - NO_DEFAULT if f.default is dataclasses.MISSING else f.default, + ParamSpec( + name=f.name, + dtype=f.type, + default=NO_DEFAULT if f.default is dataclasses.MISSING else f.default, ) for f in dataclasses.fields(obj) ] - return [(f.name, f.type) for f in dataclasses.fields(obj)] + return [ParamSpec(name=f.name, dtype=f.type) for f in dataclasses.fields(obj)] def get_typeddict_schema( obj: Any, include_default: bool = False, -) -> List[Union[Tuple[str, Any], Tuple[str, Any, Any]]]: +) -> List[ParamSpec]: """ Get the schema of a TypedDict. @@ -564,22 +588,26 @@ def get_typeddict_schema( Returns ------- - List[Tuple[str, Any]] | List[Tuple[str, Any, Any]] - A list of tuples containing the field names and field types + List[ParamSpec] + A list of parameter specifications for the TypedDict fields """ if include_default: return [ - (k, v, getattr(obj, k, NO_DEFAULT)) + ParamSpec( + name=k, + dtype=v, + default=getattr(obj, k, NO_DEFAULT), + ) for k, v in utils.get_annotations(obj).items() ] - return list(utils.get_annotations(obj).items()) + return [ParamSpec(name=k, dtype=v) for k, v in utils.get_annotations(obj).items()] def get_pydantic_schema( obj: Any, include_default: bool = False, -) -> List[Union[Tuple[str, Any], Tuple[str, Any, Any]]]: +) -> List[ParamSpec]: """ Get the schema of a pydantic model. @@ -592,26 +620,28 @@ def get_pydantic_schema( Returns ------- - List[Tuple[str, Any]] | List[Tuple[str, Any, Any]] - A list of tuples containing the field names and field types + List[ParamSpec] + A list of parameter specifications for the pydantic model fields """ import pydantic_core if include_default: return [ - ( - k, v.annotation, - NO_DEFAULT if v.default is pydantic_core.PydanticUndefined else v.default, + ParamSpec( + name=k, + dtype=v.annotation, + default=NO_DEFAULT + if v.default is pydantic_core.PydanticUndefined else v.default, ) for k, v in obj.model_fields.items() ] - return [(k, v.annotation) for k, v in obj.model_fields.items()] + return [ParamSpec(name=k, dtype=v.annotation) for k, v in obj.model_fields.items()] def get_namedtuple_schema( obj: Any, include_default: bool = False, -) -> List[Union[Tuple[Any, str], Tuple[Any, str, Any]]]: +) -> List[ParamSpec]: """ Get the schema of a named tuple. @@ -624,25 +654,28 @@ def get_namedtuple_schema( Returns ------- - List[Tuple[Any, str]] | List[Tuple[Any, str, Any]] - A list of tuples containing the field names and field types + List[ParamSpec] + A list of parameter specifications for the named tuple fields """ if include_default: return [ ( - k, v, - obj._field_defaults.get(k, NO_DEFAULT), + ParamSpec( + name=k, + dtype=v, + default=obj._field_defaults.get(k, NO_DEFAULT), + ) ) for k, v in utils.get_annotations(obj).items() ] - return list(utils.get_annotations(obj).items()) + return [ParamSpec(name=k, dtype=v) for k, v in utils.get_annotations(obj).items()] def get_table_schema( obj: Any, include_default: bool = False, -) -> List[Union[Tuple[Any, str], Tuple[Any, str, Any]]]: +) -> List[ParamSpec]: """ Get the schema of a Table. @@ -655,22 +688,26 @@ def get_table_schema( Returns ------- - List[Tuple[Any, str]] | List[Tuple[Any, str, Any]] - A list of tuples containing the field names and field types + List[ParamSpec] + A list of parameter specifications for the Table fields """ if include_default: return [ - (k, v, getattr(obj, k, NO_DEFAULT)) + ParamSpec( + name=k, + dtype=v, + default=getattr(obj, k, NO_DEFAULT), + ) for k, v in utils.get_annotations(obj).items() ] - return list(utils.get_annotations(obj).items()) + return [ParamSpec(name=k, dtype=v) for k, v in utils.get_annotations(obj).items()] def get_colspec( overrides: Any, include_default: bool = False, -) -> List[Union[Tuple[str, Any], Tuple[str, Any, Any]]]: +) -> List[ParamSpec]: """ Get the column specification from the overrides. @@ -683,8 +720,8 @@ def get_colspec( Returns ------- - List[Tuple[str, Any]] | List[Tuple[str, Any, Any]] - A list of tuples containing the field names and field types + List[ParamSpec] + A list of parameter specifications for the column fields """ overrides_colspec = [] @@ -717,21 +754,24 @@ def get_colspec( # List of types elif isinstance(overrides, list): - if include_default: - overrides_colspec = [ - (getattr(x, 'name', ''), x, NO_DEFAULT) for x in overrides - ] - else: - overrides_colspec = [(getattr(x, 'name', ''), x) for x in overrides] + overrides_colspec = [ + ParamSpec( + name=getattr(x, 'name', ''), + dtype=sql_to_dtype(x) if isinstance(x, str) else x, + sql_type=x if isinstance(x, str) else '', + ) for x in overrides + ] # Other else: - if include_default: - overrides_colspec = [ - (getattr(overrides, 'name', ''), overrides, NO_DEFAULT), - ] - else: - overrides_colspec = [(getattr(overrides, 'name', ''), overrides)] + overrides_colspec = [ + ParamSpec( + name=getattr(overrides, 'name', ''), + dtype=sql_to_dtype(overrides) + if isinstance(overrides, str) else overrides, + sql_type=overrides if isinstance(overrides, str) else '', + ), + ] return overrides_colspec @@ -756,11 +796,87 @@ def unpack_masked_type(obj: Any) -> Any: return obj +def unwrap_optional(annotation: Any) -> Tuple[Any, bool]: + """ + Unwrap Optional[T] and Union[T, None] annotations to get the underlying type. + Also indicates whether the type was optional. + + Examples: + Optional[int] -> (int, True) + Union[str, None] -> (str, True) + Union[int, str, None] -> (Union[int, str], True) + Union[int, str] -> (Union[int, str], False) + int -> (int, False) + + Parameters + ---------- + annotation : Any + The type annotation to unwrap + + Returns + ------- + Tuple[Any, bool] + A tuple containing: + - The unwrapped type annotation + - A boolean indicating if the original type was optional (contained None) + + """ + origin = typing.get_origin(annotation) + is_optional = False + + # Handle Union types (which includes Optional) + if origin is Union: + args = typing.get_args(annotation) + # Check if None is in the union + is_optional = type(None) in args + + # Filter out None/NoneType + non_none_args = [arg for arg in args if arg is not type(None)] + + if not non_none_args: + # If only None was in the Union + from typing import Any + return Any, is_optional + elif len(non_none_args) == 1: + # If there's only one type left, return it directly + return non_none_args[0], is_optional + else: + # Recreate the Union with the remaining types + return Union[tuple(non_none_args)], is_optional + + return annotation, is_optional + + +def is_object_type(spec: Any, mode: str, is_optional: bool) -> Optional[List[ParamSpec]]: + """ + Check if the type is an object type and return a list of ParamSpecs. + + """ + input_or_output = 1 if mode == 'return' else 0 + if inspect.isclass(spec) and ( + issubclass(spec, dict) + or utils.is_dataclass(spec) + or utils.is_typeddict(spec) + or utils.is_pydantic(spec) + or utils.is_namedtuple(spec) + ): + return [ + ParamSpec( + dtype='json', + sql_type='TEXT NULL' if is_optional else 'TEXT NOT NULL', + default=NO_DEFAULT, + transformer=transformers.create_json_transformers(spec)[input_or_output], + is_optional=is_optional, + ), + ] + return None + + def get_schema( spec: Any, - overrides: Optional[Union[List[str], Type[Any]]] = None, + overrides: Optional[List[ParamSpec]] = None, mode: str = 'parameter', -) -> Tuple[List[Tuple[str, Any, Optional[str]]], str, str]: +) -> Tuple[List[ParamSpec], str, str]: """ Expand a return type annotation into a list of types and field names. @@ -768,14 +884,14 @@ def get_schema( ---------- spec : Any The return type specification - overrides : List[str], optional + overrides : List[ParamSpec], optional List of SQL type specifications for the return type mode : str The mode of the function, either 'parameter' or 'return' Returns ------- - Tuple[List[Tuple[str, Any, Optional[str]]], str, str] + Tuple[List[ParamSpec], str, str] A list of tuples containing the field names and field types, the normalized data format, optionally the SQL definition of the type, and the data format of the type @@ -785,6 +901,7 @@ def get_schema( data_format = '' function_type = 'udf' + spec, is_optional = unwrap_optional(spec) origin = typing.get_origin(spec) args = typing.get_args(spec) args_origins = [typing.get_origin(x) if x is not None else None for x in args] @@ -836,18 +953,27 @@ def get_schema( elif utils.is_vector(spec) or spec in [str, float, int, bytes]: pass + elif inspect.isclass(spec) and ( + issubclass(spec, dict) + or utils.is_dataframe(spec) + or utils.is_dataclass(spec) + or utils.is_typeddict(spec) + or utils.is_pydantic(spec) + or utils.is_namedtuple(spec) + ): + # TODO: Use TEXT for now because external functions don't support JSON + return [ + ParamSpec( + dtype='json', + sql_type='TEXT NULL' if is_optional else 'TEXT NOT NULL', + default=NO_DEFAULT, + transformer=transformers.create_json_transformers(spec)[1], + is_optional=is_optional, + ), + ], 'scalar', 'udf' + # Try to catch some common mistakes - elif origin in [tuple, dict] or tuple in args_origins or \ - ( - inspect.isclass(spec) and - ( - utils.is_dataframe(spec) - or utils.is_dataclass(spec) - or utils.is_typeddict(spec) - or utils.is_pydantic(spec) - or utils.is_namedtuple(spec) - ) - ): + elif origin in [tuple, dict] or tuple in args_origins: raise TypeError( 'invalid return type for a UDF; ' f'expecting a scalar or vector, but got {spec}', @@ -857,43 +983,64 @@ def get_schema( elif utils.is_vector(spec) or spec in [str, float, int, bytes]: pass + # Object types get converted to JSON + elif inspect.isclass(spec) and ( + issubclass(spec, dict) + or utils.is_dataclass(spec) + or utils.is_typeddict(spec) + or utils.is_pydantic(spec) + or utils.is_namedtuple(spec) + ): + # TODO: Use TEXT for now because external functions don't support JSON + return [ + ParamSpec( + dtype='json', + sql_type='TEXT NULL' if is_optional else 'TEXT NOT NULL', + default=NO_DEFAULT, + transformer=transformers.create_json_transformers(spec)[0], + is_optional=is_optional, + ), + ], 'scalar', 'udf' + # Error out for incorrect parameter types - elif origin in [tuple, dict] or tuple in args_origins or \ - ( - inspect.isclass(spec) and - ( - utils.is_dataframe(spec) - or utils.is_dataclass(spec) - or utils.is_typeddict(spec) - or utils.is_pydantic(spec) - or utils.is_namedtuple(spec) - ) - ): + elif origin in [tuple, dict] or tuple in args_origins or ( + inspect.isclass(spec) and utils.is_dataframe(spec) + ): raise TypeError(f'parameter types must be scalar or vector, got {spec}') # # Process each parameter / return type into a colspec # - # Compute overrides colspec from various formats - overrides_colspec = get_colspec(overrides) - # Dataframe type if utils.is_dataframe(spec): - colspec = overrides_colspec + if not overrides: + raise TypeError( + 'column types must be specified by the ' + '`returns=` parameter of the @udf decorator for a DataFrame', + ) + colspec = get_colspec(overrides[0].dtype, include_default=True) # Numpy array types elif utils.is_numpy(spec): data_format = 'numpy' + if overrides: - colspec = overrides_colspec + + # Short circuit if the data type will be converted to JSON + obj = is_object_type(overrides[0].dtype, mode, is_optional) + if obj is not None: + return obj, data_format, function_type + + colspec = overrides + elif len(typing.get_args(spec)) < 2: raise TypeError( 'numpy array must have a data type specified ' 'in the @udf decorator or with an NDArray type annotation', ) else: - colspec = [('', typing.get_args(spec)[1])] + colspec = [ParamSpec(dtype=typing.get_args(spec)[1])] # Pandas Series elif utils.is_pandas_series(spec): @@ -903,7 +1050,13 @@ def get_schema( 'pandas Series must have a data type specified ' 'in the @udf decorator', ) - colspec = overrides_colspec + + # Short circuit if the data type will be converted to JSON + obj = is_object_type(overrides[0].dtype, mode, is_optional) + if obj is not None: + return obj, data_format, function_type + + colspec = overrides # Polars Series elif utils.is_polars_series(spec): @@ -913,7 +1066,13 @@ def get_schema( 'polars Series must have a data type specified ' 'in the @udf decorator', ) - colspec = overrides_colspec + + # Short circuit if the data type will be converted to JSON + obj = is_object_type(overrides[0].dtype, mode, is_optional) + if obj is not None: + return obj, data_format, function_type + + colspec = overrides # PyArrow Array elif utils.is_pyarrow_array(spec): @@ -923,23 +1082,29 @@ def get_schema( 'pyarrow Arrays must have a data type specified ' 'in the @udf decorator', ) - colspec = overrides_colspec + + # Short circuit if the data type will be converted to JSON + obj = is_object_type(overrides[0].dtype, mode, is_optional) + if obj is not None: + return obj, data_format, function_type + + colspec = overrides # Return type is specified by a dataclass definition elif utils.is_dataclass(spec): - colspec = overrides_colspec or get_dataclass_schema(spec) + colspec = overrides or get_dataclass_schema(spec) # Return type is specified by a TypedDict definition elif utils.is_typeddict(spec): - colspec = overrides_colspec or get_typeddict_schema(spec) + colspec = overrides or get_typeddict_schema(spec) # Return type is specified by a pydantic model elif utils.is_pydantic(spec): - colspec = overrides_colspec or get_pydantic_schema(spec) + colspec = overrides or get_pydantic_schema(spec) # Return type is specified by a named tuple elif utils.is_namedtuple(spec): - colspec = overrides_colspec or get_namedtuple_schema(spec) + colspec = overrides or get_namedtuple_schema(spec) # Unrecognized return type elif spec is not None: @@ -947,30 +1112,21 @@ def get_schema( # Return type is specified by a SQL string if isinstance(spec, str): data_format = 'scalar' - colspec = [(getattr(spec, 'name', ''), spec)] + colspec = [ParamSpec(dtype=spec, is_optional=is_optional)] # Plain list vector elif typing.get_origin(spec) is list: data_format = 'list' - colspec = [('', typing.get_args(spec)[0])] + spec, is_optional = unwrap_optional(typing.get_args(spec)[0]) + colspec = [ParamSpec(dtype=spec, is_optional=is_optional)] # Multiple return values elif inspect.isclass(typing.get_origin(spec)) \ and issubclass(typing.get_origin(spec), tuple): # type: ignore[arg-type] - out_names, out_overrides = [], [] - - # Get the colspec for the overrides - if overrides: - out_colspec = [ - x for x in get_colspec(overrides, include_default=True) - ] - out_names = [x[0] for x in out_colspec] - out_overrides = [x[1] for x in out_colspec] - # Make sure that the number of overrides matches the number of # return types or parameter types - if out_overrides and len(typing.get_args(spec)) != len(out_overrides): + if overrides and len(typing.get_args(spec)) != len(overrides): raise ValueError( f'number of {mode} types does not match the number of ' 'overrides specified', @@ -981,20 +1137,21 @@ def get_schema( # Get the colspec for each item in the tuple for i, x in enumerate(typing.get_args(spec)): - out_item, out_data_format, _ = get_schema( + params, out_data_format, _ = get_schema( unpack_masked_type(x), - overrides=out_overrides[i] if out_overrides else [], + overrides=overrides if overrides else None, # Always pass UDF mode for individual items mode=mode, ) # Use the name from the overrides if specified - if out_names and out_names[i] and not out_item[0][0]: - out_item = [(out_names[i], *out_item[0][1:])] - elif not out_item[0][0]: - out_item = [(f'{string.ascii_letters[i]}', *out_item[0][1:])] + if overrides: + if overrides[i] and not params[i].name: + params[i].name = overrides[i].name + elif not overrides[i].name: + params[i].name = f'{string.ascii_letters[i]}' - colspec += out_item + colspec.append(params[i]) out_data_formats.append(out_data_format) # Make sure that all the data formats are the same @@ -1015,25 +1172,35 @@ def get_schema( elif overrides: if not data_format: data_format = get_data_format(spec) - colspec = overrides_colspec + colspec = overrides # Single value, no override else: if not data_format: data_format = 'scalar' - colspec = [('', spec)] + colspec = [ParamSpec(dtype=spec, is_optional=is_optional)] out = [] # Normalize colspec data types - for k, v, *_ in colspec: - out.append(( - k, - collapse_dtypes( - [normalize_dtype(x) for x in simplify_dtype(v)], - ), - v if isinstance(v, str) else None, - )) + for c in colspec: + + if isinstance(c.dtype, str): + dtype = c.dtype + else: + dtype = collapse_dtypes( + [normalize_dtype(x) for x in simplify_dtype(c.dtype)], + include_null=c.is_optional, + ) + + p = ParamSpec( + name=c.name, + dtype=dtype, + sql_type=c.sql_type if isinstance(c.sql_type, str) else None, + is_optional=c.is_optional, + ) + + out.append(p) return out, data_format, function_type @@ -1151,12 +1318,10 @@ def get_signature( # Generate the parameter type and the corresponding SQL code for that parameter args_schema = [] args_data_formats = [] - args_colspec = [x for x in get_colspec(attrs.get('args', []), include_default=True)] - args_overrides = [x[1] for x in args_colspec] - args_defaults = [x[2] for x in args_colspec] # type: ignore + args_colspec = get_colspec(attrs.get('args', []), include_default=True) args_masks, ret_masks = get_masks(func) - if args_overrides and len(args_overrides) != len(signature.parameters): + if args_colspec and len(args_colspec) != len(signature.parameters): raise ValueError( 'number of args in the decorator does not match ' 'the number of parameters in the function signature', @@ -1168,33 +1333,49 @@ def get_signature( for i, param in enumerate(params): arg_schema, args_data_format, _ = get_schema( unpack_masked_type(param.annotation), - overrides=args_overrides[i] if args_overrides else [], + overrides=[args_colspec[i]] if args_colspec else None, mode='parameter', ) args_data_formats.append(args_data_format) + if len(arg_schema) > 1: + raise TypeError( + 'only one parameter type is supported; ' + f'got {len(arg_schema)} types for parameter {param.name}', + ) + # Insert parameter names as needed - if not arg_schema[0][0]: - args_schema.append((param.name, *arg_schema[0][1:])) + if not arg_schema[0].name: + arg_schema[0].name = param.name - for i, (name, atype, sql) in enumerate(args_schema): + args_schema.append(arg_schema[0]) + + for i, pspec in enumerate(args_schema): default_option = {} # Insert default values as needed - if args_defaults: - if args_defaults[i] is not NO_DEFAULT: - default_option['default'] = args_defaults[i] - else: - if params[i].default is not param.empty: - default_option['default'] = params[i].default + if args_colspec and args_colspec[i].default is not NO_DEFAULT: + default_option['default'] = args_colspec[i].default + elif params and params[i].default is not param.empty: + default_option['default'] = params[i].default # Generate SQL code for the parameter - sql = sql or dtype_to_sql( - atype, force_nullable=args_masks[i], **default_option, + sql = pspec.sql_type or dtype_to_sql( + pspec.dtype, + force_nullable=args_masks[i] or pspec.is_optional, + **default_option, ) # Add parameter to args definitions - args.append(dict(name=name, dtype=atype, sql=sql, **default_option)) + args.append( + dict( + name=pspec.name, + dtype=pspec.dtype, + sql=sql, + **default_option, + transformer=pspec.transformer, + ), + ) # Check that all the data formats are all the same if len(set(args_data_formats)) > 1: @@ -1205,10 +1386,12 @@ def get_signature( out['args_data_format'] = args_data_formats[0] if args_data_formats else 'scalar' + returns_colspec = get_colspec(attrs.get('returns', []), include_default=True) + # Generate the return types and the corresponding SQL code for those values ret_schema, out['returns_data_format'], function_type = get_schema( unpack_masked_type(signature.return_annotation), - overrides=attrs.get('returns', None), + overrides=returns_colspec if returns_colspec else None, mode='return', ) @@ -1218,22 +1401,39 @@ def get_signature( # All functions have to return a value, so if none was specified try to # insert a reasonable default that includes NULLs. if not ret_schema: - ret_schema = [('', 'int8?', 'TINYINT NULL')] + ret_schema = [ + ParamSpec( + dtype='int8?', sql_type='TINYINT NULL', default=None, is_optional=True, + ), + ] # Generate field names for the return values if function_type == 'tvf' or len(ret_schema) > 1: - for i, (name, rtype, sql) in enumerate(ret_schema): - if not name: - ret_schema[i] = (string.ascii_letters[i], rtype, sql) + for i, rspec in enumerate(ret_schema): + if not rspec.name: + ret_schema[i] = ParamSpec( + name=string.ascii_letters[i], + dtype=rspec.dtype, + sql_type=rspec.sql_type, + transformer=rspec.transformer, + ) # Generate SQL code for the return values - for i, (name, rtype, sql) in enumerate(ret_schema): - sql = sql or dtype_to_sql( - rtype, - force_nullable=ret_masks[i] if ret_masks else False, + for i, rspec in enumerate(ret_schema): + sql = rspec.sql_type or dtype_to_sql( + rspec.dtype, + force_nullable=(ret_masks[i] or rspec.is_optional) + if ret_masks else rspec.is_optional, function_type=function_type, ) - returns.append(dict(name=name, dtype=rtype, sql=sql)) + returns.append( + dict( + name=rspec.name, + dtype=rspec.dtype, + sql=sql, + transformer=rspec.transformer, + ), + ) # Set the function endpoint out['endpoint'] = '/invoke' diff --git a/singlestoredb/tests/ext_funcs/__init__.py b/singlestoredb/tests/ext_funcs/__init__.py index f5ea9e419..8100f74c7 100644 --- a/singlestoredb/tests/ext_funcs/__init__.py +++ b/singlestoredb/tests/ext_funcs/__init__.py @@ -3,12 +3,16 @@ import asyncio import time import typing +from dataclasses import dataclass from typing import List from typing import NamedTuple from typing import Optional from typing import Tuple import numpy as np +import pandas as pd +import polars as pl +import pyarrow as pa import singlestoredb.functions.dtypes as dt from singlestoredb.functions import Masked @@ -598,13 +602,8 @@ def vec_function( return x * y -class VecInputs(typing.NamedTuple): - x: np.int8 - y: np.int8 - - -class VecOutputs(typing.NamedTuple): - res: np.int16 +VecInputs = [np.int8, np.int8] +VecOutputs = np.int16 @udf(args=VecInputs, returns=VecOutputs) @@ -633,11 +632,7 @@ async def async_vec_function_df( return pdt.DataFrame(dict(res=[1, 2, 3], res2=[1.1, 2.2, 3.3])) -class MaskOutputs(typing.NamedTuple): - res: Optional[np.int16] - - -@udf(args=VecInputs, returns=MaskOutputs) +@udf(args=VecInputs, returns=np.int16) def vec_function_ints_masked( x: Masked[npt.IntArray], y: Masked[npt.IntArray], ) -> Table[Masked[npt.IntArray]]: @@ -646,9 +641,7 @@ def vec_function_ints_masked( return Table(Masked(x_data * y_data, x_nulls | y_nulls)) -class MaskOutputs2(typing.NamedTuple): - res: Optional[np.int16] - res2: Optional[np.int16] +MaskOutputs2 = [np.int16, np.int16] @udf(args=VecInputs, returns=MaskOutputs2) @@ -661,3 +654,76 @@ def vec_function_ints_masked2( Masked(x_data * y_data, x_nulls | y_nulls), Masked(x_data * y_data, x_nulls | y_nulls), ) + + +@dataclass +class InputStruct: + a: int + b: int + c: str + + +@dataclass +class OutputStruct: + a: str + b: str + + +@udf +def json_func(x: InputStruct, y: int = 2) -> OutputStruct: + return OutputStruct( + a=x.a * x.c * y, + b=x.b * x.c * y, + ) + + +@udf(args=InputStruct, returns=OutputStruct) +def numpy_vec_json_func(x: np.ndarray) -> np.ndarray: + out = [] + for item in x: + out.append( + OutputStruct( + a=item.a * item.c, + b=item.b * item.c, + ), + ) + return np.array(out) + + +@udf(args=InputStruct, returns=OutputStruct) +def pandas_vec_json_func(x: pd.Series) -> pd.Series: + out = [] + for item in x: + out.append( + OutputStruct( + a=item.a * item.c, + b=item.b * item.c, + ), + ) + return pd.Series(out) + + +@udf(args=InputStruct, returns=OutputStruct) +def arrow_vec_json_func(x: pa.Array) -> pa.Array: + out = [] + for item in x: + out.append( + OutputStruct( + a=item.a * item.c, + b=item.b * item.c, + ), + ) + return pa.array(out) + + +@udf(args=dict, returns=dict) +def polars_vec_json_func(x: pl.Series) -> pl.Series: + out = [] + for item in x: + out.append( + OutputStruct( + a=item['a'] * item['c'], + b=int(item['b']) * item['c'], + ), + ) + return pl.Series(out) diff --git a/singlestoredb/tests/test_ext_func.py b/singlestoredb/tests/test_ext_func.py index d3e680e58..a65721e32 100755 --- a/singlestoredb/tests/test_ext_func.py +++ b/singlestoredb/tests/test_ext_func.py @@ -1407,7 +1407,7 @@ def test_vec_function_ints_masked(self): desc = self.cur.description assert len(desc) == 1 - assert desc[0].name == 'res' + assert desc[0].name == 'a' assert desc[0].type_code == ft.SHORT assert desc[0].null_ok is True @@ -1417,7 +1417,7 @@ def test_vec_function_ints_masked(self): desc = self.cur.description assert len(desc) == 1 - assert desc[0].name == 'res' + assert desc[0].name == 'a' assert desc[0].type_code == ft.SHORT assert desc[0].null_ok is True @@ -1427,7 +1427,7 @@ def test_vec_function_ints_masked(self): desc = self.cur.description assert len(desc) == 1 - assert desc[0].name == 'res' + assert desc[0].name == 'a' assert desc[0].type_code == ft.SHORT assert desc[0].null_ok is True @@ -1438,10 +1438,10 @@ def test_vec_function_ints_masked2(self): desc = self.cur.description assert len(desc) == 2 - assert desc[0].name == 'res' + assert desc[0].name == 'a' assert desc[0].type_code == ft.SHORT assert desc[0].null_ok is True - assert desc[1].name == 'res2' + assert desc[1].name == 'b' assert desc[1].type_code == ft.SHORT assert desc[1].null_ok is True @@ -1451,10 +1451,10 @@ def test_vec_function_ints_masked2(self): desc = self.cur.description assert len(desc) == 2 - assert desc[0].name == 'res' + assert desc[0].name == 'a' assert desc[0].type_code == ft.SHORT assert desc[0].null_ok is True - assert desc[1].name == 'res2' + assert desc[1].name == 'b' assert desc[1].type_code == ft.SHORT assert desc[1].null_ok is True @@ -1464,9 +1464,9 @@ def test_vec_function_ints_masked2(self): desc = self.cur.description assert len(desc) == 2 - assert desc[0].name == 'res' + assert desc[0].name == 'a' assert desc[0].type_code == ft.SHORT assert desc[0].null_ok is True - assert desc[1].name == 'res2' + assert desc[1].name == 'b' assert desc[1].type_code == ft.SHORT assert desc[1].null_ok is True diff --git a/singlestoredb/utils/dtypes.py b/singlestoredb/utils/dtypes.py index 73eb893c1..98914b9b4 100644 --- a/singlestoredb/utils/dtypes.py +++ b/singlestoredb/utils/dtypes.py @@ -136,7 +136,7 @@ 15: pa.string(), # Varchar -15: pa.binary(), # Varbinary 16: pa.binary(), # Bit - 245: pa.string(), # JSON + 245: pa.binary(), # JSON 246: pa.decimal128(18, 6), # NewDecimal -246: pa.decimal128(18, 6), # NewDecimal 247: pa.string(), # Enum @@ -182,7 +182,7 @@ 15: pl.Utf8, # Varchar -15: pl.Utf8, # Varbinary 16: pl.Binary, # Bit - 245: pl.Object, # JSON + 245: pl.Struct, # JSON 246: pl.Decimal(10, 6), # NewDecimal -246: pl.Decimal(10, 6), # NewDecimal 247: pl.Utf8, # Enum From 56ab36734bc8e0e778eb950b9d0eb3b65ec89db5 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Mon, 28 Jul 2025 13:42:52 -0500 Subject: [PATCH 2/4] Add missing transformers --- singlestoredb/functions/transformers.py | 265 ++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 singlestoredb/functions/transformers.py diff --git a/singlestoredb/functions/transformers.py b/singlestoredb/functions/transformers.py new file mode 100644 index 000000000..8123790bb --- /dev/null +++ b/singlestoredb/functions/transformers.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +import dataclasses +import functools +import json +from typing import Any +from typing import Callable +from typing import Dict +from typing import Tuple +from typing import Type +from typing import TypeVar + +from . import utils + +T = TypeVar('T') + + +def json_to_dict(cls: Type[T], json_value: str) -> Dict[str, Any]: + """ + Convert a JSON string to a dictionary. + + Parameters + ---------- + json_value : str or dict + The JSON string or dictionary representing the object. + + Returns + ------- + dict + A dictionary with fields populated from the JSON. + + """ + return cls(json.loads(json_value)) # type: ignore + + +def json_to_pydantic(cls: Type[T], json_value: str) -> T: + """ + Convert a JSON string to a Pydantic model instance. + + Parameters + ---------- + cls : Type[T] + The Pydantic model type to instantiate. + json_value : str or dict + The JSON string or dictionary representing the object. + + Returns + ------- + T + An instance of the Pydantic model with fields populated from the JSON. + + """ + return cls(**json.loads(json_value)) + + +def json_to_namedtuple(cls: Type[T], json_value: str) -> T: + """ + Convert a JSON string to a namedtuple instance. + + Parameters + ---------- + cls : Type[T] + The namedtuple type to instantiate. + json_value : str or dict + The JSON string or dictionary representing the object. + + Returns + ------- + T + An instance of the namedtuple with fields populated from the JSON. + + """ + data = json.loads(json_value) + field_types = getattr(cls, '_field_types', getattr(cls, '__annotations__', {})) + typed_data = {} + for key, value in data.items(): + if key in field_types: + typ = field_types[key] + try: + typed_data[key] = typ(value) + except Exception: + typed_data[key] = value # fallback if conversion fails + else: + typed_data[key] = value + return cls(**typed_data) + + +def json_to_typeddict(cls: Type[T], json_value: str) -> Dict[str, Any]: + """ + Convert a JSON string to a TypedDict instance. + + Parameters + ---------- + cls : Type[T] + The TypedDict type to instantiate. + json_value : str or dict + The JSON string or dictionary representing the object. + + Returns + ------- + T + An instance of the TypedDict with fields populated from the JSON. + + """ + data = json.loads(json_value) + field_types = getattr(cls, '__annotations__', {}) + typed_data = {} + for key, value in data.items(): + if key in field_types: + typ = field_types[key] + try: + typed_data[key] = typ(value) + except Exception: + typed_data[key] = value # fallback if conversion fails + else: + typed_data[key] = value + return typed_data # TypedDicts are just dicts at runtime + + +def json_to_dataclass(cls: Type[T], json_value: str) -> T: + """ + Convert a JSON string to a dataclass instance. + + Parameters + ---------- + cls : Type[T] + The dataclass type to instantiate. + json_str : str + The JSON string representing the object. + + Returns + ------- + T + An instance of the dataclass with fields populated from the JSON. + + """ + data = json.loads(json_value) + field_types = {f.name: f.type for f in dataclasses.fields(cls)} # type: ignore + typed_data = {} + for key, value in data.items(): + if key in field_types: + typ = field_types[key] + try: + if callable(typ): + typed_data[key] = typ(value) + else: + typed_data[key] = value + except Exception: + typed_data[key] = value # fallback if conversion fails + else: + typed_data[key] = value + return cls(**typed_data) + + +def json_to_pandas_dataframe(cls: Type[T], json_value: str) -> T: + """ + Convert a JSON string to a DataFrame instance. + + Parameters + ---------- + cls : Type[T] + The DataFrame type to instantiate. + json_value : str or dict + The JSON string or dictionary representing the object. + + Returns + ------- + T + An instance of the DataFrame with fields populated from the JSON. + + """ + return cls(json.loads(json_value)) # type: ignore + + +def dict_to_json(cls: Type[T], obj: Dict[str, Any]) -> str: + """ + Convert a dictionary to a JSON string. + """ + return json.dumps(obj) + + +def pydantic_to_json(cls: Type[T], obj: Any) -> str: + """ + Convert a Pydantic model instance to a JSON string. + """ + return obj.model_dump_json() + + +def namedtuple_to_json(cls: Type[T], obj: Any) -> str: + """ + Convert a namedtuple instance to a JSON string. + """ + return json.dumps(obj._asdict()) + + +def typeddict_to_json(cls: Type[T], obj: Dict[str, Any]) -> str: + """ + Convert a TypedDict instance (just a dict at runtime) to a JSON string. + """ + return json.dumps(obj) + + +def dataclass_to_json(cls: Type[T], obj: Any) -> str: + """ + Convert a dataclass instance to a JSON string. + """ + return json.dumps(dataclasses.asdict(obj)) + + +def pandas_dataframe_to_json(cls: Type[T], obj: Any) -> str: + """ + Convert a pandas DataFrame to a JSON string (records orientation). + """ + return obj.to_json(orient='records') + + +def create_json_transformers( + cls: Type[T], +) -> Tuple[Callable[[T], str], Callable[[str], T]]: + """ + Create transformers for arbitrary objects to JSON strings. + + Parameters + ---------- + cls : Type[T] + The class type to instantiate for the JSON conversion. + + Returns + ------- + Tuple[Callable[[T], str], Callable[[str], T]] + A tuple containing two functions: + - The first function converts an instance of `cls` to a JSON string. + - The second function converts a JSON string back to an instance of `cls`. + + """ + if issubclass(cls, dict): + return ( # type: ignore + functools.partial(json_to_dict, cls), + functools.partial(dict_to_json, cls), + ) + elif utils.is_pydantic(cls): + return ( # type: ignore + functools.partial(json_to_pydantic, cls), + functools.partial(pydantic_to_json, cls), + ) + elif utils.is_namedtuple(cls): + return ( # type: ignore + functools.partial(json_to_namedtuple, cls), + functools.partial(namedtuple_to_json, cls), + ) + elif utils.is_typeddict(cls): + return ( # type: ignore + functools.partial(json_to_typeddict, cls), + functools.partial(typeddict_to_json, cls), + ) + elif utils.is_dataclass(cls): + return ( # type: ignore + functools.partial(json_to_dataclass, cls), + functools.partial(dataclass_to_json, cls), + ) + elif utils.is_dataframe(cls): + return ( # type: ignore + functools.partial(json_to_pandas_dataframe, cls), + functools.partial(pandas_dataframe_to_json, cls), + ) + raise TypeError(f'Unsupported type for JSON conversion: {type(cls).__name__}') From b112ba0e8478887067b0fee99d69a6bf30a58730 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 29 Jul 2025 09:30:45 -0500 Subject: [PATCH 3/4] Fix tests --- singlestoredb/functions/signature.py | 10 +-- singlestoredb/functions/transformers.py | 20 +++--- singlestoredb/tests/test_ext_func_data.py | 81 +++++++++++------------ 3 files changed, 54 insertions(+), 57 deletions(-) diff --git a/singlestoredb/functions/signature.py b/singlestoredb/functions/signature.py index de32cfc57..71fa5cc12 100644 --- a/singlestoredb/functions/signature.py +++ b/singlestoredb/functions/signature.py @@ -1139,19 +1139,19 @@ def get_schema( for i, x in enumerate(typing.get_args(spec)): params, out_data_format, _ = get_schema( unpack_masked_type(x), - overrides=overrides if overrides else None, + overrides=[overrides[i]] if overrides else None, # Always pass UDF mode for individual items mode=mode, ) # Use the name from the overrides if specified if overrides: - if overrides[i] and not params[i].name: - params[i].name = overrides[i].name + if overrides[i] and not params[0].name: + params[0].name = overrides[i].name elif not overrides[i].name: - params[i].name = f'{string.ascii_letters[i]}' + params[0].name = f'{string.ascii_letters[i]}' - colspec.append(params[i]) + colspec.append(params[0]) out_data_formats.append(out_data_format) # Make sure that all the data formats are the same diff --git a/singlestoredb/functions/transformers.py b/singlestoredb/functions/transformers.py index 8123790bb..c391500ac 100644 --- a/singlestoredb/functions/transformers.py +++ b/singlestoredb/functions/transformers.py @@ -20,8 +20,8 @@ def json_to_dict(cls: Type[T], json_value: str) -> Dict[str, Any]: Parameters ---------- - json_value : str or dict - The JSON string or dictionary representing the object. + json_value : str + The JSON string representing the object. Returns ------- @@ -40,8 +40,8 @@ def json_to_pydantic(cls: Type[T], json_value: str) -> T: ---------- cls : Type[T] The Pydantic model type to instantiate. - json_value : str or dict - The JSON string or dictionary representing the object. + json_value : str + The JSON string representing the object. Returns ------- @@ -60,8 +60,8 @@ def json_to_namedtuple(cls: Type[T], json_value: str) -> T: ---------- cls : Type[T] The namedtuple type to instantiate. - json_value : str or dict - The JSON string or dictionary representing the object. + json_value : str + The JSON string representing the object. Returns ------- @@ -92,8 +92,8 @@ def json_to_typeddict(cls: Type[T], json_value: str) -> Dict[str, Any]: ---------- cls : Type[T] The TypedDict type to instantiate. - json_value : str or dict - The JSON string or dictionary representing the object. + json_value : str + The JSON string representing the object. Returns ------- @@ -159,8 +159,8 @@ def json_to_pandas_dataframe(cls: Type[T], json_value: str) -> T: ---------- cls : Type[T] The DataFrame type to instantiate. - json_value : str or dict - The JSON string or dictionary representing the object. + json_value : str + The JSON string representing the object. Returns ------- diff --git a/singlestoredb/tests/test_ext_func_data.py b/singlestoredb/tests/test_ext_func_data.py index 9268d783a..c4e9acc0b 100755 --- a/singlestoredb/tests/test_ext_func_data.py +++ b/singlestoredb/tests/test_ext_func_data.py @@ -31,25 +31,22 @@ BINARY = -254 col_spec = [ - ('tiny', TINYINT), - ('unsigned_tiny', UNSIGNED_TINYINT), - ('short', SMALLINT), - ('unsigned_short', UNSIGNED_SMALLINT), - ('long', INT), - ('unsigned_long', UNSIGNED_INT), - ('float', FLOAT), - ('double', DOUBLE), - ('longlong', BIGINT), - ('unsigned_longlong', UNSIGNED_BIGINT), - ('int24', MEDIUMINT), - ('unsigned_int24', UNSIGNED_MEDIUMINT), - ('string', STRING), - ('binary', BINARY), + ('tiny', TINYINT, None), + ('unsigned_tiny', UNSIGNED_TINYINT, None), + ('short', SMALLINT, None), + ('unsigned_short', UNSIGNED_SMALLINT, None), + ('long', INT, None), + ('unsigned_long', UNSIGNED_INT, None), + ('float', FLOAT, None), + ('double', DOUBLE, None), + ('longlong', BIGINT, None), + ('unsigned_longlong', UNSIGNED_BIGINT, None), + ('int24', MEDIUMINT, None), + ('unsigned_int24', UNSIGNED_MEDIUMINT, None), + ('string', STRING, None), + ('binary', BINARY, None), ] -col_types = [x[1] for x in col_spec] -col_names = [x[0] for x in col_spec] - numpy_row_ids = np.array([1, 2, 3, 4]) numpy_nulls = np.array([False, False, False, True]) @@ -268,7 +265,7 @@ class TestRowdat1(unittest.TestCase): def test_numpy_accel(self): dump_res = rowdat_1._dump_numpy_accel( - col_types, numpy_row_ids, numpy_data, + col_spec, numpy_row_ids, numpy_data, ).tobytes() load_res = rowdat_1._load_numpy_accel(col_spec, dump_res) @@ -293,7 +290,7 @@ def test_numpy_accel(self): def test_numpy(self): dump_res = rowdat_1._dump_numpy( - col_types, numpy_row_ids, numpy_data, + col_spec, numpy_row_ids, numpy_data, ).tobytes() load_res = rowdat_1._load_numpy(col_spec, dump_res) @@ -386,7 +383,7 @@ def test_numpy_accel_limits(self, name, dtype, data, res): # Accelerated with self.assertRaises(res, msg=f'Expected {res} for {data} in {dtype}'): rowdat_1._dump_numpy_accel( - [dtype], numpy_row_ids, [(arr, None)], + [('x', dtype, None)], numpy_row_ids, [(arr, None)], ).tobytes() # Pure Python @@ -395,23 +392,23 @@ def test_numpy_accel_limits(self, name, dtype, data, res): else: with self.assertRaises(res, msg=f'Expected {res} for {data} in {dtype}'): rowdat_1._dump_numpy( - [dtype], numpy_row_ids, [(arr, None)], + [('x', dtype, None)], numpy_row_ids, [(arr, None)], ).tobytes() else: # Accelerated dump_res = rowdat_1._dump_numpy_accel( - [dtype], numpy_row_ids, [(arr, None)], + [('x', dtype, None)], numpy_row_ids, [(arr, None)], ).tobytes() - load_res = rowdat_1._load_numpy_accel([('x', dtype)], dump_res) + load_res = rowdat_1._load_numpy_accel([('x', dtype, None)], dump_res) assert load_res[1][0][0] == res, \ f'Expected {res} for {data}, but got {load_res[1][0][0]} in {dtype}' # Pure Python dump_res = rowdat_1._dump_numpy( - [dtype], numpy_row_ids, [(arr, None)], + [('x', dtype, None)], numpy_row_ids, [(arr, None)], ).tobytes() - load_res = rowdat_1._load_numpy([('x', dtype)], dump_res) + load_res = rowdat_1._load_numpy([('x', dtype, None)], dump_res) assert load_res[1][0][0] == res, \ f'Expected {res} for {data}, but got {load_res[1][0][0]} in {dtype}' @@ -787,9 +784,9 @@ def test_numpy_accel_casts(self, name, dtype, data, res): # Accelerated dump_res = rowdat_1._dump_numpy_accel( - [dtype], numpy_row_ids, [(data, None)], + [('x', dtype, None)], numpy_row_ids, [(data, None)], ).tobytes() - load_res = rowdat_1._load_numpy_accel([('x', dtype)], dump_res) + load_res = rowdat_1._load_numpy_accel([('x', dtype, None)], dump_res) if name == 'double from float32': assert load_res[1][0][0].dtype is res.dtype @@ -799,9 +796,9 @@ def test_numpy_accel_casts(self, name, dtype, data, res): # Pure Python dump_res = rowdat_1._dump_numpy( - [dtype], numpy_row_ids, [(data, None)], + [('x', dtype, None)], numpy_row_ids, [(data, None)], ).tobytes() - load_res = rowdat_1._load_numpy([('x', dtype)], dump_res) + load_res = rowdat_1._load_numpy([('x', dtype, None)], dump_res) if name == 'double from float32': assert load_res[1][0][0].dtype is res.dtype @@ -811,7 +808,7 @@ def test_numpy_accel_casts(self, name, dtype, data, res): def test_python(self): dump_res = rowdat_1._dump( - col_types, py_row_ids, py_col_data, + col_spec, py_row_ids, py_col_data, ).tobytes() load_res = rowdat_1._load(col_spec, dump_res) @@ -823,7 +820,7 @@ def test_python(self): def test_python_accel(self): dump_res = rowdat_1._dump_accel( - col_types, py_row_ids, py_col_data, + col_spec, py_row_ids, py_col_data, ).tobytes() load_res = rowdat_1._load_accel(col_spec, dump_res) @@ -835,7 +832,7 @@ def test_python_accel(self): def test_polars(self): dump_res = rowdat_1._dump_polars( - col_types, polars_row_ids, polars_data, + col_spec, polars_row_ids, polars_data, ).tobytes() load_res = rowdat_1._load_polars(col_spec, dump_res) @@ -860,7 +857,7 @@ def test_polars(self): def test_polars_accel(self): dump_res = rowdat_1._dump_polars_accel( - col_types, polars_row_ids, polars_data, + col_spec, polars_row_ids, polars_data, ).tobytes() load_res = rowdat_1._load_polars_accel(col_spec, dump_res) @@ -885,7 +882,7 @@ def test_polars_accel(self): def test_pandas(self): dump_res = rowdat_1._dump_pandas( - col_types, pandas_row_ids, pandas_data, + col_spec, pandas_row_ids, pandas_data, ).tobytes() load_res = rowdat_1._load_pandas(col_spec, dump_res) @@ -910,7 +907,7 @@ def test_pandas(self): def test_pandas_accel(self): dump_res = rowdat_1._dump_pandas_accel( - col_types, pandas_row_ids, pandas_data, + col_spec, pandas_row_ids, pandas_data, ).tobytes() load_res = rowdat_1._load_pandas_accel(col_spec, dump_res) @@ -935,7 +932,7 @@ def test_pandas_accel(self): def test_pyarrow(self): dump_res = rowdat_1._dump_arrow( - col_types, pyarrow_row_ids, pyarrow_data, + col_spec, pyarrow_row_ids, pyarrow_data, ).tobytes() load_res = rowdat_1._load_arrow(col_spec, dump_res) @@ -960,7 +957,7 @@ def test_pyarrow(self): def test_pyarrow_accel(self): dump_res = rowdat_1._dump_arrow_accel( - col_types, pyarrow_row_ids, pyarrow_data, + col_spec, pyarrow_row_ids, pyarrow_data, ).tobytes() load_res = rowdat_1._load_arrow_accel(col_spec, dump_res) @@ -988,7 +985,7 @@ class TestJSON(unittest.TestCase): def test_numpy(self): dump_res = jsonx.dump_numpy( - col_types, numpy_row_ids, numpy_data, + col_spec, numpy_row_ids, numpy_data, ) import pprint pprint.pprint(json.loads(dump_res)) @@ -1015,7 +1012,7 @@ def test_numpy(self): def test_python(self): dump_res = jsonx.dump( - col_types, py_row_ids, py_col_data, + col_spec, py_row_ids, py_col_data, ) load_res = jsonx.load(col_spec, dump_res) @@ -1027,7 +1024,7 @@ def test_python(self): def test_polars(self): dump_res = jsonx.dump_polars( - col_types, polars_row_ids, polars_data, + col_spec, polars_row_ids, polars_data, ) load_res = jsonx.load_polars(col_spec, dump_res) @@ -1052,7 +1049,7 @@ def test_polars(self): def test_pandas(self): dump_res = rowdat_1._dump_pandas( - col_types, pandas_row_ids, pandas_data, + col_spec, pandas_row_ids, pandas_data, ).tobytes() load_res = rowdat_1._load_pandas(col_spec, dump_res) @@ -1077,7 +1074,7 @@ def test_pandas(self): def test_pyarrow(self): dump_res = rowdat_1._dump_arrow( - col_types, pyarrow_row_ids, pyarrow_data, + col_spec, pyarrow_row_ids, pyarrow_data, ).tobytes() load_res = rowdat_1._load_arrow(col_spec, dump_res) From d6bc6250c13584346dc9879e1a05fc74b1eeb0b5 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Wed, 30 Jul 2025 08:48:01 -0500 Subject: [PATCH 4/4] General cleanup --- singlestoredb/functions/decorator.py | 16 ++- singlestoredb/functions/signature.py | 207 +++++++++++---------------- singlestoredb/tests/test_udf.py | 27 ++-- singlestoredb/utils/dtypes.py | 4 +- 4 files changed, 117 insertions(+), 137 deletions(-) diff --git a/singlestoredb/functions/decorator.py b/singlestoredb/functions/decorator.py index 55be87bae..1ca2729c9 100644 --- a/singlestoredb/functions/decorator.py +++ b/singlestoredb/functions/decorator.py @@ -58,12 +58,14 @@ def is_valid_callable(obj: Any) -> bool: return False -def expand_types(args: Any) -> List[Any]: +def expand_types(args: Any) -> Any: """Expand the types for the function arguments / return values.""" if args is None: return [] + is_list = True if not isinstance(args, list): + is_list = False args = [args] new_args = [] @@ -74,6 +76,9 @@ def expand_types(args: Any) -> List[Any]: new_args.append(arg()) else: new_args.append(arg) + + if not is_list: + return new_args[0] return new_args @@ -87,6 +92,15 @@ def _func( ) -> UDFType: """Generic wrapper for UDF and TVF decorators.""" + if isinstance(args, dict): + raise TypeError( + 'The `args` parameter must be a list of data types, not a dict.', + ) + if isinstance(returns, dict): + raise TypeError( + 'The `returns` parameter must be a list of data types, not a dict.', + ) + _singlestoredb_attrs = { # type: ignore k: v for k, v in dict( name=name, diff --git a/singlestoredb/functions/signature.py b/singlestoredb/functions/signature.py index 71fa5cc12..3ca53d441 100644 --- a/singlestoredb/functions/signature.py +++ b/singlestoredb/functions/signature.py @@ -542,10 +542,7 @@ def collapse_dtypes(dtypes: Union[str, List[str]], include_null: bool = False) - return dtypes[0] + ('?' if is_nullable else '') -def get_dataclass_schema( - obj: Any, - include_default: bool = False, -) -> List[ParamSpec]: +def get_dataclass_schema(obj: Any) -> List[ParamSpec]: """ Get the schema of a dataclass. @@ -560,22 +557,17 @@ def get_dataclass_schema( A list of parameter specifications for the dataclass fields """ - if include_default: - return [ - ParamSpec( - name=f.name, - dtype=f.type, - default=NO_DEFAULT if f.default is dataclasses.MISSING else f.default, - ) - for f in dataclasses.fields(obj) - ] - return [ParamSpec(name=f.name, dtype=f.type) for f in dataclasses.fields(obj)] + return [ + ParamSpec( + name=f.name, + dtype=f.type, + default=NO_DEFAULT if f.default is dataclasses.MISSING else f.default, + ) + for f in dataclasses.fields(obj) + ] -def get_typeddict_schema( - obj: Any, - include_default: bool = False, -) -> List[ParamSpec]: +def get_typeddict_schema(obj: Any) -> List[ParamSpec]: """ Get the schema of a TypedDict. @@ -583,8 +575,6 @@ def get_typeddict_schema( ---------- obj : TypedDict The TypedDict to get the schema of - include_default : bool, optional - Whether to include the default value in the column specification Returns ------- @@ -592,22 +582,17 @@ def get_typeddict_schema( A list of parameter specifications for the TypedDict fields """ - if include_default: - return [ - ParamSpec( - name=k, - dtype=v, - default=getattr(obj, k, NO_DEFAULT), - ) - for k, v in utils.get_annotations(obj).items() - ] - return [ParamSpec(name=k, dtype=v) for k, v in utils.get_annotations(obj).items()] + return [ + ParamSpec( + name=k, + dtype=v, + default=getattr(obj, k, NO_DEFAULT), + ) + for k, v in utils.get_annotations(obj).items() + ] -def get_pydantic_schema( - obj: Any, - include_default: bool = False, -) -> List[ParamSpec]: +def get_pydantic_schema(obj: Any) -> List[ParamSpec]: """ Get the schema of a pydantic model. @@ -615,8 +600,6 @@ def get_pydantic_schema( ---------- obj : pydantic.BaseModel The pydantic model to get the schema of - include_default : bool, optional - Whether to include the default value in the column specification Returns ------- @@ -625,23 +608,18 @@ def get_pydantic_schema( """ import pydantic_core - if include_default: - return [ - ParamSpec( - name=k, - dtype=v.annotation, - default=NO_DEFAULT - if v.default is pydantic_core.PydanticUndefined else v.default, - ) - for k, v in obj.model_fields.items() - ] - return [ParamSpec(name=k, dtype=v.annotation) for k, v in obj.model_fields.items()] + return [ + ParamSpec( + name=k, + dtype=v.annotation, + default=NO_DEFAULT + if v.default is pydantic_core.PydanticUndefined else v.default, + ) + for k, v in obj.model_fields.items() + ] -def get_namedtuple_schema( - obj: Any, - include_default: bool = False, -) -> List[ParamSpec]: +def get_namedtuple_schema(obj: Any) -> List[ParamSpec]: """ Get the schema of a named tuple. @@ -649,8 +627,6 @@ def get_namedtuple_schema( ---------- obj : NamedTuple The named tuple to get the schema of - include_default : bool, optional - Whether to include the default value in the column specification Returns ------- @@ -658,24 +634,19 @@ def get_namedtuple_schema( A list of parameter specifications for the named tuple fields """ - if include_default: - return [ - ( - ParamSpec( - name=k, - dtype=v, - default=obj._field_defaults.get(k, NO_DEFAULT), - ) + return [ + ( + ParamSpec( + name=k, + dtype=v, + default=obj._field_defaults.get(k, NO_DEFAULT), ) - for k, v in utils.get_annotations(obj).items() - ] - return [ParamSpec(name=k, dtype=v) for k, v in utils.get_annotations(obj).items()] + ) + for k, v in utils.get_annotations(obj).items() + ] -def get_table_schema( - obj: Any, - include_default: bool = False, -) -> List[ParamSpec]: +def get_table_schema(obj: Any) -> List[ParamSpec]: """ Get the schema of a Table. @@ -683,8 +654,6 @@ def get_table_schema( ---------- obj : Table The Table to get the schema of - include_default : bool, optional - Whether to include the default value in the column specification Returns ------- @@ -692,22 +661,17 @@ def get_table_schema( A list of parameter specifications for the Table fields """ - if include_default: - return [ - ParamSpec( - name=k, - dtype=v, - default=getattr(obj, k, NO_DEFAULT), - ) - for k, v in utils.get_annotations(obj).items() - ] - return [ParamSpec(name=k, dtype=v) for k, v in utils.get_annotations(obj).items()] + return [ + ParamSpec( + name=k, + dtype=v, + default=getattr(obj, k, NO_DEFAULT), + ) + for k, v in utils.get_annotations(obj).items() + ] -def get_colspec( - overrides: Any, - include_default: bool = False, -) -> List[ParamSpec]: +def get_colspec(overrides: Any) -> List[ParamSpec]: """ Get the column specification from the overrides. @@ -715,8 +679,6 @@ def get_colspec( ---------- overrides : Any The overrides to get the column specification from - include_default : bool, optional - Whether to include the default value in the column specification Returns ------- @@ -730,27 +692,19 @@ def get_colspec( # Dataclass if utils.is_dataclass(overrides): - overrides_colspec = get_dataclass_schema( - overrides, include_default=include_default, - ) + overrides_colspec = get_dataclass_schema(overrides) # TypedDict elif utils.is_typeddict(overrides): - overrides_colspec = get_typeddict_schema( - overrides, include_default=include_default, - ) + overrides_colspec = get_typeddict_schema(overrides) # Named tuple elif utils.is_namedtuple(overrides): - overrides_colspec = get_namedtuple_schema( - overrides, include_default=include_default, - ) + overrides_colspec = get_namedtuple_schema(overrides) # Pydantic model elif utils.is_pydantic(overrides): - overrides_colspec = get_pydantic_schema( - overrides, include_default=include_default, - ) + overrides_colspec = get_pydantic_schema(overrides) # List of types elif isinstance(overrides, list): @@ -847,7 +801,7 @@ def unwrap_optional(annotation: Any) -> Tuple[Any, bool]: return annotation, is_optional -def is_object_type(spec: Any, mode: str, is_optional: bool) -> Optional[List[ParamSpec]]: +def check_object(spec: Any, mode: str, is_optional: bool) -> Optional[List[ParamSpec]]: """ Check if the type is an object type and return a list of ParamSpecs. @@ -872,6 +826,30 @@ def is_object_type(spec: Any, mode: str, is_optional: bool) -> Optional[List[Par return None +def is_object(spec: Any) -> bool: + """ + Check if the object is a dataclass, TypedDict, Pydantic model, or NamedTuple. + + Parameters + ---------- + spec : Any + The object to check + + Returns + ------- + bool + True if the object is one of the supported types, False otherwise + + """ + return inspect.isclass(spec) and ( + issubclass(spec, dict) + or utils.is_dataclass(spec) + or utils.is_typeddict(spec) + or utils.is_pydantic(spec) + or utils.is_namedtuple(spec) + ) + + def get_schema( spec: Any, overrides: Optional[List[ParamSpec]] = None, @@ -953,14 +931,7 @@ def get_schema( elif utils.is_vector(spec) or spec in [str, float, int, bytes]: pass - elif inspect.isclass(spec) and ( - issubclass(spec, dict) - or utils.is_dataframe(spec) - or utils.is_dataclass(spec) - or utils.is_typeddict(spec) - or utils.is_pydantic(spec) - or utils.is_namedtuple(spec) - ): + elif is_object(spec): # TODO: Use TEXT for now because external functions don't support JSON return [ ParamSpec( @@ -984,13 +955,7 @@ def get_schema( pass # Object types get converted to JSON - elif inspect.isclass(spec) and ( - issubclass(spec, dict) - or utils.is_dataclass(spec) - or utils.is_typeddict(spec) - or utils.is_pydantic(spec) - or utils.is_namedtuple(spec) - ): + elif is_object(spec): # TODO: Use TEXT for now because external functions don't support JSON return [ ParamSpec( @@ -1019,7 +984,7 @@ def get_schema( 'column types must be specified by the ' '`returns=` parameter of the @udf decorator for a DataFrame', ) - colspec = get_colspec(overrides[0].dtype, include_default=True) + colspec = get_colspec(overrides[0].dtype) # Numpy array types elif utils.is_numpy(spec): @@ -1028,7 +993,7 @@ def get_schema( if overrides: # Short circuit if the data type will be converted to JSON - obj = is_object_type(overrides[0].dtype, mode, is_optional) + obj = check_object(overrides[0].dtype, mode, is_optional) if obj is not None: return obj, data_format, function_type @@ -1052,7 +1017,7 @@ def get_schema( ) # Short circuit if the data type will be converted to JSON - obj = is_object_type(overrides[0].dtype, mode, is_optional) + obj = check_object(overrides[0].dtype, mode, is_optional) if obj is not None: return obj, data_format, function_type @@ -1068,7 +1033,7 @@ def get_schema( ) # Short circuit if the data type will be converted to JSON - obj = is_object_type(overrides[0].dtype, mode, is_optional) + obj = check_object(overrides[0].dtype, mode, is_optional) if obj is not None: return obj, data_format, function_type @@ -1084,7 +1049,7 @@ def get_schema( ) # Short circuit if the data type will be converted to JSON - obj = is_object_type(overrides[0].dtype, mode, is_optional) + obj = check_object(overrides[0].dtype, mode, is_optional) if obj is not None: return obj, data_format, function_type @@ -1318,7 +1283,7 @@ def get_signature( # Generate the parameter type and the corresponding SQL code for that parameter args_schema = [] args_data_formats = [] - args_colspec = get_colspec(attrs.get('args', []), include_default=True) + args_colspec = get_colspec(attrs.get('args', [])) args_masks, ret_masks = get_masks(func) if args_colspec and len(args_colspec) != len(signature.parameters): @@ -1386,7 +1351,7 @@ def get_signature( out['args_data_format'] = args_data_formats[0] if args_data_formats else 'scalar' - returns_colspec = get_colspec(attrs.get('returns', []), include_default=True) + returns_colspec = get_colspec(attrs.get('returns', [])) # Generate the return types and the corresponding SQL code for those values ret_schema, out['returns_data_format'], function_type = get_schema( diff --git a/singlestoredb/tests/test_udf.py b/singlestoredb/tests/test_udf.py index 16eb325d6..d4fc73d4c 100755 --- a/singlestoredb/tests/test_udf.py +++ b/singlestoredb/tests/test_udf.py @@ -77,23 +77,24 @@ def foo() -> Optional[C]: ... # Optional return value with collection type def foo() -> Optional[List[str]]: ... - assert to_sql(foo) == '`foo`() RETURNS ARRAY(TEXT NOT NULL) NULL' + assert to_sql(foo) == '`foo`() RETURNS TEXT NOT NULL' # Optional return value with nested collection type - def foo() -> Optional[List[List[str]]]: ... - assert to_sql(foo) == '`foo`() RETURNS ARRAY(ARRAY(TEXT NOT NULL) NOT NULL) NULL' + # def foo() -> Optional[List[List[str]]]: ... + # assert to_sql(foo) == \ + # '`foo`() RETURNS ARRAY(ARRAY(TEXT NOT NULL) NOT NULL) NULL' # Optional return value with collection type with nulls - def foo() -> Optional[List[Optional[str]]]: ... - assert to_sql(foo) == '`foo`() RETURNS ARRAY(TEXT NULL) NULL' + # def foo() -> Optional[List[Optional[str]]]: ... + # assert to_sql(foo) == '`foo`() RETURNS ARRAY(TEXT NULL) NULL' # Custom type with bound def foo() -> D: ... assert to_sql(foo) == '`foo`() RETURNS TEXT NOT NULL' # Return value with custom collection type with nulls - def foo() -> E: ... - assert to_sql(foo) == '`foo`() RETURNS ARRAY(DOUBLE NULL) NULL' + # def foo() -> E: ... + # assert to_sql(foo) == '`foo`() RETURNS ARRAY(DOUBLE NULL) NULL' # Incompatible types def foo() -> Union[int, str]: ... @@ -159,16 +160,16 @@ def foo(x: Optional[C]) -> None: ... # Optional parameter with collection type def foo(x: Optional[List[str]]) -> None: ... - assert to_sql(foo) == '`foo`(`x` ARRAY(TEXT NOT NULL) NULL) RETURNS TINYINT NULL' + assert to_sql(foo) == '`foo`(`x` TEXT NOT NULL) RETURNS TINYINT NULL' # Optional parameter with nested collection type - def foo(x: Optional[List[List[str]]]) -> None: ... - assert to_sql(foo) == '`foo`(`x` ARRAY(ARRAY(TEXT NOT NULL) NOT NULL) NULL) ' \ - 'RETURNS TINYINT NULL' + # def foo(x: Optional[List[List[str]]]) -> None: ... + # assert to_sql(foo) == '`foo`(`x` ARRAY(ARRAY(TEXT NOT NULL) NOT NULL) NULL) ' \ + # 'RETURNS TINYINT NULL' # Optional parameter with collection type with nulls - def foo(x: Optional[List[Optional[str]]]) -> None: ... - assert to_sql(foo) == '`foo`(`x` ARRAY(TEXT NULL) NULL) RETURNS TINYINT NULL' + # def foo(x: Optional[List[Optional[str]]]) -> None: ... + # assert to_sql(foo) == '`foo`(`x` ARRAY(TEXT NULL) NULL) RETURNS TINYINT NULL' # Custom type with bound def foo(x: D) -> None: ... diff --git a/singlestoredb/utils/dtypes.py b/singlestoredb/utils/dtypes.py index 98914b9b4..73eb893c1 100644 --- a/singlestoredb/utils/dtypes.py +++ b/singlestoredb/utils/dtypes.py @@ -136,7 +136,7 @@ 15: pa.string(), # Varchar -15: pa.binary(), # Varbinary 16: pa.binary(), # Bit - 245: pa.binary(), # JSON + 245: pa.string(), # JSON 246: pa.decimal128(18, 6), # NewDecimal -246: pa.decimal128(18, 6), # NewDecimal 247: pa.string(), # Enum @@ -182,7 +182,7 @@ 15: pl.Utf8, # Varchar -15: pl.Utf8, # Varbinary 16: pl.Binary, # Bit - 245: pl.Struct, # JSON + 245: pl.Object, # JSON 246: pl.Decimal(10, 6), # NewDecimal -246: pl.Decimal(10, 6), # NewDecimal 247: pl.Utf8, # Enum