diff --git a/cpp/src/generated/parquet_types.cpp b/cpp/src/generated/parquet_types.cpp index 0ee973f2a2d6..1bb36ed4a423 100644 --- a/cpp/src/generated/parquet_types.cpp +++ b/cpp/src/generated/parquet_types.cpp @@ -335,7 +335,12 @@ int _kFieldRepetitionTypeValues[] = { /** * The field is repeated and can contain 0 or more values */ - FieldRepetitionType::REPEATED + FieldRepetitionType::REPEATED, + /** + * This field repeats a fixed number of times per parent value without increasing + * the maximum definition or repetition level of its descendants. + */ + FieldRepetitionType::VECTOR }; const char* _kFieldRepetitionTypeNames[] = { /** @@ -349,9 +354,14 @@ const char* _kFieldRepetitionTypeNames[] = { /** * The field is repeated and can contain 0 or more values */ - "REPEATED" + "REPEATED", + /** + * This field repeats a fixed number of times per parent value without increasing + * the maximum definition or repetition level of its descendants. + */ + "VECTOR" }; -const std::map _FieldRepetitionType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kFieldRepetitionTypeValues, _kFieldRepetitionTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map _FieldRepetitionType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kFieldRepetitionTypeValues, _kFieldRepetitionTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const FieldRepetitionType::type& val) { std::map::const_iterator it = _FieldRepetitionType_VALUES_TO_NAMES.find(val); @@ -2565,7 +2575,8 @@ SchemaElement::SchemaElement() noexcept converted_type(static_cast(0)), scale(0), precision(0), - field_id(0) { + field_id(0), + vector_length(0) { } void SchemaElement::__set_type(const Type::type val) { @@ -2612,6 +2623,11 @@ void SchemaElement::__set_field_id(const int32_t val) { __isset.field_id = true; } +void SchemaElement::__set_vector_length(const int32_t val) { + this->vector_length = val; +__isset.vector_length = true; +} + void SchemaElement::__set_logicalType(const LogicalType& val) { this->logicalType = val; __isset.logicalType = true; @@ -2634,6 +2650,7 @@ void swap(SchemaElement &a, SchemaElement &b) { swap(a.scale, b.scale); swap(a.precision, b.precision); swap(a.field_id, b.field_id); + swap(a.vector_length, b.vector_length); swap(a.logicalType, b.logicalType); swap(a.__isset, b.__isset); } @@ -2674,6 +2691,10 @@ bool SchemaElement::operator==(const SchemaElement & rhs) const return false; else if (__isset.field_id && !(field_id == rhs.field_id)) return false; + if (__isset.vector_length != rhs.__isset.vector_length) + return false; + else if (__isset.vector_length && !(vector_length == rhs.vector_length)) + return false; if (__isset.logicalType != rhs.__isset.logicalType) return false; else if (__isset.logicalType && !(logicalType == rhs.logicalType)) @@ -2691,6 +2712,7 @@ SchemaElement::SchemaElement(const SchemaElement& other126) { scale = other126.scale; precision = other126.precision; field_id = other126.field_id; + vector_length = other126.vector_length; logicalType = other126.logicalType; __isset = other126.__isset; } @@ -2704,6 +2726,7 @@ SchemaElement::SchemaElement(SchemaElement&& other127) noexcept { scale = other127.scale; precision = other127.precision; field_id = other127.field_id; + vector_length = other127.vector_length; logicalType = std::move(other127.logicalType); __isset = other127.__isset; } @@ -2717,6 +2740,7 @@ SchemaElement& SchemaElement::operator=(const SchemaElement& other128) { scale = other128.scale; precision = other128.precision; field_id = other128.field_id; + vector_length = other128.vector_length; logicalType = other128.logicalType; __isset = other128.__isset; return *this; @@ -2731,6 +2755,7 @@ SchemaElement& SchemaElement::operator=(SchemaElement&& other129) noexcept { scale = other129.scale; precision = other129.precision; field_id = other129.field_id; + vector_length = other129.vector_length; logicalType = std::move(other129.logicalType); __isset = other129.__isset; return *this; @@ -2747,6 +2772,7 @@ void SchemaElement::printTo(std::ostream& out) const { out << ", " << "scale="; (__isset.scale ? (out << to_string(scale)) : (out << "")); out << ", " << "precision="; (__isset.precision ? (out << to_string(precision)) : (out << "")); out << ", " << "field_id="; (__isset.field_id ? (out << to_string(field_id)) : (out << "")); + out << ", " << "vector_length="; (__isset.vector_length ? (out << to_string(vector_length)) : (out << "")); out << ", " << "logicalType="; (__isset.logicalType ? (out << to_string(logicalType)) : (out << "")); out << ")"; } diff --git a/cpp/src/generated/parquet_types.h b/cpp/src/generated/parquet_types.h index 1f1e254f5cf2..a9dca55e5416 100644 --- a/cpp/src/generated/parquet_types.h +++ b/cpp/src/generated/parquet_types.h @@ -202,7 +202,12 @@ struct FieldRepetitionType { /** * The field is repeated and can contain 0 or more values */ - REPEATED = 2 + REPEATED = 2, + /** + * This field repeats a fixed number of times per parent value without increasing + * the maximum definition or repetition level of its descendants. + */ + VECTOR = 3 }; }; @@ -1732,7 +1737,7 @@ void swap(LogicalType &a, LogicalType &b); std::ostream& operator<<(std::ostream& out, const LogicalType& obj); typedef struct _SchemaElement__isset { - _SchemaElement__isset() : type(false), type_length(false), repetition_type(false), num_children(false), converted_type(false), scale(false), precision(false), field_id(false), logicalType(false) {} + _SchemaElement__isset() : type(false), type_length(false), repetition_type(false), num_children(false), converted_type(false), scale(false), precision(false), field_id(false), vector_length(false), logicalType(false) {} bool type :1; bool type_length :1; bool repetition_type :1; @@ -1741,6 +1746,7 @@ typedef struct _SchemaElement__isset { bool scale :1; bool precision :1; bool field_id :1; + bool vector_length :1; bool logicalType :1; } _SchemaElement__isset; @@ -1813,6 +1819,10 @@ class SchemaElement { * original field id in the parquet schema */ int32_t field_id; + /** + * Required when repetition_type is VECTOR. May be 0 for zero-length vectors. + */ + int32_t vector_length; /** * The logical type of this SchemaElement * @@ -1841,6 +1851,8 @@ class SchemaElement { void __set_field_id(const int32_t val); + void __set_vector_length(const int32_t val); + void __set_logicalType(const LogicalType& val); bool operator == (const SchemaElement & rhs) const; diff --git a/cpp/src/generated/parquet_types.tcc b/cpp/src/generated/parquet_types.tcc index 78e3e2549394..10579b3331dc 100644 --- a/cpp/src/generated/parquet_types.tcc +++ b/cpp/src/generated/parquet_types.tcc @@ -1993,6 +1993,14 @@ uint32_t SchemaElement::read(Protocol_* iprot) { } break; case 10: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->vector_length); + this->__isset.vector_length = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 11: if (ftype == ::apache::thrift::protocol::T_STRUCT) { xfer += this->logicalType.read(iprot); this->__isset.logicalType = true; @@ -2064,8 +2072,13 @@ uint32_t SchemaElement::write(Protocol_* oprot) const { xfer += oprot->writeI32(this->field_id); xfer += oprot->writeFieldEnd(); } + if (this->__isset.vector_length) { + xfer += oprot->writeFieldBegin("vector_length", ::apache::thrift::protocol::T_I32, 10); + xfer += oprot->writeI32(this->vector_length); + xfer += oprot->writeFieldEnd(); + } if (this->__isset.logicalType) { - xfer += oprot->writeFieldBegin("logicalType", ::apache::thrift::protocol::T_STRUCT, 10); + xfer += oprot->writeFieldBegin("logicalType", ::apache::thrift::protocol::T_STRUCT, 11); xfer += this->logicalType.write(oprot); xfer += oprot->writeFieldEnd(); } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index d29458bf226b..e92eca1a291f 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -24,10 +24,13 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include #include #include #include #include +#include +#include #include #include @@ -421,6 +424,15 @@ void WriteTableToBuffer(const std::shared_ptr& table, int64_t row_group_s *out, WriteTableToBuffer(table, row_group_size, write_props, arrow_properties)); } +std::shared_ptr VectorWriterProperties() { + auto builder = WriterProperties::Builder(); + return builder.disable_dictionary() + ->disable_statistics() + ->disable_write_page_index() + ->encoding(Encoding::PLAIN) + ->build(); +} + void DoRoundtrip(const std::shared_ptr
& table, int64_t row_group_size, std::shared_ptr
* out, const std::shared_ptr<::parquet::WriterProperties>& writer_properties = @@ -442,6 +454,44 @@ void DoRoundtrip(const std::shared_ptr
& table, int64_t row_group_size, ASSERT_OK_AND_ASSIGN(*out, reader->ReadTable()); } +std::shared_ptr<::arrow::DataType> VectorFixedSizeListType( + bool element_nullable = false) { + return ::arrow::fixed_size_list( + ::arrow::field("item", ::arrow::int16(), element_nullable), + /*size=*/3); +} + +std::shared_ptr
MakeVectorFixedSizeListTable( + const std::vector>& chunks, bool nullable = true, + bool element_nullable = false) { + auto type = VectorFixedSizeListType(element_nullable); + auto field = ::arrow::field("root", type, nullable); + auto column = std::make_shared(chunks, type); + return ::arrow::Table::Make(::arrow::schema({field}), {column}); +} + +std::shared_ptr
MakeVectorFixedSizeListTable(std::string_view json, + bool nullable = true, + bool element_nullable = false) { + auto type = VectorFixedSizeListType(element_nullable); + return MakeVectorFixedSizeListTable({::arrow::ArrayFromJSON(type, std::string(json))}, + nullable, element_nullable); +} + +void CheckVectorFixedSizeListRoundtrip( + const std::shared_ptr
& table, int64_t row_group_size, + const ArrowReaderProperties& arrow_reader_properties = + default_arrow_reader_properties()) { + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + std::shared_ptr
result; + ASSERT_NO_FATAL_FAILURE(DoRoundtrip(table, row_group_size, &result, + VectorWriterProperties(), builder.build(), + arrow_reader_properties)); + ::arrow::AssertSchemaEqual(*table->schema(), *result->schema(), false); + ::arrow::AssertTablesEqual(*table, *result, false); +} + void CheckConfiguredRoundtrip( const std::shared_ptr
& input_table, const std::shared_ptr
& expected_table = nullptr, @@ -3341,6 +3391,364 @@ TEST(ArrowReadWrite, FixedSizeList) { CheckSimpleRoundtrip(table, 2, props_store_schema); } +TEST(ArrowWriteOnly, FixedSizeListVectorSchemaRequired) { + using ::arrow::field; + using ::arrow::fixed_size_list; + + auto type = fixed_size_list(field("item", ::arrow::int16(), false), /*size=*/3); + auto array = ::arrow::ArrayFromJSON(type, R"([ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9]])"); + auto table = + ::arrow::Table::Make(::arrow::schema({field("root", type, false)}), {array}); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + ASSERT_OK_AND_ASSIGN(auto buffer, + WriteTableToBuffer(table, /*row_group_size=*/2, + VectorWriterProperties(), builder.build())); + + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + const auto* schema = reader->metadata()->schema(); + const auto* root = schema->group_node()->field(0).get(); + ASSERT_TRUE(root->is_group()); + const auto* root_group = static_cast(root); + ASSERT_EQ(root_group->repetition(), Repetition::REQUIRED); + ASSERT_EQ(root_group->field_count(), 1); + ASSERT_TRUE(root_group->field(0)->is_vector()); + ASSERT_EQ(root_group->field(0)->vector_length(), 3); + ASSERT_EQ(reader->metadata()->RowGroup(0)->ColumnChunk(0)->num_values(), 6); + ASSERT_EQ(reader->metadata()->RowGroup(1)->ColumnChunk(0)->num_values(), 3); +} + +TEST(ArrowWriteOnly, FixedSizeListVectorSchemaNullable) { + using ::arrow::field; + using ::arrow::fixed_size_list; + + auto type = fixed_size_list(field("item", ::arrow::int16(), false), /*size=*/3); + auto array = ::arrow::ArrayFromJSON(type, R"([ + [1, 2, 3], + null, + [7, 8, 9]])"); + auto table = + ::arrow::Table::Make(::arrow::schema({field("root", type, true)}), {array}); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + ASSERT_OK_AND_ASSIGN(auto buffer, + WriteTableToBuffer(table, /*row_group_size=*/3, + VectorWriterProperties(), builder.build())); + + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + const auto* schema = reader->metadata()->schema(); + const auto* root = schema->group_node()->field(0).get(); + ASSERT_TRUE(root->is_group()); + const auto* root_group = static_cast(root); + ASSERT_EQ(root_group->repetition(), Repetition::OPTIONAL); + ASSERT_EQ(root_group->field_count(), 1); + ASSERT_TRUE(root_group->field(0)->is_vector()); + ASSERT_EQ(root_group->field(0)->vector_length(), 3); + ASSERT_EQ(reader->metadata()->RowGroup(0)->ColumnChunk(0)->num_values(), 9); +} + +TEST(ArrowReadWrite, FixedSizeListVectorRequiredRoundTrip) { + auto table = MakeVectorFixedSizeListTable(R"([ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9]])", + /*nullable=*/false); + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/2)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorNullableRoundTrip) { + auto table = MakeVectorFixedSizeListTable(R"([ + [1, 2, 3], + null, + [7, 8, 9]])"); + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/3)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorNullableRoundTripNullPatterns) { + const std::vector cases = { + R"([[1, 2, 3], [4, 5, 6], [7, 8, 9]])", + R"([[1, 2, 3], null, [7, 8, 9]])", + R"([null, [4, 5, 6], [7, 8, 9]])", + R"([[1, 2, 3], [4, 5, 6], null])", + R"([[1, 2, 3], null, null, [10, 11, 12], null])", + R"([null, null, null])", + R"([])"}; + + for (const auto& json : cases) { + SCOPED_TRACE(json); + auto table = MakeVectorFixedSizeListTable(json); + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip( + table, std::max(1, table->num_rows()))); + } +} + +TEST(ArrowReadWrite, FixedSizeListVectorNullableRoundTripRowGroups) { + auto table = MakeVectorFixedSizeListTable(R"([ + [1, 2, 3], + null, + [7, 8, 9], + null, + [13, 14, 15], + [16, 17, 18]])"); + + ArrowReaderProperties reader_properties; + reader_properties.set_batch_size(1); + ASSERT_NO_FATAL_FAILURE( + CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/2, reader_properties)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorNullableChunkedReadAcrossRowGroups) { + auto table = MakeVectorFixedSizeListTable(R"([ + [1, 2, 3], + null, + [7, 8, 9], + null, + [13, 14, 15]])"); + + ArrowWriterProperties::Builder writer_builder; + writer_builder.enable_experimental_vector_encoding(); + ASSERT_OK_AND_ASSIGN( + auto buffer, WriteTableToBuffer(table, /*row_group_size=*/2, + VectorWriterProperties(), writer_builder.build())); + + ArrowReaderProperties reader_properties; + reader_properties.set_batch_size(2); + FileReaderBuilder reader_builder; + ASSERT_OK(reader_builder.Open(std::make_shared(buffer))); + reader_builder.properties(reader_properties); + std::unique_ptr reader; + ASSERT_OK(reader_builder.Build(&reader)); + ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader()); + ASSERT_OK_AND_ASSIGN(auto out, Table::FromRecordBatchReader(rb_reader.get())); + + ASSERT_EQ(out->column(0)->num_chunks(), 3); + ASSERT_EQ(out->column(0)->chunk(0)->length(), 2); + ASSERT_EQ(out->column(0)->chunk(1)->length(), 2); + ASSERT_EQ(out->column(0)->chunk(2)->length(), 1); + ::arrow::AssertTablesEqual(*table, *out, false); +} + +TEST(ArrowReadWrite, FixedSizeListVectorNullableRoundTripSlicedInput) { + auto base = ::arrow::ArrayFromJSON(VectorFixedSizeListType(), R"([ + [100, 101, 102], + [1, 2, 3], + null, + [7, 8, 9], + null, + [200, 201, 202]])"); + auto table = MakeVectorFixedSizeListTable({base->Slice(/*offset=*/1, /*length=*/4)}); + + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/2)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorRequiredRowsNullableElementsRoundTrip) { + auto table = MakeVectorFixedSizeListTable(R"([ + [1, null, 3], + [null, 5, 6], + [7, 8, null]])", + /*nullable=*/false, + /*element_nullable=*/true); + + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/2)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorNullableRowsAndElementsRoundTrip) { + auto table = MakeVectorFixedSizeListTable(R"([ + [1, null, 3], + null, + [null, null, null], + [7, 8, null]])", + /*nullable=*/true, + /*element_nullable=*/true); + + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/4)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorMixedColumnsRoundTripAcrossRowGroups) { + auto vector_type = + ::arrow::fixed_size_list(::arrow::field("item", ::arrow::int16(), true), + /*list_size=*/3); + auto list_type = ::arrow::list(::arrow::field("item", ::arrow::int32(), true)); + + auto vector_chunks = std::vector>{ + ::arrow::ArrayFromJSON(vector_type, + R"([[1, null, 3], null, [7, 8, null], [10, 11, 12]])"), + ::arrow::ArrayFromJSON(vector_type, + R"([null, [16, null, 18], [19, 20, 21], [22, null, 24]])")}; + auto table = ::arrow::Table::Make( + ::arrow::schema({ + ::arrow::field("id", ::arrow::int32(), false), + ::arrow::field("embedding", vector_type, true), + ::arrow::field("label", ::arrow::utf8(), true), + ::arrow::field("tags", list_type, true), + }), + { + std::make_shared( + ::arrow::ArrayFromJSON(::arrow::int32(), "[0, 1, 2, 3, 4, 5, 6, 7]")), + std::make_shared(std::move(vector_chunks), vector_type), + std::make_shared(::arrow::ArrayFromJSON( + ::arrow::utf8(), R"(["a", null, "c", "d", null, "f", "g", "h"])")), + std::make_shared(::arrow::ArrayFromJSON( + list_type, R"([[1, 2], [], null, [3, null], [4], [5, 6], null, []])")), + }); + + ArrowWriterProperties::Builder writer_builder; + writer_builder.enable_experimental_vector_encoding(); + ASSERT_OK_AND_ASSIGN( + auto buffer, WriteTableToBuffer(table, /*row_group_size=*/3, + VectorWriterProperties(), writer_builder.build())); + + auto parquet_reader = ParquetFileReader::Open(std::make_shared(buffer)); + ASSERT_EQ(parquet_reader->metadata()->num_row_groups(), 3); + for (int i = 0; i < 3; ++i) { + const int64_t expected_rows = i == 2 ? 2 : 3; + EXPECT_EQ(parquet_reader->metadata()->RowGroup(i)->num_rows(), expected_rows); + EXPECT_EQ(parquet_reader->metadata()->RowGroup(i)->ColumnChunk(1)->num_values(), + expected_rows * 3); + } + + ArrowReaderProperties reader_properties; + reader_properties.set_batch_size(2); + FileReaderBuilder reader_builder; + ASSERT_OK(reader_builder.Open(std::make_shared(buffer))); + reader_builder.properties(reader_properties); + std::unique_ptr reader; + ASSERT_OK(reader_builder.Build(&reader)); + ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader()); + ASSERT_OK_AND_ASSIGN(auto out, Table::FromRecordBatchReader(rb_reader.get())); + + ::arrow::AssertSchemaEqual(*table->schema(), *out->schema(), false); + ::arrow::AssertTablesEqual(*table, *out, false); +} + +TEST(ArrowReadWrite, FixedSizeListVectorMixedColumnsRoundtripEqualsOriginal) { + constexpr int32_t kVectorSize = 4; + auto vector_type = ::arrow::fixed_size_list( + ::arrow::field("item", ::arrow::float32(), false), kVectorSize); + auto list_type = ::arrow::list(::arrow::field("item", ::arrow::int32(), false)); + + auto table = ::arrow::Table::Make( + ::arrow::schema({ + ::arrow::field("id", ::arrow::int32(), false), + ::arrow::field("embedding", vector_type, false), + ::arrow::field("label", ::arrow::utf8(), false), + ::arrow::field("tags", list_type, false), + }), + { + ::arrow::ArrayFromJSON(::arrow::int32(), "[0, 1, 2, 3, 4]"), + ::arrow::ArrayFromJSON(vector_type, R"([ + [0.0, 0.1, 0.2, 0.3], + [1.0, 1.1, 1.2, 1.3], + [2.0, 2.1, 2.2, 2.3], + [3.0, 3.1, 3.2, 3.3], + [4.0, 4.1, 4.2, 4.3] + ])"), + ::arrow::ArrayFromJSON(::arrow::utf8(), + R"(["alpha", "bravo", "charlie", "delta", "echo"])"), + ::arrow::ArrayFromJSON(list_type, + R"([[10, 11], [], [12], [13, 14, 15], [16, 17]])"), + }); + + ArrowWriterProperties::Builder writer_builder; + writer_builder.enable_experimental_vector_encoding(); + ASSERT_OK_AND_ASSIGN( + auto buffer, WriteTableToBuffer(table, /*row_group_size=*/table->num_rows(), + VectorWriterProperties(), writer_builder.build())); + + std::unique_ptr reader; + FileReaderBuilder reader_builder; + ASSERT_OK(reader_builder.Open(std::make_shared(buffer))); + ASSERT_OK(reader_builder.Build(&reader)); + std::shared_ptr
out; + ASSERT_OK(reader->ReadTable(&out)); + + ::arrow::AssertSchemaEqual(*table->schema(), *out->schema(), false); + ::arrow::AssertTablesEqual(*table, *out, false); +} + +std::shared_ptr<::arrow::DataType> VectorFixedSizeListStructType() { + return ::arrow::fixed_size_list( + ::arrow::field("item", + ::arrow::struct_({::arrow::field("x", ::arrow::float32(), false), + ::arrow::field("y", ::arrow::int32(), false)}), + false), + /*size=*/2); +} + +std::shared_ptr
MakeVectorFixedSizeListStructTable(std::string_view json, + bool nullable = true) { + auto type = VectorFixedSizeListStructType(); + auto field = ::arrow::field("root", type, nullable); + auto array = ::arrow::ArrayFromJSON(type, std::string(json)); + return ::arrow::Table::Make(::arrow::schema({field}), {array}); +} + +TEST(ArrowReadWrite, FixedSizeListVectorStructRoundTrip) { + auto table = MakeVectorFixedSizeListStructTable(R"([ + [{"x": 1.0, "y": 1}, {"x": 2.0, "y": 2}], + [{"x": 3.0, "y": 3}, {"x": 4.0, "y": 4}], + [{"x": 5.0, "y": 5}, {"x": 6.0, "y": 6}]])", + /*nullable=*/false); + + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/2)); +} + +TEST(ArrowReadWrite, FixedSizeListVectorStructNullableRoundTrip) { + auto table = MakeVectorFixedSizeListStructTable(R"([ + [{"x": 1.0, "y": 1}, {"x": 2.0, "y": 2}], + null, + [{"x": 5.0, "y": 5}, {"x": 6.0, "y": 6}], + null])"); + + ASSERT_NO_FATAL_FAILURE(CheckVectorFixedSizeListRoundtrip(table, /*row_group_size=*/2)); +} + +TEST(ArrowWriteOnly, FixedSizeListVectorRejectsDefaultWriterProperties) { + using ::arrow::field; + using ::arrow::fixed_size_list; + + auto type = fixed_size_list(field("item", ::arrow::int16(), false), /*size=*/3); + auto array = ::arrow::ArrayFromJSON(type, R"([ + [1, 2, 3], + [4, 5, 6]])"); + auto table = + ::arrow::Table::Make(::arrow::schema({field("root", type, false)}), {array}); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + ASSERT_RAISES(Invalid, + WriteTableToBuffer(table, /*row_group_size=*/2, + default_writer_properties(), builder.build())); +} + +TEST(ArrowWriteOnly, FixedSizeListVectorRejectsNonPlainEncoding) { + using ::arrow::field; + using ::arrow::fixed_size_list; + + auto type = fixed_size_list(field("item", ::arrow::int16(), false), /*size=*/3); + auto array = ::arrow::ArrayFromJSON(type, R"([ + [1, 2, 3], + [4, 5, 6]])"); + auto table = + ::arrow::Table::Make(::arrow::schema({field("root", type, false)}), {array}); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + auto invalid_props_builder = WriterProperties::Builder(); + auto invalid_props = invalid_props_builder.disable_dictionary() + ->disable_statistics() + ->disable_write_page_index() + ->encoding(Encoding::DELTA_BINARY_PACKED) + ->build(); + ASSERT_RAISES(Invalid, WriteTableToBuffer(table, /*row_group_size=*/2, invalid_props, + builder.build())); +} + TEST(ArrowReadWrite, ListOfStructOfList2) { using ::arrow::field; using ::arrow::list; diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 7d9ecb5e6449..43c599eb2cf1 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -289,6 +289,22 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +TEST_F(TestConvertParquetSchema, VectorFixedSizeList) { + std::vector parquet_fields; + std::vector> arrow_fields; + + auto element = PrimitiveNode::Make("element", Repetition::VECTOR, ParquetType::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, 3); + parquet_fields.push_back(GroupNode::Make("embedding", Repetition::OPTIONAL, {element})); + arrow_fields.push_back(::arrow::field( + "embedding", + ::arrow::fixed_size_list(::arrow::field("element", ::arrow::float32(), false), 3), + true)); + + ASSERT_OK(ConvertSchema(parquet_fields)); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(::arrow::schema(arrow_fields))); +} + TEST_F(TestConvertParquetSchema, DuplicateFieldNames) { std::vector parquet_fields; std::vector> arrow_fields; @@ -1746,12 +1762,8 @@ TEST_F(TestConvertArrowSchema, ParquetOtherLists) { auto arrow_list = ::arrow::large_list(arrow_element); arrow_fields.push_back(::arrow::field("my_list", arrow_list, false)); } - // // FixedSizeList[10] (list-like non-null, elements nullable) - // required group my_list (LIST) { - // repeated group list { - // optional binary element (UTF8); - // } - // } + // FixedSizeList defaults to the legacy LIST encoding unless experimental VECTOR + // encoding is explicitly enabled. { auto element = PrimitiveNode::Make("element", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8); @@ -1768,6 +1780,55 @@ TEST_F(TestConvertArrowSchema, ParquetOtherLists) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); } +TEST_F(TestConvertArrowSchema, ParquetFixedSizeListVectorUnsupportedElement) { + auto arrow_element = ::arrow::field("string", UTF8, true); + auto arrow_list = ::arrow::fixed_size_list(arrow_element, 3); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + ASSERT_RAISES( + NotImplemented, + ConvertSchema({::arrow::field("embedding", arrow_list, true)}, builder.build())); +} + +TEST_F(TestConvertArrowSchema, ParquetFixedSizeListVectorNullableElement) { + std::vector parquet_fields; + std::vector> arrow_fields; + + auto item = PrimitiveNode::Make("element", Repetition::OPTIONAL, ParquetType::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, -1); + auto element = GroupNode::Make("element", Repetition::VECTOR, {item}, + /*logical_type=*/nullptr, -1, 3); + parquet_fields.push_back(GroupNode::Make("embedding", Repetition::OPTIONAL, {element})); + + auto arrow_element = ::arrow::field("element", FLOAT, true); + auto arrow_list = ::arrow::fixed_size_list(arrow_element, 3); + arrow_fields.push_back(::arrow::field("embedding", arrow_list, true)); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + ASSERT_OK(ConvertSchema(arrow_fields, builder.build())); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); +} + +TEST_F(TestConvertArrowSchema, ParquetFixedSizeListVector) { + std::vector parquet_fields; + std::vector> arrow_fields; + + auto element = PrimitiveNode::Make("element", Repetition::VECTOR, ParquetType::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, 3); + parquet_fields.push_back(GroupNode::Make("embedding", Repetition::OPTIONAL, {element})); + + auto arrow_element = ::arrow::field("element", FLOAT, false); + auto arrow_list = ::arrow::fixed_size_list(arrow_element, 3); + arrow_fields.push_back(::arrow::field("embedding", arrow_list, true)); + + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + ASSERT_OK(ConvertSchema(arrow_fields, builder.build())); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); +} + TEST_F(TestConvertArrowSchema, ParquetNestedComplianceEnabledNullable) { std::vector parquet_fields; std::vector> arrow_fields; diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index 002859a5e7d5..ca3eab9a6f14 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -450,6 +450,85 @@ struct FixedSizedRangeSelector { int list_size; }; +struct NoLevelTerminalNode { + IterationResult Run(const ElementRange&, PathWriteContext*) { return kDone; } +}; + +class VectorNullableNode { + public: + VectorNullableNode(const uint8_t* null_bitmap, int64_t entry_offset, + int32_t vector_length, int16_t def_level_if_present, + bool child_emits_present_def_levels) + : null_bitmap_(null_bitmap), + entry_offset_(entry_offset), + vector_length_(vector_length), + valid_bits_reader_(MakeReader(ElementRange{0, 0})), + def_level_if_present_(def_level_if_present), + def_level_if_null_(def_level_if_present - 1), + child_emits_present_def_levels_(child_emits_present_def_levels), + new_range_(true) {} + + ::arrow::internal::BitRunReader MakeReader(const ElementRange& range) { + return ::arrow::internal::BitRunReader(null_bitmap_, entry_offset_ + range.start, + range.Size()); + } + + IterationResult Run(ElementRange* range, ElementRange* child_range, + PathWriteContext* context) { + if (range->Empty()) { + new_range_ = true; + return kDone; + } + if (null_bitmap_ == nullptr) { + child_range->start = range->start * vector_length_; + child_range->end = child_range->start + range->Size() * vector_length_; + context->RecordPostListVisit(*child_range); + if (!child_emits_present_def_levels_) { + RETURN_IF_ERROR( + context->AppendDefLevels(child_range->Size(), def_level_if_present_)); + } + range->start = range->end; + new_range_ = false; + return kNext; + } + if (new_range_) { + valid_bits_reader_ = MakeReader(*range); + } + ::arrow::internal::BitRun run = valid_bits_reader_.NextRun(); + while (!range->Empty() && !run.set) { + range->start += run.length; + RETURN_IF_ERROR( + context->AppendDefLevels(run.length * vector_length_, def_level_if_null_)); + run = valid_bits_reader_.NextRun(); + } + if (range->Empty()) { + new_range_ = true; + return kDone; + } + + child_range->start = range->start * vector_length_; + child_range->end = child_range->start + run.length * vector_length_; + context->RecordPostListVisit(*child_range); + if (!child_emits_present_def_levels_) { + RETURN_IF_ERROR( + context->AppendDefLevels(run.length * vector_length_, def_level_if_present_)); + } + range->start += run.length; + new_range_ = false; + return kNext; + } + + private: + const uint8_t* null_bitmap_; + int64_t entry_offset_; + int32_t vector_length_; + ::arrow::internal::BitRunReader valid_bits_reader_; + int16_t def_level_if_present_; + int16_t def_level_if_null_; + bool child_emits_present_def_levels_; + bool new_range_ = true; +}; + // An intermediate node that handles null values. class NullableNode { public: @@ -519,7 +598,8 @@ struct PathInfo { // Note index order matters here. using Node = std::variant; + NullableNode, VectorNullableNode, AllPresentTerminalNode, + AllNullsTerminalNode, NoLevelTerminalNode>; std::vector path; std::shared_ptr primitive_array; @@ -527,6 +607,7 @@ struct PathInfo { int16_t max_rep_level = 0; bool has_dictionary = false; bool leaf_is_nullable = false; + bool leaf_is_vector = false; }; /// Contains logic for writing a single leaf node to parquet. @@ -543,6 +624,7 @@ Status WritePath(ElementRange root_range, PathInfo* path_info, MultipathLevelBuilderResult builder_result; builder_result.leaf_array = path_info->primitive_array; builder_result.leaf_is_nullable = path_info->leaf_is_nullable; + builder_result.leaf_is_vector = path_info->leaf_is_vector; if (path_info->max_def_level == 0) { // This case only occurs when there are no nullable or repeated @@ -597,6 +679,12 @@ Status WritePath(ElementRange root_range, PathInfo* path_info, IterationResult operator()(LargeListNode& node) { return node.Run(stack_position, stack_position + 1, context); } + IterationResult operator()(VectorNullableNode& node) { + return node.Run(stack_position, stack_position + 1, context); + } + IterationResult operator()(NoLevelTerminalNode& node) { + return node.Run(*stack_position, context); + } ElementRange* stack_position; PathWriteContext* context; } visitor = {stack_position, &context}; @@ -624,8 +712,12 @@ Status WritePath(ElementRange root_range, PathInfo* path_info, builder_result.post_list_visited_elements.push_back({0, 0}); } } else { - builder_result.post_list_visited_elements.push_back( - {0, builder_result.leaf_array->length()}); + if (!context.visited_elements.empty()) { + std::swap(builder_result.post_list_visited_elements, context.visited_elements); + } else { + builder_result.post_list_visited_elements.push_back( + {0, builder_result.leaf_array->length()}); + } builder_result.rep_levels = nullptr; } @@ -661,6 +753,7 @@ struct FixupVisitor { } void operator()(NullableNode& arg) { HandleIntermediateNode(arg); } + void operator()(VectorNullableNode&) {} void operator()(AllNullsTerminalNode& arg) { // Even though no processing happens past this point we @@ -671,6 +764,7 @@ struct FixupVisitor { void operator()(NullableTerminalNode&) {} void operator()(AllPresentTerminalNode&) {} + void operator()(NoLevelTerminalNode&) {} }; PathInfo Fixup(PathInfo info) { @@ -692,7 +786,9 @@ PathInfo Fixup(PathInfo info) { class PathBuilder { public: - explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {} + PathBuilder(bool start_nullable, bool write_fixed_size_list_as_vector) + : nullable_in_parent_(start_nullable), + write_fixed_size_list_as_vector_(write_fixed_size_list_as_vector) {} template void AddTerminalInfo(const T& array) { info_.leaf_is_nullable = nullable_in_parent_; @@ -801,6 +897,44 @@ class PathBuilder { } Status Visit(const ::arrow::FixedSizeListArray& array) { + if (write_fixed_size_list_as_vector_) { + if (array.value_type()->id() != ::arrow::Type::STRUCT && + (::arrow::is_nested(*array.value_type()) || + !::arrow::is_fixed_width(*array.value_type()) || + array.value_type()->id() == ::arrow::Type::DICTIONARY || + array.value_type()->id() == ::arrow::Type::EXTENSION)) { + return Status::NotImplemented( + "Experimental VECTOR writing only supports fixed-width primitive or struct " + "elements"); + } + int32_t list_size = array.list_type()->list_size(); + const bool element_nullable = array.list_type()->value_field()->nullable(); + info_.leaf_is_vector = true; + const bool parent_nullable = nullable_in_parent_; + if (parent_nullable) { + info_.max_def_level++; + } + const bool child_emits_present_def_levels = + element_nullable || array.value_type()->id() == ::arrow::Type::STRUCT; + if (parent_nullable || element_nullable || + array.value_type()->id() == ::arrow::Type::STRUCT) { + info_.path.emplace_back(VectorNullableNode( + parent_nullable ? array.null_bitmap_data() : nullptr, array.offset(), + list_size, info_.max_def_level, child_emits_present_def_levels)); + } + auto values = + array.values()->Slice(array.value_offset(0), array.length() * list_size); + if (!element_nullable && array.value_type()->id() != ::arrow::Type::STRUCT) { + info_.leaf_is_nullable = false; + info_.primitive_array = values; + info_.path.emplace_back(NoLevelTerminalNode{}); + paths_.push_back(Fixup(info_)); + return Status::OK(); + } + nullable_in_parent_ = element_nullable; + return VisitInline(*values); + } + MaybeAddNullable(array); int32_t list_size = array.list_type()->list_size(); // Technically we could encode fixed size lists with two level encodings @@ -840,6 +974,7 @@ class PathBuilder { PathInfo info_; std::vector paths_; bool nullable_in_parent_; + bool write_fixed_size_list_as_vector_; }; Status PathBuilder::VisitInline(const Array& array) { @@ -883,8 +1018,10 @@ class MultipathLevelBuilderImpl : public MultipathLevelBuilder { // static ::arrow::Result> MultipathLevelBuilder::Make( - const ::arrow::Array& array, bool array_field_nullable) { - auto constructor = std::make_unique(array_field_nullable); + const ::arrow::Array& array, bool array_field_nullable, + bool write_fixed_size_list_as_vector) { + auto constructor = std::make_unique(array_field_nullable, + write_fixed_size_list_as_vector); RETURN_NOT_OK(VisitArrayInline(array, constructor.get())); return std::make_unique(array.data(), std::move(constructor)); @@ -894,8 +1031,12 @@ ::arrow::Result> MultipathLevelBuilder::M Status MultipathLevelBuilder::Write(const Array& array, bool array_field_nullable, ArrowWriteContext* context, MultipathLevelBuilder::CallbackFunction callback) { + const bool write_fixed_size_list_as_vector = + context->properties != nullptr && + context->properties->write_fixed_size_list_as_vector(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr builder, - MultipathLevelBuilder::Make(array, array_field_nullable)); + MultipathLevelBuilder::Make(array, array_field_nullable, + write_fixed_size_list_as_vector)); for (int leaf_idx = 0; leaf_idx < builder->GetLeafCount(); leaf_idx++) { RETURN_NOT_OK(builder->Write(leaf_idx, context, callback)); } diff --git a/cpp/src/parquet/arrow/path_internal.h b/cpp/src/parquet/arrow/path_internal.h index 50d2bf24291a..52e563636c41 100644 --- a/cpp/src/parquet/arrow/path_internal.h +++ b/cpp/src/parquet/arrow/path_internal.h @@ -94,6 +94,12 @@ struct MultipathLevelBuilderResult { /// Whether the leaf array is nullable. bool leaf_is_nullable; + + /// Whether this leaf is produced from an Arrow FixedSizeList being written as + /// Parquet VECTOR. For nullable VECTOR elements the physical Parquet leaf is + /// below the VECTOR node, so checking only the primitive schema node is not + /// sufficient. + bool leaf_is_vector = false; }; /// \brief Logic for being able to write out nesting (rep/def level) data that is @@ -132,7 +138,8 @@ class PARQUET_EXPORT MultipathLevelBuilder { /// the array column as nullable (as determined by its type's parent /// field). static ::arrow::Result> Make( - const ::arrow::Array& array, bool array_field_nullable); + const ::arrow::Array& array, bool array_field_nullable, + bool write_fixed_size_list_as_vector = false); virtual ~MultipathLevelBuilder() = default; diff --git a/cpp/src/parquet/arrow/path_internal_test.cc b/cpp/src/parquet/arrow/path_internal_test.cc index 0145e889ddaf..09c6acd3732b 100644 --- a/cpp/src/parquet/arrow/path_internal_test.cc +++ b/cpp/src/parquet/arrow/path_internal_test.cc @@ -551,6 +551,88 @@ TEST_F(MultipathLevelBuilderTest, TestFixedSizeList) { EXPECT_THAT(results_[0].post_list_elements[0].end, Eq(6)); } +TEST_F(MultipathLevelBuilderTest, TestFixedSizeListExperimentalVector) { + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + arrow_properties_ = builder.build(); + context_ = ArrowWriteContext(default_memory_pool(), arrow_properties_.get()); + + auto entries = field("Entries", ::arrow::int64(), /*nullable=*/false); + auto list_type = fixed_size_list(entries, 2); + auto array = ArrayFromJSON(list_type, "[[1, 2], [3, 4], [5, 6]]"); + + ASSERT_OK( + MultipathLevelBuilder::Write(*array, /*nullable=*/false, &context_, callback_)); + ASSERT_THAT(results_, SizeIs(1)); + EXPECT_TRUE(results_[0].null_rep_levels); + EXPECT_TRUE(results_[0].null_def_levels); + ASSERT_THAT(results_[0].post_list_elements, SizeIs(1)); + EXPECT_EQ(results_[0].post_list_elements[0].start, 0); + EXPECT_EQ(results_[0].post_list_elements[0].end, 6); +} + +TEST_F(MultipathLevelBuilderTest, TestFixedSizeListExperimentalVectorNullableElements) { + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + arrow_properties_ = builder.build(); + context_ = ArrowWriteContext(default_memory_pool(), arrow_properties_.get()); + + auto entries = field("Entries", ::arrow::int64(), /*nullable=*/true); + auto list_type = fixed_size_list(entries, 2); + auto array = ArrayFromJSON(list_type, "[[1, null], [3, 4], [null, 6]]"); + + ASSERT_OK( + MultipathLevelBuilder::Write(*array, /*nullable=*/false, &context_, callback_)); + ASSERT_THAT(results_, SizeIs(1)); + results_[0].CheckLevelsWithNullRepLevels(std::vector{1, 0, 1, 1, 0, 1}); + ASSERT_THAT(results_[0].post_list_elements, SizeIs(1)); + EXPECT_EQ(results_[0].post_list_elements[0].start, 0); + EXPECT_EQ(results_[0].post_list_elements[0].end, 6); +} + +TEST_F(MultipathLevelBuilderTest, + TestFixedSizeListExperimentalVectorNullableRowsAndElements) { + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + arrow_properties_ = builder.build(); + context_ = ArrowWriteContext(default_memory_pool(), arrow_properties_.get()); + + auto entries = field("Entries", ::arrow::int64(), /*nullable=*/true); + auto list_type = fixed_size_list(entries, 2); + auto array = ArrayFromJSON(list_type, "[[1, null], null, [null, 6]]"); + + ASSERT_OK( + MultipathLevelBuilder::Write(*array, /*nullable=*/true, &context_, callback_)); + ASSERT_THAT(results_, SizeIs(1)); + results_[0].CheckLevelsWithNullRepLevels(std::vector{2, 1, 0, 0, 1, 2}); + ASSERT_THAT(results_[0].post_list_elements, SizeIs(2)); + EXPECT_EQ(results_[0].post_list_elements[0].start, 0); + EXPECT_EQ(results_[0].post_list_elements[0].end, 2); + EXPECT_EQ(results_[0].post_list_elements[1].start, 4); + EXPECT_EQ(results_[0].post_list_elements[1].end, 6); +} + +TEST_F(MultipathLevelBuilderTest, TestFixedSizeListExperimentalVectorNullable) { + ArrowWriterProperties::Builder builder; + builder.enable_experimental_vector_encoding(); + arrow_properties_ = builder.build(); + context_ = ArrowWriteContext(default_memory_pool(), arrow_properties_.get()); + + auto entries = field("Entries", ::arrow::int64(), /*nullable=*/false); + auto list_type = fixed_size_list(entries, 2); + auto array = ArrayFromJSON(list_type, "[[1, 2], null, [5, 6]]"); + + ASSERT_OK( + MultipathLevelBuilder::Write(*array, /*nullable=*/true, &context_, callback_)); + ASSERT_THAT(results_, SizeIs(1)); + results_[0].CheckLevelsWithNullRepLevels(std::vector{1, 1, 0, 0, 1, 1}); + ASSERT_THAT(results_[0].post_list_elements, SizeIs(2)); + EXPECT_EQ(results_[0].post_list_elements[0].start, 0); + EXPECT_EQ(results_[0].post_list_elements[0].end, 2); + EXPECT_EQ(results_[0].post_list_elements[1].start, 4); + EXPECT_EQ(results_[0].post_list_elements[1].end, 6); +} + TEST_F(MultipathLevelBuilderTest, TestFixedSizeListMissingMiddleHasTwoVisitedRanges) { auto entries = field("Entries", ::arrow::int64(), /*nullable=*/false); auto list_type = fixed_size_list(entries, 2); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index a60af69aec9f..ac3248fa0d65 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -26,6 +26,8 @@ #include #include "arrow/array.h" +#include "arrow/array/concatenate.h" +#include "arrow/array/util.h" #include "arrow/buffer.h" #include "arrow/extension_type.h" #include "arrow/io/memory.h" @@ -273,10 +275,19 @@ class FileReaderImpl : public FileReader { // TODO(wesm): This calculation doesn't make much sense when we have repeated // schema nodes int64_t records_to_read = 0; + auto* impl = dynamic_cast(reader); + const bool is_vector = + impl != nullptr && + impl->field()->type()->id() == ::arrow::Type::FIXED_SIZE_LIST && + !impl->IsOrHasRepeatedChild(); for (auto row_group : row_groups) { // Can throw exception - records_to_read += - reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values(); + if (is_vector) { + records_to_read += reader_->metadata()->RowGroup(row_group)->num_rows(); + } else { + records_to_read += + reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values(); + } } #ifdef ARROW_WITH_OPENTELEMETRY std::string column_name = reader_->metadata()->schema()->Column(i)->name(); @@ -454,7 +465,8 @@ class LeafReader : public ColumnReaderImpl { public: LeafReader(std::shared_ptr ctx, std::shared_ptr field, std::unique_ptr input, - ::parquet::internal::LevelInfo leaf_info) + ::parquet::internal::LevelInfo leaf_info, + bool read_dense_for_nullable = false) : ctx_(std::move(ctx)), field_(std::move(field)), input_(std::move(input)), @@ -469,7 +481,8 @@ class LeafReader : public ColumnReaderImpl { record_reader_ = RecordReader::Make( descr_, leaf_info, ctx_->pool, /*read_dictionary=*/field_->type()->id() == ::arrow::Type::DICTIONARY, - /*read_dense_for_nullable=*/false, /*arrow_type=*/type_for_reading); + /*read_dense_for_nullable=*/read_dense_for_nullable, + /*arrow_type=*/type_for_reading); NextRowGroup(); } @@ -706,6 +719,143 @@ class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader { } }; +// Reads Parquet VECTOR columns into Arrow FixedSizeList arrays. +// +// VECTOR stores one definition level per element even though the public Arrow result is a +// single FixedSizeList slot per row. For nullable VECTOR rows, the child reader therefore +// materializes spaced child slots and this reader collapses each vector's per-element def +// levels back into a parent validity bitmap. +class PARQUET_NO_EXPORT VectorFixedSizeListReader : public ColumnReaderImpl { + public: + VectorFixedSizeListReader(std::shared_ptr ctx, + std::shared_ptr field, + ::parquet::internal::LevelInfo level_info, + std::unique_ptr child_reader) + : ctx_(std::move(ctx)), + field_(std::move(field)), + level_info_(level_info), + item_reader_(std::move(child_reader)), + list_size_(checked_cast(*field_->type()) + .list_size()) {} + + Status GetDefLevels(const int16_t** data, int64_t* length) override { + if (collapsed_def_levels_.empty()) { + *data = nullptr; + *length = rows_loaded_; + } else { + *data = collapsed_def_levels_.data(); + *length = rows_loaded_; + } + return Status::OK(); + } + + Status GetRepLevels(const int16_t** data, int64_t* length) override { + *data = nullptr; + *length = rows_loaded_; + return Status::OK(); + } + + bool IsOrHasRepeatedChild() const final { return false; } + + Status LoadBatch(int64_t number_of_records) final { + rows_loaded_ = 0; + collapsed_def_levels_.clear(); + RETURN_NOT_OK(item_reader_->LoadBatch(number_of_records * list_size_)); + + const int16_t* def_levels = nullptr; + int64_t num_levels = 0; + RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels)); + if (num_levels % list_size_ != 0) { + return Status::Invalid("VECTOR column produced a non-multiple of vector length ", + num_levels, " for list_size=", list_size_); + } + rows_loaded_ = num_levels / list_size_; + + if (field_->nullable() && def_levels != nullptr) { + collapsed_def_levels_.reserve(static_cast(rows_loaded_)); + for (int64_t row = 0; row < rows_loaded_; ++row) { + const bool first_is_present = + def_levels[row * list_size_] >= level_info_.def_level; + for (int32_t i = 1; i < list_size_; ++i) { + const bool is_present = + def_levels[row * list_size_ + i] >= level_info_.def_level; + if (is_present != first_is_present) { + return Status::Invalid( + "VECTOR column mixes parent-null and parent-present definition levels " + "within a single vector"); + } + } + collapsed_def_levels_.push_back(first_is_present ? level_info_.def_level + : level_info_.def_level - 1); + } + } + return Status::OK(); + } + + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr* out) override { + std::shared_ptr child_out; + RETURN_NOT_OK(item_reader_->BuildArray(length_upper_bound * list_size_, &child_out)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr child_data, + ChunksToSingle(*child_out)); + std::shared_ptr child_array = ::arrow::MakeArray(child_data); + if (rows_loaded_ == 0) { + if (child_array->length() % list_size_ != 0) { + return Status::Invalid("VECTOR FixedSizeList child length ", + child_array->length(), + " was not divisible by list_size=", list_size_); + } + rows_loaded_ = child_array->length() / list_size_; + } + + if (child_array->length() != rows_loaded_ * list_size_) { + return Status::Invalid("VECTOR FixedSizeList child length ", child_array->length(), + " did not match expected ", rows_loaded_ * list_size_); + } + + std::shared_ptr validity_buffer; + int64_t null_count = 0; + if (field_->nullable()) { + ARROW_ASSIGN_OR_RAISE( + validity_buffer, + AllocateResizableBuffer(bit_util::BytesForBits(rows_loaded_), ctx_->pool)); + memset(validity_buffer->mutable_data(), 0, + static_cast(bit_util::BytesForBits(rows_loaded_))); + if (collapsed_def_levels_.empty()) { + bit_util::SetBitsTo(validity_buffer->mutable_data(), 0, rows_loaded_, true); + } else { + for (int64_t row = 0; row < rows_loaded_; ++row) { + if (collapsed_def_levels_[row] == level_info_.def_level) { + bit_util::SetBit(validity_buffer->mutable_data(), row); + } else { + ++null_count; + } + } + } + validity_buffer->ZeroPadding(); + } + + auto data = std::make_shared( + field_->type(), rows_loaded_, + std::vector>{null_count > 0 ? validity_buffer : nullptr}, + null_count); + data->child_data.push_back(child_data); + *out = std::make_shared(::arrow::MakeArray(std::move(data))); + return Status::OK(); + } + + const std::shared_ptr field() override { return field_; } + + private: + std::shared_ptr ctx_; + std::shared_ptr field_; + ::parquet::internal::LevelInfo level_info_; + std::unique_ptr item_reader_; + int32_t list_size_; + int64_t rows_loaded_ = 0; + std::vector collapsed_def_levels_; +}; + class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl { public: explicit StructReader(std::shared_ptr ctx, @@ -884,8 +1034,12 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f } std::unique_ptr input( ctx->iterator_factory(field.column_index, ctx->reader)); - *out = std::make_unique(ctx, arrow_field, std::move(input), - field.level_info); + auto leaf_field = arrow_field; + if (field.is_vector && field.level_info.def_level > 0) { + leaf_field = leaf_field->WithNullable(true); + } + *out = + std::make_unique(ctx, leaf_field, std::move(input), field.level_info); } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP || type_id == ::arrow::Type::FIXED_SIZE_LIST || type_id == ::arrow::Type::LARGE_LIST) { @@ -950,8 +1104,24 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size)); } - *out = std::make_unique(ctx, list_field, field.level_info, - std::move(child_reader)); + if (field.is_vector) { + if (child->is_leaf() && child->column_index >= 0 && + (list_field->nullable() || child_reader->field()->nullable())) { + // Nullable VECTOR rows and/or elements are represented internally as spaced + // child slots so that the primitive leaf reader preserves one child position + // per vector element. + std::unique_ptr input( + ctx->iterator_factory(child->column_index, ctx->reader)); + child_reader = + std::make_unique(ctx, child_reader->field()->WithNullable(true), + std::move(input), child->level_info); + } + *out = std::make_unique( + ctx, list_field, field.level_info, std::move(child_reader)); + } else { + *out = std::make_unique(ctx, list_field, field.level_info, + std::move(child_reader)); + } } else { return Status::UnknownError("Unknown list type: ", field.field->ToString()); } diff --git a/cpp/src/parquet/arrow/reader_writer_benchmark.cc b/cpp/src/parquet/arrow/reader_writer_benchmark.cc index 2f288fd2eb0f..988f9ed07bdc 100644 --- a/cpp/src/parquet/arrow/reader_writer_benchmark.cc +++ b/cpp/src/parquet/arrow/reader_writer_benchmark.cc @@ -32,8 +32,10 @@ #include "parquet/properties.h" #include "arrow/array.h" +#include "arrow/array/array_nested.h" #include "arrow/array/builder_primitive.h" #include "arrow/array/data.h" +#include "arrow/buffer.h" #include "arrow/compute/cast.h" #include "arrow/io/memory.h" #include "arrow/table.h" @@ -283,15 +285,76 @@ struct Examples { static constexpr std::array values() { return {false, true}; } }; +::arrow::Result> WriteReadBenchmarkBuffer( + const Table& table, const std::shared_ptr& properties, + const std::shared_ptr& arrow_properties) { + auto output = CreateOutputStream(); + RETURN_NOT_OK(WriteTable(table, ::arrow::default_memory_pool(), output, + /*chunk_size=*/table.num_rows(), properties, + arrow_properties)); + return output->Finish(); +} + +static void BenchmarkWriteTable(::benchmark::State& state, const Table& table, + std::shared_ptr properties, + std::shared_ptr arrow_properties, + int64_t num_values = -1, int64_t total_bytes = -1) { + for (auto _ : state) { + auto output = CreateOutputStream(); + EXIT_NOT_OK(WriteTable(table, ::arrow::default_memory_pool(), output, + /*chunk_size=*/table.num_rows(), properties, + arrow_properties)); + auto finished = output->Finish(); + EXIT_NOT_OK(finished.status()); + } + + if (num_values == -1) { + num_values = table.num_rows(); + } + state.SetItemsProcessed(num_values * state.iterations()); + if (total_bytes != -1) { + state.SetBytesProcessed(total_bytes * state.iterations()); + } +} + +static void BenchmarkRoundtripTable( + ::benchmark::State& state, const Table& table, + std::shared_ptr properties, + std::shared_ptr arrow_properties, int64_t num_values = -1, + int64_t total_bytes = -1) { + for (auto _ : state) { + auto output = CreateOutputStream(); + EXIT_NOT_OK(WriteTable(table, ::arrow::default_memory_pool(), output, + /*chunk_size=*/table.num_rows(), properties, + arrow_properties)); + PARQUET_ASSIGN_OR_THROW(auto buffer, output->Finish()); + + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + auto arrow_reader_result = + FileReader::Make(::arrow::default_memory_pool(), std::move(reader)); + EXIT_NOT_OK(arrow_reader_result.status()); + auto arrow_reader = std::move(*arrow_reader_result); + + auto table_result = arrow_reader->ReadTable(); + EXIT_NOT_OK(table_result.status()); + } + + if (num_values == -1) { + num_values = table.num_rows(); + } + state.SetItemsProcessed(num_values * state.iterations()); + if (total_bytes != -1) { + state.SetBytesProcessed(total_bytes * state.iterations()); + } +} + static void BenchmarkReadTable(::benchmark::State& state, const Table& table, std::shared_ptr properties, + std::shared_ptr arrow_properties, int64_t num_values = -1, int64_t total_bytes = -1) { - // Make sure we roundtrip Arrow types by storing the schema - auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); - auto output = CreateOutputStream(); - EXIT_NOT_OK(WriteTable(table, ::arrow::default_memory_pool(), output, - /*chunk_size=*/table.num_rows(), properties, arrow_properties)); - PARQUET_ASSIGN_OR_THROW(auto buffer, output->Finish()); + PARQUET_ASSIGN_OR_THROW(auto buffer, + WriteReadBenchmarkBuffer(table, properties, arrow_properties)); for (auto _ : state) { auto reader = @@ -314,6 +377,14 @@ static void BenchmarkReadTable(::benchmark::State& state, const Table& table, } } +static void BenchmarkReadTable(::benchmark::State& state, const Table& table, + std::shared_ptr properties, + int64_t num_values = -1, int64_t total_bytes = -1) { + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkReadTable(state, table, std::move(properties), std::move(arrow_properties), + num_values, total_bytes); +} + static void BenchmarkReadTable(::benchmark::State& state, const Table& table, int64_t num_values = -1, int64_t total_bytes = -1) { BenchmarkReadTable(state, table, default_writer_properties(), num_values, total_bytes); @@ -719,6 +790,680 @@ static void BM_ReadListOfListColumn(::benchmark::State& state) { BENCHMARK(BM_ReadListOfListColumn)->Apply(NestedReadArguments); +std::shared_ptr MakeFloatValues(int64_t num_values, bool nullable = false) { + ::arrow::FloatBuilder value_builder; + EXIT_NOT_OK(value_builder.Reserve(num_values)); + for (int64_t i = 0; i < num_values; ++i) { + if (nullable && (i % 2) == 1) { + EXIT_NOT_OK(value_builder.AppendNull()); + } else { + EXIT_NOT_OK(value_builder.Append(static_cast(i % 1024) / 1024.0f)); + } + } + std::shared_ptr values; + EXIT_NOT_OK(value_builder.Finish(&values)); + return values; +} + +int64_t CountAlternatingValidRows(int64_t num_rows) { return (num_rows + 1) / 2; } + +std::shared_ptr MakeAlternatingValidityBitmap(int64_t num_rows, + int64_t* null_count_out) { + const int64_t valid_count = CountAlternatingValidRows(num_rows); + *null_count_out = num_rows - valid_count; + auto bitmap = ::arrow::AllocateEmptyBitmap(num_rows, ::arrow::default_memory_pool()) + .MoveValueUnsafe(); + ::arrow::bit_util::SetBitsTo(bitmap->mutable_data(), 0, num_rows, false); + for (int64_t i = 0; i < num_rows; ++i) { + if ((i % 2) == 0) { + ::arrow::bit_util::SetBit(bitmap->mutable_data(), i); + } + } + return bitmap; +} + +std::shared_ptr
MakeListFloatTable(int64_t num_rows, int32_t list_size, + bool nullable = false, + bool element_nullable = false) { + const int64_t present_rows = nullable ? CountAlternatingValidRows(num_rows) : num_rows; + const int64_t num_values = present_rows * list_size; + auto values = MakeFloatValues(num_values, element_nullable); + + ::arrow::Int32Builder offsets_builder; + EXIT_NOT_OK(offsets_builder.Reserve(num_rows + 1)); + int32_t current_offset = 0; + for (int64_t i = 0; i <= num_rows; ++i) { + if (i > 0 && (!nullable || ((i - 1) % 2) == 0)) { + current_offset += list_size; + } + EXIT_NOT_OK(offsets_builder.Append(current_offset)); + } + std::shared_ptr offsets; + EXIT_NOT_OK(offsets_builder.Finish(&offsets)); + + auto value_field = ::arrow::field("item", ::arrow::float32(), element_nullable); + auto type = ::arrow::list(value_field); + std::shared_ptr<::arrow::ListArray> list_array; + if (nullable) { + int64_t null_count = 0; + auto validity = MakeAlternatingValidityBitmap(num_rows, &null_count); + PARQUET_ASSIGN_OR_THROW(list_array, + ::arrow::ListArray::FromArrays(type, *offsets, *values, + ::arrow::default_memory_pool(), + validity, null_count)); + } else { + PARQUET_ASSIGN_OR_THROW(list_array, + ::arrow::ListArray::FromArrays(type, *offsets, *values)); + } + auto field = ::arrow::field("column", type, nullable); + return Table::Make(::arrow::schema({field}), {list_array}, num_rows); +} + +std::shared_ptr
MakeFixedSizeListFloatTable(int64_t num_rows, int32_t list_size, + bool nullable = false, + bool element_nullable = false) { + const int64_t num_values = num_rows * list_size; + auto values = MakeFloatValues(num_values, element_nullable); + + auto value_field = ::arrow::field("item", ::arrow::float32(), element_nullable); + auto type = ::arrow::fixed_size_list(value_field, list_size); + std::shared_ptr list_array; + if (nullable) { + int64_t null_count = 0; + auto validity = MakeAlternatingValidityBitmap(num_rows, &null_count); + PARQUET_ASSIGN_OR_THROW(list_array, ::arrow::FixedSizeListArray::FromArrays( + values, type, validity, null_count)); + } else { + PARQUET_ASSIGN_OR_THROW(list_array, + ::arrow::FixedSizeListArray::FromArrays(values, type)); + } + auto field = ::arrow::field("column", type, nullable); + return Table::Make(::arrow::schema({field}), {list_array}, num_rows); +} + +std::shared_ptr MakeInt32Values(int64_t num_values) { + ::arrow::Int32Builder builder; + EXIT_NOT_OK(builder.Reserve(num_values)); + for (int64_t i = 0; i < num_values; ++i) { + EXIT_NOT_OK(builder.Append(static_cast(i % 1024))); + } + std::shared_ptr values; + EXIT_NOT_OK(builder.Finish(&values)); + return values; +} + +std::shared_ptr
MakeFixedSizeListStructTable(int64_t num_rows, int32_t list_size) { + const int64_t num_values = num_rows * list_size; + auto x = MakeFloatValues(num_values); + auto y = MakeInt32Values(num_values); + auto struct_type = ::arrow::struct_({::arrow::field("x", ::arrow::float32(), false), + ::arrow::field("y", ::arrow::int32(), false)}); + auto values = std::make_shared<::arrow::StructArray>(struct_type, num_values, + ::arrow::ArrayVector{x, y}); + auto type = + ::arrow::fixed_size_list(::arrow::field("item", struct_type, false), list_size); + PARQUET_ASSIGN_OR_THROW(auto list_array, + ::arrow::FixedSizeListArray::FromArrays(values, type)); + auto field = ::arrow::field("column", type, false); + return Table::Make(::arrow::schema({field}), {list_array}, num_rows); +} + +std::shared_ptr
MakeMixedColumnsTableWithVector(int64_t num_rows, + int32_t vector_size) { + auto id_array = MakeInt32Values(num_rows); + + auto vector_values = MakeFloatValues(num_rows * vector_size); + auto vector_type = ::arrow::fixed_size_list( + ::arrow::field("item", ::arrow::float32(), false), vector_size); + std::shared_ptr vector_array; + PARQUET_ASSIGN_OR_THROW( + vector_array, ::arrow::FixedSizeListArray::FromArrays(vector_values, vector_type)); + + ::arrow::random::RandomArrayGenerator generator(/*seed=*/424242); + auto label_array = generator.String(num_rows, /*min_length=*/3, /*max_length=*/16, + /*null_probability=*/0.0); + + constexpr int32_t kTagsPerRow = 4; + auto tag_values = MakeInt32Values(num_rows * kTagsPerRow); + ::arrow::Int32Builder offsets_builder; + EXIT_NOT_OK(offsets_builder.Reserve(num_rows + 1)); + for (int64_t i = 0; i <= num_rows; ++i) { + EXIT_NOT_OK(offsets_builder.Append(static_cast(i * kTagsPerRow))); + } + std::shared_ptr offsets; + EXIT_NOT_OK(offsets_builder.Finish(&offsets)); + auto tag_type = ::arrow::list(::arrow::field("item", ::arrow::int32(), false)); + std::shared_ptr<::arrow::ListArray> tags_array; + PARQUET_ASSIGN_OR_THROW( + tags_array, ::arrow::ListArray::FromArrays(tag_type, *offsets, *tag_values)); + + auto schema = ::arrow::schema({ + ::arrow::field("id", ::arrow::int32(), false), + ::arrow::field("embedding", vector_type, false), + ::arrow::field("label", ::arrow::utf8(), false), + ::arrow::field("tags", tag_type, false), + }); + return Table::Make(schema, {id_array, vector_array, label_array, tags_array}, num_rows); +} + +std::shared_ptr FixedSizeListVectorWriterProperties() { + auto builder = WriterProperties::Builder(); + return builder.disable_dictionary() + ->disable_statistics() + ->disable_write_page_index() + ->encoding(Encoding::PLAIN) + ->build(); +} + +std::shared_ptr FixedSizeListVectorArrowWriterProperties() { + ArrowWriterProperties::Builder builder; + builder.store_schema(); + builder.enable_experimental_vector_encoding(); + return builder.build(); +} + +static void BM_WriteListFloat(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkWriteTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_ReadListFloat(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, default_writer_properties(), num_rows, total_bytes); +} + +static void BM_RoundtripListFloat(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_WriteListFloatNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkWriteTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_ReadListFloatNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, default_writer_properties(), num_rows, total_bytes); +} + +static void BM_RoundtripListFloatNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_WriteFixedSizeListFloatLegacy(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkWriteTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_ReadFixedSizeListFloatLegacy(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, default_writer_properties(), num_rows, total_bytes); +} + +static void BM_RoundtripFixedSizeListFloatLegacy(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_WriteFixedSizeListFloatLegacyNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + auto maybe_buffer = + WriteReadBenchmarkBuffer(*table, default_writer_properties(), arrow_properties); + if (!maybe_buffer.ok()) { + state.SkipWithError(maybe_buffer.status().ToString().c_str()); + return; + } + + BenchmarkWriteTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_ReadFixedSizeListFloatLegacyNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + auto maybe_buffer = + WriteReadBenchmarkBuffer(*table, default_writer_properties(), arrow_properties); + if (!maybe_buffer.ok()) { + state.SkipWithError(maybe_buffer.status().ToString().c_str()); + return; + } + auto buffer = std::move(maybe_buffer).MoveValueUnsafe(); + + for (auto _ : state) { + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + auto arrow_reader_result = + FileReader::Make(::arrow::default_memory_pool(), std::move(reader)); + EXIT_NOT_OK(arrow_reader_result.status()); + auto arrow_reader = std::move(*arrow_reader_result); + + auto table_result = arrow_reader->ReadTable(); + EXIT_NOT_OK(table_result.status()); + } + + state.SetItemsProcessed(num_rows * state.iterations()); + state.SetBytesProcessed(total_bytes * state.iterations()); +} + +static void BM_RoundtripFixedSizeListFloatLegacyNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + auto maybe_buffer = + WriteReadBenchmarkBuffer(*table, default_writer_properties(), arrow_properties); + if (!maybe_buffer.ok()) { + state.SkipWithError(maybe_buffer.status().ToString().c_str()); + return; + } + + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_WriteFixedSizeListFloatVector(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + BenchmarkWriteTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_RoundtripMixedColumnsLegacy(::benchmark::State& state) { + const int32_t vector_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / vector_size; + auto table = MakeMixedColumnsTableWithVector(num_rows, vector_size); + const int64_t total_bytes = + num_rows * vector_size * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_RoundtripMixedColumnsVector(::benchmark::State& state) { + const int32_t vector_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / vector_size; + auto table = MakeMixedColumnsTableWithVector(num_rows, vector_size); + const int64_t total_bytes = + num_rows * vector_size * static_cast(sizeof(float)); + + BenchmarkRoundtripTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, + total_bytes); +} + +static void BM_ReadFixedSizeListFloatVector(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_RoundtripFixedSizeListFloatVector(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false); + const int64_t total_bytes = num_rows * list_size * static_cast(sizeof(float)); + + BenchmarkRoundtripTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, + total_bytes); +} + +static void BM_WriteFixedSizeListFloatVectorNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + BenchmarkWriteTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_ReadFixedSizeListFloatVectorNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_RoundtripFixedSizeListFloatVectorNullable(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + const int64_t present_rows = CountAlternatingValidRows(num_rows); + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/true); + const int64_t total_bytes = + present_rows * list_size * static_cast(sizeof(float)); + + BenchmarkRoundtripTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, + total_bytes); +} + +static void BM_WriteFixedSizeListStructLegacy(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListStructTable(num_rows, list_size); + const int64_t total_bytes = + num_rows * list_size * static_cast(sizeof(float) + sizeof(int32_t)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkWriteTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_ReadFixedSizeListStructLegacy(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListStructTable(num_rows, list_size); + const int64_t total_bytes = + num_rows * list_size * static_cast(sizeof(float) + sizeof(int32_t)); + + BenchmarkReadTable(state, *table, default_writer_properties(), num_rows, total_bytes); +} + +static void BM_RoundtripFixedSizeListStructLegacy(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListStructTable(num_rows, list_size); + const int64_t total_bytes = + num_rows * list_size * static_cast(sizeof(float) + sizeof(int32_t)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_WriteFixedSizeListStructVector(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListStructTable(num_rows, list_size); + const int64_t total_bytes = + num_rows * list_size * static_cast(sizeof(float) + sizeof(int32_t)); + + BenchmarkWriteTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_ReadFixedSizeListStructVector(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListStructTable(num_rows, list_size); + const int64_t total_bytes = + num_rows * list_size * static_cast(sizeof(float) + sizeof(int32_t)); + + BenchmarkReadTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_RoundtripFixedSizeListStructVector(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListStructTable(num_rows, list_size); + const int64_t total_bytes = + num_rows * list_size * static_cast(sizeof(float) + sizeof(int32_t)); + + BenchmarkRoundtripTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, + total_bytes); +} + +static void BM_WriteListFloatNullableElements(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/false, + /*element_nullable=*/true); + const int64_t valid_values = CountAlternatingValidRows(num_rows * list_size); + const int64_t total_bytes = valid_values * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkWriteTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_ReadListFloatNullableElements(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/false, + /*element_nullable=*/true); + const int64_t valid_values = CountAlternatingValidRows(num_rows * list_size); + const int64_t total_bytes = valid_values * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, default_writer_properties(), num_rows, total_bytes); +} + +static void BM_RoundtripListFloatNullableElements(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeListFloatTable(num_rows, list_size, /*nullable=*/false, + /*element_nullable=*/true); + const int64_t valid_values = CountAlternatingValidRows(num_rows * list_size); + const int64_t total_bytes = valid_values * static_cast(sizeof(float)); + + auto arrow_properties = ArrowWriterProperties::Builder().store_schema()->build(); + BenchmarkRoundtripTable(state, *table, default_writer_properties(), arrow_properties, + num_rows, total_bytes); +} + +static void BM_WriteFixedSizeListFloatVectorNullableElements(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false, + /*element_nullable=*/true); + const int64_t valid_values = CountAlternatingValidRows(num_rows * list_size); + const int64_t total_bytes = valid_values * static_cast(sizeof(float)); + + BenchmarkWriteTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_ReadFixedSizeListFloatVectorNullableElements(::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false, + /*element_nullable=*/true); + const int64_t valid_values = CountAlternatingValidRows(num_rows * list_size); + const int64_t total_bytes = valid_values * static_cast(sizeof(float)); + + BenchmarkReadTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, total_bytes); +} + +static void BM_RoundtripFixedSizeListFloatVectorNullableElements( + ::benchmark::State& state) { + const int32_t list_size = static_cast(state.range(0)); + const int64_t num_rows = BENCHMARK_SIZE / list_size; + auto table = MakeFixedSizeListFloatTable(num_rows, list_size, /*nullable=*/false, + /*element_nullable=*/true); + const int64_t valid_values = CountAlternatingValidRows(num_rows * list_size); + const int64_t total_bytes = valid_values * static_cast(sizeof(float)); + + BenchmarkRoundtripTable(state, *table, FixedSizeListVectorWriterProperties(), + FixedSizeListVectorArrowWriterProperties(), num_rows, + total_bytes); +} + +BENCHMARK(BM_WriteListFloat)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_ReadListFloat)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripListFloat)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_WriteListFloatNullable)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_ReadListFloatNullable)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripListFloatNullable)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListFloatLegacy)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_ReadFixedSizeListFloatLegacy)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListFloatLegacy) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListFloatLegacyNullable) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_ReadFixedSizeListFloatLegacyNullable) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListFloatLegacyNullable) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListFloatVector)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripMixedColumnsLegacy) + ->ArgName("vector_size") + ->RangeMultiplier(2) + ->Range(1 << 2, 1 << 20); +BENCHMARK(BM_RoundtripMixedColumnsVector) + ->ArgName("vector_size") + ->RangeMultiplier(2) + ->Range(1 << 2, 1 << 20); +BENCHMARK(BM_ReadFixedSizeListFloatVector)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListFloatVector) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListFloatVectorNullable) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_ReadFixedSizeListFloatVectorNullable) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListFloatVectorNullable) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListStructLegacy)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_ReadFixedSizeListStructLegacy)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListStructLegacy) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListStructVector)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_ReadFixedSizeListStructVector)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListStructVector) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteListFloatNullableElements)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_ReadListFloatNullableElements)->Arg(80)->Arg(768)->Arg(10000)->Arg(100000); +BENCHMARK(BM_RoundtripListFloatNullableElements) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_WriteFixedSizeListFloatVectorNullableElements) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_ReadFixedSizeListFloatVectorNullableElements) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); +BENCHMARK(BM_RoundtripFixedSizeListFloatVectorNullableElements) + ->Arg(80) + ->Arg(768) + ->Arg(10000) + ->Arg(100000); + // // Benchmark different ways of reading select row groups // diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 9c4c462c6b8c..9688f504fbc3 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -30,6 +30,7 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type.h" +#include "arrow/type_traits.h" #include "arrow/util/base64.h" #include "arrow/util/checked_cast.h" #include "arrow/util/key_value_metadata.h" @@ -111,6 +112,67 @@ Status ListToNode(const std::shared_ptr<::arrow::BaseListType>& type, return Status::OK(); } +Status FixedSizeListToNode(const std::shared_ptr<::arrow::FixedSizeListType>& type, + const std::string& name, bool nullable, int field_id, + const WriterProperties& properties, + const ArrowWriterProperties& arrow_properties, NodePtr* out) { + const auto& value_field = type->value_field(); + if (value_field->type()->id() != ::arrow::Type::STRUCT && + (::arrow::is_nested(*value_field->type()) || + !::arrow::is_fixed_width(*value_field->type()) || + value_field->type()->id() == ::arrow::Type::DICTIONARY || + value_field->type()->id() == ::arrow::Type::EXTENSION)) { + return Status::NotImplemented( + "VECTOR repetition only supports fixed-width primitive or struct FixedSizeList " + "elements"); + } + + NodePtr element; + std::string value_name = + arrow_properties.compliant_nested_types() ? "element" : value_field->name(); + RETURN_NOT_OK( + FieldToNode(value_name, value_field, properties, arrow_properties, &element)); + NodePtr vector_element; + if (element->is_primitive()) { + const auto& primitive = checked_cast(*element); + if (value_field->nullable()) { + // VECTOR itself carries the fixed multiplicity. A nullable child leaf below the + // VECTOR node carries element-level nullability, distinct from parent vector-row + // nullability. + vector_element = GroupNode::Make(value_name, Repetition::VECTOR, {element}, + /*logical_type=*/nullptr, primitive.field_id(), + type->list_size()); + } else { + vector_element = + PrimitiveNode::Make(value_name, Repetition::VECTOR, primitive.logical_type(), + primitive.physical_type(), primitive.type_length(), + primitive.field_id(), type->list_size()); + } + } else if (value_field->type()->id() == ::arrow::Type::STRUCT && element->is_group() && + !value_field->nullable()) { + const auto& group = checked_cast(*element); + schema::NodeVector fields; + fields.reserve(group.field_count()); + for (int i = 0; i < group.field_count(); ++i) { + if (!group.field(i)->is_primitive()) { + return Status::NotImplemented( + "VECTOR struct elements only support primitive fields in MVP"); + } + fields.push_back(group.field(i)); + } + vector_element = + GroupNode::Make(value_name, Repetition::VECTOR, fields, + /*logical_type=*/nullptr, group.field_id(), type->list_size()); + } else { + return Status::NotImplemented( + "VECTOR repetition only supports primitive or non-null struct FixedSizeList " + "elements"); + } + *out = GroupNode::Make(name, RepetitionFromNullable(nullable), {vector_element}, + /*logical_type=*/nullptr, field_id); + return Status::OK(); +} + Status MapToNode(const std::shared_ptr<::arrow::MapType>& type, const std::string& name, bool nullable, int field_id, const WriterProperties& properties, const ArrowWriterProperties& arrow_properties, NodePtr* out) { @@ -453,7 +515,18 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, return StructToNode(struct_type, name, field->nullable(), field_id, properties, arrow_properties, out); } - case ArrowTypeId::FIXED_SIZE_LIST: + case ArrowTypeId::FIXED_SIZE_LIST: { + auto list_type = + std::static_pointer_cast<::arrow::FixedSizeListType>(field->type()); + if (arrow_properties.write_fixed_size_list_as_vector()) { + return FixedSizeListToNode(list_type, name, field->nullable(), field_id, + properties, arrow_properties, out); + } + auto base_list_type = + std::static_pointer_cast<::arrow::BaseListType>(field->type()); + return ListToNode(base_list_type, name, field->nullable(), field_id, properties, + arrow_properties, out); + } case ArrowTypeId::LARGE_LIST: case ArrowTypeId::LIST: { auto list_type = std::static_pointer_cast<::arrow::BaseListType>(field->type()); @@ -574,6 +647,13 @@ Status PopulateLeaf(int column_index, const std::shared_ptr& field, return Status::OK(); } +void MarkVectorSubtree(SchemaField* field) { + field->is_vector = true; + for (auto& child : field->children) { + MarkVectorSubtree(&child); + } +} + // Special case mentioned in the format spec: // If the name is array or uses the parent's name with `_tuple` appended, // this should be: @@ -618,6 +698,79 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels, SchemaTreeContext* ctx, const SchemaField* parent, SchemaField* out); +bool IsVectorGroup(const GroupNode& group) { + return group.field_count() == 1 && group.field(0)->is_vector(); +} + +Status VectorToSchemaField(const GroupNode& group, LevelInfo current_levels, + SchemaTreeContext* ctx, const SchemaField* parent, + SchemaField* out) { + if (group.is_repeated()) { + return Status::NotImplemented("VECTOR groups must not be repeated in MVP"); + } + if (group.field_count() != 1) { + return Status::Invalid("VECTOR groups must have a single child"); + } + const Node& child = *group.field(0); + if (!child.is_vector()) { + return Status::Invalid("VECTOR groups must contain a VECTOR child"); + } + if (child.is_optional() || child.is_repeated()) { + return Status::Invalid("VECTOR children must not also be OPTIONAL or REPEATED"); + } + + if (group.is_optional()) { + current_levels.IncrementOptional(); + } + const LevelInfo vector_level = current_levels; + + out->children.resize(1); + SchemaField* child_field = &out->children[0]; + ctx->LinkParent(out, parent); + ctx->LinkParent(child_field, out); + + if (child.is_primitive()) { + const auto& primitive_node = static_cast(child); + int column_index = ctx->schema->GetColumnIndex(primitive_node); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr type, + GetTypeForNode(column_index, primitive_node, ctx)); + auto item_field = ::arrow::field(child.name(), type, /*nullable=*/false, + FieldIdMetadata(child.field_id())); + RETURN_NOT_OK( + PopulateLeaf(column_index, item_field, current_levels, ctx, out, child_field)); + } else { + const auto& vector_group = static_cast(child); + if (vector_group.field_count() == 1 && vector_group.field(0)->is_primitive()) { + const Node& element = *vector_group.field(0); + if (element.is_repeated() || element.is_vector()) { + return Status::Invalid("VECTOR element children must not be REPEATED or VECTOR"); + } + bool element_nullable = element.is_optional(); + if (element_nullable) { + current_levels.IncrementOptional(); + } + const auto& primitive_node = static_cast(element); + int column_index = ctx->schema->GetColumnIndex(primitive_node); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr type, + GetTypeForNode(column_index, primitive_node, ctx)); + auto item_field = ::arrow::field(element.name(), type, element_nullable, + FieldIdMetadata(element.field_id())); + RETURN_NOT_OK( + PopulateLeaf(column_index, item_field, current_levels, ctx, out, child_field)); + } else { + RETURN_NOT_OK(GroupToStruct(vector_group, current_levels, ctx, out, child_field)); + } + } + + MarkVectorSubtree(child_field); + out->field = ::arrow::field( + group.name(), ::arrow::fixed_size_list(child_field->field, child.vector_length()), + group.is_optional(), FieldIdMetadata(group.field_id())); + out->level_info = vector_level; + out->is_vector = true; + return Status::OK(); +} + Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels, SchemaTreeContext* ctx, const SchemaField* parent, SchemaField* out) { @@ -842,6 +995,9 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels, Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels, SchemaTreeContext* ctx, const SchemaField* parent, SchemaField* out) { + if (IsVectorGroup(node)) { + return VectorToSchemaField(node, current_levels, ctx, parent, out); + } if (node.logical_type()->is_list()) { return ListToSchemaField(node, current_levels, ctx, parent, out); } else if (node.logical_type()->is_map()) { @@ -1126,6 +1282,16 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, } } + if (origin_type->id() == ::arrow::Type::HALF_FLOAT && + inferred_type->id() == ::arrow::Type::FIXED_SIZE_BINARY && + checked_cast(*inferred_type).byte_width() == + static_cast(sizeof(uint16_t))) { + // Older float16 files may only carry the original Arrow schema metadata and not + // the Parquet FLOAT16 logical type. Restore the intended Arrow type. + inferred->field = inferred->field->WithType(origin_type); + modified = true; + } + // Restore field metadata std::shared_ptr field_metadata = origin_field.metadata(); if (field_metadata != nullptr) { diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h index dd60fde43422..7ab0cc31dd05 100644 --- a/cpp/src/parquet/arrow/schema.h +++ b/cpp/src/parquet/arrow/schema.h @@ -96,6 +96,11 @@ struct PARQUET_EXPORT SchemaField { parquet::internal::LevelInfo level_info; + // True when this Arrow field is backed by a Parquet VECTOR node. VECTOR may + // be represented either directly as a primitive leaf (non-nullable elements) + // or as an intermediate VECTOR group containing a nullable element leaf. + bool is_vector = false; + bool is_leaf() const { return column_index != -1; } }; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 4b2b06e5e097..c0116cb4c248 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -20,12 +20,14 @@ #include #include #include +#include #include #include #include #include #include "arrow/array.h" +#include "arrow/array/concatenate.h" #include "arrow/extension_type.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" @@ -107,6 +109,58 @@ bool HasNullableRoot(const SchemaManifest& schema_manifest, return nullable; } +int64_t CountVisitedValues(const std::vector& visited_elements) { + return std::accumulate( + visited_elements.begin(), visited_elements.end(), int64_t{0}, + [](int64_t total, const ElementRange& range) { return total + range.Size(); }); +} + +Result> MaterializeVectorLeafArray( + const MultipathLevelBuilderResult& result, ArrowWriteContext* ctx, + bool* leaf_is_nullable) { + const auto& visited_elements = result.post_list_visited_elements; + DCHECK_GT(visited_elements.size(), 0); + + // Nullable VECTOR rows still need one child slot per vector element so that the + // generic spaced leaf writer can align leaf slots with the VECTOR def levels. Arrow + // FixedSizeList already stores child slots for null parent rows, so preserve a + // zero-copy slice over the complete child range and let WriteArrow's def-level-derived + // validity bitmap suppress values belonging to null vector rows. + if (CountVisitedValues(visited_elements) == result.def_rep_level_count && + visited_elements.size() == 1) { + const ElementRange& range = visited_elements[0]; + auto values = result.leaf_array->Slice(range.start, range.Size()); + if (values->null_count() != 0) { + *leaf_is_nullable = true; + } + return values; + } + + ::arrow::ArrayVector parts; + parts.reserve(visited_elements.size() * 2 + 1); + int64_t position = 0; + for (const ElementRange& range : visited_elements) { + if (range.start > position) { + ARROW_ASSIGN_OR_RAISE( + auto null_values, + ::arrow::MakeArrayOfNull(result.leaf_array->type(), range.start - position, + ctx->memory_pool)); + parts.push_back(std::move(null_values)); + } + parts.push_back(result.leaf_array->Slice(range.start, range.Size())); + position = range.end; + } + if (position < result.def_rep_level_count) { + ARROW_ASSIGN_OR_RAISE(auto null_values, + ::arrow::MakeArrayOfNull(result.leaf_array->type(), + result.def_rep_level_count - position, + ctx->memory_pool)); + parts.push_back(std::move(null_values)); + } + *leaf_is_nullable = true; + return ::arrow::Concatenate(parts, ctx->memory_pool); +} + Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, const ArrowWriterProperties& properties, std::shared_ptr* out) { @@ -134,6 +188,41 @@ Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* poo return Status::OK(); } +Status ValidateVectorColumnProperties(const SchemaDescriptor* schema, + const WriterProperties& properties) { + for (int i = 0; i < schema->num_columns(); ++i) { + const ColumnDescriptor* column = schema->Column(i); + if (!column->schema_node()->is_vector()) { + continue; + } + const auto& path = column->path(); + if (properties.dictionary_enabled(path)) { + return Status::Invalid( + "Experimental VECTOR encoding does not support dictionary " + "encoding for column '", + path->ToDotString(), "'"); + } + if (properties.encoding(path) != Encoding::PLAIN) { + return Status::Invalid( + "Experimental VECTOR encoding currently requires PLAIN encoding for column '", + path->ToDotString(), "'"); + } + if (properties.statistics_enabled(path)) { + return Status::Invalid( + "Experimental VECTOR encoding does not support statistics " + "for column '", + path->ToDotString(), "'"); + } + if (properties.page_index_enabled(path)) { + return Status::Invalid( + "Experimental VECTOR encoding does not support page index " + "for column '", + path->ToDotString(), "'"); + } + } + return Status::OK(); +} + // Manages writing nested parquet columns with support for all nested types // supported by parquet. class ArrowColumnWriterV2 { @@ -169,17 +258,22 @@ class ArrowColumnWriterV2 { leaf_idx, ctx, [&](const MultipathLevelBuilderResult& result) { size_t visited_component_size = result.post_list_visited_elements.size(); DCHECK_GT(visited_component_size, 0); - if (visited_component_size != 1) { + std::shared_ptr values_array; + bool leaf_is_nullable = result.leaf_is_nullable; + if (result.leaf_is_vector) { + ARROW_ASSIGN_OR_RAISE(values_array, MaterializeVectorLeafArray( + result, ctx, &leaf_is_nullable)); + } else if (visited_component_size == 1) { + const ElementRange& range = result.post_list_visited_elements[0]; + values_array = result.leaf_array->Slice(range.start, range.Size()); + } else { return Status::NotImplemented( "Lists with non-zero length null components are not supported"); } - const ElementRange& range = result.post_list_visited_elements[0]; - std::shared_ptr values_array = - result.leaf_array->Slice(range.start, range.Size()); return column_writer->WriteArrow(result.def_levels, result.rep_levels, result.def_rep_level_count, *values_array, - ctx, result.leaf_is_nullable); + ctx, leaf_is_nullable); })); } @@ -201,7 +295,7 @@ class ArrowColumnWriterV2 { static ::arrow::Result> Make( const ChunkedArray& data, int64_t offset, const int64_t size, const SchemaManifest& schema_manifest, RowGroupWriter* row_group_writer, - int start_leaf_column_index = -1) { + bool write_fixed_size_list_as_vector, int start_leaf_column_index = -1) { int64_t absolute_position = 0; int chunk_index = 0; int64_t chunk_offset = 0; @@ -271,8 +365,10 @@ class ArrowColumnWriterV2 { std::shared_ptr array_to_write = chunk.Slice(chunk_offset, chunk_write_size); if (array_to_write->length() > 0) { - ARROW_ASSIGN_OR_RAISE(std::unique_ptr builder, - MultipathLevelBuilder::Make(*array_to_write, is_nullable)); + ARROW_ASSIGN_OR_RAISE( + std::unique_ptr builder, + MultipathLevelBuilder::Make(*array_to_write, is_nullable, + write_fixed_size_list_as_vector)); if (leaf_count != builder->GetLeafCount()) { return Status::UnknownError("data type leaf_count != builder_leaf_count", leaf_count, " ", builder->GetLeafCount()); @@ -328,6 +424,8 @@ class FileWriterImpl : public FileWriter { } Status Init() { + RETURN_NOT_OK( + ValidateVectorColumnProperties(writer_->schema(), *writer_->properties())); return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr, default_arrow_reader_properties(), &schema_manifest_); } @@ -375,10 +473,10 @@ class FileWriterImpl : public FileWriter { if (row_group_writer_->buffered()) { return Status::Invalid("Cannot write column chunk into the buffered row group."); } - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr writer, - ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_, - row_group_writer_)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr writer, + ArrowColumnWriterV2::Make( + *data, offset, size, schema_manifest_, row_group_writer_, + arrow_properties_->write_fixed_size_list_as_vector())); return writer->Write(&column_write_context_); } @@ -459,8 +557,10 @@ class FileWriterImpl : public FileWriter { ChunkedArray chunked_array{batch.column(i)}; ARROW_ASSIGN_OR_RAISE( std::unique_ptr writer, - ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, - row_group_writer_, column_index_start)); + ArrowColumnWriterV2::Make( + chunked_array, offset, size, schema_manifest_, row_group_writer_, + arrow_properties_->write_fixed_size_list_as_vector(), + column_index_start)); column_index_start += writer->leaf_count(); if (arrow_properties_->use_threads()) { writers.emplace_back(std::move(writer)); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index b3ed46ee2d28..e3e250e731b1 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1185,6 +1185,31 @@ inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size, } } +template +inline void DoInBatchesVectorNonRepeated(int64_t num_levels, int64_t batch_size, + int64_t max_rows_per_page, int32_t vector_length, + Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { + ARROW_DCHECK_GT(vector_length, 0); + ARROW_DCHECK_EQ(num_levels % vector_length, 0); + + const int64_t total_rows = num_levels / vector_length; + int64_t row_offset = 0; + while (row_offset < total_rows) { + int64_t page_buffered_rows = curr_page_buffered_rows(); + ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); + + int64_t max_batch_rows = std::max(1, batch_size / vector_length); + max_batch_rows = std::min(max_batch_rows, total_rows - row_offset); + max_batch_rows = std::min(max_batch_rows, max_rows_per_page - page_buffered_rows); + int64_t level_offset = row_offset * vector_length; + int64_t level_count = max_batch_rows * vector_length; + + action(level_offset, level_count, /*check_page_limit=*/true); + row_offset += max_batch_rows; + } +} + // DoInBatches for repeated columns template inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_levels, @@ -1240,12 +1265,20 @@ inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_le template inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page, - bool pages_change_on_record_boundaries, Action&& action, + bool pages_change_on_record_boundaries, bool is_vector, + int32_t vector_length, Action&& action, GetBufferedRows&& curr_page_buffered_rows) { if (!rep_levels) { - DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page, - std::forward(action), - std::forward(curr_page_buffered_rows)); + if (is_vector) { + DoInBatchesVectorNonRepeated( + num_levels, batch_size, max_rows_per_page, vector_length, + std::forward(action), + std::forward(curr_page_buffered_rows)); + } else { + DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page, + std::forward(action), + std::forward(curr_page_buffered_rows)); + } } else { DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size, max_rows_per_page, pages_change_on_record_boundaries, std::forward(action), @@ -1366,7 +1399,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), properties_->max_rows_per_page(), pages_change_on_record_boundaries(), - WriteChunk, [this]() { return num_buffered_rows_; }); + descr_->schema_node()->is_vector(), + descr_->schema_node()->vector_length(), WriteChunk, + [this]() { return num_buffered_rows_; }); return value_offset; } @@ -1417,16 +1452,21 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), properties_->max_rows_per_page(), pages_change_on_record_boundaries(), - WriteChunk, [this]() { return num_buffered_rows_; }); + descr_->schema_node()->is_vector(), + descr_->schema_node()->vector_length(), WriteChunk, + [this]() { return num_buffered_rows_; }); } Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& leaf_array, ArrowWriteContext* ctx, bool leaf_field_nullable) override { BEGIN_PARQUET_CATCH_EXCEPTIONS + const bool is_vector = descr_->schema_node()->is_vector(); // Leaf nulls are canonical when there is only a single null element after a list - // and it is at the leaf. + // and it is at the leaf. VECTOR parent nulls may also be materialized as spaced + // null slots in the leaf array, so do not treat those as single nullable elements. bool single_nullable_element = + !is_vector && (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && leaf_field_nullable; if (!leaf_field_nullable && leaf_array.null_count() != 0) { @@ -1700,11 +1740,21 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, } WriteRepetitionLevels(num_levels, rep_levels); + } else if (descr_->schema_node()->is_vector()) { + const int32_t vector_length = descr_->schema_node()->vector_length(); + if (vector_length <= 0 || num_levels % vector_length != 0) { + throw ParquetException("VECTOR columns must be written in whole-vector batches"); + } + rows_written_ += num_levels / vector_length; + num_buffered_rows_ += num_levels / vector_length; } else { // Each value is exactly one row rows_written_ += num_levels; num_buffered_rows_ += num_levels; } + if (descr_->schema_node()->is_vector() && rep_levels != nullptr) { + throw ParquetException("VECTOR columns must not write repetition levels"); + } return values_to_write; } @@ -1794,11 +1844,21 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, } } WriteRepetitionLevels(num_levels, rep_levels); + } else if (descr_->schema_node()->is_vector()) { + const int32_t vector_length = descr_->schema_node()->vector_length(); + if (vector_length <= 0 || num_levels % vector_length != 0) { + throw ParquetException("VECTOR columns must be written in whole-vector batches"); + } + rows_written_ += num_levels / vector_length; + num_buffered_rows_ += num_levels / vector_length; } else { // Each value is exactly one row rows_written_ += num_levels; num_buffered_rows_ += num_levels; } + if (descr_->schema_node()->is_vector() && rep_levels != nullptr) { + throw ParquetException("VECTOR columns must not write repetition levels"); + } } void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels, @@ -2076,10 +2136,11 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( return WriteDense(); } - PARQUET_CATCH_NOT_OK( - DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(), - properties_->max_rows_per_page(), pages_change_on_record_boundaries(), - WriteIndicesChunk, [this]() { return num_buffered_rows_; })); + PARQUET_CATCH_NOT_OK(DoInBatches( + def_levels, rep_levels, num_levels, properties_->write_batch_size(), + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + descr_->schema_node()->is_vector(), descr_->schema_node()->vector_length(), + WriteIndicesChunk, [this]() { return num_buffered_rows_; })); return Status::OK(); } @@ -2538,10 +2599,11 @@ Status TypedColumnWriterImpl::WriteArrowDense( value_offset += batch_num_spaced_values; }; - PARQUET_CATCH_NOT_OK( - DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(), - properties_->max_rows_per_page(), pages_change_on_record_boundaries(), - WriteChunk, [this]() { return num_buffered_rows_; })); + PARQUET_CATCH_NOT_OK(DoInBatches( + def_levels, rep_levels, num_levels, properties_->write_batch_size(), + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + descr_->schema_node()->is_vector(), descr_->schema_node()->vector_length(), + WriteChunk, [this]() { return num_buffered_rows_; })); return Status::OK(); } diff --git a/cpp/src/parquet/parquet.thrift b/cpp/src/parquet/parquet.thrift index e3cc5adb9648..58b64469ee3b 100644 --- a/cpp/src/parquet/parquet.thrift +++ b/cpp/src/parquet/parquet.thrift @@ -190,6 +190,12 @@ enum FieldRepetitionType { /** The field is repeated and can contain 0 or more values */ REPEATED = 2; + + /** + * This field repeats a fixed number of times per parent value without increasing + * the maximum definition or repetition level of its descendants. + */ + VECTOR = 3; } /** @@ -550,13 +556,16 @@ struct SchemaElement { */ 9: optional i32 field_id; + /** Required when repetition_type is VECTOR. May be 0 for zero-length vectors. */ + 10: optional i32 vector_length; + /** * The logical type of this SchemaElement * * LogicalType replaces ConvertedType, but ConvertedType is still required * for some logical types to ensure forward-compatibility in format v1. */ - 10: optional LogicalType logicalType + 11: optional LogicalType logicalType } /** diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 6634bac4f684..20d097ad6c37 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1298,6 +1298,7 @@ class PARQUET_EXPORT ArrowWriterProperties { truncated_timestamps_allowed_(false), store_schema_(false), compliant_nested_types_(true), + write_fixed_size_list_as_vector_(false), engine_version_(V2), use_threads_(kArrowDefaultUseThreads), executor_(NULLPTR), @@ -1367,6 +1368,17 @@ class PARQUET_EXPORT ArrowWriterProperties { return this; } + /// \brief EXPERIMENTAL: encode Arrow FixedSizeList as Parquet VECTOR. + Builder* enable_experimental_vector_encoding() { + write_fixed_size_list_as_vector_ = true; + return this; + } + + Builder* disable_experimental_vector_encoding() { + write_fixed_size_list_as_vector_ = false; + return this; + } + /// Set the version of the Parquet writer engine. Builder* set_engine_version(EngineVersion version) { engine_version_ = version; @@ -1409,7 +1421,8 @@ class PARQUET_EXPORT ArrowWriterProperties { return std::shared_ptr(new ArrowWriterProperties( write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, truncated_timestamps_allowed_, store_schema_, compliant_nested_types_, - engine_version_, use_threads_, executor_, write_time_adjusted_to_utc_)); + write_fixed_size_list_as_vector_, engine_version_, use_threads_, executor_, + write_time_adjusted_to_utc_)); } private: @@ -1421,6 +1434,7 @@ class PARQUET_EXPORT ArrowWriterProperties { bool store_schema_; bool compliant_nested_types_; + bool write_fixed_size_list_as_vector_; EngineVersion engine_version_; bool use_threads_; @@ -1447,6 +1461,10 @@ class PARQUET_EXPORT ArrowWriterProperties { /// "element". bool compliant_nested_types() const { return compliant_nested_types_; } + bool write_fixed_size_list_as_vector() const { + return write_fixed_size_list_as_vector_; + } + /// \brief The underlying engine version to use when writing Arrow data. /// /// V2 is currently the latest V1 is considered deprecated but left in @@ -1471,6 +1489,7 @@ class PARQUET_EXPORT ArrowWriterProperties { ::arrow::TimeUnit::type coerce_timestamps_unit, bool truncated_timestamps_allowed, bool store_schema, bool compliant_nested_types, + bool write_fixed_size_list_as_vector, EngineVersion engine_version, bool use_threads, ::arrow::internal::Executor* executor, bool write_time_adjusted_to_utc) @@ -1480,6 +1499,7 @@ class PARQUET_EXPORT ArrowWriterProperties { truncated_timestamps_allowed_(truncated_timestamps_allowed), store_schema_(store_schema), compliant_nested_types_(compliant_nested_types), + write_fixed_size_list_as_vector_(write_fixed_size_list_as_vector), engine_version_(engine_version), use_threads_(use_threads), executor_(executor), @@ -1491,6 +1511,7 @@ class PARQUET_EXPORT ArrowWriterProperties { const bool truncated_timestamps_allowed_; const bool store_schema_; const bool compliant_nested_types_; + const bool write_fixed_size_list_as_vector_; const EngineVersion engine_version_; const bool use_threads_; ::arrow::internal::Executor* executor_; diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc index 0cfa49c21c16..f069c5b74153 100644 --- a/cpp/src/parquet/schema.cc +++ b/cpp/src/parquet/schema.cc @@ -50,6 +50,16 @@ void CheckColumnBounds(int column_index, size_t max_columns) { } } +void ValidateVectorProperties(Repetition::type repetition, int32_t vector_length) { + if (repetition == Repetition::VECTOR) { + if (vector_length < 0) { + throw ParquetException("VECTOR nodes must specify a non-negative vector_length"); + } + } else if (vector_length != -1) { + throw ParquetException("Only VECTOR nodes may specify vector_length"); + } +} + } // namespace namespace schema { @@ -117,7 +127,7 @@ const std::shared_ptr Node::path() const { bool Node::EqualsInternal(const Node* other) const { return type_ == other->type_ && name_ == other->name_ && repetition_ == other->repetition_ && converted_type_ == other->converted_type_ && - field_id_ == other->field_id() && + field_id_ == other->field_id() && vector_length_ == other->vector_length() && logical_type_->Equals(*(other->logical_type())); } @@ -128,10 +138,12 @@ void Node::SetParent(const Node* parent) { parent_ = parent; } PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetition, Type::type type, ConvertedType::type converted_type, - int length, int precision, int scale, int id) - : Node(Node::PRIMITIVE, name, repetition, converted_type, id), + int length, int precision, int scale, int id, + int32_t vector_length) + : Node(Node::PRIMITIVE, name, repetition, converted_type, id, vector_length), physical_type_(type), type_length_(length) { + ValidateVectorProperties(repetition, vector_length); std::stringstream ss; // PARQUET-842: In an earlier revision, decimal_metadata_.isset was being @@ -241,10 +253,12 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetition, std::shared_ptr logical_type, - Type::type physical_type, int physical_length, int id) - : Node(Node::PRIMITIVE, name, repetition, std::move(logical_type), id), + Type::type physical_type, int physical_length, int id, + int32_t vector_length) + : Node(Node::PRIMITIVE, name, repetition, std::move(logical_type), id, vector_length), physical_type_(physical_type), type_length_(physical_length) { + ValidateVectorProperties(repetition, vector_length); std::stringstream error; if (logical_type_) { // Check for logical type <=> node type consistency @@ -315,8 +329,11 @@ void PrimitiveNode::VisitConst(Node::ConstVisitor* visitor) const { // Group node GroupNode::GroupNode(const std::string& name, Repetition::type repetition, - const NodeVector& fields, ConvertedType::type converted_type, int id) - : Node(Node::GROUP, name, repetition, converted_type, id), fields_(fields) { + const NodeVector& fields, ConvertedType::type converted_type, int id, + int32_t vector_length) + : Node(Node::GROUP, name, repetition, converted_type, id, vector_length), + fields_(fields) { + ValidateVectorProperties(repetition, vector_length); // For forward compatibility, create an equivalent logical type logical_type_ = LogicalType::FromConvertedType(converted_type_); if (!(logical_type_ && (logical_type_->is_nested() || logical_type_->is_none()) && @@ -334,8 +351,11 @@ GroupNode::GroupNode(const std::string& name, Repetition::type repetition, GroupNode::GroupNode(const std::string& name, Repetition::type repetition, const NodeVector& fields, - std::shared_ptr logical_type, int id) - : Node(Node::GROUP, name, repetition, std::move(logical_type), id), fields_(fields) { + std::shared_ptr logical_type, int id, + int32_t vector_length) + : Node(Node::GROUP, name, repetition, std::move(logical_type), id, vector_length), + fields_(fields) { + ValidateVectorProperties(repetition, vector_length); if (logical_type_) { // Check for logical type <=> node type consistency if (logical_type_->is_nested()) { @@ -423,19 +443,23 @@ std::unique_ptr GroupNode::FromParquet(const void* opaque_element, if (element->__isset.field_id) { field_id = element->field_id; } + int32_t vector_length = -1; + if (element->__isset.vector_length) { + vector_length = element->vector_length; + } std::unique_ptr group_node; if (element->__isset.logicalType) { // updated writer with logical type present - group_node = std::unique_ptr( - new GroupNode(element->name, LoadEnumSafe(&element->repetition_type), fields, - LogicalType::FromThrift(element->logicalType), field_id)); + group_node = std::unique_ptr(new GroupNode( + element->name, LoadEnumSafe(&element->repetition_type), fields, + LogicalType::FromThrift(element->logicalType), field_id, vector_length)); } else { group_node = std::unique_ptr(new GroupNode( element->name, LoadEnumSafe(&element->repetition_type), fields, (element->__isset.converted_type ? LoadEnumSafe(&element->converted_type) : ConvertedType::NONE), - field_id)); + field_id, vector_length)); } return std::unique_ptr(group_node.release()); @@ -449,25 +473,30 @@ std::unique_ptr PrimitiveNode::FromParquet(const void* opaque_element) { if (element->__isset.field_id) { field_id = element->field_id; } + int32_t vector_length = -1; + if (element->__isset.vector_length) { + vector_length = element->vector_length; + } std::unique_ptr primitive_node; if (element->__isset.logicalType) { // updated writer with logical type present - primitive_node = std::unique_ptr( - new PrimitiveNode(element->name, LoadEnumSafe(&element->repetition_type), - LogicalType::FromThrift(element->logicalType), - LoadEnumSafe(&element->type), element->type_length, field_id)); - } else if (element->__isset.converted_type) { - // legacy writer with converted type present primitive_node = std::unique_ptr(new PrimitiveNode( element->name, LoadEnumSafe(&element->repetition_type), - LoadEnumSafe(&element->type), LoadEnumSafe(&element->converted_type), - element->type_length, element->precision, element->scale, field_id)); + LogicalType::FromThrift(element->logicalType), LoadEnumSafe(&element->type), + element->type_length, field_id, vector_length)); + } else if (element->__isset.converted_type) { + // legacy writer with converted type present + primitive_node = std::unique_ptr( + new PrimitiveNode(element->name, LoadEnumSafe(&element->repetition_type), + LoadEnumSafe(&element->type), + LoadEnumSafe(&element->converted_type), element->type_length, + element->precision, element->scale, field_id, vector_length)); } else { // logical type not present primitive_node = std::unique_ptr(new PrimitiveNode( element->name, LoadEnumSafe(&element->repetition_type), NoLogicalType::Make(), - LoadEnumSafe(&element->type), element->type_length, field_id)); + LoadEnumSafe(&element->type), element->type_length, field_id, vector_length)); } // Return as unique_ptr to the base type @@ -499,6 +528,9 @@ void GroupNode::ToParquet(void* opaque_element) const { if (field_id_ >= 0) { element->__set_field_id(field_id_); } + if (is_vector()) { + element->__set_vector_length(vector_length_); + } if (logical_type_ && logical_type_->is_serialized()) { element->__set_logicalType(logical_type_->ToThrift()); } @@ -524,6 +556,9 @@ void PrimitiveNode::ToParquet(void* opaque_element) const { if (field_id_ >= 0) { element->__set_field_id(field_id_); } + if (is_vector()) { + element->__set_vector_length(vector_length_); + } if (logical_type_ && logical_type_->is_serialized() && // TODO(tpboudreau): remove the following conjunct to enable serialization // of IntervalTypes after parquet.thrift recognizes them @@ -634,6 +669,9 @@ static void PrintRepLevel(Repetition::type repetition, std::ostream& stream) { case Repetition::REPEATED: stream << "repeated"; break; + case Repetition::VECTOR: + stream << "vector"; + break; default: break; } @@ -710,6 +748,9 @@ struct SchemaPrinter : public Node::ConstVisitor { stream_ << " "; PrintType(node, stream_); stream_ << " field_id=" << node->field_id() << " " << node->name(); + if (node->is_vector()) { + stream_ << " [" << node->vector_length() << "]"; + } PrintConvertedType(node, stream_); stream_ << ";" << std::endl; } @@ -718,6 +759,9 @@ struct SchemaPrinter : public Node::ConstVisitor { PrintRepLevel(node->repetition(), stream_); stream_ << " group " << "field_id=" << node->field_id() << " " << node->name(); + if (node->is_vector()) { + stream_ << " [" << node->vector_length() << "]"; + } auto lt = node->converted_type(); const auto& la = node->logical_type(); if (la && la->is_valid() && !la->is_none()) { @@ -836,6 +880,9 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level, // between an empty list and a list with an item in it. ++max_rep_level; ++max_def_level; + } else if (node->is_vector()) { + // VECTOR fields repeat a fixed number of times per parent value without + // increasing the maximum definition or repetition level. } // Now, walk the schema and create a ColumnDescriptor for each leaf node @@ -930,8 +977,13 @@ std::string ColumnDescriptor::ToString() const { << " physical_type: " << TypeToString(physical_type()) << "," << std::endl << " converted_type: " << ConvertedTypeToString(converted_type()) << "," << std::endl - << " logical_type: " << logical_type()->ToString() << "," << std::endl - << " max_definition_level: " << max_definition_level() << "," << std::endl + << " logical_type: " << logical_type()->ToString() << "," << std::endl; + + if (schema_node()->is_vector()) { + ss << " vector_length: " << schema_node()->vector_length() << "," << std::endl; + } + + ss << " max_definition_level: " << max_definition_level() << "," << std::endl << " max_repetition_level: " << max_repetition_level() << "," << std::endl; if (physical_type() == ::parquet::Type::FIXED_LEN_BYTE_ARRAY) { diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h index 1addc73bd367..833b593c1be6 100644 --- a/cpp/src/parquet/schema.h +++ b/cpp/src/parquet/schema.h @@ -109,6 +109,8 @@ class PARQUET_EXPORT Node { bool is_repeated() const { return repetition_ == Repetition::REPEATED; } + bool is_vector() const { return repetition_ == Repetition::VECTOR; } + bool is_required() const { return repetition_ == Repetition::REQUIRED; } virtual bool Equals(const Node* other) const = 0; @@ -128,6 +130,10 @@ class PARQUET_EXPORT Node { /// Thrift. int field_id() const { return field_id_; } + /// \brief The fixed number of values per parent when repetition == VECTOR. + /// Returns -1 when this node is not VECTOR-repeated. + int32_t vector_length() const { return vector_length_; } + const Node* parent() const { return parent_; } const std::shared_ptr path() const; @@ -155,21 +161,25 @@ class PARQUET_EXPORT Node { friend class GroupNode; Node(Node::type type, const std::string& name, Repetition::type repetition, - ConvertedType::type converted_type = ConvertedType::NONE, int field_id = -1) + ConvertedType::type converted_type = ConvertedType::NONE, int field_id = -1, + int32_t vector_length = -1) : type_(type), name_(name), repetition_(repetition), converted_type_(converted_type), field_id_(field_id), + vector_length_(vector_length), parent_(NULLPTR) {} Node(Node::type type, const std::string& name, Repetition::type repetition, - std::shared_ptr logical_type, int field_id = -1) + std::shared_ptr logical_type, int field_id = -1, + int32_t vector_length = -1) : type_(type), name_(name), repetition_(repetition), logical_type_(std::move(logical_type)), field_id_(field_id), + vector_length_(vector_length), parent_(NULLPTR) {} Node::type type_; @@ -178,6 +188,7 @@ class PARQUET_EXPORT Node { ConvertedType::type converted_type_{ConvertedType::NONE}; std::shared_ptr logical_type_; int field_id_; + int32_t vector_length_; // Nodes should not be shared, they have a single parent. const Node* parent_; @@ -205,9 +216,9 @@ class PARQUET_EXPORT PrimitiveNode : public Node { Type::type type, ConvertedType::type converted_type = ConvertedType::NONE, int length = -1, int precision = -1, int scale = -1, - int field_id = -1) { + int field_id = -1, int32_t vector_length = -1) { return NodePtr(new PrimitiveNode(name, repetition, type, converted_type, length, - precision, scale, field_id)); + precision, scale, field_id, vector_length)); } // If no logical type, pass LogicalType::None() or nullptr @@ -215,9 +226,10 @@ class PARQUET_EXPORT PrimitiveNode : public Node { static inline NodePtr Make(const std::string& name, Repetition::type repetition, std::shared_ptr logical_type, Type::type primitive_type, int primitive_length = -1, - int field_id = -1) { + int field_id = -1, int32_t vector_length = -1) { return NodePtr(new PrimitiveNode(name, repetition, std::move(logical_type), - primitive_type, primitive_length, field_id)); + primitive_type, primitive_length, field_id, + vector_length)); } bool Equals(const Node* other) const override; @@ -239,11 +251,13 @@ class PARQUET_EXPORT PrimitiveNode : public Node { private: PrimitiveNode(const std::string& name, Repetition::type repetition, Type::type type, ConvertedType::type converted_type = ConvertedType::NONE, int length = -1, - int precision = -1, int scale = -1, int field_id = -1); + int precision = -1, int scale = -1, int field_id = -1, + int32_t vector_length = -1); PrimitiveNode(const std::string& name, Repetition::type repetition, std::shared_ptr logical_type, - Type::type primitive_type, int primitive_length = -1, int field_id = -1); + Type::type primitive_type, int primitive_length = -1, int field_id = -1, + int32_t vector_length = -1); Type::type physical_type_; int32_t type_length_; @@ -270,8 +284,9 @@ class PARQUET_EXPORT GroupNode : public Node { static inline NodePtr Make(const std::string& name, Repetition::type repetition, const NodeVector& fields, ConvertedType::type converted_type = ConvertedType::NONE, - int field_id = -1) { - return NodePtr(new GroupNode(name, repetition, fields, converted_type, field_id)); + int field_id = -1, int32_t vector_length = -1) { + return NodePtr( + new GroupNode(name, repetition, fields, converted_type, field_id, vector_length)); } // If no logical type, pass nullptr @@ -279,9 +294,9 @@ class PARQUET_EXPORT GroupNode : public Node { static inline NodePtr Make(const std::string& name, Repetition::type repetition, const NodeVector& fields, std::shared_ptr logical_type, - int field_id = -1) { - return NodePtr( - new GroupNode(name, repetition, fields, std::move(logical_type), field_id)); + int field_id = -1, int32_t vector_length = -1) { + return NodePtr(new GroupNode(name, repetition, fields, std::move(logical_type), + field_id, vector_length)); } bool Equals(const Node* other) const override; @@ -307,11 +322,12 @@ class PARQUET_EXPORT GroupNode : public Node { private: GroupNode(const std::string& name, Repetition::type repetition, const NodeVector& fields, - ConvertedType::type converted_type = ConvertedType::NONE, int field_id = -1); + ConvertedType::type converted_type = ConvertedType::NONE, int field_id = -1, + int32_t vector_length = -1); GroupNode(const std::string& name, Repetition::type repetition, const NodeVector& fields, std::shared_ptr logical_type, - int field_id = -1); + int field_id = -1, int32_t vector_length = -1); NodeVector fields_; bool EqualsInternal(const GroupNode* other) const; diff --git a/cpp/src/parquet/schema_test.cc b/cpp/src/parquet/schema_test.cc index 2950a7df70f8..9dc9d09463b9 100644 --- a/cpp/src/parquet/schema_test.cc +++ b/cpp/src/parquet/schema_test.cc @@ -43,7 +43,8 @@ namespace schema { static inline SchemaElement NewPrimitive(const std::string& name, FieldRepetitionType::type repetition, - Type::type type, int field_id = -1) { + Type::type type, int field_id = -1, + int32_t vector_length = -1) { SchemaElement result; result.__set_name(name); result.__set_repetition_type(repetition); @@ -51,12 +52,16 @@ static inline SchemaElement NewPrimitive(const std::string& name, if (field_id >= 0) { result.__set_field_id(field_id); } + if (vector_length >= 0) { + result.__set_vector_length(vector_length); + } return result; } static inline SchemaElement NewGroup(const std::string& name, FieldRepetitionType::type repetition, - int num_children, int field_id = -1) { + int num_children, int field_id = -1, + int32_t vector_length = -1) { SchemaElement result; result.__set_name(name); result.__set_repetition_type(repetition); @@ -65,6 +70,9 @@ static inline SchemaElement NewGroup(const std::string& name, if (field_id >= 0) { result.__set_field_id(field_id); } + if (vector_length >= 0) { + result.__set_vector_length(vector_length); + } return result; } @@ -217,6 +225,29 @@ TEST_F(TestPrimitiveNode, FromParquet) { ASSERT_EQ(12, prim_node_->decimal_metadata().precision); } +TEST_F(TestPrimitiveNode, VectorFromParquet) { + SchemaElement elt = + NewPrimitive(name_, FieldRepetitionType::VECTOR, Type::FLOAT, field_id_, 8); + + ASSERT_NO_FATAL_FAILURE(Convert(&elt)); + ASSERT_EQ(name_, prim_node_->name()); + ASSERT_EQ(field_id_, prim_node_->field_id()); + ASSERT_EQ(Repetition::VECTOR, prim_node_->repetition()); + ASSERT_TRUE(prim_node_->is_vector()); + ASSERT_EQ(8, prim_node_->vector_length()); + ASSERT_EQ(Type::FLOAT, prim_node_->physical_type()); +} + +TEST_F(TestPrimitiveNode, VectorValidation) { + ASSERT_THROW(PrimitiveNode::Make("vec", Repetition::VECTOR, Type::FLOAT), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("scalar", Repetition::REQUIRED, Type::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, 4), + ParquetException); + ASSERT_NO_THROW(PrimitiveNode::Make("empty", Repetition::VECTOR, Type::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, 0)); +} + TEST_F(TestPrimitiveNode, Equals) { PrimitiveNode node1("foo", Repetition::REQUIRED, Type::INT32); PrimitiveNode node2("foo", Repetition::REQUIRED, Type::INT64); @@ -365,6 +396,16 @@ TEST_F(TestGroupNode, Attrs) { ASSERT_EQ(ConvertedType::LIST, node2.converted_type()); } +TEST_F(TestGroupNode, VectorAttrs) { + auto node = GroupNode::Make("vec", Repetition::VECTOR, Fields1(), ConvertedType::NONE, + /*field_id=*/-1, + /*vector_length=*/4); + + ASSERT_TRUE(node->is_vector()); + ASSERT_EQ(Repetition::VECTOR, node->repetition()); + ASSERT_EQ(4, node->vector_length()); +} + TEST_F(TestGroupNode, Equals) { NodeVector f1 = Fields1(); NodeVector f2 = Fields1(); @@ -799,6 +840,21 @@ TEST_F(TestSchemaDescriptor, BuildTree) { ASSERT_EQ(nleaves, descr_.num_columns()); } +TEST_F(TestSchemaDescriptor, BuildTreeVector) { + NodePtr element = PrimitiveNode::Make("element", Repetition::VECTOR, Type::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, 3); + NodePtr embedding = GroupNode::Make("embedding", Repetition::OPTIONAL, {element}); + + descr_.Init(GroupNode::Make("schema", Repetition::REPEATED, {embedding})); + + ASSERT_EQ(1, descr_.num_columns()); + const ColumnDescriptor* col = descr_.Column(0); + EXPECT_EQ(1, col->max_definition_level()); + EXPECT_EQ(0, col->max_repetition_level()); + EXPECT_TRUE(col->schema_node()->is_vector()); + EXPECT_EQ(3, col->schema_node()->vector_length()); +} + TEST_F(TestSchemaDescriptor, HasRepeatedFields) { NodeVector fields; NodePtr schema; @@ -886,6 +942,22 @@ TEST(TestSchemaPrinter, Examples) { ASSERT_EQ(expected, result); } +TEST(TestSchemaPrinter, Vector) { + NodePtr schema = GroupNode::Make( + "schema", Repetition::REPEATED, + {GroupNode::Make("embedding", Repetition::OPTIONAL, + {PrimitiveNode::Make("element", Repetition::VECTOR, Type::FLOAT, + ConvertedType::NONE, -1, -1, -1, -1, 3)})}); + + std::string expected = R"(repeated group field_id=-1 schema { + optional group field_id=-1 embedding { + vector float field_id=-1 element [3]; + } +} +)"; + ASSERT_EQ(expected, Print(schema)); +} + static void ConfirmFactoryEquivalence( ConvertedType::type converted_type, const std::shared_ptr& from_make, diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index ad4df5119e75..74a9600b5aee 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -112,7 +112,13 @@ class LogicalType; // Mirrors parquet::FieldRepetitionType struct Repetition { - enum type { REQUIRED = 0, OPTIONAL = 1, REPEATED = 2, /*Always last*/ UNDEFINED = 3 }; + enum type { + REQUIRED = 0, + OPTIONAL = 1, + REPEATED = 2, + VECTOR = 3, + /*Always last*/ UNDEFINED = 4 + }; }; // Reference: