diff --git a/crates/floe-core/src/io/write/delta.rs b/crates/floe-core/src/io/write/delta.rs index 33bba39..e29a0e9 100644 --- a/crates/floe-core/src/io/write/delta.rs +++ b/crates/floe-core/src/io/write/delta.rs @@ -93,7 +93,7 @@ fn write_delta_table_with_metrics( ) } config::WriteMode::MergeScd1 => { - let (version, merge, perf) = scd1::execute_merge_scd1_with_runtime( + let (version, merge, schema_evolution, perf) = scd1::execute_merge_scd1_with_runtime( &runtime, df, target, @@ -105,18 +105,7 @@ fn write_delta_table_with_metrics( ( version, Some(merge), - crate::io::format::AcceptedSchemaEvolution { - enabled: false, - mode: entity - .schema - .resolved_schema_evolution() - .mode - .as_str() - .to_string(), - applied: false, - added_columns: Vec::new(), - incompatible_changes_detected: false, - }, + schema_evolution, AcceptedWritePerfBreakdown { conversion_ms: Some(perf.conversion_ms), source_df_build_ms: Some(perf.source_df_build_ms), @@ -127,7 +116,7 @@ fn write_delta_table_with_metrics( ) } config::WriteMode::MergeScd2 => { - let (version, merge, perf) = scd2::execute_merge_scd2_with_runtime( + let (version, merge, schema_evolution, perf) = scd2::execute_merge_scd2_with_runtime( &runtime, df, target, @@ -139,18 +128,7 @@ fn write_delta_table_with_metrics( ( version, Some(merge), - crate::io::format::AcceptedSchemaEvolution { - enabled: false, - mode: entity - .schema - .resolved_schema_evolution() - .mode - .as_str() - .to_string(), - applied: false, - added_columns: Vec::new(), - incompatible_changes_detected: false, - }, + schema_evolution, AcceptedWritePerfBreakdown { conversion_ms: Some(perf.conversion_ms), source_df_build_ms: Some(perf.source_df_build_ms), diff --git a/crates/floe-core/src/io/write/strategy/merge/mod.rs b/crates/floe-core/src/io/write/strategy/merge/mod.rs index 7bd656e..4e5d53c 100644 --- a/crates/floe-core/src/io/write/strategy/merge/mod.rs +++ b/crates/floe-core/src/io/write/strategy/merge/mod.rs @@ -26,13 +26,23 @@ pub(crate) trait MergeBackend { &self, source_df: &mut DataFrame, ctx: &MergeExecutionContext<'_>, - ) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)>; + ) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, + )>; fn execute_scd2( &self, _source_df: &mut DataFrame, _ctx: &MergeExecutionContext<'_>, - ) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> { + ) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, + )> { Err(Box::new(RunError( "write_mode=merge_scd2 is not implemented for this backend".to_string(), ))) diff --git a/crates/floe-core/src/io/write/strategy/merge/scd1.rs b/crates/floe-core/src/io/write/strategy/merge/scd1.rs index a9888b4..cec1e00 100644 --- a/crates/floe-core/src/io/write/strategy/merge/scd1.rs +++ b/crates/floe-core/src/io/write/strategy/merge/scd1.rs @@ -1,11 +1,10 @@ -use deltalake::table::builder::DeltaTableBuilder; use polars::prelude::DataFrame; use std::collections::HashSet; use std::time::Instant; use crate::errors::RunError; use crate::io::format::AcceptedMergeMetrics; -use crate::io::storage::{object_store, Target}; +use crate::io::storage::Target; use crate::{config, FloeResult}; use super::{shared, MergeBackend, MergeExecutionContext}; @@ -20,7 +19,12 @@ pub(crate) fn execute_merge_scd1_with_runtime( entity: &config::EntityConfig, partition_by: Option>, target_file_size_bytes: Option, -) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> { +) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, +)> { let ctx = MergeExecutionContext { runtime, target, @@ -37,26 +41,19 @@ impl MergeBackend for DeltaMergeBackend { &self, source_df: &mut DataFrame, ctx: &MergeExecutionContext<'_>, - ) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> { + ) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, + )> { let merge_start = Instant::now(); let mut perf = shared::DeltaMergePerfBreakdown::default(); let merge_key = shared::resolve_merge_key(ctx.entity)?; // Merge-key uniqueness is enforced upstream by check::UniqueTracker for merge_scd1, // so the writer can stay focused on sink-level merge execution. - let store = object_store::delta_store_config(ctx.target, ctx.resolver, ctx.entity)?; - let table_url = store.table_url; - let storage_options = store.storage_options; - let builder = DeltaTableBuilder::from_url(table_url.clone()) - .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))? - .with_storage_options(storage_options.clone()); - let loaded_table = ctx - .runtime - .block_on(async move { builder.load().await }) - .map(Some) - .or_else(|err| match err { - deltalake::DeltaTableError::NotATable(_) => Ok(None), - other => Err(Box::new(RunError(format!("delta load failed: {other}")))), - })?; + let loaded_table = + shared::load_delta_table(ctx.runtime, ctx.target, ctx.resolver, ctx.entity)?; if loaded_table.is_none() { let outcome = shared::write_standard_delta_version_with_perf( ctx.runtime, @@ -82,20 +79,31 @@ impl MergeBackend for DeltaMergeBackend { target_rows_after: source_df.height() as u64, merge_elapsed_ms: merge_start.elapsed().as_millis() as u64, }, + outcome.schema_evolution, perf, )); } let table = loaded_table.expect("checked is_some"); + let conversion_start = Instant::now(); + let source_batch = shared::source_record_batch(source_df, ctx.entity)?; + perf.conversion_ms = conversion_start.elapsed().as_millis() as u64; + let schema_evolution = shared::plan_merge_delta_schema_evolution( + ctx.runtime, + &source_batch, + ctx.target, + ctx.resolver, + ctx.entity, + config::WriteMode::MergeScd1, + &[], + )?; let target_schema_columns = shared::delta_schema_columns(&table)?; shared::validate_merge_schema_compatibility( &target_schema_columns, source_df, &ctx.entity.name, + schema_evolution.merge_schema, )?; - let conversion_start = Instant::now(); - let source_batch = shared::source_record_batch(source_df, ctx.entity)?; - perf.conversion_ms = conversion_start.elapsed().as_millis() as u64; let source_df_build_start = Instant::now(); let source = shared::source_as_datafusion_df_from_batch(source_batch, &ctx.entity.name)?; perf.source_df_build_ms = source_df_build_start.elapsed().as_millis() as u64; @@ -119,7 +127,8 @@ impl MergeBackend for DeltaMergeBackend { let mut merge = table .merge(source, predicate) .with_source_alias("source") - .with_target_alias("target"); + .with_target_alias("target") + .with_merge_schema(schema_evolution.merge_schema); if !update_columns.is_empty() { let update_cols = update_columns.clone(); merge = merge.when_matched_update(|update| { @@ -155,6 +164,11 @@ impl MergeBackend for DeltaMergeBackend { &merge_metrics, merge_start.elapsed().as_millis() as u64, ); - Ok((version, accepted_merge_metrics, perf)) + Ok(( + version, + accepted_merge_metrics, + schema_evolution.summary, + perf, + )) } } diff --git a/crates/floe-core/src/io/write/strategy/merge/scd2.rs b/crates/floe-core/src/io/write/strategy/merge/scd2.rs index 95314db..79873e0 100644 --- a/crates/floe-core/src/io/write/strategy/merge/scd2.rs +++ b/crates/floe-core/src/io/write/strategy/merge/scd2.rs @@ -1,5 +1,4 @@ use deltalake::protocol::SaveMode; -use deltalake::table::builder::DeltaTableBuilder; use polars::prelude::{DataFrame, DataType, NamedFrom, Series, TimeUnit}; use std::collections::HashSet; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -7,7 +6,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::checks::normalize; use crate::errors::RunError; use crate::io::format::AcceptedMergeMetrics; -use crate::io::storage::{object_store, Target}; +use crate::io::storage::Target; use crate::{config, FloeResult}; use super::{shared, MergeBackend, MergeExecutionContext}; @@ -22,7 +21,12 @@ pub(crate) fn execute_merge_scd2_with_runtime( entity: &config::EntityConfig, partition_by: Option>, target_file_size_bytes: Option, -) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> { +) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, +)> { let ctx = MergeExecutionContext { runtime, target, @@ -39,7 +43,12 @@ impl MergeBackend for DeltaMergeBackend { &self, _source_df: &mut DataFrame, _ctx: &MergeExecutionContext<'_>, - ) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> { + ) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, + )> { Err(Box::new(RunError( "write_mode=merge_scd1 is not implemented for scd2 backend".to_string(), ))) @@ -49,7 +58,12 @@ impl MergeBackend for DeltaMergeBackend { &self, source_df: &mut DataFrame, ctx: &MergeExecutionContext<'_>, - ) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> { + ) -> FloeResult<( + i64, + AcceptedMergeMetrics, + crate::io::format::AcceptedSchemaEvolution, + shared::DeltaMergePerfBreakdown, + )> { let merge_start = Instant::now(); let mut perf = shared::DeltaMergePerfBreakdown::default(); let merge_key = shared::resolve_merge_key(ctx.entity)?; @@ -70,20 +84,8 @@ impl MergeBackend for DeltaMergeBackend { let system_columns = shared::resolve_scd2_system_columns(ctx.entity); let merge_key_predicate = shared::merge_predicate_sql(&merge_key); - let store = object_store::delta_store_config(ctx.target, ctx.resolver, ctx.entity)?; - let table_url = store.table_url; - let storage_options = store.storage_options; - let builder = DeltaTableBuilder::from_url(table_url.clone()) - .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))? - .with_storage_options(storage_options.clone()); - let loaded_table = ctx - .runtime - .block_on(async move { builder.load().await }) - .map(Some) - .or_else(|err| match err { - deltalake::DeltaTableError::NotATable(_) => Ok(None), - other => Err(Box::new(RunError(format!("delta load failed: {other}")))), - })?; + let loaded_table = + shared::load_delta_table(ctx.runtime, ctx.target, ctx.resolver, ctx.entity)?; if loaded_table.is_none() { let mut bootstrap_df = source_df.clone(); @@ -122,11 +124,28 @@ impl MergeBackend for DeltaMergeBackend { target_rows_after: source_df.height() as u64, merge_elapsed_ms: merge_start.elapsed().as_millis() as u64, }, + shared::default_schema_evolution_summary(ctx.entity, config::WriteMode::MergeScd2), perf, )); } let table = loaded_table.expect("checked is_some"); + let conversion_start = Instant::now(); + let source_batch = shared::source_record_batch(source_df, ctx.entity)?; + perf.conversion_ms = conversion_start.elapsed().as_millis() as u64; + let schema_evolution = shared::plan_merge_delta_schema_evolution( + ctx.runtime, + &source_batch, + ctx.target, + ctx.resolver, + ctx.entity, + config::WriteMode::MergeScd2, + &[ + system_columns.is_current.as_str(), + system_columns.valid_from.as_str(), + system_columns.valid_to.as_str(), + ], + )?; let target_schema_columns = shared::delta_schema_columns(&table)?; shared::validate_scd2_schema_compatibility( &target_schema_columns, @@ -137,18 +156,35 @@ impl MergeBackend for DeltaMergeBackend { system_columns.valid_to.as_str(), ], &ctx.entity.name, + schema_evolution.merge_schema, )?; - - let conversion_start = Instant::now(); - let source_batch = shared::source_record_batch(source_df, ctx.entity)?; - perf.conversion_ms = conversion_start.elapsed().as_millis() as u64; + let added_compare_columns = compare_columns + .iter() + .filter(|column| schema_evolution.summary.added_columns.contains(column)) + .cloned() + .collect::>(); + let existing_compare_columns = compare_columns + .iter() + .filter(|column| !schema_evolution.summary.added_columns.contains(column)) + .cloned() + .collect::>(); + let mut source_with_system_columns = source_df.clone(); + append_scd2_system_columns(&mut source_with_system_columns, &system_columns)?; + let source_with_system_columns_schema = + build_scd2_bootstrap_schema_columns(ctx.entity, &system_columns)?; let source_df_build_start = Instant::now(); let source_for_close = shared::source_as_datafusion_df_from_batch(source_batch.clone(), &ctx.entity.name)?; - let source_for_insert = - shared::source_as_datafusion_df_from_batch(source_batch, &ctx.entity.name)?; + let source_for_insert = shared::source_as_datafusion_df_from_batch( + crate::io::write::delta::record_batch::dataframe_to_record_batch_with_schema( + &source_with_system_columns, + &source_with_system_columns_schema, + )?, + &ctx.entity.name, + )?; perf.source_df_build_ms = source_df_build_start.elapsed().as_millis() as u64; - let update_predicate = scd2_changed_predicate(&compare_columns); + let update_predicate = + scd2_changed_predicate(&existing_compare_columns, &added_compare_columns); let merge_key_predicate_for_close = merge_key_predicate.clone(); let merge_exec_start = Instant::now(); let is_current_column = system_columns.is_current.clone(); @@ -199,7 +235,8 @@ impl MergeBackend for DeltaMergeBackend { let mut merge = table_after_close .merge(source_for_insert, active_match_predicate) .with_source_alias("source") - .with_target_alias("target"); + .with_target_alias("target") + .with_merge_schema(schema_evolution.merge_schema); merge = merge.when_not_matched_insert(|insert| { let insert = source_columns.iter().fold(insert, |builder, column| { builder.set( @@ -252,6 +289,7 @@ impl MergeBackend for DeltaMergeBackend { target_rows_after, merge_elapsed_ms: merge_start.elapsed().as_millis() as u64, }, + schema_evolution.summary, perf, )) } @@ -357,11 +395,14 @@ fn now_timestamp_micros() -> i64 { .saturating_add(i64::from(duration.subsec_micros())) } -fn scd2_changed_predicate(compare_columns: &[String]) -> String { - if compare_columns.is_empty() { +fn scd2_changed_predicate( + existing_compare_columns: &[String], + added_compare_columns: &[String], +) -> String { + if existing_compare_columns.is_empty() && added_compare_columns.is_empty() { return "false".to_string(); } - compare_columns + existing_compare_columns .iter() .map(|column| { let target_col = shared::qualified_column("target", column); @@ -370,6 +411,10 @@ fn scd2_changed_predicate(compare_columns: &[String]) -> String { "(({target_col} <> {source_col}) OR ({target_col} IS NULL AND {source_col} IS NOT NULL) OR ({target_col} IS NOT NULL AND {source_col} IS NULL))" ) }) + .chain(added_compare_columns.iter().map(|column| { + let source_col = shared::qualified_column("source", column); + format!("{source_col} IS NOT NULL") + })) .collect::>() .join(" OR ") } diff --git a/crates/floe-core/src/io/write/strategy/merge/shared.rs b/crates/floe-core/src/io/write/strategy/merge/shared.rs index bb5fd93..14d964d 100644 --- a/crates/floe-core/src/io/write/strategy/merge/shared.rs +++ b/crates/floe-core/src/io/write/strategy/merge/shared.rs @@ -28,6 +28,13 @@ pub(crate) struct DeltaVersionWriteOutcome { pub(crate) schema_evolution: crate::io::format::AcceptedSchemaEvolution, } +#[derive(Clone)] +pub(crate) struct PlannedDeltaSchemaEvolution { + pub(crate) summary: crate::io::format::AcceptedSchemaEvolution, + pub(crate) write_schema_mode: Option, + pub(crate) merge_schema: bool, +} + #[derive(Debug, Default, Clone, Copy)] pub(crate) struct DeltaMergePerfBreakdown { pub(crate) conversion_ms: u64, @@ -43,6 +50,28 @@ pub(crate) struct Scd2SystemColumns { pub(crate) valid_to: String, } +pub(crate) fn default_schema_evolution_summary( + entity: &config::EntityConfig, + mode: config::WriteMode, +) -> crate::io::format::AcceptedSchemaEvolution { + let schema_evolution = entity.schema.resolved_schema_evolution(); + crate::io::format::AcceptedSchemaEvolution { + enabled: entity.sink.accepted.format == "delta" + && schema_evolution.mode == config::SchemaEvolutionMode::AddColumns + && matches!( + mode, + config::WriteMode::Append + | config::WriteMode::Overwrite + | config::WriteMode::MergeScd1 + | config::WriteMode::MergeScd2 + ), + mode: schema_evolution.mode.as_str().to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + } +} + pub(crate) fn write_standard_delta_version_with_perf( runtime: &tokio::runtime::Runtime, df: &mut DataFrame, @@ -81,60 +110,61 @@ pub(crate) fn write_standard_delta_version_with_perf( }) } -struct PlannedSchemaEvolution { - summary: crate::io::format::AcceptedSchemaEvolution, - write_schema_mode: Option, +fn plan_standard_delta_schema_evolution( + runtime: &tokio::runtime::Runtime, + batch: &RecordBatch, + target: &Target, + resolver: &config::StorageResolver, + entity: &config::EntityConfig, + mode: config::WriteMode, +) -> FloeResult { + plan_delta_schema_evolution(runtime, batch, target, resolver, entity, mode, &[]) } -fn plan_standard_delta_schema_evolution( +pub(crate) fn plan_merge_delta_schema_evolution( runtime: &tokio::runtime::Runtime, batch: &RecordBatch, target: &Target, resolver: &config::StorageResolver, entity: &config::EntityConfig, mode: config::WriteMode, -) -> FloeResult { - let schema_evolution = entity.schema.resolved_schema_evolution(); - let enabled = schema_evolution.mode == config::SchemaEvolutionMode::AddColumns - && matches!( - mode, - config::WriteMode::Append | config::WriteMode::Overwrite - ); - let mut summary = crate::io::format::AcceptedSchemaEvolution { - enabled, - mode: schema_evolution.mode.as_str().to_string(), - applied: false, - added_columns: Vec::new(), - incompatible_changes_detected: false, - }; - if !enabled { - return Ok(PlannedSchemaEvolution { + ignored_target_columns: &[&str], +) -> FloeResult { + plan_delta_schema_evolution( + runtime, + batch, + target, + resolver, + entity, + mode, + ignored_target_columns, + ) +} + +fn plan_delta_schema_evolution( + runtime: &tokio::runtime::Runtime, + batch: &RecordBatch, + target: &Target, + resolver: &config::StorageResolver, + entity: &config::EntityConfig, + mode: config::WriteMode, + ignored_target_columns: &[&str], +) -> FloeResult { + let mut summary = default_schema_evolution_summary(entity, mode); + if !summary.enabled { + return Ok(PlannedDeltaSchemaEvolution { summary, write_schema_mode: None, + merge_schema: false, }); } - let store = object_store::delta_store_config(target, resolver, entity)?; - let table_url = store.table_url; - let storage_options = store.storage_options; - let builder = DeltaTableBuilder::from_url(table_url.clone()) - .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))? - .with_storage_options(storage_options.clone()); - - let maybe_table = runtime - .block_on(async move { - match builder.load().await { - Ok(table) => Ok(Some(table)), - Err(deltalake::DeltaTableError::NotATable(_)) => Ok(None), - Err(err) => Err(err), - } - }) - .map_err(|err| Box::new(RunError(format!("delta schema load failed: {err}"))))?; - + let maybe_table = load_delta_table(runtime, target, resolver, entity)?; let Some(table) = maybe_table else { - return Ok(PlannedSchemaEvolution { + return Ok(PlannedDeltaSchemaEvolution { summary, write_schema_mode: None, + merge_schema: false, }); }; @@ -145,8 +175,13 @@ fn plan_standard_delta_schema_evolution( let target_fields = target_schema.fields(); let source_schema = batch.schema(); let source_fields = source_schema.fields(); + let ignored_target_columns = ignored_target_columns + .iter() + .copied() + .collect::>(); let added_columns = additive_columns(target_fields, source_fields); - let incompatible_changes = incompatible_schema_changes(target_fields, source_fields); + let incompatible_changes = + incompatible_schema_changes(target_fields, source_fields, &ignored_target_columns); summary.added_columns = added_columns; summary.incompatible_changes_detected = !incompatible_changes.is_empty(); @@ -159,10 +194,31 @@ fn plan_standard_delta_schema_evolution( )))); } + if matches!( + mode, + config::WriteMode::MergeScd1 | config::WriteMode::MergeScd2 + ) { + let merge_key = resolve_merge_key(entity)?; + let added_merge_key_columns = merge_key + .iter() + .filter(|column| summary.added_columns.iter().any(|added| added == *column)) + .cloned() + .collect::>(); + if !added_merge_key_columns.is_empty() { + summary.incompatible_changes_detected = true; + return Err(Box::new(RunError(format!( + "entity.name={} delta schema evolution failed: merge key columns cannot be added: {}", + entity.name, + added_merge_key_columns.join(", ") + )))); + } + } + if summary.added_columns.is_empty() { - return Ok(PlannedSchemaEvolution { + return Ok(PlannedDeltaSchemaEvolution { summary, write_schema_mode: None, + merge_schema: false, }); } @@ -175,9 +231,17 @@ fn plan_standard_delta_schema_evolution( } summary.applied = true; - Ok(PlannedSchemaEvolution { + Ok(PlannedDeltaSchemaEvolution { + write_schema_mode: matches!( + mode, + config::WriteMode::Append | config::WriteMode::Overwrite + ) + .then_some(SchemaMode::Merge), + merge_schema: matches!( + mode, + config::WriteMode::MergeScd1 | config::WriteMode::MergeScd2 + ), summary, - write_schema_mode: Some(SchemaMode::Merge), }) } @@ -196,6 +260,7 @@ fn additive_columns(target_fields: &[FieldRef], source_fields: &[FieldRef]) -> V fn incompatible_schema_changes( target_fields: &[FieldRef], source_fields: &[FieldRef], + ignored_target_columns: &HashSet<&str>, ) -> Vec { let source_by_name = source_fields .iter() @@ -203,6 +268,9 @@ fn incompatible_schema_changes( .collect::>(); let mut incompatible = Vec::new(); for target_field in target_fields { + if ignored_target_columns.contains(target_field.name().as_str()) { + continue; + } let Some(source_field) = source_by_name.get(target_field.name()) else { incompatible.push(format!("missing existing column {}", target_field.name())); continue; @@ -225,6 +293,30 @@ fn incompatible_schema_changes( incompatible } +pub(crate) fn load_delta_table( + runtime: &tokio::runtime::Runtime, + target: &Target, + resolver: &config::StorageResolver, + entity: &config::EntityConfig, +) -> FloeResult> { + let store = object_store::delta_store_config(target, resolver, entity)?; + let table_url = store.table_url; + let storage_options = store.storage_options; + let builder = DeltaTableBuilder::from_url(table_url.clone()) + .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))? + .with_storage_options(storage_options.clone()); + + runtime + .block_on(async move { + match builder.load().await { + Ok(table) => Ok(Some(table)), + Err(deltalake::DeltaTableError::NotATable(_)) => Ok(None), + Err(err) => Err(err), + } + }) + .map_err(|err| Box::new(RunError(format!("delta schema load failed: {err}"))).into()) +} + #[allow(clippy::too_many_arguments)] pub(crate) fn write_delta_batch_version( runtime: &tokio::runtime::Runtime, @@ -417,6 +509,7 @@ pub(crate) fn validate_merge_schema_compatibility( target_schema_columns: &[String], source_df: &DataFrame, entity_name: &str, + allow_target_additive_evolution: bool, ) -> FloeResult<()> { let source_columns = source_df .get_column_names() @@ -439,6 +532,9 @@ pub(crate) fn validate_merge_schema_compatibility( .collect::>(); for source_column in source_columns { if !target_columns.contains(source_column) { + if allow_target_additive_evolution { + continue; + } return Err(Box::new(RunError(format!( "entity.name={} delta merge failed: target schema missing source column {}", entity_name, source_column @@ -453,6 +549,7 @@ pub(crate) fn validate_scd2_schema_compatibility( source_df: &DataFrame, system_columns: &[&str], entity_name: &str, + allow_target_additive_evolution: bool, ) -> FloeResult<()> { let source_columns = source_df .get_column_names() @@ -466,6 +563,9 @@ pub(crate) fn validate_scd2_schema_compatibility( .collect::>(); for source_column in &source_columns { if !target_columns.contains(source_column) { + if allow_target_additive_evolution { + continue; + } return Err(Box::new(RunError(format!( "entity.name={} delta merge_scd2 failed: target schema missing source column {}", entity_name, source_column diff --git a/crates/floe-core/src/run/entity/accepted_write.rs b/crates/floe-core/src/run/entity/accepted_write.rs index 900672d..c6bb8ff 100644 --- a/crates/floe-core/src/run/entity/accepted_write.rs +++ b/crates/floe-core/src/run/entity/accepted_write.rs @@ -1,7 +1,7 @@ use std::path::Path; use std::time::Instant; -use polars::prelude::DataFrame; +use polars::prelude::{DataFrame, Series}; use crate::errors::RunError; use crate::{config, io, report, FloeResult}; @@ -41,20 +41,11 @@ pub(super) struct AcceptedWriteReportState { impl AcceptedWriteReportState { pub(super) fn for_entity(entity: &config::EntityConfig, write_mode: config::WriteMode) -> Self { - let schema_evolution = entity.schema.resolved_schema_evolution(); Self { - schema_evolution: io::format::AcceptedSchemaEvolution { - enabled: entity.sink.accepted.format == "delta" - && schema_evolution.mode == config::SchemaEvolutionMode::AddColumns - && matches!( - write_mode, - config::WriteMode::Append | config::WriteMode::Overwrite - ), - mode: schema_evolution.mode.as_str().to_string(), - applied: false, - added_columns: Vec::new(), - incompatible_changes_detected: false, - }, + schema_evolution: + crate::io::write::strategy::merge::shared::default_schema_evolution_summary( + entity, write_mode, + ), ..Self::default() } } @@ -139,15 +130,20 @@ pub(super) fn run_accepted_write_phase( } = context; let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity, write_mode); - if accepted_accum.is_empty() { + if accepted_accum.is_empty() && write_mode != config::WriteMode::Overwrite { return Ok(accepted_write_report); } - let concat_start = perf_enabled.then(Instant::now); - let mut accepted_df = concat_accepted_frames(accepted_accum)?; - if let Some(start) = concat_start { - phase_timings.concat_accepted_ms += start.elapsed().as_millis() as u64; - } + let mut accepted_df = if accepted_accum.is_empty() { + empty_accepted_frame(entity)? + } else { + let concat_start = perf_enabled.then(Instant::now); + let accepted_df = concat_accepted_frames(accepted_accum)?; + if let Some(start) = concat_start { + phase_timings.concat_accepted_ms += start.elapsed().as_millis() as u64; + } + accepted_df + }; let output_stem = io::storage::paths::build_part_stem(0); let accepted_adapter = runtime.accepted_sink_adapter(entity.sink.accepted.format.as_str())?; @@ -210,3 +206,24 @@ fn concat_accepted_frames(mut frames: Vec) -> FloeResult { .pop() .ok_or_else(|| Box::new(RunError("missing accepted dataframe".to_string())).into()) } + +fn empty_accepted_frame(entity: &config::EntityConfig) -> FloeResult { + let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?; + let columns = crate::checks::normalize::resolve_output_columns( + &entity.schema.columns, + normalize_strategy.as_deref(), + ); + let series = columns + .into_iter() + .map(|column| { + let dtype = config::parse_data_type(&column.column_type)?; + Ok(Series::full_null(column.name.into(), 0, &dtype).into()) + }) + .collect::>>()?; + DataFrame::new(series).map_err(|err| { + Box::new(RunError(format!( + "failed to build empty accepted dataframe: {err}" + ))) + .into() + }) +} diff --git a/crates/floe-core/tests/integration/delta_run.rs b/crates/floe-core/tests/integration/delta_run.rs index eb24438..ff21cac 100644 --- a/crates/floe-core/tests/integration/delta_run.rs +++ b/crates/floe-core/tests/integration/delta_run.rs @@ -2,9 +2,10 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, OnceLock}; +use deltalake::datafusion::datasource::TableProvider; use deltalake::table::builder::DeltaTableBuilder; use floe_core::{run, set_observer, validate, RunEvent, RunObserver, RunOptions, ValidateOptions}; -use polars::prelude::DataFrame; +use polars::prelude::{DataFrame, DataType, Series}; use url::Url; #[derive(Default)] @@ -101,23 +102,55 @@ fn read_local_delta_table(path: &Path) -> DataFrame { let table = runtime .block_on(async move { builder.load().await }) .expect("load delta table"); - let mut merged = DataFrame::default(); - for file_uri in table + let ordered_columns = table + .schema() + .fields() + .into_iter() + .map(|field| field.name().clone()) + .collect::>(); + let mut frames = table .get_file_uris() .expect("list table uris") - .collect::>() - { - let path = Url::parse(&file_uri) - .ok() - .and_then(|url| url.to_file_path().ok()) - .unwrap_or_else(|| PathBuf::from(&file_uri)); - let frame = floe_core::io::read::parquet::read_parquet_lazy(&path, None) - .expect("read delta parquet file"); + .map(|file_uri| { + let path = Url::parse(&file_uri) + .ok() + .and_then(|url| url.to_file_path().ok()) + .unwrap_or_else(|| PathBuf::from(&file_uri)); + floe_core::io::read::parquet::read_parquet_lazy(&path, None) + .expect("read delta parquet file") + }) + .collect::>(); + let mut column_dtypes = std::collections::HashMap::::new(); + for frame in &frames { + for series in frame.materialized_column_iter() { + column_dtypes + .entry(series.name().as_str().to_string()) + .or_insert_with(|| series.dtype().clone()); + } + } + let mut merged = DataFrame::default(); + for frame in &mut frames { + for column in &ordered_columns { + if frame.column(column).is_ok() { + continue; + } + let dtype = column_dtypes.get(column).cloned().unwrap_or(DataType::Null); + frame + .with_column(Series::full_null( + column.clone().into(), + frame.height(), + &dtype, + )) + .expect("append missing delta column"); + } + let aligned = frame + .select(ordered_columns.iter().map(String::as_str)) + .expect("align delta columns"); if merged.height() == 0 && merged.width() == 0 { - merged = frame; + merged = aligned; } else { merged - .vstack_mut(&frame) + .vstack_mut(&aligned) .expect("append delta parquet rows"); } } @@ -473,6 +506,164 @@ entities: assert!(!report.schema_evolution.incompatible_changes_detected); } +#[test] +fn local_delta_overwrite_clears_existing_rows_when_all_rows_are_rejected() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let rejected_dir = root.join("out/rejected/customer_csv"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n2;bob\n"); + + let yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "overwrite" + accepted: + format: "delta" + path: "{accepted_dir}" + rejected: + format: "csv" + path: "{rejected_dir}" + policy: + severity: "reject" + schema: + columns: + - name: "id" + type: "int64" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + rejected_dir = rejected_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-overwrite-empty-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial overwrite run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;name\nx;carol\ny;dave\n"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-overwrite-empty-second".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("overwrite run with zero accepted rows"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.accepted_total, 0); + assert_eq!(report.results.rejected_total, 2); + assert_eq!( + report.accepted_output.write_mode.as_deref(), + Some("overwrite") + ); + assert!(report.accepted_output.table_version.is_some()); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 0); +} + +#[test] +fn local_delta_append_does_not_clear_existing_rows_when_all_rows_are_rejected() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let rejected_dir = root.join("out/rejected/customer_csv"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n2;bob\n"); + + let yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + rejected: + format: "csv" + path: "{rejected_dir}" + policy: + severity: "reject" + schema: + columns: + - name: "id" + type: "int64" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + rejected_dir = rejected_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-append-empty-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial append run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;name\nx;carol\ny;dave\n"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-append-empty-second".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("append run with zero accepted rows"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.accepted_total, 0); + assert_eq!(report.results.rejected_total, 2); + assert_eq!(report.accepted_output.write_mode.as_deref(), Some("append")); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 2); +} + #[test] fn local_delta_strict_mode_rejects_added_columns() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); @@ -696,7 +887,9 @@ entities: } #[test] -fn local_delta_merge_scd1_warn_drops_duplicate_keys_before_merge() { +fn local_delta_merge_scd1_add_columns_evolves_target_and_reports_event() { + let observer = test_observer(); + observer.reset(); let temp_dir = tempfile::TempDir::new().expect("temp dir"); let root = temp_dir.path(); let input_dir = root.join("in"); @@ -706,12 +899,12 @@ fn local_delta_merge_scd1_warn_drops_duplicate_keys_before_merge() { fs::create_dir_all(&input_dir).expect("create input dir"); write_csv( &input_dir, - "batch.csv", - "id;country;name\n1;fr;alice\n1;fr;alice-dup\n", + "batch1.csv", + "id;country;name\n1;fr;alice\n2;ca;bob\n", ); - let yaml = format!( - r#"version: "0.1" + let initial_yaml = format!( + r#"version: "0.2" report: path: "{report_dir}" entities: @@ -728,6 +921,8 @@ entities: severity: "warn" schema: primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" columns: - name: "id" type: "string" @@ -740,51 +935,27 @@ entities: input_dir = input_dir.display(), accepted_dir = accepted_dir.display(), ); - let config_path = write_config(root, &yaml); + let config_path = write_config(root, &initial_yaml); - let outcome = run( + run( &config_path, RunOptions { - run_id: Some("it-delta-merge-dup-source".to_string()), + run_id: Some("it-delta-merge-scd1-evolution-init".to_string()), entities: Vec::new(), dry_run: false, }, ) - .expect("merge_scd1 should reject duplicate merge-key rows before merge in warn mode"); - - let report = &outcome.entity_outcomes[0].report; - assert_eq!(report.results.rows_total, 2); - assert_eq!(report.results.accepted_total, 1); - assert_eq!(report.results.rejected_total, 1); - - let df = read_local_delta_table(&accepted_dir); - assert_eq!(df.height(), 1); - let name = df - .column("name") - .expect("name") - .as_materialized_series() - .str() - .expect("name string"); - assert_eq!(name.get(0), Some("alice")); -} - -#[test] -fn local_delta_merge_scd2_closes_changed_rows_and_inserts_new_versions() { - let temp_dir = tempfile::TempDir::new().expect("temp dir"); - let root = temp_dir.path(); - let input_dir = root.join("in"); - let accepted_dir = root.join("out/accepted/customer_delta"); - let report_dir = root.join("report"); + .expect("initial merge_scd1 run"); - fs::create_dir_all(&input_dir).expect("create input dir"); + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); write_csv( &input_dir, - "batch1.csv", - "id;country;name\n1;fr;alice\n2;ca;bob\n", + "batch2.csv", + "id;country;name;city\n1;fr;alice-updated;paris\n3;us;carol;new-york\n", ); - let yaml = format!( - r#"version: "0.1" + let evolved_yaml = format!( + r#"version: "0.2" report: path: "{report_dir}" entities: @@ -793,7 +964,7 @@ entities: format: "csv" path: "{input_dir}" sink: - write_mode: "merge_scd2" + write_mode: "merge_scd1" accepted: format: "delta" path: "{accepted_dir}" @@ -801,6 +972,8 @@ entities: severity: "warn" schema: primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" columns: - name: "id" type: "string" @@ -808,61 +981,438 @@ entities: type: "string" - name: "name" type: "string" + - name: "city" + type: "string" "#, report_dir = report_dir.display(), input_dir = input_dir.display(), accepted_dir = accepted_dir.display(), ); - let config_path = write_config(root, &yaml); - - run( - &config_path, - RunOptions { - run_id: Some("it-delta-merge-scd2-init".to_string()), - entities: Vec::new(), - dry_run: false, - }, - ) - .expect("initial merge_scd2 run"); - - fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch"); - write_csv( - &input_dir, - "batch2.csv", - "id;country;name\n1;fr;alice-v2\n2;ca;bob\n3;us;carol\n", - ); + write_config(root, &evolved_yaml); let outcome = run( &config_path, RunOptions { - run_id: Some("it-delta-merge-scd2-upsert".to_string()), + run_id: Some("it-delta-merge-scd1-evolution-upsert".to_string()), entities: Vec::new(), dry_run: false, }, ) - .expect("merge_scd2 upsert run"); + .expect("merge_scd1 run with additive column"); let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(report.schema_evolution.applied); assert_eq!( - report.accepted_output.write_mode.as_deref(), - Some("merge_scd2") + report.schema_evolution.added_columns, + vec!["city".to_string()] ); - assert_eq!(report.accepted_output.inserted_count, Some(2)); - assert_eq!(report.accepted_output.updated_count, Some(1)); - assert_eq!(report.accepted_output.closed_count, Some(1)); - assert_eq!(report.accepted_output.unchanged_count, Some(1)); - assert_eq!(report.accepted_output.target_rows_before, Some(2)); - assert_eq!(report.accepted_output.target_rows_after, Some(4)); - - let df = read_local_delta_table(&accepted_dir); - assert_eq!(df.height(), 4); - assert!(df.column("__floe_is_current").is_ok()); - assert!(df.column("__floe_valid_from").is_ok()); - assert!(df.column("__floe_valid_to").is_ok()); + assert!(!report.schema_evolution.incompatible_changes_detected); - let id = df - .column("id") - .expect("id") + let events = observer.events_for_run("it-delta-merge-scd1-evolution-upsert"); + let event = events + .iter() + .find_map(|event| match event { + RunEvent::SchemaEvolutionApplied { + entity, + mode, + added_columns, + .. + } => Some((entity, mode, added_columns)), + _ => None, + }) + .expect("schema evolution event"); + assert_eq!(event.0, "customer"); + assert_eq!(event.1, "add_columns"); + assert_eq!(event.2, &vec!["city".to_string()]); + + let df = read_local_delta_table(&accepted_dir); + assert!(df.column("city").is_ok()); + let id = df + .column("id") + .expect("id") + .as_materialized_series() + .str() + .expect("id string"); + let country = df + .column("country") + .expect("country") + .as_materialized_series() + .str() + .expect("country string"); + let city = df + .column("city") + .expect("city") + .as_materialized_series() + .str() + .expect("city string"); + let rows = (0..df.height()) + .map(|idx| { + ( + id.get(idx).unwrap_or_default().to_string(), + country.get(idx).unwrap_or_default().to_string(), + city.get(idx).map(str::to_string), + ) + }) + .collect::>(); + assert!(rows.contains(&("1".to_string(), "fr".to_string(), Some("paris".to_string())))); + assert!(rows.contains(&("2".to_string(), "ca".to_string(), None))); + assert!(rows.contains(&( + "3".to_string(), + "us".to_string(), + Some("new-york".to_string()) + ))); +} + +#[test] +fn local_delta_merge_scd1_warn_drops_duplicate_keys_before_merge() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "batch.csv", + "id;country;name\n1;fr;alice\n1;fr;alice-dup\n", + ); + + let yaml = format!( + r#"version: "0.1" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &yaml); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-dup-source".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge_scd1 should reject duplicate merge-key rows before merge in warn mode"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.rows_total, 2); + assert_eq!(report.results.accepted_total, 1); + assert_eq!(report.results.rejected_total, 1); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 1); + let name = df + .column("name") + .expect("name") + .as_materialized_series() + .str() + .expect("name string"); + assert_eq!(name.get(0), Some("alice")); +} + +#[test] +fn local_delta_merge_scd2_closes_changed_rows_and_inserts_new_versions() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "batch1.csv", + "id;country;name\n1;fr;alice\n2;ca;bob\n", + ); + + let yaml = format!( + r#"version: "0.1" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd2 run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch"); + write_csv( + &input_dir, + "batch2.csv", + "id;country;name\n1;fr;alice-v2\n2;ca;bob\n3;us;carol\n", + ); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-upsert".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge_scd2 upsert run"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!( + report.accepted_output.write_mode.as_deref(), + Some("merge_scd2") + ); + assert_eq!(report.accepted_output.inserted_count, Some(2)); + assert_eq!(report.accepted_output.updated_count, Some(1)); + assert_eq!(report.accepted_output.closed_count, Some(1)); + assert_eq!(report.accepted_output.unchanged_count, Some(1)); + assert_eq!(report.accepted_output.target_rows_before, Some(2)); + assert_eq!(report.accepted_output.target_rows_after, Some(4)); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 4); + assert!(df.column("__floe_is_current").is_ok()); + assert!(df.column("__floe_valid_from").is_ok()); + assert!(df.column("__floe_valid_to").is_ok()); + + let id = df + .column("id") + .expect("id") + .as_materialized_series() + .str() + .expect("id string"); + let country = df + .column("country") + .expect("country") + .as_materialized_series() + .str() + .expect("country string"); + let name = df + .column("name") + .expect("name") + .as_materialized_series() + .str() + .expect("name string"); + let is_current = df + .column("__floe_is_current") + .expect("is current") + .as_materialized_series() + .bool() + .expect("is_current bool"); + let rows = (0..df.height()) + .map(|idx| { + ( + id.get(idx).unwrap_or_default().to_string(), + country.get(idx).unwrap_or_default().to_string(), + name.get(idx).unwrap_or_default().to_string(), + is_current.get(idx).unwrap_or(false), + ) + }) + .collect::>(); + assert!(rows.contains(&( + "1".to_string(), + "fr".to_string(), + "alice".to_string(), + false + ))); + assert!(rows.contains(&( + "1".to_string(), + "fr".to_string(), + "alice-v2".to_string(), + true + ))); + assert!(rows.contains(&("2".to_string(), "ca".to_string(), "bob".to_string(), true))); + assert!(rows.contains(&("3".to_string(), "us".to_string(), "carol".to_string(), true))); +} + +#[test] +fn local_delta_merge_scd2_add_columns_preserves_system_columns() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "batch1.csv", + "id;country;name\n1;fr;alice\n2;ca;bob\n", + ); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-evolution-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd2 run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch"); + write_csv( + &input_dir, + "batch2.csv", + "id;country;name;city\n1;fr;alice-v2;paris\n2;ca;bob;toronto\n3;us;carol;austin\n", + ); + + let evolved_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" + - name: "city" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-evolution-upsert".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge_scd2 run with additive column"); + + let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(report.schema_evolution.applied); + assert_eq!( + report.schema_evolution.added_columns, + vec!["city".to_string()] + ); + assert_eq!(report.accepted_output.closed_count, Some(2)); + assert_eq!(report.accepted_output.inserted_count, Some(3)); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 5); + assert!(df.column("city").is_ok()); + assert!(df.column("__floe_is_current").is_ok()); + assert!(df.column("__floe_valid_from").is_ok()); + assert!(df.column("__floe_valid_to").is_ok()); + + let id = df + .column("id") + .expect("id") .as_materialized_series() .str() .expect("id string"); @@ -872,42 +1422,57 @@ entities: .as_materialized_series() .str() .expect("country string"); - let name = df - .column("name") - .expect("name") + let city = df + .column("city") + .expect("city") .as_materialized_series() .str() - .expect("name string"); + .expect("city string"); let is_current = df .column("__floe_is_current") .expect("is current") .as_materialized_series() .bool() .expect("is_current bool"); - let rows = (0..df.height()) + + let current_rows = (0..df.height()) + .filter(|idx| is_current.get(*idx) == Some(true)) .map(|idx| { ( id.get(idx).unwrap_or_default().to_string(), country.get(idx).unwrap_or_default().to_string(), - name.get(idx).unwrap_or_default().to_string(), - is_current.get(idx).unwrap_or(false), + city.get(idx).map(str::to_string), ) }) .collect::>(); - assert!(rows.contains(&( - "1".to_string(), - "fr".to_string(), - "alice".to_string(), - false + assert_eq!(current_rows.len(), 3); + assert!(current_rows.contains(&("1".to_string(), "fr".to_string(), Some("paris".to_string())))); + assert!(current_rows.contains(&( + "2".to_string(), + "ca".to_string(), + Some("toronto".to_string()) ))); - assert!(rows.contains(&( - "1".to_string(), - "fr".to_string(), - "alice-v2".to_string(), - true + assert!(current_rows.contains(&( + "3".to_string(), + "us".to_string(), + Some("austin".to_string()) ))); - assert!(rows.contains(&("2".to_string(), "ca".to_string(), "bob".to_string(), true))); - assert!(rows.contains(&("3".to_string(), "us".to_string(), "carol".to_string(), true))); + + let closed_historical_rows = (0..df.height()) + .filter(|idx| { + matches!( + ( + id.get(*idx), + country.get(*idx), + is_current.get(*idx), + city.get(*idx) + ), + (Some("1"), Some("fr"), Some(false), None) + | (Some("2"), Some("ca"), Some(false), None) + ) + }) + .count(); + assert_eq!(closed_historical_rows, 2); } #[test] @@ -985,7 +1550,7 @@ entities: assert_eq!(report.results.rows_total, 2); assert_eq!(report.results.accepted_total, 0); assert_eq!(report.results.rejected_total, 2); - assert!(!report.schema_evolution.enabled); + assert!(report.schema_evolution.enabled); assert_eq!(report.schema_evolution.mode, "add_columns"); assert!(!report.schema_evolution.applied); assert!(report.schema_evolution.added_columns.is_empty()); @@ -1472,6 +2037,201 @@ entities: .contains("source schema missing target column city")); } +#[test] +fn local_delta_merge_scd1_add_columns_rejects_non_additive_schema_change() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;country;score\n1;fr;10\n"); + + let initial_yaml = format!( + r#"version: "0.2" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "score" + type: "int64" +"#, + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd1-nonadditive-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd1 run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;country;score\n1;fr;ten\n"); + + let evolved_yaml = format!( + r#"version: "0.2" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "score" + type: "string" +"#, + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let err = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd1-nonadditive-upsert".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect_err("merge_scd1 with type change should fail"); + assert!(err.to_string().contains("incompatible changes detected")); + assert!(err.to_string().contains("column score type changed")); +} + +#[test] +fn local_delta_merge_scd1_add_columns_rejects_merge_key_evolution() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n"); + + let initial_yaml = format!( + r#"version: "0.2" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "name" + type: "string" +"#, + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd1-merge-key-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd1 run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;country;name\n1;fr;alice\n"); + + let evolved_yaml = format!( + r#"version: "0.2" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" +"#, + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let err = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd1-merge-key-upsert".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect_err("merge_scd1 with additive merge key should fail"); + assert!(err + .to_string() + .contains("merge key columns cannot be added: country")); +} + #[test] fn validate_merge_scd1_requires_primary_key() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); diff --git a/crates/floe-core/tests/unit/io/write/delta_merge.rs b/crates/floe-core/tests/unit/io/write/delta_merge.rs index 09edf6f..648d596 100644 --- a/crates/floe-core/tests/unit/io/write/delta_merge.rs +++ b/crates/floe-core/tests/unit/io/write/delta_merge.rs @@ -158,3 +158,126 @@ entities: assert_eq!(accepted.target_rows_before, Some(2)); assert_eq!(accepted.target_rows_after, Some(3)); } + +#[test] +fn merge_scd1_add_columns_reports_schema_evolution() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "batch1.csv", + "id;country;name\n1;fr;alice\n2;ca;bob\n", + ); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("unit-delta-merge-schema-evolution-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove first file"); + write_csv( + &input_dir, + "batch2.csv", + "id;country;name;city\n1;fr;alice-updated;paris\n3;us;carol;new-york\n", + ); + + let evolved_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" + - name: "city" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("unit-delta-merge-schema-evolution-upsert".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge upsert with new column"); + + let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(report.schema_evolution.applied); + assert_eq!( + report.schema_evolution.added_columns, + vec!["city".to_string()] + ); + assert!(!report.schema_evolution.incompatible_changes_detected); +}