Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions bindings/python/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::pyarrow::IntoPyArrow;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use futures::{StreamExt, stream};
use iceberg::arrow::{ArrowReaderBuilder, schema_to_arrow_schema};
use iceberg::arrow::{ArrowReaderBuilder, arrow_schema_for_file_scan_task, schema_to_arrow_schema};
use iceberg::expr::Bind;
use iceberg::metadata_columns::is_metadata_field;
use iceberg::scan::{
Expand Down Expand Up @@ -293,39 +293,49 @@ fn schema_top_level_field_ids(schema: &PySchema) -> Vec<i32> {

fn validate_reader_projection(output_schema: &PySchema, tasks: &[FileScanTask]) -> PyResult<()> {
let output_field_ids = schema_top_level_field_ids(output_schema);
if let Some(field_id) = output_field_ids
.iter()
.copied()
.find(|id| is_metadata_field(*id))
{
return Err(PyValueError::new_err(format!(
"ArrowReader does not yet support metadata field projections; field id {field_id} requires the reader-produced Arrow schema"
)));
}
for task in tasks {
if task.partition.is_some() {
return Err(PyValueError::new_err(
"ArrowReader does not yet support partition data projections; partition constants require the reader-produced Arrow schema",
));
if task.project_field_ids != output_field_ids {
return Err(PyValueError::new_err(format!(
"output_schema field ids {:?} must match task project_field_ids {:?} for {}; pass the projected schema used to build the tasks",
output_field_ids, task.project_field_ids, task.data_file_path
)));
}
if let Some(field_id) = task
.project_field_ids
}
Ok(())
}

fn arrow_schema_for_reader(
output_schema: &PySchema,
tasks: &[FileScanTask],
) -> PyResult<ArrowSchemaRef> {
let Some((first_task, rest)) = tasks.split_first() else {
let output_field_ids = schema_top_level_field_ids(output_schema);
if let Some(field_id) = output_field_ids
.iter()
.copied()
.find(|id| is_metadata_field(*id))
{
return Err(PyValueError::new_err(format!(
"ArrowReader does not yet support metadata field projections; field id {field_id} requires the reader-produced Arrow schema"
"ArrowReader cannot infer the exact Arrow schema for metadata field id {field_id} without scan tasks"
)));
}
if task.project_field_ids != output_field_ids {
return Ok(Arc::new(
schema_to_arrow_schema(output_schema.inner.as_ref())
.map_err(crate::error::to_py_err)?,
));
};

let schema = arrow_schema_for_file_scan_task(first_task).map_err(crate::error::to_py_err)?;
for task in rest {
let task_schema = arrow_schema_for_file_scan_task(task).map_err(crate::error::to_py_err)?;
if task_schema.as_ref() != schema.as_ref() {
return Err(PyValueError::new_err(format!(
"output_schema field ids {:?} must match task project_field_ids {:?} for {}; pass the projected schema used to build the tasks",
output_field_ids, task.project_field_ids, task.data_file_path
"all scan tasks must produce the same Arrow schema for pyarrow export; {} differs from {}",
task.data_file_path, first_task.data_file_path
)));
}
}
Ok(())
Ok(schema)
}

#[pyclass(
Expand Down Expand Up @@ -621,6 +631,7 @@ impl PyArrowReader {
) -> PyResult<Bound<'py, PyAny>> {
let rust_tasks = py_tasks_to_rust(tasks)?;
validate_reader_projection(output_schema, &rust_tasks)?;
let schema = arrow_schema_for_reader(output_schema, &rust_tasks)?;
let task_stream =
Box::pin(stream::iter(rust_tasks.into_iter().map(Ok))) as FileScanTaskStream;

Expand All @@ -639,10 +650,6 @@ impl PyArrowReader {
.read(task_stream)
.map_err(crate::error::to_py_err)?
.stream();
let schema = Arc::new(
schema_to_arrow_schema(output_schema.inner.as_ref())
.map_err(crate::error::to_py_err)?,
);
let reader: Box<dyn RecordBatchReader + Send> =
Box::new(BlockingArrowRecordBatchReader { schema, stream });

Expand Down
61 changes: 55 additions & 6 deletions bindings/python/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def test_arrow_reader_rejects_output_schema_that_does_not_match_task_projection(
assert isinstance(projected_reader, pa.RecordBatchReader)


def test_arrow_reader_rejects_metadata_projection_until_exact_reader_schema_is_exported():
def test_arrow_reader_metadata_projection_no_longer_fails():
reader = ArrowReader(FileIO.from_props({}))
task = FileScanTask(
schema(),
Expand All @@ -344,11 +344,22 @@ def test_arrow_reader_rejects_metadata_projection_until_exact_reader_schema_is_e
[1, 2147483646],
)

with pytest.raises(ValueError, match="metadata field projections"):
reader.read(id_file_schema(), [task])
projected_reader = reader.read(id_file_schema(), [task])
assert isinstance(projected_reader, pa.RecordBatchReader)
assert projected_reader.schema.names == ["id", "_file"]

file_field = projected_reader.schema.field("_file")
assert "run_end_encoded" in str(file_field.type) or pa.types.is_run_end_encoded(file_field.type)


def test_arrow_reader_rejects_empty_metadata_projection_without_task_schema():
reader = ArrowReader(FileIO.from_props({}))

with pytest.raises(ValueError, match="cannot infer the exact Arrow schema"):
reader.read(id_file_schema(), [])


def test_arrow_reader_rejects_partition_data_until_exact_reader_schema_is_exported():
def test_arrow_reader_partition_projection_no_longer_fails():
partition_spec = json.dumps(
{
"spec-id": 1,
Expand All @@ -372,5 +383,43 @@ def test_arrow_reader_rejects_partition_data_until_exact_reader_schema_is_export
partition_spec=partition_spec,
)

with pytest.raises(ValueError, match="partition data projections"):
reader.read(id_schema(), [task])
projected_reader = reader.read(id_schema(), [task])
assert isinstance(projected_reader, pa.RecordBatchReader)
assert projected_reader.schema.names == ["id"]

id_field = projected_reader.schema.field("id")
assert "run_end_encoded" in str(id_field.type) or pa.types.is_run_end_encoded(id_field.type)


def test_arrow_reader_rejects_tasks_with_different_physical_schemas():
partition_spec = json.dumps(
{
"spec-id": 1,
"fields": [
{
"source-id": 1,
"field-id": 1000,
"name": "id",
"transform": "identity",
}
],
}
)
reader = ArrowReader(FileIO.from_props({}))
constant_task = FileScanTask(
schema(),
"s3://bucket/partitioned.parquet",
1024,
[1],
partition_data=[7],
partition_spec=partition_spec,
)
plain_task = FileScanTask(
schema(),
"s3://bucket/plain.parquet",
1024,
[1],
)

with pytest.raises(ValueError, match="same Arrow schema"):
reader.read(id_schema(), [constant_task, plain_task])
7 changes: 7 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,10 @@ pub use partition_value_calculator::*;
/// Record batch partition splitter for partitioned tables
pub mod record_batch_partition_splitter;
pub use record_batch_partition_splitter::*;

/// Build the Arrow schema emitted by a file scan task after projection and constants.
pub fn arrow_schema_for_file_scan_task(
task: &crate::scan::FileScanTask,
) -> crate::Result<arrow_schema::SchemaRef> {
record_batch_transformer::RecordBatchTransformer::arrow_schema_for_task(task)
}
Loading