Skip to content

[Experiment] Check if ArrowScan.to_read_batches() materializes entire files into memory#1

Draft
ShreyeshArangath wants to merge 9 commits into
mainfrom
test/pyarrow-materialization
Draft

[Experiment] Check if ArrowScan.to_read_batches() materializes entire files into memory#1
ShreyeshArangath wants to merge 9 commits into
mainfrom
test/pyarrow-materialization

Conversation

@ShreyeshArangath
Copy link
Copy Markdown
Owner

@ShreyeshArangath ShreyeshArangath commented Feb 7, 2026

Summary

TL;DR

ArrowScan materializes entire Iceberg data files into memory, it does not stream batches lazily. Our
tests confirm this across 500K–10M rows for both Parquet and ORC, with constant ~40 bytes/row and streaming
peak identical to full materialization (ratio 1.00x). This means each Ray worker in the OHDataLoader will hold
an entire file's worth of Arrow buffers in memory when processing a FileScanTask. Workers must be sized with
enough memory to hold the largest file they'll process, plus headroom for the Python runtime, UDF execution,
and DataFusion plan processing.

Read the markdown file in the PR for more info

ShreyeshArangath and others added 9 commits February 7, 2026 14:43
Empirically tests whether PyIceberg's ArrowScan.to_record_batches()
materializes entire files into memory or streams batches lazily.
Bumps pyiceberg to 0.11.0rc2 for ArrowScan import support.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…tion

Adds test_arrowscan_memory_scaling that generates Parquet and ORC files
at 500K, 1M, 2M, and 10M rows to show that ArrowScan memory usage
scales linearly with file size (constant ~40 bytes/row, 0.0% spread).
This conclusively proves full materialization and informs file-sizing
decisions for the distributed data loader.

Also refactors _make_table() to use numpy for fast data generation at
large sizes, and parameterizes helpers with row count defaults.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Add comprehensive documentation showing PyIceberg's ArrowScan does not
expose batch_size control and materializes full files into memory.

Key findings:
- PyArrow's Scanner.from_fragment() accepts batch_size (default 131072)
  but PyIceberg never forwards it (pyarrow.py:1615)
- to_record_batches() calls list() on batches (pyarrow.py:1782-1786),
  defeating any streaming even if batch_size were forwarded
- Both issues must be fixed for true streaming

Experimental proof includes:
1. Memory behavior test (200K rows): 1.00x ratio = full materialization
2. Memory scaling test (500K-10M rows): constant 40 bytes/row = linear scaling
3. Ray OOM test (150M rows, 1GB worker): RSS climbs 142MB→1052MB, crashes
   before yielding any batch despite 1.5M tiny 100-row stripes

All experiments include live test output logs and GitHub source references
to PyIceberg 0.11.0rc2 and PyArrow 23.0.0 documentation.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Document the two required changes to enable streaming in PyIceberg:
1. Add batch_size parameter and forward to Scanner.from_fragment()
2. Remove list() materialization in to_record_batches()

Includes:
- Specific line numbers and GitHub references
- Current vs needed code snippets
- Two implementation options for handling executor pattern
  (sequential streaming vs parallel with bounded queue)
- Optional Table.scan() API changes
- Testing approach to verify streaming behavior

Context document for implementing streaming support in PyIceberg.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant