Skip to content

Commit

Permalink
Merge pull request #264 from chdb-io/fixArrowTypes
Browse files Browse the repository at this point in the history
Fix arrow types handling
  • Loading branch information
auxten authored Sep 5, 2024
2 parents c550182 + b524be0 commit e65cc31
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void PythonSource::convert_string_array_to_block(
void PythonSource::insert_obj_to_string_column(PyObject * obj, ColumnString * string_column)
{
// check if the object is NaN
if (PyFloat_Check(obj) && Py_IS_NAN(PyFloat_AS_DOUBLE(obj)))
if (obj == Py_None || (PyFloat_Check(obj) && Py_IS_NAN(PyFloat_AS_DOUBLE(obj))))
{
// insert default value for string column, which is empty string
string_column->insertDefault();
Expand Down Expand Up @@ -494,6 +494,8 @@ Chunk PythonSource::scanDataToChunk()
columns[i] = convert_and_insert_array<UInt16>(col, cursor, count);
else if (which.isString())
columns[i] = convert_and_insert_array<String>(col, cursor, count);
else if (which.isNullable())
columns[i] = convert_and_insert_array<String>(col, cursor, count);
else
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {} for column {}", type->getName(), col.name);

Expand Down
23 changes: 19 additions & 4 deletions src/Storages/StoragePython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,15 @@ ColumnsDescription StoragePython::getTableStructureFromData(py::object data_sour
RE2 pattern_decimal128(R"(decimal128\((\d+),\s*(\d+)\))");
RE2 pattern_decimal256(R"(decimal256\((\d+),\s*(\d+)\))");
RE2 pattern_date32(R"(\bdate32\b)");
RE2 pattern_date64(R"(\bdate64\b)");
RE2 pattern_datatime64s(R"(\bdatetime64\[s\]|timestamp\[s\])");
RE2 pattern_date64(R"(\bdate64\b|datetime64\[ms\]|timestamp\[ms\])");
RE2 pattern_time32(R"(\btime32\b)");
RE2 pattern_time64_us(R"(\btime64\[us\]\b)");
RE2 pattern_time64_ns(R"(\btime64\[ns\]\b|<M8\[ns\])");
RE2 pattern_time64_us(R"(\btime64\[us\]\b|datetime64\[us\]|<M8\[us\])");
RE2 pattern_time64_ns(R"(\btime64\[ns\]\b|datetime64\[ns\]|<M8\[ns\])");
RE2 pattern_string_binary(
R"(\bstring\b|<class 'str'>|str|DataType\(string\)|DataType\(binary\)|binary\[pyarrow\]|dtype\[object_\]|
dtype\('S|dtype\('O|<class 'bytes'>|<class 'bytearray'>|<class 'memoryview'>|<class 'numpy.bytes_'>|<class 'numpy.str_'>|<class 'numpy.void)");
RE2 pattern_null(R"(\bnull\b)");

// Iterate through each pair of name and type string in the schema
for (const auto & [name, typeStr] : schema)
Expand Down Expand Up @@ -231,6 +233,10 @@ dtype\('S|dtype\('O|<class 'bytes'>|<class 'bytearray'>|<class 'memoryview'>|<cl
{
data_type = std::make_shared<DataTypeDate32>();
}
else if (RE2::PartialMatch(typeStr, pattern_datatime64s))
{
data_type = std::make_shared<DataTypeDateTime64>(0); // datetime64[s] corresponds to DateTime64(0)
}
else if (RE2::PartialMatch(typeStr, pattern_date64))
{
data_type = std::make_shared<DataTypeDateTime64>(3); // date64 corresponds to DateTime64(3)
Expand All @@ -251,9 +257,18 @@ dtype\('S|dtype\('O|<class 'bytes'>|<class 'bytearray'>|<class 'memoryview'>|<cl
{
data_type = std::make_shared<DataTypeString>();
}
else if (RE2::PartialMatch(typeStr, pattern_null))
{
// ClickHouse uses a separate file with NULL masks in addition to normal file with values.
// Entries in masks file allow ClickHouse to distinguish between NULL and a default value of
// corresponding data type for each table row. Because of an additional file we can't make it
// in Python, so we have to use String type for NULLs.
// https://clickhouse.com/docs/en/sql-reference/data-types/nullable#storage-features
data_type = std::make_shared<DataTypeString>();
}
else
{
throw Exception(ErrorCodes::TYPE_MISMATCH, "Unrecognized data type: {}", typeStr);
throw Exception(ErrorCodes::TYPE_MISMATCH, "Unrecognized data type: {} on column {}", typeStr, name);
}

names_and_types.push_back({name, data_type});
Expand Down
Binary file not shown.
75 changes: 75 additions & 0 deletions tests/test_query_py.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!python3

import io
import json
import random
import unittest
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import csv
import pyarrow.json
import pyarrow.parquet
import chdb


Expand Down Expand Up @@ -42,6 +45,11 @@
639860,win,1989-06-30
"""

ARROW_DATA_JSONL = """{"match_id": 3943077, "match_date": "2024-07-15", "kick_off": "04:15:00.000", "competition": {"competition_id": 223, "country_name": "South America", "competition_name": "Copa America"}, "season": {"season_id": 282, "season_name": "2024"}, "home_team": {"home_team_id": 779, "home_team_name": "Argentina", "home_team_gender": "male", "home_team_group": null, "country": {"id": 11, "name": "Argentina"}, "managers": [{"id": 5677, "name": "Lionel Sebasti\u00e1n Scaloni", "nickname": null, "dob": "1978-05-16", "country": {"id": 11, "name": "Argentina"}}]}, "away_team": {"away_team_id": 769, "away_team_name": "Colombia", "away_team_gender": "male", "away_team_group": null, "country": {"id": 49, "name": "Colombia"}, "managers": [{"id": 5905, "name": "N\u00e9stor Gabriel Lorenzo", "nickname": null, "dob": "1966-02-28", "country": {"id": 11, "name": "Argentina"}}]}, "home_score": 1, "away_score": 0, "match_status": "available", "match_status_360": "unscheduled", "last_updated": "2024-07-15T15:50:08.671355", "last_updated_360": null, "metadata": {"data_version": "1.1.0", "shot_fidelity_version": "2", "xy_fidelity_version": "2"}, "match_week": 6, "competition_stage": {"id": 26, "name": "Final"}, "stadium": {"id": 5337, "name": "Hard Rock Stadium", "country": {"id": 241, "name": "United States of America"}}, "referee": {"id": 2638, "name": "Raphael Claus", "country": {"id": 31, "name": "Brazil"}}}
{"match_id": 3943076, "match_date": "2024-07-14", "kick_off": "03:00:00.000", "competition": {"competition_id": 223, "country_name": "South America", "competition_name": "Copa America"}, "season": {"season_id": 282, "season_name": "2024"}, "home_team": {"home_team_id": 1833, "home_team_name": "Canada", "home_team_gender": "male", "home_team_group": null, "country": {"id": 40, "name": "Canada"}, "managers": [{"id": 165, "name": "Jesse Marsch", "nickname": null, "dob": "1973-11-08", "country": {"id": 241, "name": "United States of America"}}]}, "away_team": {"away_team_id": 783, "away_team_name": "Uruguay", "away_team_gender": "male", "away_team_group": null, "country": {"id": 242, "name": "Uruguay"}, "managers": [{"id": 269, "name": "Marcelo Alberto Bielsa Caldera", "nickname": "Marcelo Bielsa", "dob": "1955-07-21", "country": {"id": 11, "name": "Argentina"}}]}, "home_score": 2, "away_score": 2, "match_status": "available", "match_status_360": "unscheduled", "last_updated": "2024-07-15T07:57:02.660641", "last_updated_360": null, "metadata": {"data_version": "1.1.0", "shot_fidelity_version": "2", "xy_fidelity_version": "2"}, "match_week": 6, "competition_stage": {"id": 25, "name": "3rd Place Final"}, "stadium": {"id": 52985, "name": "Bank of America Stadium", "country": {"id": 241, "name": "United States of America"}}, "referee": {"id": 1849, "name": "Alexis Herrera", "country": {"id": 246, "name": "Venezuela\u00a0(Bolivarian Republic)"}}}
"""


class myReader(chdb.PyReader):
def __init__(self, data):
self.data = data
Expand All @@ -58,6 +66,7 @@ def read(self, col_names, count):


class TestQueryPy(unittest.TestCase):

# def test_query_np(self):
# t3 = {
# "a": np.array([1, 2, 3, 4, 5, 6]),
Expand Down Expand Up @@ -135,6 +144,72 @@ def test_query_arrow3(self):
"5872873,587287.3,553446.5,470878.25,3,0,7,10\n",
)

def test_query_arrow4(self):
arrow_table = pa.json.read_json(io.BytesIO(ARROW_DATA_JSONL.encode()))
# print(arrow_table.schema)
ret = chdb.query("SELECT * FROM Python(arrow_table) LIMIT 10", "JSONEachRow")
# print(ret)
self.assertEqual("", ret.error_message())

def test_query_arrow5(self):
arrow_table = pa.parquet.read_table(
"data/sample_2021-04-01_performance_mobile_tiles.parquet"
)
# print("Arrow Schema:\n", arrow_table.schema)
ret = chdb.query("SELECT * FROM Python(arrow_table) LIMIT 1", "JSONCompact")
# print("JSON:\n", ret)
schema = json.loads(str(ret)).get("meta")
# shema is array like:
# [{"name":"quadkey","type":"String"},{"name":"tile","type":"String"}]
schema_dict = {x["name"]: x["type"] for x in schema}
self.assertDictEqual(
schema_dict,
{
"quadkey": "String",
"tile": "String",
"tile_x": "Float64",
"tile_y": "Float64",
"avg_d_kbps": "Int64",
"avg_u_kbps": "Int64",
"avg_lat_ms": "Int64",
"avg_lat_down_ms": "Float64",
"avg_lat_up_ms": "Float64",
"tests": "Int64",
"devices": "Int64",
},
)
ret = chdb.query(
"""
WITH numericColumns AS (
SELECT * EXCEPT ('tile.*') EXCEPT(quadkey)
FROM Python(arrow_table)
)
SELECT * APPLY(max), * APPLY(median) APPLY(x -> round(x, 2))
FROM numericColumns
""",
"JSONCompact",
)
# print("JSONCompact:\n", ret)
self.assertDictEqual(
{x["name"]: x["type"] for x in json.loads(str(ret)).get("meta")},
{
"max(avg_d_kbps)": "Int64",
"max(avg_lat_down_ms)": "Float64",
"max(avg_lat_ms)": "Int64",
"max(avg_lat_up_ms)": "Float64",
"max(avg_u_kbps)": "Int64",
"max(devices)": "Int64",
"max(tests)": "Int64",
"round(median(avg_d_kbps), 2)": "Float64",
"round(median(avg_lat_down_ms), 2)": "Float64",
"round(median(avg_lat_ms), 2)": "Float64",
"round(median(avg_lat_up_ms), 2)": "Float64",
"round(median(avg_u_kbps), 2)": "Float64",
"round(median(devices), 2)": "Float64",
"round(median(tests), 2)": "Float64",
},
)

def test_random_float(self):
x = {"col1": [random.uniform(0, 1) for _ in range(0, 100000)]}
ret = chdb.sql(
Expand Down

0 comments on commit e65cc31

Please sign in to comment.