Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(FileMetaData const& parquet
: aggregate_reader_metadata_base({}, false, false)
{
// Just copy over the FileMetaData struct to the internal metadata struct
per_file_metadata.emplace_back(metadata{parquet_metadata}.get_file_metadata());
per_file_metadata.emplace_back(metadata{parquet_metadata});
initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs);
}

Expand All @@ -86,7 +86,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(cudf::host_span<uint8_t con
: aggregate_reader_metadata_base({}, false, false)
{
// Re-initialize internal variables here as base class was initialized without a source
per_file_metadata.emplace_back(metadata{footer_bytes}.get_file_metadata());
per_file_metadata.emplace_back(metadata{footer_bytes});
initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs);
}

Expand Down Expand Up @@ -148,36 +148,12 @@ void aggregate_reader_metadata::setup_page_index(cudf::host_span<uint8_t const>
return;
}

auto& row_groups = per_file_metadata.front().row_groups;
Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for all this anymore. Just call metadata.setup_page_index(..) method.


CUDF_EXPECTS(not row_groups.empty() and not row_groups.front().columns.empty(),
"No column chunks in Parquet schema to read page index for");

CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size());

// Get the file metadata and setup the page index
auto& file_metadata = per_file_metadata.front();
// Set the first ColumnChunk's offset of ColumnIndex as the adjusted zero offset
int64_t const min_offset = row_groups.front().columns.front().column_index_offset;
// now loop over row groups
for (auto& rg : row_groups) {
for (auto& col : rg.columns) {
// Read the ColumnIndex for this ColumnChunk
if (col.column_index_length > 0 && col.column_index_offset > 0) {
int64_t const offset = col.column_index_offset - min_offset;
cp.init(page_index_bytes.data() + offset, col.column_index_length);
ColumnIndex ci;
cp.read(&ci);
col.column_index = std::move(ci);
}
// Read the OffsetIndex for this ColumnChunk
if (col.offset_index_length > 0 && col.offset_index_offset > 0) {
int64_t const offset = col.offset_index_offset - min_offset;
cp.init(page_index_bytes.data() + offset, col.offset_index_length);
OffsetIndex oi;
cp.read(&oi);
col.offset_index = std::move(oi);
}
}
}
int64_t const min_offset = file_metadata.row_groups.front().columns.front().column_index_offset;

file_metadata.setup_page_index(page_index_bytes, min_offset);
}

size_type aggregate_reader_metadata::total_rows_in_row_groups(
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ using parquet::detail::row_group_info;
/**
* @brief Class for parsing dataset metadata
*/
struct metadata : private metadata_base {
struct metadata : public metadata_base {
explicit metadata(cudf::host_span<uint8_t const> footer_bytes);
explicit metadata(FileMetaData const& other) { static_cast<FileMetaData&>(*this) = other; }
metadata_base get_file_metadata() && { return std::move(*this); }
metadata(metadata const& other) = delete;
metadata(metadata&& other) = default;
metadata& operator=(metadata const& other) = delete;
metadata& operator=(metadata&& other) = default;

~metadata() = default;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use base reader's destructor that uses multiple threads

};

class aggregate_reader_metadata : public aggregate_reader_metadata_base {
Expand Down
156 changes: 82 additions & 74 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,85 +339,93 @@ metadata::metadata(datasource* source, bool read_page_indexes)
auto const& last_col = row_groups.back().columns.back();
int64_t const max_offset = last_col.offset_index_offset + last_col.offset_index_length;

if (max_offset > 0) {
int64_t const length = max_offset - min_offset;
auto const idx_buf = source->host_read(min_offset, length);
// Flatten all columns into a single vector for easier task distribution
std::vector<std::reference_wrapper<ColumnChunk>> all_column_chunks;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved everything beyond this point as-is into the setup_page_index function.

all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size());
for (auto& rg : row_groups) {
for (auto& col : rg.columns) {
all_column_chunks.emplace_back(std::ref(col));
}
}
if (max_offset > min_offset) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we were checking max_offset > 0, max_offset > min_offset should be a better check unless I am missing something here?

size_t const length = max_offset - min_offset;
auto const page_idx_buf = source->host_read(min_offset, length);
setup_page_index({page_idx_buf->data(), length}, min_offset);
}
}

auto read_column_indexes = [&idx_buf, min_offset](CompactProtocolReader& reader,
ColumnChunk& col) {
if (col.column_index_length > 0 && col.column_index_offset > 0) {
int64_t const offset = col.column_index_offset - min_offset;
reader.init(idx_buf->data() + offset, col.column_index_length);
col.column_index = ColumnIndex();
reader.read(&col.column_index.value());
}
if (col.offset_index_length > 0 && col.offset_index_offset > 0) {
int64_t const offset = col.offset_index_offset - min_offset;
reader.init(idx_buf->data() + offset, col.offset_index_length);
col.offset_index = OffsetIndex();
reader.read(&col.offset_index.value());
}
};

// Use parallel processing only if we have enough columns to justify the overhead
constexpr std::size_t parallel_threshold = 512;
auto const total_column_chunks = all_column_chunks.size();
if (total_column_chunks >= parallel_threshold) {
// Dynamically calculate number of tasks based the number or row groups and columns
constexpr std::size_t min_tasks = 4;
constexpr std::size_t max_tasks = 32;
auto const ratio = static_cast<double>(total_column_chunks) / parallel_threshold;
// Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements
// doubles both the number of tasks and the task size)
auto const multiplier = std::size_t(1) << (static_cast<size_t>(std::log2(ratio)) / 2);

auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks);
auto const column_chunks_per_task = total_column_chunks / num_tasks;
auto const remainder = total_column_chunks % num_tasks;

std::vector<std::future<void>> tasks;
tasks.reserve(num_tasks);

std::size_t start_idx = 0;
for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) {
auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0);
auto const end_idx = start_idx + task_size;

if (start_idx >= total_column_chunks) break;

tasks.emplace_back(cudf::detail::host_worker_pool().submit_task(
[&all_column_chunks, &read_column_indexes, start_idx, end_idx]() {
CompactProtocolReader local_cp;

for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) {
read_column_indexes(local_cp, all_column_chunks[i].get());
}
}));

start_idx = end_idx;
}
sanitize_schema();
}

for (auto& task : tasks) {
task.get();
}
} else {
// For small numbers of columns, use sequential processing to avoid overhead
for (auto& col_ref : all_column_chunks) {
read_column_indexes(cp, col_ref.get());
}
}
void metadata::setup_page_index(cudf::host_span<uint8_t const> page_index_bytes, int64_t min_offset)
{
CUDF_FUNC_RANGE();

// Flatten all columns into a single vector for easier task distribution
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved as-is from the constructor

std::vector<std::reference_wrapper<ColumnChunk>> all_column_chunks;
all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size());
for (auto& rg : row_groups) {
for (auto& col : rg.columns) {
all_column_chunks.emplace_back(std::ref(col));
}
}

sanitize_schema();
auto read_column_indexes = [page_index_bytes, min_offset](CompactProtocolReader& reader,
ColumnChunk& col) {
if (col.column_index_length > 0 && col.column_index_offset > 0) {
int64_t const offset = col.column_index_offset - min_offset;
reader.init(page_index_bytes.data() + offset, col.column_index_length);
col.column_index = ColumnIndex();
reader.read(&col.column_index.value());
}
if (col.offset_index_length > 0 && col.offset_index_offset > 0) {
int64_t const offset = col.offset_index_offset - min_offset;
reader.init(page_index_bytes.data() + offset, col.offset_index_length);
col.offset_index = OffsetIndex();
reader.read(&col.offset_index.value());
}
};

// Use parallel processing only if we have enough columns to justify the overhead
constexpr std::size_t parallel_threshold = 256;
Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vuule did you pick 512 for this empirically or did you run any experiments. I am seeing more benefit with 256 for the hybrid scan example but can revert.

auto const total_column_chunks = all_column_chunks.size();
if (total_column_chunks >= parallel_threshold) {
// Dynamically calculate number of tasks based the number or row groups and columns
constexpr std::size_t min_tasks = 4;
constexpr std::size_t max_tasks = 32;
auto const ratio = static_cast<double>(total_column_chunks) / parallel_threshold;
// Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements
// doubles both the number of tasks and the task size)
auto const multiplier = std::size_t(1) << (static_cast<size_t>(std::log2(ratio)) / 2);

auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks);
auto const column_chunks_per_task = total_column_chunks / num_tasks;
auto const remainder = total_column_chunks % num_tasks;

std::vector<std::future<void>> tasks;
tasks.reserve(num_tasks);

std::size_t start_idx = 0;
for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) {
auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0);
auto const end_idx = start_idx + task_size;

if (start_idx >= total_column_chunks) break;

tasks.emplace_back(cudf::detail::host_worker_pool().submit_task(
[&all_column_chunks, &read_column_indexes, start_idx, end_idx]() {
CompactProtocolReader local_cp;

for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) {
read_column_indexes(local_cp, all_column_chunks[i].get());
}
}));

start_idx = end_idx;
}

for (auto& task : tasks) {
task.get();
}
} else {
CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size());
// For small numbers of columns, use sequential processing to avoid overhead
for (auto& col_ref : all_column_chunks) {
read_column_indexes(cp, col_ref.get());
}
}
}

metadata::~metadata()
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ struct metadata : public FileMetaData {
metadata& operator=(metadata&& other) = default;
~metadata();

void setup_page_index(cudf::host_span<uint8_t const> page_index_bytes, int64_t min_offset);

protected:
void sanitize_schema();
};

Expand Down