Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

22.8 Backport of #54370 - fix parquet array inconsistent offsets #304

Open
wants to merge 6 commits into
base: customizations/22.8.15
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,30 +289,34 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedAr
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
offsets_data.reserve(arrow_column->length());

uint64_t start_offset = 0u;

for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
auto arrow_offsets_array = list_chunk.offsets();
auto & arrow_offsets = dynamic_cast<arrow::Int32Array &>(*arrow_offsets_array);

/*
* It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks.
* When it is shared, the offsets will be monotonically increasing. Otherwise, the offsets will be zero based.
* In order to account for both cases, the starting offset is updated whenever a zero-based offset is found.
* More info can be found in: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and
* https://github.com/ClickHouse/ClickHouse/pull/43297
* CH uses element size as "offsets", while arrow uses actual offsets as offsets.
* That's why CH usually starts reading offsets with i=1 and i=0 is ignored.
* In case multiple batches are used to read a column, there is a chance the offsets are
* monotonically increasing, which will cause inconsistencies with the batch data length on `DB::ColumnArray`.
*
* If the offsets are monotonically increasing, `arrow_offsets.Value(0)` will be non-zero for the nth batch, where n > 0.
* If they are not monotonically increasing, it'll always be 0.
* Therefore, we subtract the previous offset from the current offset to get the corresponding CH "offset".
*
* The same might happen for multiple chunks. In this case, we need to add the last offset of the previous chunk, hence
* `offsets.back()`. More info can be found in https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm,
* https://github.com/ClickHouse/ClickHouse/pull/43297 and https://github.com/ClickHouse/ClickHouse/pull/54370
* */
if (list_chunk.offset() == 0)
{
start_offset = offsets_data.back();
}
uint64_t previous_offset = arrow_offsets.Value(0);

for (int64_t i = 1; i < arrow_offsets.length(); ++i)
{
auto offset = arrow_offsets.Value(i);
offsets_data.emplace_back(start_offset + offset);
uint64_t elements = offset - previous_offset;
previous_offset = offset;
offsets_data.emplace_back(offsets_data.back() + elements);
}
}
return offsets_column;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Parquet
e76a749f346078a6a43e0cbd25f0d18a -
400
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

echo "Parquet"

# More info on: https://github.com/ClickHouse/ClickHouse/pull/54370

# File generated with the below code

#std::string random_string(size_t length) {
# static const std::string characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
#
# std::random_device random_device;
# std::mt19937 generator(random_device());
# std::uniform_int_distribution<> distribution(0, characters.size() - 1);
#
# std::string random_string;
# random_string.reserve(length);
#
# std::generate_n(std::back_inserter(random_string), length, [&]() {
# return characters[distribution(generator)];
# });
#
# return random_string;
#}
#
#static const std::string the_string = random_string(9247124u);
#
#std::shared_ptr<arrow::Array> CreateIntArray(std::size_t length) {
# arrow::MemoryPool* pool = arrow::default_memory_pool();
#
# auto int_builder_ptr = std::make_shared<arrow::Int64Builder>(pool);
# auto & int_builder = *int_builder_ptr;
# arrow::ListBuilder list_builder(pool, int_builder_ptr);
#
# for (auto i = 0u; i < length; i++)
# {
# if (i % 10 == 0)
# {
# ARROW_CHECK_OK(list_builder.Append());
# }
# else
# {
# ARROW_CHECK_OK(int_builder.Append(i));
# }
# }
#
# std::shared_ptr<arrow::Array> int_list_array;
# ARROW_CHECK_OK(list_builder.Finish(&int_list_array));
# return int_list_array;
#}
#
#std::shared_ptr<arrow::Array> CreateStringArray(std::size_t length) {
# arrow::MemoryPool* pool = arrow::default_memory_pool();
#
# auto str_builder = std::make_shared<arrow::LargeStringBuilder>(arrow::large_utf8(), pool);
#
# for (auto i = 0u; i < length; i++)
# {
# if (i % 10 == 0)
# {
# ARROW_CHECK_OK(str_builder->AppendNull());
# }
# else
# {
# ARROW_CHECK_OK(str_builder->Append(the_string));
# }
# }
#
# std::shared_ptr<arrow::Array> str_array;
# ARROW_CHECK_OK(str_builder->Finish(&str_array));
# return str_array;
#}
#
#void run()
#{
# auto schema = arrow::schema({
# arrow::field("ints", arrow::list(arrow::int64())),
# arrow::field("strings", arrow::utf8())
# });
#
# auto l1_length = 2000;
# auto l2_length = 2000;
#
# std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
#
# auto int_array1 = CreateIntArray(l1_length);
#
# auto int_array2 = CreateIntArray(l1_length);
#
# auto str_array1 = CreateStringArray(l2_length);
#
# auto str_array2 = CreateStringArray(l2_length);
#
# batches.push_back(arrow::RecordBatch::Make(schema, int_array1->length(), {int_array1, str_array1}));
#
# batches.push_back(arrow::RecordBatch::Make(schema, int_array2->length(), {int_array2, str_array2}));
#
# std::shared_ptr<arrow::io::FileOutputStream> outfile;
# PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open("generated.parquet"));
#
# parquet::WriterProperties::Builder builder;
# builder.compression(arrow::Compression::GZIP);
# builder.dictionary_pagesize_limit(10*1024*1024);
# builder.data_pagesize(20*1024*1024);
#
# std::shared_ptr<parquet::WriterProperties> props = builder.build();
#
# std::unique_ptr<parquet::arrow::FileWriter> file_writer;
# PARQUET_ASSIGN_OR_THROW(file_writer, parquet::arrow::FileWriter::Open(*schema, ::arrow::default_memory_pool(), outfile, props));
#
# for (const auto& batch : batches) {
# PARQUET_THROW_NOT_OK(file_writer->WriteRecordBatch(*batch));
# }
#
# PARQUET_THROW_NOT_OK(file_writer->Close());
#}

DATA_FILE=$CUR_DIR/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (ints Array(Int64), strings Nullable(String)) ENGINE = Memory"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load"
${CLICKHOUSE_CLIENT} --query="drop table parquet_load"
Binary file not shown.
Loading