Skip to content

[branch-0.9] Cherry pick feat(reader): Add read_with_metrics() for scan I/O metrics (#2349)#15

Open
toutane wants to merge 1 commit intobranch-0.9from
branch-0.9-cherry-pick-6
Open

[branch-0.9] Cherry pick feat(reader): Add read_with_metrics() for scan I/O metrics (#2349)#15
toutane wants to merge 1 commit intobranch-0.9from
branch-0.9-cherry-pick-6

Conversation

@toutane
Copy link
Copy Markdown

@toutane toutane commented May 5, 2026

Cherry pick : apache#2349

CI is disabled for this fork, so testing was performed using make check and make test in the provided testing environment

## Which issue does this PR close?

- Closes #.

## What changes are included in this PR?

Add always-on per-scan I/O metrics to `ArrowReader`.

**Motivation:** Downstream engines need per-scan byte counts for their
UIs. For example, DataFusion Comet uses this to populate `bytes_scanned`
on its Iceberg scan operator, which flows through to Spark UI via
`TaskMetrics.inputMetrics.setBytesRead()`. This must be per-scan, not
global. Concurrent scans against the same `FileIO` need independent
counters. The approach matches DataFusion's pattern of wrapping
`AsyncFileReader` with a counting layer and is storage-backend agnostic.

**`ArrowReader::read()` now returns `ScanResult`**
- `ScanResult` wraps the record batch stream and `ScanMetrics`.
Accessors: `stream()`, `metrics()`, `into_parts()`.
- Metrics are always collected. One `fetch_add(Relaxed)` per I/O
request, negligible overhead.
- Counter is created fresh per `read()` call, so cloned readers get
independent metrics.

**New file: `crates/iceberg/src/arrow/scan_metrics.rs`**
- `CountingFileRead<F: FileRead>`: generic wrapper that increments a
shared `AtomicU64` on each `read()`.
- `ScanMetrics`: public handle exposing `bytes_read()`.
- `ScanResult`: public struct returned by `ArrowReader::read()`.

**`FileRead` blanket impl for `Box<dyn FileRead>`**
- Enables generic `CountingFileRead<F>` to wrap the boxed reader
returned by `FileIO::reader()`.

**Single `open_parquet_file` with counting**
- All Parquet opens (data files and delete files) go through the same
`open_parquet_file` wrapped with `CountingFileRead`, so `bytes_read`
reflects total scan I/O.
- `build_parquet_reader()`: shared internals for reader construction and
metadata loading.

**`FileScanTaskReader` struct (refactor)**
- Extracted `process_file_scan_task`'s parameters into a `Clone` struct
with a `process(self, task)` method, resolving a
`clippy::too_many_arguments` violation. Struct and impl are co-located.

**Re-exports**
- `ScanMetrics` and `ScanResult` re-exported from `iceberg::arrow` and
`iceberg::scan`.

## Are these changes tested?

`test_scan_metrics_bytes_read` in `reader.rs`: asserts `bytes_read() ==
0` before stream consumption (the stream is lazy) and `bytes_read() > 0`
after. `test_scan_metrics_includes_delete_file_bytes`: reads the same
data file with and without a positional delete file and asserts
`bytes_read` is strictly greater when deletes are present. All existing
reader and scan tests pass (updated to use `ScanResult::stream()`).

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: blackmwk <liurenjie1024@outlook.com>
(cherry picked from commit 1ad4bfd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants