diff --git a/crates/floe-cli/src/logging.rs b/crates/floe-cli/src/logging.rs index 945c47b..6364396 100644 --- a/crates/floe-cli/src/logging.rs +++ b/crates/floe-cli/src/logging.rs @@ -129,7 +129,8 @@ fn level_for_event(event: &RunEvent) -> Level { }, RunEvent::RunStarted { .. } | RunEvent::EntityStarted { .. } - | RunEvent::FileStarted { .. } => Level::Info, + | RunEvent::FileStarted { .. } + | RunEvent::SchemaEvolutionApplied { .. } => Level::Info, RunEvent::FileFinished { status, .. } => match status.as_str() { "success" => Level::Info, "rejected" => Level::Warn, @@ -233,6 +234,17 @@ pub fn format_event_text(event: &RunEvent) -> String { "entity_finished name={} status={} files={} rows={} accepted={} rejected={} warnings={} errors={}", name, status, files, rows, accepted, rejected, warnings, errors ), + RunEvent::SchemaEvolutionApplied { + entity, + mode, + added_columns, + .. + } => format!( + "schema_evolution_applied entity={} mode={} added_columns={}", + entity, + mode, + added_columns.join(",") + ), RunEvent::RunFinished { status, exit_code, diff --git a/crates/floe-cli/tests/output.rs b/crates/floe-cli/tests/output.rs index f6ec51f..08e0861 100644 --- a/crates/floe-cli/tests/output.rs +++ b/crates/floe-cli/tests/output.rs @@ -70,6 +70,13 @@ fn sample_outcome() -> RunOutcome { target_rows_after: None, merge_elapsed_ms: None, }, + schema_evolution: report::SchemaEvolutionSummary { + enabled: false, + mode: "strict".to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, unique_constraints: Vec::new(), results: report::ResultsTotals { files_total: 1, diff --git a/crates/floe-core/src/io/format.rs b/crates/floe-core/src/io/format.rs index 74c6cf7..e6b734a 100644 --- a/crates/floe-core/src/io/format.rs +++ b/crates/floe-core/src/io/format.rs @@ -40,6 +40,15 @@ pub struct AcceptedWriteMetrics { pub small_files_count: Option, } +#[derive(Debug, Clone, Default)] +pub struct AcceptedSchemaEvolution { + pub enabled: bool, + pub mode: String, + pub applied: bool, + pub added_columns: Vec, + pub incompatible_changes_detected: bool, +} + #[derive(Debug, Clone, Default)] pub struct AcceptedWritePerfBreakdown { pub conversion_ms: Option, @@ -76,6 +85,7 @@ pub struct AcceptedWriteOutput { pub iceberg_table: Option, pub metrics: AcceptedWriteMetrics, pub merge: Option, + pub schema_evolution: AcceptedSchemaEvolution, pub perf: Option, } diff --git a/crates/floe-core/src/io/write/delta.rs b/crates/floe-core/src/io/write/delta.rs index 8514311..33bba39 100644 --- a/crates/floe-core/src/io/write/delta.rs +++ b/crates/floe-core/src/io/write/delta.rs @@ -33,6 +33,7 @@ struct DeltaWriteResult { part_files: Vec, metrics: AcceptedWriteMetrics, merge: Option, + schema_evolution: crate::io::format::AcceptedSchemaEvolution, perf: AcceptedWritePerfBreakdown, } @@ -68,7 +69,7 @@ fn write_delta_table_with_metrics( .enable_all() .build() .map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?; - let (version, merge, mut perf_breakdown) = match mode { + let (version, merge, schema_evolution, mut perf_breakdown) = match mode { config::WriteMode::Overwrite | config::WriteMode::Append => { let outcome = shared::write_standard_delta_version_with_perf( &runtime, @@ -83,6 +84,7 @@ fn write_delta_table_with_metrics( ( outcome.version, None, + outcome.schema_evolution, AcceptedWritePerfBreakdown { conversion_ms: Some(outcome.perf.conversion_ms), commit_ms: Some(outcome.perf.commit_ms), @@ -103,6 +105,18 @@ fn write_delta_table_with_metrics( ( version, Some(merge), + crate::io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, AcceptedWritePerfBreakdown { conversion_ms: Some(perf.conversion_ms), source_df_build_ms: Some(perf.source_df_build_ms), @@ -125,6 +139,18 @@ fn write_delta_table_with_metrics( ( version, Some(merge), + crate::io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, AcceptedWritePerfBreakdown { conversion_ms: Some(perf.conversion_ms), source_df_build_ms: Some(perf.source_df_build_ms), @@ -153,6 +179,7 @@ fn write_delta_table_with_metrics( part_files, metrics, merge, + schema_evolution, perf: perf_breakdown, }) } @@ -184,6 +211,7 @@ impl AcceptedSinkAdapter for DeltaAcceptedAdapter { iceberg_table: None, metrics: result.metrics, merge: result.merge, + schema_evolution: result.schema_evolution, perf: Some(result.perf), }) } diff --git a/crates/floe-core/src/io/write/iceberg.rs b/crates/floe-core/src/io/write/iceberg.rs index 38dab80..2f4a5a8 100644 --- a/crates/floe-core/src/io/write/iceberg.rs +++ b/crates/floe-core/src/io/write/iceberg.rs @@ -165,6 +165,18 @@ fn write_iceberg_table_with_remote_context( iceberg_table: result.iceberg_table, metrics: result.metrics, merge: None, + schema_evolution: crate::io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, perf: Some(result.perf), }) } diff --git a/crates/floe-core/src/io/write/parquet.rs b/crates/floe-core/src/io/write/parquet.rs index dcb9077..2434ff9 100644 --- a/crates/floe-core/src/io/write/parquet.rs +++ b/crates/floe-core/src/io/write/parquet.rs @@ -165,6 +165,18 @@ impl AcceptedSinkAdapter for ParquetAcceptedAdapter { iceberg_table: None, metrics, merge: None, + schema_evolution: io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, perf: None, }) } diff --git a/crates/floe-core/src/io/write/strategy/merge/scd2.rs b/crates/floe-core/src/io/write/strategy/merge/scd2.rs index 3687f25..95314db 100644 --- a/crates/floe-core/src/io/write/strategy/merge/scd2.rs +++ b/crates/floe-core/src/io/write/strategy/merge/scd2.rs @@ -107,6 +107,7 @@ impl MergeBackend for DeltaMergeBackend { SaveMode::Append, ctx.partition_by.clone(), ctx.target_file_size_bytes, + None, )?; perf.commit_ms = commit_start.elapsed().as_millis() as u64; return Ok(( diff --git a/crates/floe-core/src/io/write/strategy/merge/shared.rs b/crates/floe-core/src/io/write/strategy/merge/shared.rs index 76361be..bb5fd93 100644 --- a/crates/floe-core/src/io/write/strategy/merge/shared.rs +++ b/crates/floe-core/src/io/write/strategy/merge/shared.rs @@ -1,6 +1,9 @@ use std::time::Instant; +use arrow::datatypes::FieldRef; use arrow::record_batch::RecordBatch; +use deltalake::datafusion::datasource::TableProvider; +use deltalake::operations::write::SchemaMode; use deltalake::protocol::SaveMode; use deltalake::table::builder::DeltaTableBuilder; use deltalake::{datafusion::prelude::SessionContext, DeltaTable}; @@ -18,10 +21,11 @@ pub(crate) struct DeltaStandardWritePerf { pub(crate) commit_ms: u64, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) struct DeltaVersionWriteOutcome { pub(crate) version: i64, pub(crate) perf: DeltaStandardWritePerf, + pub(crate) schema_evolution: crate::io::format::AcceptedSchemaEvolution, } #[derive(Debug, Default, Clone, Copy)] @@ -51,6 +55,8 @@ pub(crate) fn write_standard_delta_version_with_perf( ) -> FloeResult { let conversion_start = Instant::now(); let batch = crate::io::write::delta::record_batch::dataframe_to_record_batch(df, entity)?; + let schema_evolution = + plan_standard_delta_schema_evolution(runtime, &batch, target, resolver, entity, mode)?; let conversion_ms = conversion_start.elapsed().as_millis() as u64; let commit_start = Instant::now(); let version = write_delta_batch_version( @@ -62,10 +68,12 @@ pub(crate) fn write_standard_delta_version_with_perf( save_mode_for_write_mode(mode), partition_by, target_file_size_bytes, + schema_evolution.write_schema_mode, )?; let commit_ms = commit_start.elapsed().as_millis() as u64; Ok(DeltaVersionWriteOutcome { version, + schema_evolution: schema_evolution.summary, perf: DeltaStandardWritePerf { conversion_ms, commit_ms, @@ -73,6 +81,151 @@ pub(crate) fn write_standard_delta_version_with_perf( }) } +struct PlannedSchemaEvolution { + summary: crate::io::format::AcceptedSchemaEvolution, + write_schema_mode: Option, +} + +fn plan_standard_delta_schema_evolution( + runtime: &tokio::runtime::Runtime, + batch: &RecordBatch, + target: &Target, + resolver: &config::StorageResolver, + entity: &config::EntityConfig, + mode: config::WriteMode, +) -> FloeResult { + let schema_evolution = entity.schema.resolved_schema_evolution(); + let enabled = schema_evolution.mode == config::SchemaEvolutionMode::AddColumns + && matches!( + mode, + config::WriteMode::Append | config::WriteMode::Overwrite + ); + let mut summary = crate::io::format::AcceptedSchemaEvolution { + enabled, + mode: schema_evolution.mode.as_str().to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }; + if !enabled { + return Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: None, + }); + } + + let store = object_store::delta_store_config(target, resolver, entity)?; + let table_url = store.table_url; + let storage_options = store.storage_options; + let builder = DeltaTableBuilder::from_url(table_url.clone()) + .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))? + .with_storage_options(storage_options.clone()); + + let maybe_table = runtime + .block_on(async move { + match builder.load().await { + Ok(table) => Ok(Some(table)), + Err(deltalake::DeltaTableError::NotATable(_)) => Ok(None), + Err(err) => Err(err), + } + }) + .map_err(|err| Box::new(RunError(format!("delta schema load failed: {err}"))))?; + + let Some(table) = maybe_table else { + return Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: None, + }); + }; + + let snapshot = table + .snapshot() + .map_err(|err| Box::new(RunError(format!("delta schema load failed: {err}"))))?; + let target_schema = table.schema(); + let target_fields = target_schema.fields(); + let source_schema = batch.schema(); + let source_fields = source_schema.fields(); + let added_columns = additive_columns(target_fields, source_fields); + let incompatible_changes = incompatible_schema_changes(target_fields, source_fields); + + summary.added_columns = added_columns; + summary.incompatible_changes_detected = !incompatible_changes.is_empty(); + + if !incompatible_changes.is_empty() { + return Err(Box::new(RunError(format!( + "entity.name={} delta schema evolution failed: incompatible changes detected: {}", + entity.name, + incompatible_changes.join("; ") + )))); + } + + if summary.added_columns.is_empty() { + return Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: None, + }); + } + + let partition_columns = snapshot.metadata().partition_columns(); + if !partition_columns.is_empty() { + return Err(Box::new(RunError(format!( + "entity.name={} delta schema evolution failed: adding columns is unsupported for partitioned delta tables", + entity.name + )))); + } + + summary.applied = true; + Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: Some(SchemaMode::Merge), + }) +} + +fn additive_columns(target_fields: &[FieldRef], source_fields: &[FieldRef]) -> Vec { + let target_names = target_fields + .iter() + .map(|field| field.name().to_string()) + .collect::>(); + source_fields + .iter() + .filter(|field| !target_names.contains(field.name())) + .map(|field| field.name().to_string()) + .collect() +} + +fn incompatible_schema_changes( + target_fields: &[FieldRef], + source_fields: &[FieldRef], +) -> Vec { + let source_by_name = source_fields + .iter() + .map(|field| (field.name(), field)) + .collect::>(); + let mut incompatible = Vec::new(); + for target_field in target_fields { + let Some(source_field) = source_by_name.get(target_field.name()) else { + incompatible.push(format!("missing existing column {}", target_field.name())); + continue; + }; + if target_field.data_type() != source_field.data_type() { + incompatible.push(format!( + "column {} type changed from {:?} to {:?}", + target_field.name(), + target_field.data_type(), + source_field.data_type() + )); + } + if !target_field.is_nullable() && source_field.is_nullable() { + incompatible.push(format!( + "column {} nullability changed from non-nullable to nullable", + target_field.name() + )); + } + } + incompatible +} + +#[allow(clippy::too_many_arguments)] pub(crate) fn write_delta_batch_version( runtime: &tokio::runtime::Runtime, batch: deltalake::arrow::record_batch::RecordBatch, @@ -82,6 +235,7 @@ pub(crate) fn write_delta_batch_version( save_mode: SaveMode, partition_by: Option>, target_file_size_bytes: Option, + schema_mode: Option, ) -> FloeResult { let store = object_store::delta_store_config(target, resolver, entity)?; let table_url = store.table_url; @@ -103,6 +257,9 @@ pub(crate) fn write_delta_batch_version( }, }; let mut write = table.write(vec![batch]).with_save_mode(save_mode); + if let Some(schema_mode) = schema_mode { + write = write.with_schema_mode(schema_mode); + } if let Some(partition_by) = partition_by { write = write.with_partition_columns(partition_by); } diff --git a/crates/floe-core/src/report/entity.rs b/crates/floe-core/src/report/entity.rs index 47201f9..acc773f 100644 --- a/crates/floe-core/src/report/entity.rs +++ b/crates/floe-core/src/report/entity.rs @@ -37,6 +37,7 @@ pub(crate) struct RunReportContext<'a> { pub accepted_target_rows_before: Option, pub accepted_target_rows_after: Option, pub accepted_merge_elapsed_ms: Option, + pub accepted_schema_evolution: report::SchemaEvolutionSummary, pub unique_constraints: Vec, } @@ -119,6 +120,7 @@ pub(crate) fn build_run_report(ctx: RunReportContext<'_>) -> report::RunReport { target_rows_after: ctx.accepted_target_rows_after, merge_elapsed_ms: ctx.accepted_merge_elapsed_ms, }, + schema_evolution: ctx.accepted_schema_evolution, unique_constraints: build_unique_constraint_reports(ctx.severity, &ctx.unique_constraints), results: ctx.totals, files: ctx.file_reports, diff --git a/crates/floe-core/src/report/mod.rs b/crates/floe-core/src/report/mod.rs index 46e61d2..d275ef0 100644 --- a/crates/floe-core/src/report/mod.rs +++ b/crates/floe-core/src/report/mod.rs @@ -19,6 +19,7 @@ pub struct RunReport { pub sink: SinkEcho, pub policy: PolicyEcho, pub accepted_output: AcceptedOutputSummary, + pub schema_evolution: SchemaEvolutionSummary, #[serde(default)] #[serde(skip_serializing_if = "Vec::is_empty")] pub unique_constraints: Vec, @@ -191,6 +192,18 @@ pub struct AcceptedOutputSummary { pub merge_elapsed_ms: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub struct SchemaEvolutionSummary { + pub enabled: bool, + pub mode: String, + pub applied: bool, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub added_columns: Vec, + pub incompatible_changes_detected: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct UniqueConstraintReport { diff --git a/crates/floe-core/src/run/entity/accepted_write.rs b/crates/floe-core/src/run/entity/accepted_write.rs index 803169b..900672d 100644 --- a/crates/floe-core/src/run/entity/accepted_write.rs +++ b/crates/floe-core/src/run/entity/accepted_write.rs @@ -8,6 +8,7 @@ use crate::{config, io, report, FloeResult}; use super::super::output::{write_accepted_output, AcceptedOutputContext}; use super::EntityPhaseTimings; +use crate::run::events::{event_time_ms, RunEvent, RunObserver}; use crate::run::RunContext; use io::storage::Target; @@ -34,10 +35,30 @@ pub(super) struct AcceptedWriteReportState { pub(super) target_rows_before: Option, pub(super) target_rows_after: Option, pub(super) merge_elapsed_ms: Option, + pub(super) schema_evolution: io::format::AcceptedSchemaEvolution, pub(super) write_perf: Option, } impl AcceptedWriteReportState { + pub(super) fn for_entity(entity: &config::EntityConfig, write_mode: config::WriteMode) -> Self { + let schema_evolution = entity.schema.resolved_schema_evolution(); + Self { + schema_evolution: io::format::AcceptedSchemaEvolution { + enabled: entity.sink.accepted.format == "delta" + && schema_evolution.mode == config::SchemaEvolutionMode::AddColumns + && matches!( + write_mode, + config::WriteMode::Append | config::WriteMode::Overwrite + ), + mode: schema_evolution.mode.as_str().to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, + ..Self::default() + } + } + pub(super) fn from_write_output(output: io::format::AcceptedWriteOutput) -> Self { Self { parts_written: output.parts_written, @@ -68,6 +89,7 @@ impl AcceptedWriteReportState { target_rows_before: output.merge.as_ref().map(|merge| merge.target_rows_before), target_rows_after: output.merge.as_ref().map(|merge| merge.target_rows_after), merge_elapsed_ms: output.merge.as_ref().map(|merge| merge.merge_elapsed_ms), + schema_evolution: output.schema_evolution, write_perf: output.perf, } } @@ -89,6 +111,7 @@ impl AcceptedWriteReportState { pub(super) struct AcceptedWritePhaseContext<'a> { pub(super) run_context: &'a RunContext, + pub(super) observer: &'a dyn RunObserver, pub(super) runtime: &'a mut dyn crate::runtime::Runtime, pub(super) entity: &'a config::EntityConfig, pub(super) accepted_target: &'a Target, @@ -104,6 +127,7 @@ pub(super) fn run_accepted_write_phase( ) -> FloeResult { let AcceptedWritePhaseContext { run_context, + observer, runtime, entity, accepted_target, @@ -114,7 +138,7 @@ pub(super) fn run_accepted_write_phase( accepted_accum, } = context; - let mut accepted_write_report = AcceptedWriteReportState::default(); + let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity, write_mode); if accepted_accum.is_empty() { return Ok(accepted_write_report); } @@ -151,6 +175,15 @@ pub(super) fn run_accepted_write_phase( } accepted_write_report = AcceptedWriteReportState::from_write_output(accepted_output); + if accepted_write_report.schema_evolution.applied { + observer.on_event(RunEvent::SchemaEvolutionApplied { + run_id: run_context.run_id.clone(), + entity: entity.name.clone(), + mode: accepted_write_report.schema_evolution.mode.clone(), + added_columns: accepted_write_report.schema_evolution.added_columns.clone(), + ts_ms: event_time_ms(), + }); + } Ok(accepted_write_report) } diff --git a/crates/floe-core/src/run/entity/mod.rs b/crates/floe-core/src/run/entity/mod.rs index cdfe1ac..00bd5b5 100644 --- a/crates/floe-core/src/run/entity/mod.rs +++ b/crates/floe-core/src/run/entity/mod.rs @@ -241,6 +241,7 @@ pub(super) fn run_entity( let accepted_target_uri = accepted_target.target_uri().to_string(); let accepted_write_report = run_accepted_write_phase(AcceptedWritePhaseContext { run_context: context, + observer, runtime, entity, accepted_target: &accepted_target, @@ -289,6 +290,15 @@ pub(super) fn run_entity( accepted_target_rows_before: accepted_write_report.target_rows_before, accepted_target_rows_after: accepted_write_report.target_rows_after, accepted_merge_elapsed_ms: accepted_write_report.merge_elapsed_ms, + accepted_schema_evolution: report::SchemaEvolutionSummary { + enabled: accepted_write_report.schema_evolution.enabled, + mode: accepted_write_report.schema_evolution.mode, + applied: accepted_write_report.schema_evolution.applied, + added_columns: accepted_write_report.schema_evolution.added_columns, + incompatible_changes_detected: accepted_write_report + .schema_evolution + .incompatible_changes_detected, + }, unique_constraints, }); diff --git a/crates/floe-core/src/run/events.rs b/crates/floe-core/src/run/events.rs index 8233294..8570ae9 100644 --- a/crates/floe-core/src/run/events.rs +++ b/crates/floe-core/src/run/events.rs @@ -43,6 +43,13 @@ pub enum RunEvent { elapsed_ms: u64, ts_ms: u128, }, + SchemaEvolutionApplied { + run_id: String, + entity: String, + mode: String, + added_columns: Vec, + ts_ms: u128, + }, EntityFinished { run_id: String, name: String, diff --git a/crates/floe-core/tests/integration/delta_run.rs b/crates/floe-core/tests/integration/delta_run.rs index cb00817..eb24438 100644 --- a/crates/floe-core/tests/integration/delta_run.rs +++ b/crates/floe-core/tests/integration/delta_run.rs @@ -1,11 +1,82 @@ use std::fs; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, OnceLock}; use deltalake::table::builder::DeltaTableBuilder; -use floe_core::{run, validate, RunOptions, ValidateOptions}; +use floe_core::{run, set_observer, validate, RunEvent, RunObserver, RunOptions, ValidateOptions}; use polars::prelude::DataFrame; use url::Url; +#[derive(Default)] +struct TestObserver { + events: Mutex>, +} + +impl TestObserver { + fn reset(&self) { + self.events.lock().expect("observer lock").clear(); + } + + fn events_for_run(&self, run_id: &str) -> Vec { + self.events + .lock() + .expect("observer lock") + .iter() + .filter(|event| match event { + RunEvent::Log { + run_id: event_run_id, + .. + } + | RunEvent::RunStarted { + run_id: event_run_id, + .. + } + | RunEvent::EntityStarted { + run_id: event_run_id, + .. + } + | RunEvent::FileStarted { + run_id: event_run_id, + .. + } + | RunEvent::FileFinished { + run_id: event_run_id, + .. + } + | RunEvent::SchemaEvolutionApplied { + run_id: event_run_id, + .. + } + | RunEvent::EntityFinished { + run_id: event_run_id, + .. + } + | RunEvent::RunFinished { + run_id: event_run_id, + .. + } => event_run_id == run_id, + }) + .cloned() + .collect() + } +} + +impl RunObserver for TestObserver { + fn on_event(&self, event: RunEvent) { + self.events.lock().expect("observer lock").push(event); + } +} + +fn test_observer() -> &'static TestObserver { + static OBSERVER: OnceLock> = OnceLock::new(); + let observer = OBSERVER.get_or_init(|| { + let observer = Arc::new(TestObserver::default()); + let _ = set_observer(observer.clone()); + observer + }); + observer.as_ref() +} + fn write_csv(dir: &Path, name: &str, contents: &str) -> PathBuf { let path = dir.join(name); fs::write(&path, contents).expect("write csv"); @@ -133,7 +204,7 @@ fn local_delta_run_without_partitioning_preserves_existing_behavior() { fs::create_dir_all(&input_dir).expect("create input dir"); write_csv(&input_dir, "orders.csv", "id;country\n1;us\n2;ca\n"); - let yaml = format!( + let initial_yaml = format!( r#"version: "0.1" report: path: "{report_dir}" @@ -159,7 +230,7 @@ entities: input_dir = input_dir.display(), accepted_dir = accepted_dir.display(), ); - let config_path = write_config(root, &yaml); + let config_path = write_config(root, &initial_yaml); let outcome = run( &config_path, @@ -184,6 +255,315 @@ entities: assert!(report.accepted_output.files_written > 0); } +#[test] +fn local_delta_append_add_columns_mode_reports_schema_evolution_and_event() { + let observer = test_observer(); + observer.reset(); + + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/orders_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id\n1\n2\n"); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;email\n3;a@example.com\n4;\n"); + let evolved_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" + - name: "email" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-append".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("schema evolution append run"); + + let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(report.schema_evolution.applied); + assert_eq!( + report.schema_evolution.added_columns, + vec!["email".to_string()] + ); + assert!(!report.schema_evolution.incompatible_changes_detected); + + let events = observer.events_for_run("it-delta-schema-evolution-append"); + let event = events + .iter() + .find_map(|event| match event { + RunEvent::SchemaEvolutionApplied { + entity, + mode, + added_columns, + .. + } => Some((entity, mode, added_columns)), + _ => None, + }) + .expect("schema evolution event"); + assert_eq!(event.0, "orders"); + assert_eq!(event.1, "add_columns"); + assert_eq!(event.2, &vec!["email".to_string()]); + + let url = Url::from_directory_path(&accepted_dir).expect("delta table path url"); + let builder = DeltaTableBuilder::from_url(url) + .expect("delta table builder") + .with_storage_options(std::collections::HashMap::new()); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("delta runtime"); + let table = runtime + .block_on(async move { builder.load().await }) + .expect("load delta table"); + let field_names = table + .snapshot() + .expect("table snapshot") + .schema() + .fields() + .map(|field| field.name.clone()) + .collect::>(); + assert!(field_names.contains(&"email".to_string())); +} + +#[test] +fn local_delta_overwrite_add_columns_mode_reports_noop_when_unchanged() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/orders_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "orders.csv", + "id;email\n1;a@example.com\n2;b@example.com\n", + ); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "overwrite" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" + - name: "email" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-overwrite-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial overwrite run"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-overwrite-noop".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("overwrite no-op run"); + + let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(!report.schema_evolution.applied); + assert!(report.schema_evolution.added_columns.is_empty()); + assert!(!report.schema_evolution.incompatible_changes_detected); +} + +#[test] +fn local_delta_strict_mode_rejects_added_columns() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/orders_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id\n1\n2\n"); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + columns: + - name: "id" + type: "int64" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-strict-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial strict run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;email\n3;a@example.com\n"); + let evolved_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + columns: + - name: "id" + type: "int64" + - name: "email" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let err = run( + &config_path, + RunOptions { + run_id: Some("it-delta-strict-extra-column".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect_err("strict mode should reject added columns"); + assert!(err.to_string().contains("delta write failed")); +} + #[test] fn local_delta_merge_scd1_upserts_and_reports_metrics() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); @@ -530,6 +910,91 @@ entities: assert!(rows.contains(&("3".to_string(), "us".to_string(), "carol".to_string(), true))); } +#[test] +fn local_delta_merge_add_columns_mode_reports_schema_evolution_disabled_when_all_rows_rejected() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let rejected_dir = root.join("out/rejected/customer_csv"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n2;bob\n"); + + let yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + rejected: + format: "csv" + path: "{rejected_dir}" + policy: + severity: "reject" + schema: + primary_key: ["id"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + rejected_dir = rejected_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-schema-evolution-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;name\nx;carol\ny;dave\n"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-schema-evolution-empty".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge run with zero accepted rows"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.rows_total, 2); + assert_eq!(report.results.accepted_total, 0); + assert_eq!(report.results.rejected_total, 2); + assert!(!report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(!report.schema_evolution.applied); + assert!(report.schema_evolution.added_columns.is_empty()); + assert!(!report.schema_evolution.incompatible_changes_detected); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 2); +} + #[test] fn local_delta_merge_scd2_supports_custom_system_column_names() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); diff --git a/crates/floe-core/tests/unit/io/write/delta_write.rs b/crates/floe-core/tests/unit/io/write/delta_write.rs index 876d9b8..2a6bb85 100644 --- a/crates/floe-core/tests/unit/io/write/delta_write.rs +++ b/crates/floe-core/tests/unit/io/write/delta_write.rs @@ -217,6 +217,155 @@ fn delta_append_rejects_nulls_for_non_nullable_columns() -> FloeResult<()> { Ok(()) } +#[test] +fn delta_append_add_columns_mode_evolves_schema() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Append, + vec![column("id", "int64", Some(false))], + None, + ); + let mut initial = df!("id" => &[1i64, 2])?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + entity + .schema + .columns + .push(column("email", "string", Some(true))); + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut evolved = df!( + "id" => &[3i64, 4], + "email" => &[Some("a@example.com"), None] + )?; + write_delta_table( + &mut evolved, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + let runtime = runtime()?; + let table = open_table(&runtime, &table_path)?; + let field_names = table + .snapshot()? + .schema() + .fields() + .map(|field| field.name.clone()) + .collect::>(); + assert!(field_names.contains(&"email".to_string())); + assert_eq!(row_count(&table)?, 4); + + Ok(()) +} + +#[test] +fn delta_overwrite_add_columns_mode_evolves_schema() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Overwrite, + vec![column("id", "int64", Some(false))], + None, + ); + let mut initial = df!("id" => &[1i64, 2])?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Overwrite, + )?; + + entity + .schema + .columns + .push(column("email", "string", Some(true))); + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut evolved = df!( + "id" => &[3i64, 4], + "email" => &[Some("a@example.com"), Some("b@example.com")] + )?; + write_delta_table( + &mut evolved, + &target, + &resolver, + &entity, + config::WriteMode::Overwrite, + )?; + + let runtime = runtime()?; + let table = open_table(&runtime, &table_path)?; + let field_names = table + .snapshot()? + .schema() + .fields() + .map(|field| field.name.clone()) + .collect::>(); + assert!(field_names.contains(&"email".to_string())); + assert_eq!(row_count(&table)?, 2); + + Ok(()) +} + +#[test] +fn delta_add_columns_mode_rejects_incompatible_type_changes() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Append, + vec![column("id", "int64", Some(false))], + None, + ); + let mut initial = df!("id" => &[1i64, 2])?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + entity.schema.columns = vec![column("id", "string", Some(false))]; + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut incompatible = df!("id" => &["3", "4"])?; + let err = write_delta_table( + &mut incompatible, + &target, + &resolver, + &entity, + config::WriteMode::Append, + ) + .expect_err("type change should fail"); + assert!(err + .to_string() + .contains("delta schema evolution failed: incompatible changes detected")); + + Ok(()) +} + #[test] fn delta_write_uses_normalized_schema_names() -> FloeResult<()> { let temp_dir = tempfile::TempDir::new()?; @@ -472,6 +621,13 @@ fn normalize_config(strategy: &str) -> config::NormalizeColumnsConfig { } } +fn add_columns_schema_evolution() -> config::SchemaEvolutionConfig { + config::SchemaEvolutionConfig { + mode: config::SchemaEvolutionMode::AddColumns, + on_incompatible: config::SchemaEvolutionIncompatibleAction::Fail, + } +} + fn runtime() -> FloeResult { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/crates/floe-core/tests/unit/run/report.rs b/crates/floe-core/tests/unit/run/report.rs index c2a320c..c335981 100644 --- a/crates/floe-core/tests/unit/run/report.rs +++ b/crates/floe-core/tests/unit/run/report.rs @@ -71,6 +71,13 @@ fn sample_report() -> RunReport { target_rows_after: None, merge_elapsed_ms: None, }, + schema_evolution: SchemaEvolutionSummary { + enabled: false, + mode: "strict".to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, unique_constraints: Vec::new(), results: ResultsTotals { files_total: 1, @@ -170,6 +177,7 @@ fn report_serializes_expected_keys() { assert!(object.contains_key("sink")); assert!(object.contains_key("policy")); assert!(object.contains_key("accepted_output")); + assert!(object.contains_key("schema_evolution")); assert!(object.contains_key("results")); assert!(object.contains_key("files")); } diff --git a/docs/report.md b/docs/report.md index 1c7a8d4..1ecf0f9 100644 --- a/docs/report.md +++ b/docs/report.md @@ -49,6 +49,7 @@ Golden example files live under `example/report/run_2026-01-19T10-23-45Z/`. - `sink`: Accepted/rejected/archive paths and formats. - `policy`: Severity. - `accepted_output`: Entity-level accepted output summary (path, row counts, write metadata, and file metrics). +- `schema_evolution`: Entity-level schema-evolution summary for the accepted write path. - `results`: Totals for files, rows, accepted/rejected rows, warnings, errors. - `files`: Per-file outcomes and validation summary. @@ -89,6 +90,19 @@ Merge-specific metrics (optional, Delta `merge_scd1` / `merge_scd2`): - `target_rows_after` - `merge_elapsed_ms` +### `schema_evolution` (entity report) + +- `enabled`: `true` when schema evolution is active for the entity write path +- `mode`: resolved config mode (`strict` or `add_columns`) +- `applied`: `true` when Floe actually added Delta columns during the write +- `added_columns`: ordered list of Delta columns added in that write +- `incompatible_changes_detected`: `true` when Floe detected non-additive changes and failed the write + +Notes: +- Floe writes an explicit no-op shape (`applied=false`, empty `added_columns`) when + `mode=add_columns` is enabled but the target schema already matches the write schema. +- This block is populated for entity reports even when the write path remains strict. + Notes: - Metrics are populated when the writer can collect them cheaply and reliably. - Delta metrics are derived from committed Delta log `add` actions for the committed version. diff --git a/docs/sinks/delta.md b/docs/sinks/delta.md index 783daac..8777627 100644 --- a/docs/sinks/delta.md +++ b/docs/sinks/delta.md @@ -28,10 +28,20 @@ Semantics: `__floe_is_current`, `__floe_valid_from`, `__floe_valid_to` - Local, S3, ADLS, and GCS storage are supported for delta output. +Schema evolution for standard Delta writes: +- Default behavior remains strict. +- `schema.schema_evolution.mode: add_columns` enables additive-only schema evolution for + accepted Delta `append` and `overwrite` writes. +- Existing columns must remain compatible; incompatible non-additive changes fail before commit. +- When evolution is applied, Floe emits a structured `schema_evolution_applied` event and + populates the entity report `schema_evolution` block with `applied=true` and `added_columns`. +- When evolution is enabled but no Delta schema change is needed, the entity report still + includes `schema_evolution` with `applied=false`. + Merge mode notes (`merge_scd1`, `merge_scd2`): - Requires `schema.primary_key` in config (non-empty, non-nullable columns). - Only supported on Delta accepted sinks. -- Current implementation validates strict schema compatibility; no schema evolution. +- Merge modes remain strict; this task does not change merge schema behavior. - Source duplicates on merge key are handled during row checks before write. - Single-writer assumption applies; Delta commit conflicts surface as write errors. diff --git a/docs/write_modes.md b/docs/write_modes.md index 9ae8a21..9602fd6 100644 --- a/docs/write_modes.md +++ b/docs/write_modes.md @@ -8,6 +8,20 @@ Supported values: - `merge_scd1` (Delta accepted sink only): SCD1 upsert keyed by `schema.primary_key`. - `merge_scd2` (Delta accepted sink only): SCD2 merge keyed by `schema.primary_key`. +## Standard Delta writes (`append`, `overwrite`) + +For accepted Delta sinks, standard write modes stay strict by default. + +- `schema.schema_evolution.mode: strict` preserves the previous behavior: + source and target Delta schemas must already be compatible. +- `schema.schema_evolution.mode: add_columns` enables additive-only Delta schema evolution + for `append` and `overwrite`. + - Only new columns present in the source schema may be added to the target Delta table. + - Existing columns must remain compatible; drop/rename/type-change flows are rejected. + - When enabled and no new columns are present, Floe records a no-op schema-evolution report block. + - When columns are added, Floe emits a structured `schema_evolution_applied` event and + records the added column names in the entity run report. + ## `merge_scd1` semantics - Accepted sink format must be `delta`. @@ -17,7 +31,7 @@ Supported values: - On key match: update non-key columns from source. - `sink.accepted.merge.ignore_columns` can exclude additional business columns from SCD1 update sets. - On missing key: insert source row. -- Current v1 behavior validates strict schema compatibility and does not perform schema evolution. +- Merge modes still validate strict schema compatibility and do not perform schema evolution. - Merge execution uses Delta native merge (`MERGE INTO`) through delta-rs/DataFusion. - Single-writer assumption: Delta commit conflicts are returned as clear write errors. @@ -44,7 +58,7 @@ Supported values: - Change detection columns are resolved as: - `sink.accepted.merge.compare_columns` when configured - otherwise all non-key business columns minus `sink.accepted.merge.ignore_columns` -- Current v1 behavior validates strict schema compatibility and does not perform schema evolution. +- Merge modes still validate strict schema compatibility and do not perform schema evolution. - Single-writer assumption: Delta commit conflicts are returned as clear write errors. ## Rejected output behavior