diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index 864320dae4..a476aae8df 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -182,6 +182,10 @@ impl SchemaVisitor for GlueSchemaBuilder { Ok(glue_type) } + + fn variant(&mut self, _v: &iceberg::spec::VariantType) -> Result { + Ok("variant".to_string()) + } } #[cfg(test)] diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs index c23b48719d..78ba89a208 100644 --- a/crates/catalog/hms/src/schema.rs +++ b/crates/catalog/hms/src/schema.rs @@ -139,6 +139,10 @@ impl SchemaVisitor for HiveSchemaBuilder { Ok(hive_type) } + + fn variant(&mut self, _v: &iceberg::spec::VariantType) -> Result { + Ok("variant".to_string()) + } } #[cfg(test)] diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 231971fd54..629c9be427 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -34,7 +34,7 @@ use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, - PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, + PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, VariantType, visit_schema_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -537,6 +537,10 @@ impl SchemaWithPartnerVisitor for EqDelColumnProcessor<'_> { fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) -> Result<()> { Ok(()) } + + fn variant(&mut self, _v: &VariantType, _partner: &ArrayRef) -> Result<()> { + Ok(()) + } } struct EqDelRecordBatchPartnerAccessor; diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index e514457887..d01f3e9e56 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -28,7 +28,7 @@ use crate::Result; use crate::arrow::{ArrowArrayAccessor, FieldMatchMode}; use crate::spec::{ ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, - StructType, visit_struct_with_partner, + StructType, VariantType, visit_struct_with_partner, }; macro_rules! cast_and_update_cnt_map { @@ -122,6 +122,10 @@ impl SchemaWithPartnerVisitor for NanValueCountVisitor { Ok(()) } + fn variant(&mut self, _v: &VariantType, _col: &ArrayRef) -> Result { + Ok(()) + } + fn after_struct_field(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> { let field_id = field.id; count_float_nans!(partner, self, field_id); diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index deae027e14..2a0157bbc8 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -23,7 +23,7 @@ use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; -use arrow_schema::{Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; @@ -76,6 +76,91 @@ impl ArrowReader { Self::include_leaf_field_id(&map_type.key_field, field_ids); Self::include_leaf_field_id(&map_type.value_field, field_ids); } + // Variant is a leaf type for Parquet projection purposes (like a primitive). + Type::Variant(_) => { + field_ids.push(field.id); + } + } + } + + /// Recursive DFS over an Arrow `Fields` tree whose leaf numbering matches + /// `arrow_schema::Fields::filter_leaves`. For every leaf sitting inside a + /// variant column, stores `leaf_idx → variant_field_id` in `out`. A + /// "variant column" is any Arrow field whose embedded Parquet field id + /// resolves to `Type::Variant` in the Iceberg schema — including variants + /// nested inside a struct, list, or map. + fn collect_variant_leaves( + fields: &arrow_schema::Fields, + leaf_idx: &mut usize, + variant_parent: Option, + iceberg_schema: &Schema, + leaf_field_id_set: &HashSet, + out: &mut HashMap, + ) { + for field in fields { + Self::collect_variant_leaves_in_field( + field, + leaf_idx, + variant_parent, + iceberg_schema, + leaf_field_id_set, + out, + ); + } + } + + fn collect_variant_leaves_in_field( + field: &FieldRef, + leaf_idx: &mut usize, + variant_parent: Option, + iceberg_schema: &Schema, + leaf_field_id_set: &HashSet, + out: &mut HashMap, + ) { + // Once we are inside a variant, stay inside; otherwise check + // whether this Arrow field IS a variant column. + let effective_variant = variant_parent.or_else(|| { + let fid = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|s| i32::from_str(s).ok())?; + if !leaf_field_id_set.contains(&fid) { + return None; + } + let iceberg_field = iceberg_schema.field_by_id(fid)?; + matches!(iceberg_field.field_type.as_ref(), Type::Variant(_)).then_some(fid) + }); + + match field.data_type() { + DataType::Struct(sub) => { + Self::collect_variant_leaves( + sub, + leaf_idx, + effective_variant, + iceberg_schema, + leaf_field_id_set, + out, + ); + } + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::FixedSizeList(inner, _) + | DataType::Map(inner, _) => { + Self::collect_variant_leaves_in_field( + inner, + leaf_idx, + effective_variant, + iceberg_schema, + leaf_field_id_set, + out, + ); + } + _ => { + if let Some(vid) = effective_variant { + out.insert(*leaf_idx, vid); + } + *leaf_idx += 1; + } } } @@ -148,8 +233,38 @@ impl ArrowReader { arrow_schema: &ArrowSchemaRef, type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool, ) -> Result { - let mut column_map = HashMap::new(); + // Maps field_id → leaf column indices. Vec because variant contributes two + // leaves (metadata + value) under a single field ID. + let mut column_map: HashMap> = HashMap::new(); let fields = arrow_schema.fields(); + // HashSet for O(1) membership checks instead of O(n) slice scans. + let leaf_field_id_set: HashSet = leaf_field_ids.iter().copied().collect(); + + // Variant fields are an Iceberg leaf type but a Parquet GROUP. Their + // sub-fields (metadata, value, and any shredded children) carry no + // embedded field IDs — only the parent group has the field ID. + // `filter_leaves` therefore never finds them via the standard field-ID + // scan below. + // + // Java's PruneColumns.variant() returns the variant group unchanged, so + // every leaf beneath it is projected as part of the variant column. We + // replicate that here with a recursive DFS over the Arrow schema whose + // leaf-numbering matches `Fields::filter_leaves`. For each Arrow leaf + // index that sits inside any variant (top-level OR nested inside a + // struct/list/map), we record the enclosing variant's field id. + let variant_leaves: HashMap = { + let mut out = HashMap::new(); + let mut leaf_idx: usize = 0; + Self::collect_variant_leaves( + fields, + &mut leaf_idx, + None, + iceberg_schema_of_task, + &leaf_field_id_set, + &mut out, + ); + out + }; // Pre-project only the fields that have been selected, possibly avoiding converting // some Arrow types that are not yet supported. @@ -161,7 +276,7 @@ impl ArrowReader { .and_then(|field_id| i32::from_str(field_id).ok()) .is_some_and(|field_id| { projected_fields.insert((*f).clone(), field_id); - leaf_field_ids.contains(&field_id) + leaf_field_id_set.contains(&field_id) }) }), arrow_schema.metadata().clone(), @@ -169,6 +284,14 @@ impl ArrowReader { let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; fields.filter_leaves(|idx, field| { + // Variant sub-fields: parent group carries the field ID, not the leaf. + // Skip type-promotion check — Type::Variant is not a primitive type + // (matches Java's PruneColumns.variant() which returns the group unchanged). + if let Some(&variant_field_id) = variant_leaves.get(&idx) { + column_map.entry(variant_field_id).or_default().push(idx); + return true; + } + let Some(field_id) = projected_fields.get(field).cloned() else { return false; }; @@ -190,7 +313,7 @@ impl ArrowReader { return false; } - column_map.insert(field_id, idx); + column_map.entry(field_id).or_default().push(idx); true }); @@ -198,8 +321,8 @@ impl ArrowReader { // We only project existing columns; RecordBatchTransformer adds default/NULL values. let mut indices = vec![]; for field_id in leaf_field_ids { - if let Some(col_idx) = column_map.get(field_id) { - indices.push(*col_idx); + if let Some(col_indices) = column_map.get(field_id) { + indices.extend_from_slice(col_indices); } } @@ -436,7 +559,9 @@ mod tests { use crate::expr::{Bind, Reference}; use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskStream}; - use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; + use crate::spec::{ + DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type, VariantType, + }; #[test] fn test_arrow_projection_mask() { @@ -530,6 +655,219 @@ message schema { assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0])); } + /// Variant fields are an Iceberg leaf type but a Parquet GROUP whose sub-fields carry + /// no embedded field IDs. The projection mask must include both metadata and value + /// leaves when the variant field ID is requested, and must not drop the variant when + /// it is projected alongside ordinary primitive columns. + #[test] + fn test_arrow_projection_mask_variant() { + // Iceberg schema: c1 (String, id=1) + v (Variant, id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(), + ); + + // Arrow schema: c1 with field ID 1; v as Struct(metadata: Binary, value: Binary) + // with field ID 2 on the struct but NO field IDs on the sub-fields — that is the + // Parquet variant wire format. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new( + "v", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, false)), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Parquet message: c1 is leaf 0; variant sub-fields metadata=leaf 1, value=leaf 2. + // No field IDs on sub-leaves — matching the real Iceberg/Spark-written variant format. + let message_type = " +message schema { + required binary c1 (STRING) = 1; + required group v = 2 { + required binary metadata; + required binary value; + } +} +"; + let parquet_type = parse_message_type(message_type).expect("should parse schema"); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type)); + + // Both fields: all three leaves must be included. + let mask = ArrowReader::get_arrow_projection_mask( + &[1, 2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for c1 + v"); + assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0, 1, 2])); + + // Variant only: leaves 1 (metadata) and 2 (value) must be included. + let mask_variant_only = ArrowReader::get_arrow_projection_mask( + &[2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for v only"); + assert_eq!( + mask_variant_only, + ProjectionMask::leaves(&parquet_schema, vec![1, 2]), + ); + + // Primitive only: leaf 0 (c1) must be included; variant NOT included. + let mask_primitive_only = ArrowReader::get_arrow_projection_mask( + &[1], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for c1 only"); + assert_eq!( + mask_primitive_only, + ProjectionMask::leaves(&parquet_schema, vec![0]), + ); + } + + /// variant nested inside a struct must also have its sub-leaves + /// included in the projection mask. + #[test] + fn test_arrow_projection_mask_variant_nested_in_struct() { + // Iceberg schema: parent struct (id=1) containing c2 String (id=2) and + // v Variant (id=3). + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "parent", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required(2, "c2", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(3, "v", Type::Variant(VariantType)).into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Arrow: parent struct (id=1) → [c2 (id=2), v struct(metadata,value) (id=3)]. + // Variant sub-fields intentionally without field IDs. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "parent", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new( + Field::new("c2", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ), + Arc::new( + Field::new( + "v", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, false)), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Parquet: parent.c2 = leaf 0; parent.v.metadata = leaf 1; parent.v.value = leaf 2. + let message_type = " +message schema { + required group parent = 1 { + required binary c2 (STRING) = 2; + required group v = 3 { + required binary metadata; + required binary value; + } + } +} +"; + let parquet_type = parse_message_type(message_type).expect("should parse schema"); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type)); + + // Projecting the nested variant must include both of its leaves. + let mask_variant = ArrowReader::get_arrow_projection_mask( + &[3], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for nested variant"); + assert_eq!( + mask_variant, + ProjectionMask::leaves(&parquet_schema, vec![1, 2]), + "variant nested in a struct was dropped from projection" + ); + + // Projecting the sibling primitive must not include variant leaves. + let mask_primitive = ArrowReader::get_arrow_projection_mask( + &[2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for nested primitive"); + assert_eq!( + mask_primitive, + ProjectionMask::leaves(&parquet_schema, vec![0]), + ); + + // Projecting both must include all three leaves. + let mask_both = ArrowReader::get_arrow_projection_mask( + &[2, 3], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for nested primitive + variant"); + assert_eq!( + mask_both, + ProjectionMask::leaves(&parquet_schema, vec![0, 1, 2]), + ); + } + /// Test schema evolution: reading old Parquet file (with only column 'a') /// using a newer table schema (with columns 'a' and 'b'). /// This tests that: diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 9b504421ae..2910deecf3 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -35,7 +35,7 @@ use crate::error::Result; use crate::spec::decimal_utils::i128_from_be_bytes; use crate::spec::{ Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, - PrimitiveType, Schema, SchemaVisitor, StructType, Type, + PrimitiveType, Schema, SchemaVisitor, StructType, Type, VariantType, }; use crate::{Error, ErrorKind}; @@ -692,6 +692,16 @@ impl SchemaVisitor for ToArrowSchemaConverter { } } } + + fn variant(&mut self, _v: &VariantType) -> crate::Result { + // Variant is stored as a struct with two required binary fields (no field IDs on sub-fields). + // Uses Binary (not LargeBinary) matching the Parquet BINARY primitive directly. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct( + vec![metadata_field, value_field].into(), + ))) + } } /// Convert iceberg schema to an arrow schema. @@ -1705,6 +1715,15 @@ mod tests { simple_field("map", map, false, "16"), simple_field("struct", r#struct, false, "17"), simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"), + simple_field( + "v", + DataType::Struct(Fields::from(vec![ + Field::new("metadata", DataType::Binary, false), + Field::new("value", DataType::Binary, false), + ])), + true, + "31", + ), ]) } @@ -1888,6 +1907,12 @@ mod tests { "name":"uuid", "required":true, "type":"uuid" + }, + { + "id":31, + "name":"v", + "required":false, + "type":"variant" } ], "identifier-field-ids":[] diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..c76233820f 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -30,7 +30,7 @@ use uuid::Uuid; use super::get_field_id_from_metadata; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType, - SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, + SchemaWithPartnerVisitor, Struct, StructType, Type, VariantType, visit_struct_with_partner, visit_type_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -426,6 +426,13 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } } } + + fn variant(&mut self, _v: &VariantType, _partner: &ArrayRef) -> Result>> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Converting variant Arrow array to Iceberg literal is not supported yet", + )) + } } /// Defines how Arrow fields are matched with Iceberg fields when converting data. diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index fdbc680977..fbb214fd7c 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -28,7 +28,7 @@ use serde_json::{Number, Value}; use crate::spec::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, - StructType, Type, visit_schema, + StructType, Type, VariantType, visit_schema, }; use crate::{Error, ErrorKind, Result, ensure_data_valid}; @@ -37,11 +37,15 @@ const FIELD_ID_PROP: &str = "field-id"; const KEY_ID: &str = "key-id"; const VALUE_ID: &str = "value-id"; const MAP_LOGICAL_TYPE: &str = "map"; +const VARIANT_LOGICAL_TYPE: &str = "variant"; // This const may better to maintain in avro-rs. const LOGICAL_TYPE: &str = "logicalType"; struct SchemaToAvroSchema { schema: String, + // Stack of enclosing field ids, used to derive unique record names for + // structural types (e.g. variant) — mirrors Java TypeToSchema.fieldIds. + field_ids: Vec, } type AvroSchemaOrField = Either; @@ -49,6 +53,39 @@ type AvroSchemaOrField = Either; impl SchemaVisitor for SchemaToAvroSchema { type T = AvroSchemaOrField; + fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) -> Result { let mut avro_schema = value.unwrap_left(); @@ -220,6 +257,46 @@ impl SchemaVisitor for SchemaToAvroSchema { } } + fn variant(&mut self, _v: &VariantType) -> Result { + let fields = vec![ + AvroRecordField { + name: "metadata".to_string(), + schema: AvroSchema::Bytes, + order: RecordFieldOrder::Ignore, + position: 0, + doc: None, + aliases: None, + default: None, + custom_attributes: Default::default(), + }, + AvroRecordField { + name: "value".to_string(), + schema: AvroSchema::Bytes, + order: RecordFieldOrder::Ignore, + position: 1, + doc: None, + aliases: None, + default: None, + custom_attributes: Default::default(), + }, + ]; + // Avro record names must be unique within a schema. Derive the name from the + // enclosing field id. + let record_name = match self.field_ids.last() { + Some(id) => format!("r{id}"), + // falling back to "variant" when no enclosing id is set. + None => VARIANT_LOGICAL_TYPE.to_string(), + }; + let mut schema = avro_record_schema(&record_name, fields)?; + if let AvroSchema::Record(record) = &mut schema { + record.attributes.insert( + LOGICAL_TYPE.to_string(), + Value::String(VARIANT_LOGICAL_TYPE.to_string()), + ); + } + Ok(Either::Left(schema)) + } + fn primitive(&mut self, p: &PrimitiveType) -> Result { let avro_schema = match p { PrimitiveType::Boolean => AvroSchema::Boolean, @@ -249,6 +326,7 @@ impl SchemaVisitor for SchemaToAvroSchema { pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { let mut converter = SchemaToAvroSchema { schema: name.to_string(), + field_ids: Vec::new(), }; visit_schema(schema, &mut converter).map(Either::unwrap_left) @@ -435,6 +513,13 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { record: &RecordSchema, field_types: Vec>, ) -> Result> { + // A variant is encoded as a record with logicalType "variant" — return it directly + // rather than trying to build a struct from its metadata/value byte fields. + if record.attributes.get(LOGICAL_TYPE).and_then(Value::as_str) == Some(VARIANT_LOGICAL_TYPE) + { + return Ok(Some(Type::Variant(VariantType))); + } + let mut fields = Vec::with_capacity(field_types.len()); for (avro_field, field_type) in record.fields.iter().zip_eq(field_types) { let field_id = @@ -614,7 +699,9 @@ mod tests { use super::*; use crate::avro::schema::AvroSchemaToSchema; - use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; + use crate::spec::{ + ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, VariantType, + }; fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema { let input = read_to_string(format!( @@ -1212,4 +1299,163 @@ mod tests { converter.primitive(&avro_schema).unwrap().unwrap() ); } + + /// Adapted from Java TestSchemaConversions.testVariantConversion + #[test] + fn test_variant_schema_conversion() { + let avro_schema = AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "variantCol1", + "type": { + "type": "record", + "name": "r1", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + }, + "field-id": 1 + }, + { + "name": "variantCol2", + "type": { + "type": "record", + "name": "r2", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + }, + "field-id": 2 + } + ] +} + "#, + ) + .unwrap(); + + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "variantCol1", Type::Variant(VariantType)).into(), + NestedField::required(2, "variantCol2", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(); + + check_schema_conversion(avro_schema, iceberg_schema); + } + + #[test] + fn test_optional_variant_schema_conversion() { + let avro_schema = AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "v", + "type": [ + "null", + { + "type": "record", + "name": "r1", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + } + ], + "default": null, + "field-id": 1 + } + ] +} + "#, + ) + .unwrap(); + + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(); + + check_schema_conversion(avro_schema, iceberg_schema); + } + + /// Regression: two variant columns nested inside a struct must produce unique + /// Avro record names (`r{field_id}`). + #[test] + fn test_multiple_variants_in_struct_have_unique_record_names() { + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::required( + 1, + "nested", + Type::Struct(StructType::new(vec![ + NestedField::required(2, "v1", Type::Variant(VariantType)).into(), + NestedField::required(3, "v2", Type::Variant(VariantType)).into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + let avro_schema = schema_to_avro_schema("test", &iceberg_schema).unwrap(); + + // Collect every variant record's name from the resulting Avro schema + // and assert they are unique and match the enclosing field ids. + let json = serde_json::to_value(&avro_schema).unwrap(); + fn walk(v: &Value, out: &mut Vec) { + match v { + Value::Object(map) => { + if map.get("logicalType").and_then(Value::as_str) == Some("variant") + && let Some(name) = map.get("name").and_then(Value::as_str) + { + out.push(name.to_string()); + } + for (_k, child) in map { + walk(child, out); + } + } + Value::Array(arr) => { + for child in arr { + walk(child, out); + } + } + _ => {} + } + } + let mut variant_record_names: Vec = Vec::new(); + walk(&json, &mut variant_record_names); + + assert_eq!( + variant_record_names.len(), + 2, + "expected two variant records, got {variant_record_names:?}" + ); + assert!( + variant_record_names.contains(&"r2".to_string()), + "expected variant record 'r2' (field id 2), got {variant_record_names:?}" + ); + assert!( + variant_record_names.contains(&"r3".to_string()), + "expected variant record 'r3' (field id 3), got {variant_record_names:?}" + ); + + // Round-trips back to the same Iceberg schema. + let roundtrip = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(iceberg_schema, roundtrip); + } } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 0379465584..01e84a7f66 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -32,8 +32,8 @@ use serde_json::Value as JsonValue; use super::values::Literal; use crate::ensure_data_valid; use crate::error::Result; -use crate::spec::PrimitiveLiteral; use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; +use crate::spec::{FormatVersion, PrimitiveLiteral}; /// Field name for list type. pub const LIST_FIELD_NAME: &str = "element"; @@ -42,6 +42,11 @@ pub const MAP_KEY_FIELD_NAME: &str = "key"; /// Field name for map type's value. pub const MAP_VALUE_FIELD_NAME: &str = "value"; +/// Minimum format version required for nanosecond-precision timestamp types (v3). +pub const MIN_FORMAT_VERSION_TIMESTAMP_NS: FormatVersion = FormatVersion::V3; +/// Minimum format version required for the variant type (v3). +pub const MIN_FORMAT_VERSION_VARIANT: FormatVersion = FormatVersion::V3; + pub(crate) const MAX_DECIMAL_BYTES: u32 = 24; pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38; @@ -90,6 +95,8 @@ pub enum Type { List(ListType), /// Map type Map(MapType), + /// Variant Type + Variant(VariantType), } impl fmt::Display for Type { @@ -99,6 +106,7 @@ impl fmt::Display for Type { Type::Struct(s) => write!(f, "{s}"), Type::List(_) => write!(f, "list"), Type::Map(_) => write!(f, "map"), + Type::Variant(_) => write!(f, "variant"), } } } @@ -122,6 +130,12 @@ impl Type { matches!(self, Type::Struct(_) | Type::List(_) | Type::Map(_)) } + /// Whether the type is variant type. + #[inline(always)] + pub fn is_variant(&self) -> bool { + matches!(self, Type::Variant(_)) + } + /// Convert Type to reference of PrimitiveType pub fn as_primitive_type(&self) -> Option<&PrimitiveType> { if let Type::Primitive(primitive_type) = self { @@ -178,6 +192,17 @@ impl Type { Type::Primitive(PrimitiveType::Float) | Type::Primitive(PrimitiveType::Double) ) } + + /// Returns the minimum format version required for the type. + #[inline(always)] + pub fn min_format_version(&self) -> FormatVersion { + match self { + Type::Primitive(PrimitiveType::TimestampNs) + | Type::Primitive(PrimitiveType::TimestamptzNs) => MIN_FORMAT_VERSION_TIMESTAMP_NS, + Type::Variant(_) => MIN_FORMAT_VERSION_VARIANT, + _ => FormatVersion::V1, + } + } } impl From for Type { @@ -710,6 +735,7 @@ pub(super) mod _serde { use crate::spec::datatypes::Type::Map; use crate::spec::datatypes::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, + VariantType, }; /// List type for serialization and deserialization @@ -737,6 +763,7 @@ pub(super) mod _serde { value: Cow<'a, Type>, }, Primitive(PrimitiveType), + Variant(VariantType), } impl From> for Type { @@ -775,6 +802,7 @@ pub(super) mod _serde { Self::Struct(StructType::new(fields.into_owned())) } SerdeType::Primitive(p) => Self::Primitive(p), + SerdeType::Variant(v) => Self::Variant(v), } } } @@ -801,6 +829,7 @@ pub(super) mod _serde { fields: Cow::Borrowed(&s.fields), }, Type::Primitive(p) => SerdeType::Primitive(p.clone()), + Type::Variant(v) => SerdeType::Variant(*v), } } } @@ -828,6 +857,42 @@ impl MapType { } } +/// Variant type - can hold semi-structured data of any type. +/// This is an Iceberg V3 feature. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] +pub struct VariantType; + +impl fmt::Display for VariantType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "variant") + } +} + +impl From for Type { + fn from(_: VariantType) -> Self { + Type::Variant(VariantType) + } +} + +impl Serialize for VariantType { + fn serialize(&self, serializer: S) -> std::result::Result + where S: Serializer { + serializer.serialize_str("variant") + } +} + +impl<'de> Deserialize<'de> for VariantType { + fn deserialize(deserializer: D) -> std::result::Result + where D: Deserializer<'de> { + let s = String::deserialize(deserializer)?; + if s == "variant" { + Ok(VariantType) + } else { + Err(D::Error::custom(format!("expected 'variant', got '{s}'"))) + } + } +} + #[cfg(test)] mod tests { use pretty_assertions::assert_eq; @@ -1216,6 +1281,17 @@ mod tests { } } + #[test] + fn variant_type_serde() { + let json = r#"{"id": 1, "name": "v", "required": true, "type": "variant"}"#; + let field: NestedField = serde_json::from_str(json).unwrap(); + assert_eq!(*field.field_type, Type::Variant(VariantType)); + + let serialized = serde_json::to_string(&field).unwrap(); + let roundtrip: NestedField = serde_json::from_str(&serialized).unwrap(); + assert_eq!(field, roundtrip); + } + #[test] fn struct_type_with_type_field() { // Test that StructType properly deserializes JSON with "type":"struct" field diff --git a/crates/iceberg/src/spec/schema/id_reassigner.rs b/crates/iceberg/src/spec/schema/id_reassigner.rs index 5dbb370001..72caf30121 100644 --- a/crates/iceberg/src/spec/schema/id_reassigner.rs +++ b/crates/iceberg/src/spec/schema/id_reassigner.rs @@ -102,6 +102,7 @@ impl ReassignFieldIds { value_field: Arc::new(value_field), })) } + Type::Variant(v) => Ok(Type::Variant(v)), } } diff --git a/crates/iceberg/src/spec/schema/index.rs b/crates/iceberg/src/spec/schema/index.rs index d4e77ab2aa..f521b2d6c9 100644 --- a/crates/iceberg/src/spec/schema/index.rs +++ b/crates/iceberg/src/spec/schema/index.rs @@ -53,6 +53,10 @@ pub fn index_by_id(r#struct: &StructType) -> Result fn primitive(&mut self, _: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(()) + } } let mut index = IndexById(HashMap::new()); @@ -145,6 +149,10 @@ pub fn index_parents(r#struct: &StructType) -> Result> { fn primitive(&mut self, _p: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(()) + } } let mut index = IndexByParent { @@ -293,6 +301,10 @@ impl SchemaVisitor for IndexByName { fn primitive(&mut self, _p: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(()) + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 13ad41818b..4c2daaf3f1 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -39,6 +39,7 @@ pub use self::prune_columns::prune_columns; use super::NestedField; use crate::error::Result; use crate::expr::accessor::StructAccessor; +use crate::spec::FormatVersion; use crate::spec::datatypes::{ LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedFieldRef, PrimitiveType, StructType, Type, @@ -419,6 +420,56 @@ impl Schema { pub fn field_id_to_fields(&self) -> &HashMap { &self.id_to_field } + + /// Returns the minimum [`FormatVersion`] required to represent all types in this schema. + /// + /// Iterates over every field and returns the highest minimum version among them, + /// defaulting to `FormatVersion::V1` if all types are universally supported. + pub fn min_format_version(&self) -> FormatVersion { + self.id_to_field + .values() + .map(|f| f.field_type.min_format_version()) + .max() + .unwrap_or(FormatVersion::V1) + } + + /// Check that all types in this schema are supported by the given format version. + /// + /// Mirrors Java's `Schema.checkCompatibility()`. Returns an error listing every + /// incompatible field if any are found. + /// + /// Types with a minimum format version: + /// - `TimestampNs` / `TimestamptzNs` → v3+ + /// - `Variant` → v3+ + pub fn check_format_compatibility(&self, format_version: FormatVersion) -> Result<()> { + let mut problems: Vec = Vec::new(); + + for field in self.id_to_field.values() { + let min_version = field.field_type.min_format_version(); + + if format_version < min_version { + let name = self + .name_by_field_id(field.id) + .unwrap_or(field.name.as_str()); + problems.push(format!( + "Invalid type for {name}: {} is not supported until {min_version} but format version is {format_version}.", + field.field_type, + )); + } + } + + if problems.is_empty() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid schema for v{format_version}:\n- {}", + problems.join("\n- ") + ), + )) + } + } } impl Display for Schema { diff --git a/crates/iceberg/src/spec/schema/prune_columns.rs b/crates/iceberg/src/spec/schema/prune_columns.rs index 14f1bfd25f..06b16df35d 100644 --- a/crates/iceberg/src/spec/schema/prune_columns.rs +++ b/crates/iceberg/src/spec/schema/prune_columns.rs @@ -238,6 +238,10 @@ impl SchemaVisitor for PruneColumn { fn primitive(&mut self, _p: &PrimitiveType) -> Result> { Ok(None) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(None) + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index 50f7c04caa..3287ad7ee8 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use crate::spec::VariantType; /// A post order schema visitor. /// @@ -69,6 +70,9 @@ pub trait SchemaVisitor { fn map(&mut self, map: &MapType, key_value: Self::T, value: Self::T) -> Result; /// Called when see a primitive type. fn primitive(&mut self, p: &PrimitiveType) -> Result; + + /// Called when see a variant type. + fn variant(&mut self, _v: &VariantType) -> Result; } /// Visiting a type in post order. @@ -99,6 +103,7 @@ pub(crate) fn visit_type(r#type: &Type, visitor: &mut V) -> Re visitor.map(map, key_result, value_result) } Type::Struct(s) => visit_struct(s, visitor), + Type::Variant(v) => visitor.variant(v), } } @@ -185,6 +190,8 @@ pub trait SchemaWithPartnerVisitor

{ ) -> Result; /// Called when see a primitive type. fn primitive(&mut self, p: &PrimitiveType, partner: &P) -> Result; + /// Called when see a variant type. + fn variant(&mut self, _v: &VariantType, _partner: &P) -> Result; } /// Accessor used to get child partner from parent partner. @@ -242,6 +249,7 @@ pub(crate) fn visit_type_with_partner, A: Part visitor.map(map, partner, key_result, value_result) } Type::Struct(s) => visit_struct_with_partner(s, partner, visitor, accessor), + Type::Variant(v) => visitor.variant(v, partner), } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 607fd98350..a21cd36fd3 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -535,6 +535,7 @@ impl TableMetadata { // Normalize location (remove trailing slash) self.location = self.location.trim_end_matches('/').to_string(); self.validate_snapshot_sequence_number()?; + self.validate_schema_format_compatibility()?; self.try_normalize_partition_spec()?; self.try_normalize_sort_order()?; Ok(self) @@ -749,6 +750,15 @@ impl TableMetadata { Ok(()) } + + /// Validates that every type used across all schemas is supported by the + /// table's format version. Delegates to [`Schema::check_format_compatibility`]. + fn validate_schema_format_compatibility(&self) -> Result<()> { + for schema in self.schemas.values() { + schema.check_format_compatibility(self.format_version)?; + } + Ok(()) + } } pub(super) mod _serde { diff --git a/crates/iceberg/src/spec/values/literal.rs b/crates/iceberg/src/spec/values/literal.rs index e82fa197cd..1068c6b785 100644 --- a/crates/iceberg/src/spec/values/literal.rs +++ b/crates/iceberg/src/spec/values/literal.rs @@ -595,6 +595,10 @@ impl Literal { )) } } + Type::Variant(_) => Err(Error::new( + ErrorKind::DataInvalid, + "Variant type is not supported for single-value JSON serialization", + )), } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..5cd9ebd138 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -40,7 +40,7 @@ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + StructType, TableMetadata, Type, VariantType, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -113,6 +113,22 @@ impl IndexByParquetPathName { pub fn get(&self, name: &str) -> Option<&i32> { self.name_to_id.get(name) } + + fn insert_current_path(&mut self) -> Result<()> { + let full_name = self.field_names.iter().map(String::as_str).join("."); + let field_id = self.field_id; + if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}" + ), + )); + } else { + self.name_to_id.insert(full_name, field_id); + } + Ok(()) + } } impl Default for IndexByParquetPathName { @@ -191,20 +207,11 @@ impl SchemaVisitor for IndexByParquetPathName { } fn primitive(&mut self, _p: &PrimitiveType) -> Result { - let full_name = self.field_names.iter().map(String::as_str).join("."); - let field_id = self.field_id; - if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}" - ), - )); - } else { - self.name_to_id.insert(full_name, field_id); - } + self.insert_current_path() + } - Ok(()) + fn variant(&mut self, _v: &VariantType) -> Result { + self.insert_current_path() } } diff --git a/crates/integration_tests/tests/read_variant.rs b/crates/integration_tests/tests/read_variant.rs new file mode 100644 index 0000000000..9d8801fb11 --- /dev/null +++ b/crates/integration_tests/tests/read_variant.rs @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for variant type support. +//! +//! These tests require a running Docker environment seeded by `dev/spark/provision.py`. +//! The Spark 4.0 provisioner creates `rest.default.test_variant_column` with a +//! `VARIANT` column containing three rows of JSON data. + +use std::sync::Arc; + +use arrow_array::StructArray; +use arrow_array::cast::AsArray; +use arrow_schema::DataType; +use futures::TryStreamExt; +use iceberg::spec::Type; +use iceberg::{Catalog, CatalogBuilder, TableIdent}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogBuilder}; +use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; + +/// Build a `RestCatalog` against the docker-compose test fixture, wired with +/// the S3 storage factory the seeded tables expect. +async fn rest_catalog() -> RestCatalog { + let fixture = get_test_fixture(); + RestCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load: None, + })) + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap() +} + +/// Asserts that `dt` is `Struct(metadata: Binary, value: Binary)` — the Parquet +/// physical layout of a variant column. +fn assert_variant_struct(dt: &DataType) { + let DataType::Struct(fields) = dt else { + panic!("expected variant to be DataType::Struct, got {dt:?}"); + }; + assert_eq!( + fields.len(), + 2, + "variant struct must have exactly 2 sub-fields" + ); + assert!( + fields + .iter() + .any(|f| f.name() == "metadata" && f.data_type() == &DataType::Binary), + "variant struct missing 'metadata: Binary' sub-field: {fields:?}" + ); + assert!( + fields + .iter() + .any(|f| f.name() == "value" && f.data_type() == &DataType::Binary), + "variant struct missing 'value: Binary' sub-field: {fields:?}" + ); +} + +/// Verifies that a table written by Spark with a VARIANT column has its schema +/// parsed into `Type::Variant` by the Rust iceberg implementation. +#[tokio::test] +async fn test_variant_schema_is_parsed() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let schema = table.metadata().current_schema(); + let variant_field = schema + .field_by_name("v") + .expect("field 'v' not found in schema"); + + assert!( + matches!(variant_field.field_type.as_ref(), Type::Variant(_)), + "Expected Type::Variant for field 'v', got {:?}", + variant_field.field_type, + ); +} + +/// Verifies that scanning a table with a VARIANT column produces an Arrow batch +/// where the variant column is represented as `Struct(metadata: Binary, value: Binary)`, +/// matching the Parquet physical layout (§3.3 of the Parquet Variant spec). +#[tokio::test] +async fn test_variant_arrow_schema() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let scan = table.scan().build().unwrap(); + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + + // Variant column must be a struct with exactly two binary sub-fields + let v_col = batches[0] + .column_by_name("v") + .expect("column 'v' not found in batch"); + + let DataType::Struct(fields) = v_col.data_type() else { + panic!( + "Expected variant column to be DataType::Struct, got {:?}", + v_col.data_type() + ); + }; + + assert_eq!( + fields.len(), + 2, + "variant struct must have exactly 2 sub-fields" + ); + + let metadata_field = fields + .iter() + .find(|f| f.name() == "metadata") + .expect("sub-field 'metadata' not found"); + let value_field = fields + .iter() + .find(|f| f.name() == "value") + .expect("sub-field 'value' not found"); + + assert_eq!( + metadata_field.data_type(), + &DataType::Binary, + "'metadata' sub-field must be DataType::Binary" + ); + assert_eq!( + value_field.data_type(), + &DataType::Binary, + "'value' sub-field must be DataType::Binary" + ); +} + +/// Verifies that a variant column is NOT silently dropped when it is projected +/// alongside ordinary primitive columns (regression test for the projection bug +/// where variant sub-fields had no embedded Parquet field IDs and were therefore +/// excluded from `column_map`, causing the whole variant group to be omitted). +#[tokio::test] +async fn test_variant_projected_with_primitive_columns() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + // Explicitly select only the two columns — this exercises the projection path + // that was previously broken for variant types. + let scan = table.scan().select(["id", "v"]).build().unwrap(); + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + + let first_batch = &batches[0]; + + // Both columns must be present — the variant must not be silently dropped. + assert!( + first_batch.column_by_name("id").is_some(), + "column 'id' not found in projected batch" + ); + let v_col = first_batch + .column_by_name("v") + .expect("column 'v' was silently dropped from projected scan — projection bug regression"); + + // The variant column must still be a struct with the expected sub-fields. + let DataType::Struct(fields) = v_col.data_type() else { + panic!( + "Expected variant column to be DataType::Struct after projection, got {:?}", + v_col.data_type() + ); + }; + assert_eq!( + fields.len(), + 2, + "projected variant struct must have exactly 2 sub-fields" + ); + assert!( + fields.iter().any(|f| f.name() == "metadata"), + "projected variant struct must have a 'metadata' sub-field" + ); + assert!( + fields.iter().any(|f| f.name() == "value"), + "projected variant struct must have a 'value' sub-field" + ); + + // All three seeded rows must be readable. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3, "expected exactly 3 rows"); +} + +/// Verifies that projecting ONLY a variant column (without any sibling +/// primitive) still includes both of its sub-leaves in the projection mask. +/// This exercises a different branch of the mask builder than +/// [`test_variant_projected_with_primitive_columns`], where `column_map` +/// contains only variant-derived entries. +#[tokio::test] +async fn test_variant_projected_alone() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let scan = table.scan().select(["v"]).build().unwrap(); + let batches: Vec<_> = scan.to_arrow().await.unwrap().try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + let v_col = batches[0] + .column_by_name("v") + .expect("variant-only projection dropped column 'v'"); + assert_variant_struct(v_col.data_type()); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Verifies that a table with multiple top-level variant columns is readable +/// end-to-end — guards against the Avro record-name collision +#[tokio::test] +async fn test_multiple_variant_columns() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_multi"]).unwrap()) + .await + .unwrap(); + + // Both variant fields must parse as Type::Variant in the Iceberg schema. + let schema = table.metadata().current_schema(); + for name in ["v1", "v2"] { + let f = schema + .field_by_name(name) + .unwrap_or_else(|| panic!("field '{name}' missing from schema")); + assert!( + matches!(f.field_type.as_ref(), Type::Variant(_)), + "field '{name}' is not Type::Variant: {:?}", + f.field_type + ); + } + + // Full scan returns both variant columns with the correct physical shape. + let batches: Vec<_> = table + .scan() + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + assert_variant_struct( + batches[0] + .column_by_name("v1") + .expect("missing v1") + .data_type(), + ); + assert_variant_struct( + batches[0] + .column_by_name("v2") + .expect("missing v2") + .data_type(), + ); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Full scan of a table where a variant lives inside a struct. Confirms that +/// the nested `payload: VARIANT` sub-field is materialized as +/// `Struct(metadata, value)` in the output. +#[tokio::test] +async fn test_nested_variant_full_scan() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_nested"]).unwrap()) + .await + .unwrap(); + + let batches: Vec<_> = table + .scan() + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + + let nested = batches[0] + .column_by_name("nested") + .expect("column 'nested' missing"); + let nested_struct: &StructArray = nested.as_struct(); + let payload = nested_struct + .column_by_name("payload") + .expect("inner 'payload' field missing from nested struct"); + assert_variant_struct(payload.data_type()); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Regression: projecting a struct column that contains a variant must keep +/// the inner variant's sub-leaves. +#[tokio::test] +async fn test_nested_variant_projected() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_nested"]).unwrap()) + .await + .unwrap(); + + let batches: Vec<_> = table + .scan() + .select(["nested"]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + let nested = batches[0] + .column_by_name("nested") + .expect("projected scan dropped column 'nested'"); + let nested_struct: &StructArray = nested.as_struct(); + let payload = nested_struct.column_by_name("payload").expect( + "nested.payload (variant) was dropped from the projected scan — \ + nested-variant projection bug regression", + ); + assert_variant_struct(payload.data_type()); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Projecting only a sibling primitive must NOT pull in the variant's leaves +/// from a neighbouring struct field. Guards against over-eager variant +/// inclusion in the projection mask. +#[tokio::test] +async fn test_nested_variant_sibling_projection() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_nested"]).unwrap()) + .await + .unwrap(); + + let batches: Vec<_> = table + .scan() + .select(["id"]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + let first = &batches[0]; + assert!(first.column_by_name("id").is_some(), "column 'id' missing"); + assert!( + first.column_by_name("nested").is_none(), + "projecting only 'id' must not pull in sibling 'nested' (variant leaked)" + ); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..af8932d734 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::FormatVersion; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -163,10 +164,14 @@ impl SchemaProvider for IcebergSchemaProvider { let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; + // Use at least V2, and upgrade to V3 if the schema requires it (e.g. timestamp_ns / variant). + let format_version = iceberg_schema.min_format_version().max(FormatVersion::V2); + // Create the table in the Iceberg catalog let table_creation = TableCreation::builder() .name(name.clone()) .schema(iceberg_schema) + .format_version(format_version) .build(); let catalog = self.catalog.clone(); diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 40f9ba0f38..cf1531b35f 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -129,6 +129,64 @@ spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN baz TYPE decimal(6, 2)") spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (25, 22.25, 22.25)") +# Create a table with a variant column +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_column ( + id INT, + v VARIANT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_column +VALUES + (1, PARSE_JSON('{"a": 1, "b": "hello"}')), + (2, PARSE_JSON('[1, 2, 3]')), + (3, PARSE_JSON('42')) +""") + +# Table with TWO top-level variant columns — exercises the Avro +# record-name-collision path and multi-variant projection. +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_multi ( + id INT, + v1 VARIANT, + v2 VARIANT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_multi +VALUES + (1, PARSE_JSON('{"a": 1}'), PARSE_JSON('"x"')), + (2, PARSE_JSON('[1, 2, 3]'), PARSE_JSON('true')), + (3, PARSE_JSON('42'), PARSE_JSON('null')) +""") + +# Table with a VARIANT nested inside a struct — exercises projection of a +# variant that is not a top-level Arrow field (regression for the bug where +# nested variant sub-leaves were silently dropped from the projection mask). +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_nested ( + id INT, + nested STRUCT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_nested +VALUES + (1, named_struct('name', 'a', 'payload', PARSE_JSON('{"k": 1}'))), + (2, named_struct('name', 'b', 'payload', PARSE_JSON('[1, 2]'))), + (3, named_struct('name', 'c', 'payload', PARSE_JSON('42'))) +""") + # Create a table with various types spark.sql(""" CREATE OR REPLACE TABLE rest.default.types_test USING ICEBERG AS