From f7ce4714e7e428289826b9b4ba0b38df6289484c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 25 Nov 2025 01:36:03 +0000 Subject: [PATCH 1/3] Use multithreaded `setup_page_index` in hybrid scan reader --- .../experimental/hybrid_scan_helpers.cpp | 34 +--- cpp/src/io/parquet/reader_impl_helpers.cpp | 166 ++++++++++-------- cpp/src/io/parquet/reader_impl_helpers.hpp | 2 + 3 files changed, 96 insertions(+), 106 deletions(-) diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index 3818e5ed36f..d6e55c842d1 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -148,36 +148,12 @@ void aggregate_reader_metadata::setup_page_index(cudf::host_span return; } - auto& row_groups = per_file_metadata.front().row_groups; - - CUDF_EXPECTS(not row_groups.empty() and not row_groups.front().columns.empty(), - "No column chunks in Parquet schema to read page index for"); - - CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); - + // Get the file metadata and setup the page index + auto& file_metadata = per_file_metadata.front(); // Set the first ColumnChunk's offset of ColumnIndex as the adjusted zero offset - int64_t const min_offset = row_groups.front().columns.front().column_index_offset; - // now loop over row groups - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - // Read the ColumnIndex for this ColumnChunk - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.column_index_length); - ColumnIndex ci; - cp.read(&ci); - col.column_index = std::move(ci); - } - // Read the OffsetIndex for this ColumnChunk - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.offset_index_length); - OffsetIndex oi; - cp.read(&oi); - col.offset_index = std::move(oi); - } - } - } + int64_t const min_offset = file_metadata.row_groups.front().columns.front().column_index_offset; + + file_metadata.setup_page_index(page_index_bytes, min_offset); } size_type aggregate_reader_metadata::total_rows_in_row_groups( diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index fe9d33678c3..f85caee96fd 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -339,85 +339,93 @@ metadata::metadata(datasource* source, bool read_page_indexes) auto const& last_col = row_groups.back().columns.back(); int64_t const max_offset = last_col.offset_index_offset + last_col.offset_index_length; - if (max_offset > 0) { - int64_t const length = max_offset - min_offset; - auto const idx_buf = source->host_read(min_offset, length); - // Flatten all columns into a single vector for easier task distribution - std::vector> all_column_chunks; - all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - all_column_chunks.emplace_back(std::ref(col)); - } - } + if (max_offset > min_offset) { + size_t const length = max_offset - min_offset; + auto const page_idx_buf = source->host_read(min_offset, length); + setup_page_index({page_idx_buf->data(), length}, min_offset); + } + } - auto read_column_indexes = [&idx_buf, min_offset](CompactProtocolReader& reader, - ColumnChunk& col) { - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.column_index_length); - col.column_index = ColumnIndex(); - reader.read(&col.column_index.value()); - } - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.offset_index_length); - col.offset_index = OffsetIndex(); - reader.read(&col.offset_index.value()); - } - }; - - // Use parallel processing only if we have enough columns to justify the overhead - constexpr std::size_t parallel_threshold = 512; - auto const total_column_chunks = all_column_chunks.size(); - if (total_column_chunks >= parallel_threshold) { - // Dynamically calculate number of tasks based the number or row groups and columns - constexpr std::size_t min_tasks = 4; - constexpr std::size_t max_tasks = 32; - auto const ratio = static_cast(total_column_chunks) / parallel_threshold; - // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements - // doubles both the number of tasks and the task size) - auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); - - auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); - auto const column_chunks_per_task = total_column_chunks / num_tasks; - auto const remainder = total_column_chunks % num_tasks; - - std::vector> tasks; - tasks.reserve(num_tasks); - - std::size_t start_idx = 0; - for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { - auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); - auto const end_idx = start_idx + task_size; - - if (start_idx >= total_column_chunks) break; - - tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( - [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { - CompactProtocolReader local_cp; - - for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { - read_column_indexes(local_cp, all_column_chunks[i].get()); - } - })); - - start_idx = end_idx; - } + sanitize_schema(); +} - for (auto& task : tasks) { - task.get(); - } - } else { - // For small numbers of columns, use sequential processing to avoid overhead - for (auto& col_ref : all_column_chunks) { - read_column_indexes(cp, col_ref.get()); - } - } +void metadata::setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset) +{ + CUDF_FUNC_RANGE(); + + // Flatten all columns into a single vector for easier task distribution + std::vector> all_column_chunks; + all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); + for (auto& rg : row_groups) { + for (auto& col : rg.columns) { + all_column_chunks.emplace_back(std::ref(col)); } } - sanitize_schema(); + auto read_column_indexes = [page_index_bytes, min_offset](CompactProtocolReader& reader, + ColumnChunk& col) { + if (col.column_index_length > 0 && col.column_index_offset > 0) { + int64_t const offset = col.column_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.column_index_length); + col.column_index = ColumnIndex(); + reader.read(&col.column_index.value()); + } + if (col.offset_index_length > 0 && col.offset_index_offset > 0) { + int64_t const offset = col.offset_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.offset_index_length); + col.offset_index = OffsetIndex(); + reader.read(&col.offset_index.value()); + } + }; + + // Use parallel processing only if we have enough columns to justify the overhead + constexpr std::size_t parallel_threshold = 256; + auto const total_column_chunks = all_column_chunks.size(); + if (total_column_chunks >= parallel_threshold) { + // Dynamically calculate number of tasks based the number or row groups and columns + constexpr std::size_t min_tasks = 4; + constexpr std::size_t max_tasks = 32; + auto const ratio = static_cast(total_column_chunks) / parallel_threshold; + // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements + // doubles both the number of tasks and the task size) + auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); + + auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); + auto const column_chunks_per_task = total_column_chunks / num_tasks; + auto const remainder = total_column_chunks % num_tasks; + + std::vector> tasks; + tasks.reserve(num_tasks); + + std::size_t start_idx = 0; + for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { + auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); + auto const end_idx = start_idx + task_size; + + if (start_idx >= total_column_chunks) break; + + tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( + [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { + CompactProtocolReader local_cp; + + for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { + read_column_indexes(local_cp, all_column_chunks[i].get()); + } + })); + + start_idx = end_idx; + } + + for (auto& task : tasks) { + task.get(); + } + } else { + CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); + // For small numbers of columns, use sequential processing to avoid overhead + for (auto& col_ref : all_column_chunks) { + read_column_indexes(cp, col_ref.get()); + } + } } metadata::~metadata() @@ -448,8 +456,10 @@ std::vector aggregate_reader_metadata::metadatas_from_sources( std::vector> metadata_ctor_tasks; metadata_ctor_tasks.reserve(sources.size()); for (auto const& source : sources) { - metadata_ctor_tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( - [source = source.get(), read_page_indexes] { return metadata{source, read_page_indexes}; })); + metadata_ctor_tasks.emplace_back( + cudf::detail::host_worker_pool().submit_task([source = source.get(), read_page_indexes] { + return metadata{source, read_page_indexes}; + })); } std::vector metadatas; metadatas.reserve(sources.size()); @@ -472,7 +482,9 @@ aggregate_reader_metadata::collect_keyval_metadata() const std::transform(pfm.key_value_metadata.cbegin(), pfm.key_value_metadata.cend(), std::inserter(kv_map, kv_map.end()), - [](auto const& kv) { return std::pair{kv.key, kv.value}; }); + [](auto const& kv) { + return std::pair{kv.key, kv.value}; + }); return kv_map; }); diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 4b431ce45bd..b49191b31bd 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -110,6 +110,8 @@ struct metadata : public FileMetaData { metadata& operator=(metadata&& other) = default; ~metadata(); + void setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset); + void sanitize_schema(); }; From 8b85949bd8351f9f9abd923239e82df5885e3ca6 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 25 Nov 2025 01:46:30 +0000 Subject: [PATCH 2/3] style fix --- cpp/src/io/parquet/reader_impl_helpers.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index f85caee96fd..3ef002c931e 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -456,10 +456,8 @@ std::vector aggregate_reader_metadata::metadatas_from_sources( std::vector> metadata_ctor_tasks; metadata_ctor_tasks.reserve(sources.size()); for (auto const& source : sources) { - metadata_ctor_tasks.emplace_back( - cudf::detail::host_worker_pool().submit_task([source = source.get(), read_page_indexes] { - return metadata{source, read_page_indexes}; - })); + metadata_ctor_tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( + [source = source.get(), read_page_indexes] { return metadata{source, read_page_indexes}; })); } std::vector metadatas; metadatas.reserve(sources.size()); @@ -482,9 +480,7 @@ aggregate_reader_metadata::collect_keyval_metadata() const std::transform(pfm.key_value_metadata.cbegin(), pfm.key_value_metadata.cend(), std::inserter(kv_map, kv_map.end()), - [](auto const& kv) { - return std::pair{kv.key, kv.value}; - }); + [](auto const& kv) { return std::pair{kv.key, kv.value}; }); return kv_map; }); From e9e89767165f42e1d5bcac6ae4f8b81d03979255 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 25 Nov 2025 20:12:42 +0000 Subject: [PATCH 3/3] Minor improvements --- cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp | 4 ++-- cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp | 9 +++++++-- cpp/src/io/parquet/reader_impl_helpers.hpp | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index d6e55c842d1..f8408ca2fe5 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -76,7 +76,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(FileMetaData const& parquet : aggregate_reader_metadata_base({}, false, false) { // Just copy over the FileMetaData struct to the internal metadata struct - per_file_metadata.emplace_back(metadata{parquet_metadata}.get_file_metadata()); + per_file_metadata.emplace_back(metadata{parquet_metadata}); initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); } @@ -86,7 +86,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(cudf::host_span footer_bytes); explicit metadata(FileMetaData const& other) { static_cast(*this) = other; } - metadata_base get_file_metadata() && { return std::move(*this); } + metadata(metadata const& other) = delete; + metadata(metadata&& other) = default; + metadata& operator=(metadata const& other) = delete; + metadata& operator=(metadata&& other) = default; + + ~metadata() = default; }; class aggregate_reader_metadata : public aggregate_reader_metadata_base { diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index b49191b31bd..515688aa215 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -112,6 +112,7 @@ struct metadata : public FileMetaData { void setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset); + protected: void sanitize_schema(); };