Skip to content

Commit 4e6566f

Browse files
viiryaclaude
andcommitted
feat(datafusion): Add schema validation for partition projection
Implement schema validation in project_with_partition to ensure the input schema matches the Iceberg table schema before calculating partition values. This prevents subtle bugs from schema mismatches and provides clear error messages when schemas don't match. Changes: - Add helper functions to recursively strip metadata from Arrow schemas - Implement schema validation that compares input schema with expected Iceberg table schema, ignoring metadata differences - Add comprehensive tests for metadata stripping and schema validation - Closes #1752 The implementation follows the approach suggested in issue #1752: - Recursively visits schema and removes metadata from all fields - Compares cleaned schemas using Arrow's built-in equality operator - Returns helpful error messages showing both schemas on mismatch Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 76cdf28 commit 4e6566f

1 file changed

Lines changed: 348 additions & 5 deletions

File tree

  • crates/integrations/datafusion/src/physical_plan

crates/integrations/datafusion/src/physical_plan/project.rs

Lines changed: 348 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,80 @@
2020
use std::sync::Arc;
2121

2222
use datafusion::arrow::array::RecordBatch;
23-
use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
24-
use datafusion::common::Result as DFResult;
23+
use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema as ArrowSchema};
24+
use datafusion::common::{DataFusionError, Result as DFResult};
2525
use datafusion::physical_expr::PhysicalExpr;
2626
use datafusion::physical_expr::expressions::Column;
2727
use datafusion::physical_plan::projection::ProjectionExec;
2828
use datafusion::physical_plan::{ColumnarValue, ExecutionPlan};
29-
use iceberg::arrow::{PROJECTED_PARTITION_VALUE_COLUMN, PartitionValueCalculator};
29+
use iceberg::arrow::{PROJECTED_PARTITION_VALUE_COLUMN, PartitionValueCalculator, schema_to_arrow_schema};
3030
use iceberg::spec::PartitionSpec;
3131
use iceberg::table::Table;
3232

3333
use crate::to_datafusion_error;
3434

35+
/// Recursively strips metadata from an Arrow schema and all its nested fields.
36+
///
37+
/// This function creates a new schema with all metadata removed from fields at every level,
38+
/// including nested struct fields. This is useful for schema comparison where metadata
39+
/// differences should be ignored.
40+
///
41+
/// # Arguments
42+
/// * `schema` - The Arrow schema to strip metadata from
43+
///
44+
/// # Returns
45+
/// A new Arrow schema with all metadata removed
46+
fn strip_metadata_from_schema(schema: &ArrowSchema) -> ArrowSchema {
47+
let fields: Fields = schema
48+
.fields()
49+
.iter()
50+
.map(|field| strip_metadata_from_field(field))
51+
.collect();
52+
ArrowSchema::new(fields)
53+
}
54+
55+
/// Recursively strips metadata from an Arrow field and its nested fields.
56+
///
57+
/// # Arguments
58+
/// * `field` - The Arrow field to strip metadata from
59+
///
60+
/// # Returns
61+
/// A new Arrow field with all metadata removed
62+
fn strip_metadata_from_field(field: &Field) -> Field {
63+
let data_type = strip_metadata_from_datatype(field.data_type());
64+
Field::new(field.name(), data_type, field.is_nullable())
65+
}
66+
67+
/// Recursively strips metadata from an Arrow data type.
68+
///
69+
/// For struct types, this function recursively processes all nested fields.
70+
/// For other types, it returns a clone of the type.
71+
///
72+
/// # Arguments
73+
/// * `data_type` - The Arrow data type to strip metadata from
74+
///
75+
/// # Returns
76+
/// A new Arrow data type with all metadata removed from nested structures
77+
fn strip_metadata_from_datatype(data_type: &DataType) -> DataType {
78+
match data_type {
79+
DataType::Struct(fields) => {
80+
let stripped_fields: Fields = fields
81+
.iter()
82+
.map(|field| strip_metadata_from_field(field))
83+
.collect();
84+
DataType::Struct(stripped_fields)
85+
}
86+
DataType::List(field) => DataType::List(Arc::new(strip_metadata_from_field(field))),
87+
DataType::LargeList(field) => {
88+
DataType::LargeList(Arc::new(strip_metadata_from_field(field)))
89+
}
90+
DataType::Map(field, sorted) => {
91+
DataType::Map(Arc::new(strip_metadata_from_field(field)), *sorted)
92+
}
93+
_ => data_type.clone(),
94+
}
95+
}
96+
3597
/// Extends an ExecutionPlan with partition value calculations for Iceberg tables.
3698
///
3799
/// This function takes an input ExecutionPlan and extends it with an additional column
@@ -58,8 +120,23 @@ pub fn project_with_partition(
58120
}
59121

60122
let input_schema = input.schema();
61-
// TODO: Validate that input_schema matches the Iceberg table schema.
62-
// See: https://github.com/apache/iceberg-rust/issues/1752
123+
124+
// Validate that input_schema matches the Iceberg table schema
125+
// Strip metadata from both schemas before comparison to ignore metadata differences
126+
let expected_arrow_schema =
127+
schema_to_arrow_schema(table_schema.as_ref()).map_err(to_datafusion_error)?;
128+
let input_schema_cleaned = strip_metadata_from_schema(&input_schema);
129+
let expected_schema_cleaned = strip_metadata_from_schema(&expected_arrow_schema);
130+
131+
if input_schema_cleaned != expected_schema_cleaned {
132+
return Err(DataFusionError::Plan(format!(
133+
"Input schema does not match Iceberg table schema.\n\
134+
Expected schema: {}\n\
135+
Input schema: {}",
136+
expected_schema_cleaned, input_schema_cleaned
137+
)));
138+
}
139+
63140
let calculator =
64141
PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref())
65142
.map_err(to_datafusion_error)?;
@@ -377,4 +454,270 @@ mod tests {
377454
assert_eq!(city_partition.value(0), "New York");
378455
assert_eq!(city_partition.value(1), "Los Angeles");
379456
}
457+
458+
#[test]
459+
fn test_strip_metadata_from_simple_schema() {
460+
use std::collections::HashMap;
461+
462+
let mut metadata = HashMap::new();
463+
metadata.insert("key1".to_string(), "value1".to_string());
464+
465+
let field_with_metadata = Field::new("id", DataType::Int32, false).with_metadata(metadata);
466+
let schema = ArrowSchema::new(vec![field_with_metadata]);
467+
468+
let stripped = strip_metadata_from_schema(&schema);
469+
470+
assert_eq!(stripped.fields().len(), 1);
471+
assert_eq!(stripped.field(0).name(), "id");
472+
assert_eq!(*stripped.field(0).data_type(), DataType::Int32);
473+
assert!(stripped.field(0).metadata().is_empty());
474+
}
475+
476+
#[test]
477+
fn test_strip_metadata_from_nested_schema() {
478+
use std::collections::HashMap;
479+
480+
let mut metadata = HashMap::new();
481+
metadata.insert("key1".to_string(), "value1".to_string());
482+
483+
let nested_field_with_metadata =
484+
Field::new("city", DataType::Utf8, false).with_metadata(metadata.clone());
485+
let struct_fields = Fields::from(vec![
486+
Field::new("street", DataType::Utf8, false),
487+
nested_field_with_metadata,
488+
]);
489+
let struct_field =
490+
Field::new("address", DataType::Struct(struct_fields), false).with_metadata(metadata);
491+
492+
let schema = ArrowSchema::new(vec![
493+
Field::new("id", DataType::Int32, false),
494+
struct_field,
495+
]);
496+
497+
let stripped = strip_metadata_from_schema(&schema);
498+
499+
assert_eq!(stripped.fields().len(), 2);
500+
assert!(stripped.field(0).metadata().is_empty());
501+
assert!(stripped.field(1).metadata().is_empty());
502+
503+
if let DataType::Struct(fields) = stripped.field(1).data_type() {
504+
assert_eq!(fields.len(), 2);
505+
assert!(fields[0].metadata().is_empty());
506+
assert!(fields[1].metadata().is_empty());
507+
} else {
508+
panic!("Expected Struct data type");
509+
}
510+
}
511+
512+
#[test]
513+
fn test_strip_metadata_from_list_schema() {
514+
use std::collections::HashMap;
515+
516+
let mut metadata = HashMap::new();
517+
metadata.insert("key1".to_string(), "value1".to_string());
518+
519+
let list_field = Field::new("item", DataType::Int32, false).with_metadata(metadata.clone());
520+
let list_type = DataType::List(Arc::new(list_field));
521+
let field = Field::new("numbers", list_type, false).with_metadata(metadata);
522+
523+
let schema = ArrowSchema::new(vec![field]);
524+
let stripped = strip_metadata_from_schema(&schema);
525+
526+
assert_eq!(stripped.fields().len(), 1);
527+
assert!(stripped.field(0).metadata().is_empty());
528+
529+
if let DataType::List(inner_field) = stripped.field(0).data_type() {
530+
assert!(inner_field.metadata().is_empty());
531+
} else {
532+
panic!("Expected List data type");
533+
}
534+
}
535+
536+
#[test]
537+
fn test_schema_validation_matching_schemas() {
538+
use iceberg::TableIdent;
539+
use iceberg::io::FileIO;
540+
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
541+
542+
let table_schema = Arc::new(
543+
Schema::builder()
544+
.with_fields(vec![
545+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
546+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String))
547+
.into(),
548+
])
549+
.build()
550+
.unwrap(),
551+
);
552+
553+
let partition_spec = iceberg::spec::PartitionSpec::builder(table_schema.clone())
554+
.add_partition_field("id", "id_partition", Transform::Identity)
555+
.unwrap()
556+
.build()
557+
.unwrap();
558+
559+
let sort_order = iceberg::spec::SortOrder::builder()
560+
.build(&table_schema)
561+
.unwrap();
562+
563+
let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
564+
(*table_schema).clone(),
565+
partition_spec,
566+
sort_order,
567+
"/test/table".to_string(),
568+
FormatVersion::V2,
569+
std::collections::HashMap::new(),
570+
)
571+
.unwrap();
572+
573+
let table_metadata = table_metadata_builder.build().unwrap();
574+
575+
// Create Arrow schema matching the table schema
576+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
577+
Field::new("id", DataType::Int32, false),
578+
Field::new("name", DataType::Utf8, false),
579+
]));
580+
581+
let input = Arc::new(EmptyExec::new(arrow_schema));
582+
583+
let table = iceberg::table::Table::builder()
584+
.metadata(table_metadata.metadata)
585+
.identifier(TableIdent::from_strs(["test", "table"]).unwrap())
586+
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
587+
.metadata_location("/test/metadata.json".to_string())
588+
.build()
589+
.unwrap();
590+
591+
let result = project_with_partition(input, &table);
592+
assert!(result.is_ok(), "Schema validation should pass");
593+
}
594+
595+
#[test]
596+
fn test_schema_validation_mismatched_schemas() {
597+
use iceberg::TableIdent;
598+
use iceberg::io::FileIO;
599+
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
600+
601+
let table_schema = Arc::new(
602+
Schema::builder()
603+
.with_fields(vec![
604+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
605+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String))
606+
.into(),
607+
])
608+
.build()
609+
.unwrap(),
610+
);
611+
612+
let partition_spec = iceberg::spec::PartitionSpec::builder(table_schema.clone())
613+
.add_partition_field("id", "id_partition", Transform::Identity)
614+
.unwrap()
615+
.build()
616+
.unwrap();
617+
618+
let sort_order = iceberg::spec::SortOrder::builder()
619+
.build(&table_schema)
620+
.unwrap();
621+
622+
let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
623+
(*table_schema).clone(),
624+
partition_spec,
625+
sort_order,
626+
"/test/table".to_string(),
627+
FormatVersion::V2,
628+
std::collections::HashMap::new(),
629+
)
630+
.unwrap();
631+
632+
let table_metadata = table_metadata_builder.build().unwrap();
633+
634+
// Create Arrow schema with different field name (mismatched)
635+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
636+
Field::new("id", DataType::Int32, false),
637+
Field::new("different_name", DataType::Utf8, false), // Wrong field name
638+
]));
639+
640+
let input = Arc::new(EmptyExec::new(arrow_schema));
641+
642+
let table = iceberg::table::Table::builder()
643+
.metadata(table_metadata.metadata)
644+
.identifier(TableIdent::from_strs(["test", "table"]).unwrap())
645+
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
646+
.metadata_location("/test/metadata.json".to_string())
647+
.build()
648+
.unwrap();
649+
650+
let result = project_with_partition(input, &table);
651+
assert!(result.is_err(), "Schema validation should fail for mismatched schemas");
652+
assert!(result
653+
.unwrap_err()
654+
.to_string()
655+
.contains("Input schema does not match Iceberg table schema"));
656+
}
657+
658+
#[test]
659+
fn test_schema_validation_with_metadata_differences() {
660+
use iceberg::TableIdent;
661+
use iceberg::io::FileIO;
662+
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
663+
use std::collections::HashMap;
664+
665+
let table_schema = Arc::new(
666+
Schema::builder()
667+
.with_fields(vec![
668+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
669+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String))
670+
.into(),
671+
])
672+
.build()
673+
.unwrap(),
674+
);
675+
676+
let partition_spec = iceberg::spec::PartitionSpec::builder(table_schema.clone())
677+
.add_partition_field("id", "id_partition", Transform::Identity)
678+
.unwrap()
679+
.build()
680+
.unwrap();
681+
682+
let sort_order = iceberg::spec::SortOrder::builder()
683+
.build(&table_schema)
684+
.unwrap();
685+
686+
let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
687+
(*table_schema).clone(),
688+
partition_spec,
689+
sort_order,
690+
"/test/table".to_string(),
691+
FormatVersion::V2,
692+
std::collections::HashMap::new(),
693+
)
694+
.unwrap();
695+
696+
let table_metadata = table_metadata_builder.build().unwrap();
697+
698+
// Create Arrow schema with metadata (should be ignored in comparison)
699+
let mut metadata = HashMap::new();
700+
metadata.insert("extra".to_string(), "metadata".to_string());
701+
702+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
703+
Field::new("id", DataType::Int32, false).with_metadata(metadata.clone()),
704+
Field::new("name", DataType::Utf8, false).with_metadata(metadata),
705+
]));
706+
707+
let input = Arc::new(EmptyExec::new(arrow_schema));
708+
709+
let table = iceberg::table::Table::builder()
710+
.metadata(table_metadata.metadata)
711+
.identifier(TableIdent::from_strs(["test", "table"]).unwrap())
712+
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
713+
.metadata_location("/test/metadata.json".to_string())
714+
.build()
715+
.unwrap();
716+
717+
let result = project_with_partition(input, &table);
718+
assert!(
719+
result.is_ok(),
720+
"Schema validation should pass even with metadata differences"
721+
);
722+
}
380723
}

0 commit comments

Comments
 (0)