From 78787c0717ee4fcb3dc1c17a4a843dc74e77104e Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sun, 24 May 2026 20:35:00 -0500 Subject: [PATCH] feat(python): export exact arrow reader schema --- bindings/python/src/scan.rs | 57 +++--- bindings/python/tests/test_scan.py | 61 +++++- crates/iceberg/src/arrow/mod.rs | 7 + .../src/arrow/record_batch_transformer.rs | 185 +++++++++++++++--- crates/iceberg/src/scan/context.rs | 29 ++- crates/iceberg/src/scan/mod.rs | 9 + 6 files changed, 281 insertions(+), 67 deletions(-) diff --git a/bindings/python/src/scan.rs b/bindings/python/src/scan.rs index e2b08d5128..20eeed8597 100644 --- a/bindings/python/src/scan.rs +++ b/bindings/python/src/scan.rs @@ -22,7 +22,7 @@ use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::pyarrow::IntoPyArrow; use arrow::record_batch::{RecordBatch, RecordBatchReader}; use futures::{StreamExt, stream}; -use iceberg::arrow::{ArrowReaderBuilder, schema_to_arrow_schema}; +use iceberg::arrow::{ArrowReaderBuilder, arrow_schema_for_file_scan_task, schema_to_arrow_schema}; use iceberg::expr::Bind; use iceberg::metadata_columns::is_metadata_field; use iceberg::scan::{ @@ -293,39 +293,49 @@ fn schema_top_level_field_ids(schema: &PySchema) -> Vec { fn validate_reader_projection(output_schema: &PySchema, tasks: &[FileScanTask]) -> PyResult<()> { let output_field_ids = schema_top_level_field_ids(output_schema); - if let Some(field_id) = output_field_ids - .iter() - .copied() - .find(|id| is_metadata_field(*id)) - { - return Err(PyValueError::new_err(format!( - "ArrowReader does not yet support metadata field projections; field id {field_id} requires the reader-produced Arrow schema" - ))); - } for task in tasks { - if task.partition.is_some() { - return Err(PyValueError::new_err( - "ArrowReader does not yet support partition data projections; partition constants require the reader-produced Arrow schema", - )); + if task.project_field_ids != output_field_ids { + return Err(PyValueError::new_err(format!( + "output_schema field ids {:?} must match task project_field_ids {:?} for {}; pass the projected schema used to build the tasks", + output_field_ids, task.project_field_ids, task.data_file_path + ))); } - if let Some(field_id) = task - .project_field_ids + } + Ok(()) +} + +fn arrow_schema_for_reader( + output_schema: &PySchema, + tasks: &[FileScanTask], +) -> PyResult { + let Some((first_task, rest)) = tasks.split_first() else { + let output_field_ids = schema_top_level_field_ids(output_schema); + if let Some(field_id) = output_field_ids .iter() .copied() .find(|id| is_metadata_field(*id)) { return Err(PyValueError::new_err(format!( - "ArrowReader does not yet support metadata field projections; field id {field_id} requires the reader-produced Arrow schema" + "ArrowReader cannot infer the exact Arrow schema for metadata field id {field_id} without scan tasks" ))); } - if task.project_field_ids != output_field_ids { + return Ok(Arc::new( + schema_to_arrow_schema(output_schema.inner.as_ref()) + .map_err(crate::error::to_py_err)?, + )); + }; + + let schema = arrow_schema_for_file_scan_task(first_task).map_err(crate::error::to_py_err)?; + for task in rest { + let task_schema = arrow_schema_for_file_scan_task(task).map_err(crate::error::to_py_err)?; + if task_schema.as_ref() != schema.as_ref() { return Err(PyValueError::new_err(format!( - "output_schema field ids {:?} must match task project_field_ids {:?} for {}; pass the projected schema used to build the tasks", - output_field_ids, task.project_field_ids, task.data_file_path + "all scan tasks must produce the same Arrow schema for pyarrow export; {} differs from {}", + task.data_file_path, first_task.data_file_path ))); } } - Ok(()) + Ok(schema) } #[pyclass( @@ -621,6 +631,7 @@ impl PyArrowReader { ) -> PyResult> { let rust_tasks = py_tasks_to_rust(tasks)?; validate_reader_projection(output_schema, &rust_tasks)?; + let schema = arrow_schema_for_reader(output_schema, &rust_tasks)?; let task_stream = Box::pin(stream::iter(rust_tasks.into_iter().map(Ok))) as FileScanTaskStream; @@ -639,10 +650,6 @@ impl PyArrowReader { .read(task_stream) .map_err(crate::error::to_py_err)? .stream(); - let schema = Arc::new( - schema_to_arrow_schema(output_schema.inner.as_ref()) - .map_err(crate::error::to_py_err)?, - ); let reader: Box = Box::new(BlockingArrowRecordBatchReader { schema, stream }); diff --git a/bindings/python/tests/test_scan.py b/bindings/python/tests/test_scan.py index 54b978fb30..9ef58c7e2b 100644 --- a/bindings/python/tests/test_scan.py +++ b/bindings/python/tests/test_scan.py @@ -335,7 +335,7 @@ def test_arrow_reader_rejects_output_schema_that_does_not_match_task_projection( assert isinstance(projected_reader, pa.RecordBatchReader) -def test_arrow_reader_rejects_metadata_projection_until_exact_reader_schema_is_exported(): +def test_arrow_reader_metadata_projection_no_longer_fails(): reader = ArrowReader(FileIO.from_props({})) task = FileScanTask( schema(), @@ -344,11 +344,22 @@ def test_arrow_reader_rejects_metadata_projection_until_exact_reader_schema_is_e [1, 2147483646], ) - with pytest.raises(ValueError, match="metadata field projections"): - reader.read(id_file_schema(), [task]) + projected_reader = reader.read(id_file_schema(), [task]) + assert isinstance(projected_reader, pa.RecordBatchReader) + assert projected_reader.schema.names == ["id", "_file"] + + file_field = projected_reader.schema.field("_file") + assert "run_end_encoded" in str(file_field.type) or pa.types.is_run_end_encoded(file_field.type) + + +def test_arrow_reader_rejects_empty_metadata_projection_without_task_schema(): + reader = ArrowReader(FileIO.from_props({})) + + with pytest.raises(ValueError, match="cannot infer the exact Arrow schema"): + reader.read(id_file_schema(), []) -def test_arrow_reader_rejects_partition_data_until_exact_reader_schema_is_exported(): +def test_arrow_reader_partition_projection_no_longer_fails(): partition_spec = json.dumps( { "spec-id": 1, @@ -372,5 +383,43 @@ def test_arrow_reader_rejects_partition_data_until_exact_reader_schema_is_export partition_spec=partition_spec, ) - with pytest.raises(ValueError, match="partition data projections"): - reader.read(id_schema(), [task]) + projected_reader = reader.read(id_schema(), [task]) + assert isinstance(projected_reader, pa.RecordBatchReader) + assert projected_reader.schema.names == ["id"] + + id_field = projected_reader.schema.field("id") + assert "run_end_encoded" in str(id_field.type) or pa.types.is_run_end_encoded(id_field.type) + + +def test_arrow_reader_rejects_tasks_with_different_physical_schemas(): + partition_spec = json.dumps( + { + "spec-id": 1, + "fields": [ + { + "source-id": 1, + "field-id": 1000, + "name": "id", + "transform": "identity", + } + ], + } + ) + reader = ArrowReader(FileIO.from_props({})) + constant_task = FileScanTask( + schema(), + "s3://bucket/partitioned.parquet", + 1024, + [1], + partition_data=[7], + partition_spec=partition_spec, + ) + plain_task = FileScanTask( + schema(), + "s3://bucket/plain.parquet", + 1024, + [1], + ) + + with pytest.raises(ValueError, match="same Arrow schema"): + reader.read(id_schema(), [constant_task, plain_task]) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index bf53633cfc..e7b41fb390 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -44,3 +44,10 @@ pub use partition_value_calculator::*; /// Record batch partition splitter for partitioned tables pub mod record_batch_partition_splitter; pub use record_batch_partition_splitter::*; + +/// Build the Arrow schema emitted by a file scan task after projection and constants. +pub fn arrow_schema_for_file_scan_task( + task: &crate::scan::FileScanTask, +) -> crate::Result { + record_batch_transformer::RecordBatchTransformer::arrow_schema_for_task(task) +} diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..51389103f5 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -29,7 +29,8 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element}; use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema}; -use crate::metadata_columns::get_metadata_field; +use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, get_metadata_field}; +use crate::scan::FileScanTask; use crate::spec::{ Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, }; @@ -53,7 +54,7 @@ fn constants_map( partition_spec: &PartitionSpec, partition_data: &Struct, schema: &IcebergSchema, -) -> Result> { +) -> Result>> { let mut constants = HashMap::new(); for (pos, field) in partition_spec.fields().iter().enumerate() { @@ -83,15 +84,12 @@ fn constants_map( // Handle both None (null) and Some(Literal::Primitive) cases match &partition_data[pos] { None => { - // Skip null partition values - they will be resolved as null per Iceberg spec rule #4. - // When a partition value is null, we don't add it to the constants map, - // allowing downstream column resolution to handle it correctly. - continue; + constants.insert(field.source_id, None); } Some(Literal::Primitive(value)) => { // Create a Datum from the primitive type and value let datum = Datum::new(prim_type.clone(), value.clone()); - constants.insert(field.source_id, datum); + constants.insert(field.source_id, Some(datum)); } Some(literal) => { return Err(Error::new( @@ -192,7 +190,7 @@ enum SchemaComparison { pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - constant_fields: HashMap, + constant_fields: HashMap>, } impl RecordBatchTransformerBuilder { @@ -214,7 +212,7 @@ impl RecordBatchTransformerBuilder { /// * `field_id` - The field ID to associate with the constant /// * `datum` - The constant value (with type) for this field pub(crate) fn with_constant(mut self, field_id: i32, datum: Datum) -> Self { - self.constant_fields.insert(field_id, datum); + self.constant_fields.insert(field_id, Some(datum)); self } @@ -240,6 +238,71 @@ impl RecordBatchTransformerBuilder { Ok(self) } + pub(crate) fn target_schema(&self) -> Result { + let mapped_unprojected_arrow_schema = + Arc::new(schema_to_arrow_schema(&self.snapshot_schema)?); + let field_id_to_mapped_schema_map = + RecordBatchTransformer::build_field_id_to_arrow_schema_map( + &mapped_unprojected_arrow_schema, + )?; + + let fields: Result> = self + .projected_iceberg_field_ids + .iter() + .map(|field_id| { + if self.constant_fields.contains_key(field_id) { + if let Ok(iceberg_field) = get_metadata_field(*field_id) { + let constant_value = + self.constant_fields.get(field_id).ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "constant field not found") + })?; + let arrow_type = if let Some(datum) = constant_value { + datum_to_arrow_type_with_ree(datum) + } else { + return Err(Error::new( + ErrorKind::Unexpected, + "metadata constant field cannot be null", + )); + }; + let arrow_field = + Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + iceberg_field.id.to_string(), + )])); + Ok(Arc::new(arrow_field)) + } else { + let field = &field_id_to_mapped_schema_map + .get(field_id) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "field not found"))? + .0; + let constant_value = + self.constant_fields.get(field_id).ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "constant field not found") + })?; + let arrow_type = RecordBatchTransformer::constant_arrow_type( + *field_id, + constant_value, + &field_id_to_mapped_schema_map, + )?; + let constant_field = + Field::new(field.name(), arrow_type, field.is_nullable()) + .with_metadata(field.metadata().clone()); + Ok(Arc::new(constant_field)) + } + } else { + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + } + }) + .collect(); + + Ok(Arc::new(ArrowSchema::new(fields?))) + } + pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, @@ -284,10 +347,10 @@ impl RecordBatchTransformerBuilder { pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - // Pre-computed constant field information: field_id -> Datum + // Pre-computed constant field information: field_id -> optional Datum. // Includes both virtual/metadata fields (like _file) and identity-partitioned fields - // Datum holds both the Iceberg type and the value - constant_fields: HashMap, + // Datum holds both the Iceberg type and the value; None means a constant null. + constant_fields: HashMap>, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -295,6 +358,47 @@ pub(crate) struct RecordBatchTransformer { } impl RecordBatchTransformer { + fn run_end_encoded_type(values_type: DataType) -> DataType { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", values_type, true)); + DataType::RunEndEncoded(run_ends_field, values_field) + } + + fn constant_arrow_type( + field_id: i32, + constant_value: &Option, + field_id_to_mapped_schema_map: &HashMap, + ) -> Result { + if let Some(datum) = constant_value { + return Ok(datum_to_arrow_type_with_ree(datum)); + } + + let field = field_id_to_mapped_schema_map + .get(&field_id) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone(); + Ok(Self::run_end_encoded_type(field.data_type().clone())) + } + + pub(crate) fn arrow_schema_for_task(task: &FileScanTask) -> Result { + let mut builder = + RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + + if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) { + let file_datum = Datum::string(task.data_file_path.clone()); + builder = builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum); + } + + if let (Some(partition_spec), Some(partition_data)) = + (task.partition_spec.clone(), task.partition.clone()) + { + builder = builder.with_partition(partition_spec, partition_data)?; + } + + builder.target_schema() + } + pub(crate) fn process_record_batch( &mut self, record_batch: RecordBatch, @@ -310,7 +414,11 @@ impl RecordBatchTransformer { .with_row_count(Some(record_batch.num_rows())); RecordBatch::try_new_with_options( Arc::clone(target_schema), - self.transform_columns(record_batch.columns(), operations)?, + self.transform_columns( + record_batch.num_rows(), + record_batch.columns(), + operations, + )?, &options, )? } @@ -348,7 +456,7 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - constant_fields: &HashMap, + constant_fields: &HashMap>, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -365,11 +473,18 @@ impl RecordBatchTransformer { // For partition fields, get name from schema (they exist in schema) if let Ok(iceberg_field) = get_metadata_field(*field_id) { // This is a metadata/virtual field - convert Iceberg field to Arrow - let datum = constant_fields.get(field_id).ok_or(Error::new( + let constant_value = constant_fields.get(field_id).ok_or(Error::new( ErrorKind::Unexpected, "constant field not found", ))?; - let arrow_type = datum_to_arrow_type_with_ree(datum); + let arrow_type = if let Some(datum) = constant_value { + datum_to_arrow_type_with_ree(datum) + } else { + return Err(Error::new( + ErrorKind::Unexpected, + "metadata constant field cannot be null", + )); + }; let arrow_field = Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required) .with_metadata(HashMap::from([( @@ -383,11 +498,15 @@ impl RecordBatchTransformer { .get(field_id) .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? .0; - let datum = constant_fields.get(field_id).ok_or(Error::new( + let constant_value = constant_fields.get(field_id).ok_or(Error::new( ErrorKind::Unexpected, "constant field not found", ))?; - let arrow_type = datum_to_arrow_type_with_ree(datum); + let arrow_type = Self::constant_arrow_type( + *field_id, + constant_value, + &field_id_to_mapped_schema_map, + )?; // Use the type from constant_fields (REE for constants) let constant_field = Field::new(field.name(), arrow_type, field.is_nullable()) @@ -473,7 +592,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, - constant_fields: &HashMap, + constant_fields: &HashMap>, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -485,10 +604,14 @@ impl RecordBatchTransformer { // Constant fields always use their pre-computed constant values, regardless of whether // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata // is authoritative and should be preferred over file data. - if let Some(datum) = constant_fields.get(field_id) { - let arrow_type = datum_to_arrow_type_with_ree(datum); + if let Some(constant_value) = constant_fields.get(field_id) { + let arrow_type = Self::constant_arrow_type( + *field_id, + constant_value, + &field_id_to_mapped_schema_map, + )?; return Ok(ColumnSource::Add { - value: Some(datum.literal().clone()), + value: constant_value.as_ref().map(|datum| datum.literal().clone()), target_type: arrow_type, }); } @@ -593,14 +716,10 @@ impl RecordBatchTransformer { fn transform_columns( &self, + num_rows: usize, columns: &[Arc], operations: &[ColumnSource], ) -> Result>> { - if columns.is_empty() { - return Ok(columns.to_vec()); - } - let num_rows = columns[0].len(); - operations .iter() .map(|op| { @@ -1669,10 +1788,14 @@ mod test { .unwrap(); assert_eq!(id_col.values(), &[1, 2, 3]); - // Partition column with null value should produce nulls + // Partition column with null value should not pass through file data. let data_col = result.column(1); - assert!(data_col.is_null(0)); - assert!(data_col.is_null(1)); - assert!(data_col.is_null(2)); + let data_col = data_col + .as_any() + .downcast_ref::>() + .unwrap(); + assert_eq!(data_col.len(), 3); + assert_eq!(data_col.values().len(), 1); + assert!(data_col.values().is_null(0)); } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index aa28ffd5a2..2d3aa49f31 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -28,8 +28,8 @@ use crate::scan::{ PartitionFilterCache, }; use crate::spec::{ - ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef, - TableMetadataRef, + ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, PartitionSpec, SchemaRef, + SnapshotRef, TableMetadataRef, }; use crate::{Error, ErrorKind, Result}; @@ -44,6 +44,7 @@ pub(crate) struct ManifestFileContext { bound_predicates: Option>, object_cache: Arc, snapshot_schema: SchemaRef, + partition_spec: Arc, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, case_sensitive: bool, @@ -58,6 +59,7 @@ pub(crate) struct ManifestEntryContext { pub field_ids: Arc>, pub bound_predicates: Option>, pub partition_spec_id: i32, + pub partition_spec: Arc, pub snapshot_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, pub case_sensitive: bool, @@ -72,6 +74,7 @@ impl ManifestFileContext { manifest_file, bound_predicates, snapshot_schema, + partition_spec, field_ids, mut sender, expression_evaluator_cache, @@ -90,6 +93,7 @@ impl ManifestFileContext { partition_spec_id: manifest_file.partition_spec_id, bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), + partition_spec: partition_spec.clone(), delete_file_index: delete_file_index.clone(), case_sensitive: self.case_sensitive, }; @@ -133,10 +137,8 @@ impl ManifestEntryContext { deletes, - // Include partition data and spec from manifest entry partition: Some(self.manifest_entry.data_file.partition.clone()), - // TODO: Pass actual PartitionSpec through context chain for native flow - partition_spec: None, + partition_spec: Some(self.partition_spec), // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" name_mapping: None, case_sensitive: self.case_sensitive, @@ -241,8 +243,23 @@ impl PlanContext { None }; + let partition_spec = self + .table_metadata + .partition_spec_by_id(manifest_file.partition_spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Could not find partition spec for id {}", + manifest_file.partition_spec_id + ), + ) + })? + .clone(); + let mfc = self.create_manifest_file_context( manifest_file, + partition_spec, partition_bound_predicate, tx, delete_file_idx.clone(), @@ -257,6 +274,7 @@ impl PlanContext { fn create_manifest_file_context( &self, manifest_file: &ManifestFile, + partition_spec: Arc, partition_filter: Option>, sender: Sender, delete_file_index: DeleteFileIndex, @@ -279,6 +297,7 @@ impl PlanContext { sender, object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), + partition_spec, field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 21822d9f00..bd706614c1 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1352,6 +1352,15 @@ pub mod tests { tasks[1].data_file_path, format!("{}/3.parquet", &fixture.table_location) ); + + for task in tasks { + assert_eq!( + task.partition_spec + .as_ref() + .map(|partition_spec| partition_spec.spec_id()), + Some(0) + ); + } } #[tokio::test]