-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-48467: [C++][Parquet] Add configure to limit the row group size #48468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
Thanks for working on this! Since I'm still new to the Arrow codebase, I reviewed the PR at a high level and it helped me understand how WriterProperties and row group configuration are implemented. I don’t have enough experience yet to provide a full technical review, but the approach looks consistent with the design discussed in the issue. Thanks again for sharing this! |
HuaHuaY
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
cpp/src/parquet/file_writer.cc
Outdated
| return contents_->total_compressed_bytes_written(); | ||
| } | ||
|
|
||
| int64_t RowGroupWriter::current_buffered_bytes() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name is a little misleading because readers may think it is same as contents_->estimated_buffered_value_bytes().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to total_buffered_bytes()
cpp/src/parquet/arrow/writer.cc
Outdated
| chunk_size = this->properties().max_row_group_length(); | ||
| } | ||
| // max_row_group_bytes is applied only after the row group has accumulated data. | ||
| if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row_group_writer_->num_rows() > 0 can only happen when the current row group writer is in the buffered mode. Usually users calling WriteTable will never use buffered mode so this approach seems not working in the majority of cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, can we gather this information from all written row groups (if available)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac If user use the static WriteTable function, the arrow FileWriter is always recreated and we can not gather the old written row groups.
arrow/cpp/src/parquet/arrow/writer.cc
Lines 591 to 601 in 8040f2a
| Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, | |
| std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size, | |
| std::shared_ptr<WriterProperties> properties, | |
| std::shared_ptr<ArrowWriterProperties> arrow_properties) { | |
| std::unique_ptr<FileWriter> writer; | |
| ARROW_ASSIGN_OR_RAISE( | |
| writer, FileWriter::Open(*table.schema(), pool, std::move(sink), | |
| std::move(properties), std::move(arrow_properties))); | |
| RETURN_NOT_OK(writer->WriteTable(table, chunk_size)); | |
| return writer->Close(); | |
| } |
If user use the internal WriteTable function, we can get avg_row_bytes by last row_group_writer_ or gathering all previous row group writers.
arrow/cpp/src/parquet/arrow/writer.cc
Line 394 in 8040f2a
| Status WriteTable(const Table& table, int64_t chunk_size) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this estimation does not help because in most cases WriteTable will not be used in the buffered mode. See my suggestion in the below comment.
cpp/src/parquet/arrow/writer.cc
Outdated
| int64_t group_rows = row_group_writer_->num_rows(); | ||
| int64_t batch_size = | ||
| std::min(max_row_group_length - group_rows, batch.num_rows() - offset); | ||
| if (group_rows > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my comment above, should we consider all written row groups as well to estimate the average row size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change to use all written row groups, then the first row group size can only be determined by max_row_group_length, is it OK or just use current row group writer's buffered data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add something like below to estimate the per row size based on written row groups if there is any written row group?
std::optional<double> ParquetFileWriter::EstimateCompressedBytesPerRow() const {
auto estimate_size = [](const FileMetaData& metadata) -> std::optional<double> {
int64_t total_compressed_size = 0;
int64_t total_rows = 0;
for (int i = 0; i < metadata.num_row_groups(); i++) {
total_compressed_size += metadata.RowGroup(i)->total_compressed_size();
total_rows += metadata.RowGroup(i)->num_rows();
}
if (total_compressed_size == 0 || total_rows == 0) {
return std::nullopt;
}
return static_cast<double>(total_compressed_size) / total_rows;
};
if (contents_) {
// Use written row groups to estimate.
return estimate_size(*contents_->metadata());
}
if (file_metadata_) {
// Use closed file metadata to estimate.
return estimate_size(*file_metadata_);
}
return std::nullopt;
}Then we can add following function to FileWriterImpl to adaptively estimate the per row size:
std::optional<double> FileWriterImpl::EstimateCompressedBytesPerRow() const {
if (auto value = writer_->EstimateCompressedBytesPerRow()) {
return value.value();
}
if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) {
return static_cast<double>(row_group_writer_->total_buffered_bytes()) /
row_group_writer_->num_rows();
}
return std::nullopt;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The good thing is that EstimateCompressedBytesPerRow() can be called in different write functions consistently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*contents_->metadata() is not available until ParquetFileWriter is closed, we can cache the bytes and rows of written row group instead.
|
@wgtmac could you please take a look again? |
|
Sorry for the delay! I will review this later this week. |
cpp/src/parquet/arrow/writer.cc
Outdated
| chunk_size = this->properties().max_row_group_length(); | ||
| } | ||
| // max_row_group_bytes is applied only after the row group has accumulated data. | ||
| if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this estimation does not help because in most cases WriteTable will not be used in the buffered mode. See my suggestion in the below comment.
cpp/src/parquet/arrow/writer.cc
Outdated
| int64_t group_rows = row_group_writer_->num_rows(); | ||
| int64_t batch_size = | ||
| std::min(max_row_group_length - group_rows, batch.num_rows() - offset); | ||
| if (group_rows > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add something like below to estimate the per row size based on written row groups if there is any written row group?
std::optional<double> ParquetFileWriter::EstimateCompressedBytesPerRow() const {
auto estimate_size = [](const FileMetaData& metadata) -> std::optional<double> {
int64_t total_compressed_size = 0;
int64_t total_rows = 0;
for (int i = 0; i < metadata.num_row_groups(); i++) {
total_compressed_size += metadata.RowGroup(i)->total_compressed_size();
total_rows += metadata.RowGroup(i)->num_rows();
}
if (total_compressed_size == 0 || total_rows == 0) {
return std::nullopt;
}
return static_cast<double>(total_compressed_size) / total_rows;
};
if (contents_) {
// Use written row groups to estimate.
return estimate_size(*contents_->metadata());
}
if (file_metadata_) {
// Use closed file metadata to estimate.
return estimate_size(*file_metadata_);
}
return std::nullopt;
}Then we can add following function to FileWriterImpl to adaptively estimate the per row size:
std::optional<double> FileWriterImpl::EstimateCompressedBytesPerRow() const {
if (auto value = writer_->EstimateCompressedBytesPerRow()) {
return value.value();
}
if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) {
return static_cast<double>(row_group_writer_->total_buffered_bytes()) /
row_group_writer_->num_rows();
}
return std::nullopt;
}
cpp/src/parquet/arrow/writer.cc
Outdated
| int64_t group_rows = row_group_writer_->num_rows(); | ||
| int64_t batch_size = | ||
| std::min(max_row_group_length - group_rows, batch.num_rows() - offset); | ||
| if (group_rows > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The good thing is that EstimateCompressedBytesPerRow() can be called in different write functions consistently.
| if (auto avg_row_size = EstimateCompressedBytesPerRow()) { | ||
| chunk_size = std::min( | ||
| chunk_size, static_cast<int64_t>(this->properties().max_row_group_bytes() / | ||
| avg_row_size.value())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunk_size could be 0 if the configured max_row_group_bytes is less than avg_row_size, do we need a double check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to clamp the chunk size between 1 and max_row_group_bytes/avg_row_size.
| int64_t buffered_bytes = row_group_writer_->EstimatedTotalCompressedBytes(); | ||
| batch_size = std::min( | ||
| batch_size, static_cast<int64_t>((max_row_group_bytes - buffered_bytes) / | ||
| avg_row_size.value())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
cpp/src/parquet/file_writer.h
Outdated
| virtual int64_t num_rows() const = 0; | ||
| virtual int64_t compressed_bytes() const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| virtual int64_t num_rows() const = 0; | |
| virtual int64_t compressed_bytes() const = 0; | |
| virtual int64_t compressed_bytes() const = 0; | |
| virtual int64_t num_rows() const = 0; |
This order looks more natural :)
cpp/src/parquet/file_writer.h
Outdated
| void AddKeyValueMetadata( | ||
| const std::shared_ptr<const KeyValueMetadata>& key_value_metadata); | ||
|
|
||
| /// Estimate compressed bytes per row from closed row groups. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Estimate compressed bytes per row from closed row groups. | |
| /// \brief Estimate compressed bytes per row from closed row groups. | |
| /// \return Estimated bytes or std::nullopt when no written row group. |
cpp/src/parquet/file_writer.cc
Outdated
| const std::shared_ptr<WriterProperties> properties_; | ||
| int num_row_groups_; | ||
| int64_t num_rows_; | ||
| int64_t compressed_bytes_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps rename to written_row_group_compressed_bytes_ to be more clear? Or written_compressed_bytes_ if previous one is too long.
cpp/src/parquet/arrow/writer.h
Outdated
| /// \brief Estimate compressed bytes per row from closed row groups or the active row | ||
| /// group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// \brief Estimate compressed bytes per row from closed row groups or the active row | |
| /// group. | |
| /// \brief Estimate compressed bytes per row from data written so far. | |
| /// \note std::nullopt will be returned if there is no row written. |
cpp/src/parquet/arrow/writer.cc
Outdated
| if (chunk_size <= 0 && table.num_rows() > 0) { | ||
| return Status::Invalid("chunk size per row_group must be greater than 0"); | ||
| } else if (!table.schema()->Equals(*schema_, false)) { | ||
| return Status::Invalid("rows per row_group must be greater than 0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return Status::Invalid("rows per row_group must be greater than 0"); | |
| return Status::Invalid("chunk size per row_group must be greater than 0"); |
| if (auto avg_row_size = EstimateCompressedBytesPerRow()) { | ||
| chunk_size = std::min( | ||
| chunk_size, static_cast<int64_t>(this->properties().max_row_group_bytes() / | ||
| avg_row_size.value())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to clamp the chunk size between 1 and max_row_group_bytes/avg_row_size.
| RETURN_NOT_OK(WriteBatch(offset, batch_size)); | ||
| offset += batch_size; | ||
| } else if (offset < batch.num_rows()) { | ||
| // Current row group is full, write remaining rows in a new group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it cause infinite loop at this line if batch_size is always 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would cause infinite loop only when the max_row_group_bytes/avg_row_size is 0, is it OK to return Invalid status in WriteXxx() at this case?
if (batch_size == 0 && row_group_writer_->num_rows() == 0) {
return Status::Invalid(
"Configured max_row_group_bytes is too small to hold a single row");
}|
Do you want to take a look at this PR? It may affect the default behavior of row group size. @pitrou |
| int64_t RowGroupWriter::EstimatedTotalCompressedBytes() const { | ||
| return contents_->total_compressed_bytes() + | ||
| contents_->total_compressed_bytes_written() + | ||
| contents_->EstimatedBufferedValueBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EstimatedBufferedValueBytes does not account for compression and may therefore wildly overestimate the final compressed size?
Are we sure we want to account for contents not serialized into a page yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This encoding size is a reference before the first page written, and its impact diminishes as more pages are written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that makes it useful in any way, though.
| static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; | ||
| static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; | ||
| static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; | ||
| static constexpr int64_t DEFAULT_MAX_ROW_GROUP_BYTES = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason for this value? AFAIK some Parquet implementation (is it Parquet Rust? @alamb ) writes a single row group per file by default.
I also feel like the HDFS-related reasons in the Parquet docs are completely outdated (who cares about HDFS?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think smaller row groups are still useful when pruning is essential. https://www.firebolt.io/blog/unlocking-faster-iceberg-queries-the-writer-optimizations-you-are-missing is a good read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but the value is not easy to devise. For example, if you have 10_000 columns, this will make for some very short columns.
Rationale for this change
Limit the row group size.
What changes are included in this PR?
Add a new config
parquet::WriterProperties::max_row_group_bytes.Are these changes tested?
Yes, add unit test.
Are there any user-facing changes?
Yes, user could use the new config to limit row group size.