Skip to content

Commit cf205ae

Browse files
authored
GH-47027: [C++][Parquet] Fix repeated column pages not being written when reaching page size limit (#47032)
### Rationale for this change Ensures Parquet pages are written when the buffered data reaches the configured page size, while also ensuring pages are only split on record boundaries when required. Without this fix, page sizes can grow unbounded until the row group is closed. ### What changes are included in this PR? Fixes off-by-one error in logic to control when pages can be written. ### Are these changes tested? Yes, added a new unit test. ### Are there any user-facing changes? **This PR contains a "Critical Fix".** This bug could cause a crash when writing a large number of rows of a repeated column and reaching a page size > max int32. * GitHub Issue: #47027 Authored-by: Adam Reeve <adreeve@gmail.com> Signed-off-by: Adam Reeve <adreeve@gmail.com>
1 parent 2bfbfc8 commit cf205ae

2 files changed

Lines changed: 61 additions & 5 deletions

File tree

cpp/src/parquet/column_writer.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,7 +1176,7 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
11761176
while (offset < num_levels) {
11771177
int64_t end_offset = std::min(offset + batch_size, num_levels);
11781178

1179-
// Find next record boundary (i.e. ref_level = 0)
1179+
// Find next record boundary (i.e. rep_level = 0)
11801180
while (end_offset < num_levels && rep_levels[end_offset] != 0) {
11811181
end_offset++;
11821182
}
@@ -1196,13 +1196,14 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
11961196
last_record_begin_offset--;
11971197
}
11981198

1199-
if (offset < last_record_begin_offset) {
1199+
if (offset <= last_record_begin_offset) {
12001200
// We have found the beginning of last record and can check page size.
12011201
action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
12021202
offset = last_record_begin_offset;
12031203
}
12041204

1205-
// There is no record boundary in this chunk and cannot check page size.
1205+
// Write remaining data after the record boundary,
1206+
// or all data if no boundary was found.
12061207
action(offset, end_offset - offset, /*check_page_size=*/false);
12071208
}
12081209

cpp/src/parquet/column_writer_test.cc

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
108108
const ColumnProperties& column_properties = ColumnProperties(),
109109
const ParquetVersion::type version = ParquetVersion::PARQUET_1_0,
110110
const ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1,
111-
bool enable_checksum = false) {
111+
bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize) {
112112
sink_ = CreateOutputStream();
113113
WriterProperties::Builder wp_builder;
114114
wp_builder.version(version)->data_page_version(data_page_version);
@@ -124,6 +124,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
124124
wp_builder.enable_page_checksum();
125125
}
126126
wp_builder.max_statistics_size(column_properties.max_statistics_size());
127+
wp_builder.data_pagesize(page_size);
127128
writer_properties_ = wp_builder.build();
128129

129130
metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_);
@@ -938,6 +939,60 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
938939
ASSERT_TRUE(this->metadata_is_stats_set());
939940
}
940941

942+
// Test for https://github.com/apache/arrow/issues/47027.
943+
// When writing a repeated column with page indexes enabled
944+
// and batches that are aligned with list boundaries,
945+
// pages should be written after reaching the page limit.
946+
TEST_F(TestValuesWriterInt32Type, PagesSplitWithListAlignedWrites) {
947+
this->SetUpSchema(Repetition::REPEATED);
948+
949+
constexpr int list_length = 10;
950+
constexpr int num_rows = 100;
951+
constexpr int64_t page_size = sizeof(int32_t) * 100;
952+
953+
this->GenerateData(num_rows * list_length);
954+
955+
std::vector<int16_t> repetition_levels(list_length, 1);
956+
repetition_levels[0] = 0;
957+
958+
ColumnProperties column_properties;
959+
column_properties.set_dictionary_enabled(false);
960+
column_properties.set_encoding(Encoding::PLAIN);
961+
column_properties.set_page_index_enabled(true);
962+
963+
auto writer =
964+
this->BuildWriter(list_length, column_properties, ParquetVersion::PARQUET_1_0,
965+
ParquetDataPageVersion::V1, false, page_size);
966+
967+
int64_t pages_written = 0;
968+
int64_t prev_bytes_written = 0;
969+
970+
for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
971+
writer->WriteBatch(list_length, def_levels_.data(), repetition_levels.data(),
972+
values_ptr_ + row_idx * list_length);
973+
974+
int64_t bytes_written = writer->total_bytes_written();
975+
if (bytes_written != prev_bytes_written) {
976+
pages_written++;
977+
prev_bytes_written = bytes_written;
978+
}
979+
// Buffered bytes shouldn't grow larger than the specified page size
980+
ASSERT_LE(writer->estimated_buffered_value_bytes(), page_size);
981+
}
982+
983+
writer->Close();
984+
985+
// pages_written doesn't include the last page written when closing the writer:
986+
ASSERT_EQ(pages_written, 9);
987+
988+
this->SetupValuesOut(num_rows * list_length);
989+
definition_levels_out_.resize(num_rows * list_length);
990+
repetition_levels_out_.resize(num_rows * list_length);
991+
this->ReadColumnFully();
992+
993+
ASSERT_EQ(values_out_, values_);
994+
}
995+
941996
TEST(TestPageWriter, ThrowsOnPagesTooLarge) {
942997
NodePtr item = schema::Int32("item"); // optional item
943998
NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST));
@@ -1481,7 +1536,7 @@ TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesWithSmallBatches) {
14811536
auto row_group_reader = file_reader->RowGroup(0);
14821537

14831538
// Check if pages are changed on record boundaries.
1484-
const std::array<int64_t, num_cols> expect_num_pages_by_col = {5, 201, 397, 201};
1539+
const std::array<int64_t, num_cols> expect_num_pages_by_col = {5, 201, 397, 400};
14851540
const std::array<int64_t, num_cols> expect_num_rows_1st_page_by_col = {99, 1, 1, 1};
14861541
const std::array<int64_t, num_cols> expect_num_vals_1st_page_by_col = {99, 50, 99, 150};
14871542
for (int32_t i = 0; i < num_cols; ++i) {

0 commit comments

Comments
 (0)