diff --git a/crates/floe-core/src/config/parse.rs b/crates/floe-core/src/config/parse.rs index 4b4336d..0b7a8e9 100644 --- a/crates/floe-core/src/config/parse.rs +++ b/crates/floe-core/src/config/parse.rs @@ -11,9 +11,10 @@ use crate::config::yaml_decode::{ use crate::config::{ ArchiveTarget, CatalogDefinition, CatalogsConfig, ColumnConfig, DomainConfig, EntityConfig, EntityMetadata, EnvConfig, IcebergPartitionFieldConfig, IcebergSinkTargetConfig, - NormalizeColumnsConfig, PolicyConfig, ProjectMetadata, ReportConfig, RootConfig, SchemaConfig, - SchemaMismatchConfig, SinkConfig, SinkOptions, SinkTarget, SourceConfig, SourceOptions, - StorageDefinition, StoragesConfig, WriteMode, + MergeOptionsConfig, MergeScd2OptionsConfig, NormalizeColumnsConfig, PolicyConfig, + ProjectMetadata, ReportConfig, RootConfig, SchemaConfig, SchemaMismatchConfig, SinkConfig, + SinkOptions, SinkTarget, SourceConfig, SourceOptions, StorageDefinition, StoragesConfig, + WriteMode, }; use crate::{ConfigError, FloeResult}; @@ -349,7 +350,13 @@ fn parse_sink_target( let hash = yaml_hash(value, ctx)?; let mut allowed = vec!["format", "path", "storage", "filesystem"]; if allow_options { - allowed.extend(["options", "iceberg", "partition_by", "partition_spec"]); + allowed.extend([ + "options", + "merge", + "iceberg", + "partition_by", + "partition_spec", + ]); } validate_known_keys(hash, ctx, &allowed)?; let storage = opt_string(hash, "storage", ctx)?; @@ -367,6 +374,14 @@ fn parse_sink_target( } else { None }; + let merge = if allow_options { + match hash_get(hash, "merge") { + Some(value) => Some(parse_merge_options(value, &format!("{ctx}.merge"))?), + None => None, + } + } else { + None + }; let partition_by = if allow_options { opt_vec_string(hash, "partition_by", ctx)? } else { @@ -399,6 +414,7 @@ fn parse_sink_target( path: get_string(hash, "path", ctx)?, storage: storage.or(filesystem), options, + merge, iceberg, partition_by, partition_spec, @@ -432,6 +448,38 @@ fn parse_sink_options(value: &Yaml, ctx: &str) -> FloeResult { }) } +fn parse_merge_options(value: &Yaml, ctx: &str) -> FloeResult { + let hash = yaml_hash(value, ctx)?; + validate_known_keys(hash, ctx, &["ignore_columns", "compare_columns", "scd2"])?; + let scd2 = match hash_get(hash, "scd2") { + Some(value) => Some(parse_merge_scd2_options(value, &format!("{ctx}.scd2"))?), + None => None, + }; + Ok(MergeOptionsConfig { + ignore_columns: opt_vec_string(hash, "ignore_columns", ctx)?, + compare_columns: opt_vec_string(hash, "compare_columns", ctx)?, + scd2, + }) +} + +fn parse_merge_scd2_options(value: &Yaml, ctx: &str) -> FloeResult { + let hash = yaml_hash(value, ctx)?; + validate_known_keys( + hash, + ctx, + &[ + "current_flag_column", + "valid_from_column", + "valid_to_column", + ], + )?; + Ok(MergeScd2OptionsConfig { + current_flag_column: opt_string(hash, "current_flag_column", ctx)?, + valid_from_column: opt_string(hash, "valid_from_column", ctx)?, + valid_to_column: opt_string(hash, "valid_to_column", ctx)?, + }) +} + fn parse_iceberg_partition_spec( value: &Yaml, ctx: &str, diff --git a/crates/floe-core/src/config/types.rs b/crates/floe-core/src/config/types.rs index f89b929..9f689a3 100644 --- a/crates/floe-core/src/config/types.rs +++ b/crates/floe-core/src/config/types.rs @@ -218,12 +218,31 @@ pub struct SinkTarget { pub path: String, pub storage: Option, pub options: Option, + pub merge: Option, pub iceberg: Option, pub partition_by: Option>, pub partition_spec: Option>, pub write_mode: WriteMode, } +pub const DEFAULT_SCD2_CURRENT_FLAG_COLUMN: &str = "__floe_is_current"; +pub const DEFAULT_SCD2_VALID_FROM_COLUMN: &str = "__floe_valid_from"; +pub const DEFAULT_SCD2_VALID_TO_COLUMN: &str = "__floe_valid_to"; + +#[derive(Debug, Clone)] +pub struct MergeOptionsConfig { + pub ignore_columns: Option>, + pub compare_columns: Option>, + pub scd2: Option, +} + +#[derive(Debug, Clone)] +pub struct MergeScd2OptionsConfig { + pub current_flag_column: Option, + pub valid_from_column: Option, + pub valid_to_column: Option, +} + #[derive(Debug)] pub struct SinkOptions { pub compression: Option, diff --git a/crates/floe-core/src/config/validate.rs b/crates/floe-core/src/config/validate.rs index c4f8a70..66078de 100644 --- a/crates/floe-core/src/config/validate.rs +++ b/crates/floe-core/src/config/validate.rs @@ -252,34 +252,222 @@ fn validate_sink( fn validate_sink_write_mode(entity: &EntityConfig) -> FloeResult<()> { let write_mode = entity.sink.resolved_write_mode(); - if !matches!( + let is_merge_mode = matches!( write_mode, crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2 - ) { - return Ok(()); + ); + if is_merge_mode { + let mode_name = write_mode.as_str(); + if entity.sink.accepted.format != "delta" { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.write_mode={} requires sink.accepted.format=delta", + entity.name, mode_name + )))); + } + + let primary_key = entity.schema.primary_key.as_ref().ok_or_else(|| { + Box::new(ConfigError(format!( + "entity.name={} sink.write_mode={} requires schema.primary_key", + entity.name, mode_name + ))) as Box + })?; + if primary_key.is_empty() { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.write_mode={} requires non-empty schema.primary_key", + entity.name, mode_name + )))); + } } - let mode_name = write_mode.as_str(); + + validate_merge_options(entity, write_mode) +} + +fn validate_merge_options( + entity: &EntityConfig, + write_mode: crate::config::WriteMode, +) -> FloeResult<()> { + let Some(merge) = entity.sink.accepted.merge.as_ref() else { + return Ok(()); + }; if entity.sink.accepted.format != "delta" { return Err(Box::new(ConfigError(format!( - "entity.name={} sink.write_mode={} requires sink.accepted.format=delta", - entity.name, mode_name + "entity.name={} sink.accepted.merge is only supported when sink.accepted.format=delta", + entity.name )))); } - - let primary_key = entity.schema.primary_key.as_ref().ok_or_else(|| { - Box::new(ConfigError(format!( - "entity.name={} sink.write_mode={} requires schema.primary_key", - entity.name, mode_name - ))) as Box - })?; - if primary_key.is_empty() { + if !matches!( + write_mode, + crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2 + ) { return Err(Box::new(ConfigError(format!( - "entity.name={} sink.write_mode={} requires non-empty schema.primary_key", - entity.name, mode_name + "entity.name={} sink.accepted.merge is only supported with sink.write_mode=merge_scd1 or merge_scd2", + entity.name )))); } + let schema_columns = entity + .schema + .columns + .iter() + .map(|column| column.name.as_str()) + .collect::>(); + let normalize_strategy = if entity + .schema + .normalize_columns + .as_ref() + .and_then(|normalize| normalize.enabled) + .unwrap_or(false) + { + entity + .schema + .normalize_columns + .as_ref() + .and_then(|normalize| normalize.strategy.as_deref()) + .or(Some("snake_case")) + } else { + None + }; + let resolved_output_columns = crate::checks::normalize::resolve_output_columns( + &entity.schema.columns, + normalize_strategy, + ); + let resolved_output_column_names = resolved_output_columns + .iter() + .map(|column| column.name.as_str()) + .collect::>(); + let primary_key_columns = entity + .schema + .primary_key + .as_ref() + .map(|columns| { + columns + .iter() + .map(|column| column.trim().to_string()) + .collect::>() + }) + .unwrap_or_default(); + + if let Some(ignore_columns) = merge.ignore_columns.as_ref() { + validate_merge_column_list( + entity, + "sink.accepted.merge.ignore_columns", + ignore_columns, + &schema_columns, + )?; + for (index, column_name) in ignore_columns.iter().enumerate() { + let value = column_name.trim(); + if primary_key_columns.contains(value) { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.accepted.merge.ignore_columns[{}]={} cannot reference schema.primary_key column", + entity.name, index, value + )))); + } + } + } + + if let Some(compare_columns) = merge.compare_columns.as_ref() { + validate_merge_column_list( + entity, + "sink.accepted.merge.compare_columns", + compare_columns, + &schema_columns, + )?; + for (index, column_name) in compare_columns.iter().enumerate() { + let value = column_name.trim(); + if primary_key_columns.contains(value) { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.accepted.merge.compare_columns[{}]={} cannot reference schema.primary_key column", + entity.name, index, value + )))); + } + } + } + + if let Some(scd2) = merge.scd2.as_ref() { + if write_mode != crate::config::WriteMode::MergeScd2 { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.accepted.merge.scd2 is only supported with sink.write_mode=merge_scd2", + entity.name + )))); + } + let current_flag_column = scd2 + .current_flag_column + .as_deref() + .unwrap_or(crate::config::DEFAULT_SCD2_CURRENT_FLAG_COLUMN) + .trim(); + let valid_from_column = scd2 + .valid_from_column + .as_deref() + .unwrap_or(crate::config::DEFAULT_SCD2_VALID_FROM_COLUMN) + .trim(); + let valid_to_column = scd2 + .valid_to_column + .as_deref() + .unwrap_or(crate::config::DEFAULT_SCD2_VALID_TO_COLUMN) + .trim(); + let resolved_columns = [ + ("current_flag_column", current_flag_column), + ("valid_from_column", valid_from_column), + ("valid_to_column", valid_to_column), + ]; + for (field, value) in resolved_columns { + if value.is_empty() { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.accepted.merge.scd2.{} must not be empty", + entity.name, field + )))); + } + if resolved_output_column_names.contains(value) { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.accepted.merge.scd2.{}={} collides with schema column name", + entity.name, field, value + )))); + } + } + let unique_columns = resolved_columns + .iter() + .map(|(_, value)| *value) + .collect::>(); + if unique_columns.len() != resolved_columns.len() { + return Err(Box::new(ConfigError(format!( + "entity.name={} sink.accepted.merge.scd2 column names must be unique", + entity.name + )))); + } + } + + Ok(()) +} + +fn validate_merge_column_list( + entity: &EntityConfig, + field: &str, + values: &[String], + schema_columns: &HashSet<&str>, +) -> FloeResult<()> { + let mut seen = HashSet::new(); + for (index, value) in values.iter().enumerate() { + let trimmed = value.trim(); + if trimmed.is_empty() { + return Err(Box::new(ConfigError(format!( + "entity.name={} {}[{}] must not be empty", + entity.name, field, index + )))); + } + if !schema_columns.contains(trimmed) { + return Err(Box::new(ConfigError(format!( + "entity.name={} {}[{}]={} references unknown schema column", + entity.name, field, index, trimmed + )))); + } + if !seen.insert(trimmed.to_string()) { + return Err(Box::new(ConfigError(format!( + "entity.name={} {} has duplicate column {}", + entity.name, field, trimmed + )))); + } + } Ok(()) } diff --git a/crates/floe-core/src/io/write/strategy/merge/scd1.rs b/crates/floe-core/src/io/write/strategy/merge/scd1.rs index 05aecdb..a9888b4 100644 --- a/crates/floe-core/src/io/write/strategy/merge/scd1.rs +++ b/crates/floe-core/src/io/write/strategy/merge/scd1.rs @@ -105,9 +105,12 @@ impl MergeBackend for DeltaMergeBackend { .map(|name| name.to_string()) .collect::>(); let merge_key_set = merge_key.iter().map(String::as_str).collect::>(); + let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity)?; let update_columns = source_columns .iter() - .filter(|name| !merge_key_set.contains(name.as_str())) + .filter(|name| { + !merge_key_set.contains(name.as_str()) && !ignore_columns.contains(name.as_str()) + }) .cloned() .collect::>(); let predicate = shared::merge_predicate_sql(&merge_key); diff --git a/crates/floe-core/src/io/write/strategy/merge/scd2.rs b/crates/floe-core/src/io/write/strategy/merge/scd2.rs index 1e9872c..3687f25 100644 --- a/crates/floe-core/src/io/write/strategy/merge/scd2.rs +++ b/crates/floe-core/src/io/write/strategy/merge/scd2.rs @@ -12,10 +12,6 @@ use crate::{config, FloeResult}; use super::{shared, MergeBackend, MergeExecutionContext}; -const SCD2_IS_CURRENT_COLUMN: &str = "__floe_is_current"; -const SCD2_VALID_FROM_COLUMN: &str = "__floe_valid_from"; -const SCD2_VALID_TO_COLUMN: &str = "__floe_valid_to"; - struct DeltaMergeBackend; pub(crate) fn execute_merge_scd2_with_runtime( @@ -58,12 +54,20 @@ impl MergeBackend for DeltaMergeBackend { let mut perf = shared::DeltaMergePerfBreakdown::default(); let merge_key = shared::resolve_merge_key(ctx.entity)?; let merge_key_set = merge_key.iter().map(String::as_str).collect::>(); - let compare_columns = source_df - .get_column_names() - .iter() - .map(|name| name.to_string()) - .filter(|name| !merge_key_set.contains(name.as_str())) - .collect::>(); + let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity)?; + let compare_columns = + shared::resolve_merge_compare_columns(ctx.entity)?.unwrap_or_else(|| { + source_df + .get_column_names() + .iter() + .map(|name| name.to_string()) + .filter(|name| { + !merge_key_set.contains(name.as_str()) + && !ignore_columns.contains(name.as_str()) + }) + .collect::>() + }); + let system_columns = shared::resolve_scd2_system_columns(ctx.entity); let merge_key_predicate = shared::merge_predicate_sql(&merge_key); let store = object_store::delta_store_config(ctx.target, ctx.resolver, ctx.entity)?; @@ -83,8 +87,9 @@ impl MergeBackend for DeltaMergeBackend { if loaded_table.is_none() { let mut bootstrap_df = source_df.clone(); - append_scd2_system_columns(&mut bootstrap_df)?; - let bootstrap_schema_columns = build_scd2_bootstrap_schema_columns(ctx.entity)?; + append_scd2_system_columns(&mut bootstrap_df, &system_columns)?; + let bootstrap_schema_columns = + build_scd2_bootstrap_schema_columns(ctx.entity, &system_columns)?; let conversion_start = Instant::now(); let batch = crate::io::write::delta::record_batch::dataframe_to_record_batch_with_schema( @@ -126,9 +131,9 @@ impl MergeBackend for DeltaMergeBackend { &target_schema_columns, source_df, &[ - SCD2_IS_CURRENT_COLUMN, - SCD2_VALID_FROM_COLUMN, - SCD2_VALID_TO_COLUMN, + system_columns.is_current.as_str(), + system_columns.valid_from.as_str(), + system_columns.valid_to.as_str(), ], &ctx.entity.name, )?; @@ -145,6 +150,11 @@ impl MergeBackend for DeltaMergeBackend { let update_predicate = scd2_changed_predicate(&compare_columns); let merge_key_predicate_for_close = merge_key_predicate.clone(); let merge_exec_start = Instant::now(); + let is_current_column = system_columns.is_current.clone(); + let valid_from_column = system_columns.valid_from.clone(); + let valid_to_column = system_columns.valid_to.clone(); + let close_is_current_column = is_current_column.clone(); + let close_valid_to_column = valid_to_column.clone(); let close_result = ctx.runtime.block_on(async move { let mut merge = table .merge(source_for_close, merge_key_predicate_for_close) @@ -154,15 +164,15 @@ impl MergeBackend for DeltaMergeBackend { update .predicate(format!( "{} = true AND ({})", - shared::qualified_column("target", SCD2_IS_CURRENT_COLUMN), + shared::qualified_column("target", close_is_current_column.as_str()), update_predicate )) .update( - shared::qualified_column("target", SCD2_IS_CURRENT_COLUMN), + shared::qualified_column("target", close_is_current_column.as_str()), "false", ) .update( - shared::qualified_column("target", SCD2_VALID_TO_COLUMN), + shared::qualified_column("target", close_valid_to_column.as_str()), "current_timestamp()", ) })?; @@ -174,13 +184,16 @@ impl MergeBackend for DeltaMergeBackend { let active_match_predicate = format!( "{} AND {} = true", merge_key_predicate, - shared::qualified_column("target", SCD2_IS_CURRENT_COLUMN) + shared::qualified_column("target", is_current_column.as_str()) ); let source_columns = source_df .get_column_names() .iter() .map(|name| name.to_string()) .collect::>(); + let insert_is_current_column = is_current_column.clone(); + let insert_valid_from_column = valid_from_column.clone(); + let insert_valid_to_column = valid_to_column.clone(); let insert_result = ctx.runtime.block_on(async move { let mut merge = table_after_close .merge(source_for_insert, active_match_predicate) @@ -195,15 +208,15 @@ impl MergeBackend for DeltaMergeBackend { }); insert .set( - shared::qualified_column("target", SCD2_IS_CURRENT_COLUMN), + shared::qualified_column("target", insert_is_current_column.as_str()), "true", ) .set( - shared::qualified_column("target", SCD2_VALID_FROM_COLUMN), + shared::qualified_column("target", insert_valid_from_column.as_str()), "current_timestamp()", ) .set( - shared::qualified_column("target", SCD2_VALID_TO_COLUMN), + shared::qualified_column("target", insert_valid_to_column.as_str()), "NULL", ) })?; @@ -243,48 +256,54 @@ impl MergeBackend for DeltaMergeBackend { } } -fn append_scd2_system_columns(df: &mut DataFrame) -> FloeResult<()> { +fn append_scd2_system_columns( + df: &mut DataFrame, + system_columns: &shared::Scd2SystemColumns, +) -> FloeResult<()> { let row_count = df.height(); let now_micros = now_timestamp_micros(); let valid_from = Series::new( - SCD2_VALID_FROM_COLUMN.into(), + system_columns.valid_from.as_str().into(), vec![Some(now_micros); row_count], ) .cast(&DataType::Datetime(TimeUnit::Microseconds, None)) .map_err(|err| { Box::new(RunError(format!( "delta merge_scd2 failed to build {} column: {err}", - SCD2_VALID_FROM_COLUMN + system_columns.valid_from.as_str() ))) })?; let valid_to = Series::new( - SCD2_VALID_TO_COLUMN.into(), + system_columns.valid_to.as_str().into(), vec![Option::::None; row_count], ) .cast(&DataType::Datetime(TimeUnit::Microseconds, None)) .map_err(|err| { Box::new(RunError(format!( "delta merge_scd2 failed to build {} column: {err}", - SCD2_VALID_TO_COLUMN + system_columns.valid_to.as_str() ))) })?; - let is_current = Series::new(SCD2_IS_CURRENT_COLUMN.into(), vec![Some(true); row_count]); + let is_current = Series::new( + system_columns.is_current.as_str().into(), + vec![Some(true); row_count], + ); df.with_column(valid_from).map_err(|err| { Box::new(RunError(format!( "delta merge_scd2 failed to append {} column: {err}", - SCD2_VALID_FROM_COLUMN + system_columns.valid_from.as_str() ))) })?; df.with_column(valid_to).map_err(|err| { Box::new(RunError(format!( "delta merge_scd2 failed to append {} column: {err}", - SCD2_VALID_TO_COLUMN + system_columns.valid_to.as_str() ))) })?; df.with_column(is_current).map_err(|err| { Box::new(RunError(format!( "delta merge_scd2 failed to append {} column: {err}", - SCD2_IS_CURRENT_COLUMN + system_columns.is_current.as_str() ))) })?; Ok(()) @@ -292,13 +311,14 @@ fn append_scd2_system_columns(df: &mut DataFrame) -> FloeResult<()> { fn build_scd2_bootstrap_schema_columns( entity: &config::EntityConfig, + system_columns: &shared::Scd2SystemColumns, ) -> FloeResult> { let mut columns = normalize::resolve_output_columns( &entity.schema.columns, normalize::resolve_normalize_strategy(entity)?.as_deref(), ); columns.push(config::ColumnConfig { - name: SCD2_IS_CURRENT_COLUMN.to_string(), + name: system_columns.is_current.clone(), source: None, column_type: "boolean".to_string(), nullable: Some(false), @@ -307,7 +327,7 @@ fn build_scd2_bootstrap_schema_columns( trim: None, }); columns.push(config::ColumnConfig { - name: SCD2_VALID_FROM_COLUMN.to_string(), + name: system_columns.valid_from.clone(), source: None, column_type: "datetime".to_string(), nullable: Some(false), @@ -316,7 +336,7 @@ fn build_scd2_bootstrap_schema_columns( trim: None, }); columns.push(config::ColumnConfig { - name: SCD2_VALID_TO_COLUMN.to_string(), + name: system_columns.valid_to.clone(), source: None, column_type: "datetime".to_string(), nullable: Some(true), diff --git a/crates/floe-core/src/io/write/strategy/merge/shared.rs b/crates/floe-core/src/io/write/strategy/merge/shared.rs index 53211e4..76361be 100644 --- a/crates/floe-core/src/io/write/strategy/merge/shared.rs +++ b/crates/floe-core/src/io/write/strategy/merge/shared.rs @@ -5,7 +5,7 @@ use deltalake::protocol::SaveMode; use deltalake::table::builder::DeltaTableBuilder; use deltalake::{datafusion::prelude::SessionContext, DeltaTable}; use polars::prelude::DataFrame; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use crate::errors::RunError; use crate::io::format::AcceptedMergeMetrics; @@ -32,6 +32,13 @@ pub(crate) struct DeltaMergePerfBreakdown { pub(crate) commit_ms: u64, } +#[derive(Debug, Clone)] +pub(crate) struct Scd2SystemColumns { + pub(crate) is_current: String, + pub(crate) valid_from: String, + pub(crate) valid_to: String, +} + pub(crate) fn write_standard_delta_version_with_perf( runtime: &tokio::runtime::Runtime, df: &mut DataFrame, @@ -133,6 +140,111 @@ pub(crate) fn resolve_merge_key(entity: &config::EntityConfig) -> FloeResult FloeResult> { + let Some(columns) = entity + .sink + .accepted + .merge + .as_ref() + .and_then(|merge| merge.ignore_columns.as_ref()) + else { + return Ok(HashSet::new()); + }; + + let schema_to_output = schema_to_output_column_name_map(entity)?; + let resolved = columns + .iter() + .map(|column| column.trim()) + .filter(|column| !column.is_empty()) + .map(|column| { + schema_to_output + .get(column) + .cloned() + .unwrap_or_else(|| column.to_string()) + }) + .collect::>(); + Ok(resolved) +} + +pub(crate) fn resolve_merge_compare_columns( + entity: &config::EntityConfig, +) -> FloeResult>> { + let Some(columns) = entity + .sink + .accepted + .merge + .as_ref() + .and_then(|merge| merge.compare_columns.as_ref()) + else { + return Ok(None); + }; + + let schema_to_output = schema_to_output_column_name_map(entity)?; + let mut seen = HashSet::new(); + let resolved = columns + .iter() + .map(|column| column.trim()) + .filter(|column| !column.is_empty()) + .map(|column| { + schema_to_output + .get(column) + .cloned() + .unwrap_or_else(|| column.to_string()) + }) + .filter(|column| seen.insert(column.clone())) + .collect::>(); + Ok(Some(resolved)) +} + +fn schema_to_output_column_name_map( + entity: &config::EntityConfig, +) -> FloeResult> { + let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?; + let output_columns = crate::checks::normalize::resolve_output_columns( + &entity.schema.columns, + normalize_strategy.as_deref(), + ); + let mut mapping = HashMap::with_capacity(entity.schema.columns.len()); + for (schema_column, output_column) in entity.schema.columns.iter().zip(output_columns.iter()) { + mapping.insert( + schema_column.name.trim().to_string(), + output_column.name.clone(), + ); + } + Ok(mapping) +} + +pub(crate) fn resolve_scd2_system_columns(entity: &config::EntityConfig) -> Scd2SystemColumns { + let scd2 = entity + .sink + .accepted + .merge + .as_ref() + .and_then(|merge| merge.scd2.as_ref()); + let is_current = scd2 + .and_then(|value| value.current_flag_column.as_deref()) + .unwrap_or(config::DEFAULT_SCD2_CURRENT_FLAG_COLUMN) + .trim() + .to_string(); + let valid_from = scd2 + .and_then(|value| value.valid_from_column.as_deref()) + .unwrap_or(config::DEFAULT_SCD2_VALID_FROM_COLUMN) + .trim() + .to_string(); + let valid_to = scd2 + .and_then(|value| value.valid_to_column.as_deref()) + .unwrap_or(config::DEFAULT_SCD2_VALID_TO_COLUMN) + .trim() + .to_string(); + Scd2SystemColumns { + is_current, + valid_from, + valid_to, + } +} + pub(crate) fn delta_schema_columns(table: &DeltaTable) -> FloeResult> { let columns = table .snapshot() diff --git a/crates/floe-core/tests/integration/delta_run.rs b/crates/floe-core/tests/integration/delta_run.rs index 08fccda..cb00817 100644 --- a/crates/floe-core/tests/integration/delta_run.rs +++ b/crates/floe-core/tests/integration/delta_run.rs @@ -530,6 +530,280 @@ entities: assert!(rows.contains(&("3".to_string(), "us".to_string(), "carol".to_string(), true))); } +#[test] +fn local_delta_merge_scd2_supports_custom_system_column_names() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "batch1.csv", + "id;country;name\n1;fr;alice\n2;ca;bob\n", + ); + + let yaml = format!( + r#"version: "0.1" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "{accepted_dir}" + merge: + scd2: + current_flag_column: "__is_current" + valid_from_column: "__valid_from" + valid_to_column: "__valid_to" + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-custom-cols-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd2 run with custom system columns"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch"); + write_csv( + &input_dir, + "batch2.csv", + "id;country;name\n1;fr;alice-v2\n2;ca;bob\n", + ); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-custom-cols-upsert".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge_scd2 run with custom system columns"); + + let df = read_local_delta_table(&accepted_dir); + assert!(df.column("__is_current").is_ok()); + assert!(df.column("__valid_from").is_ok()); + assert!(df.column("__valid_to").is_ok()); + assert!(df.column("__floe_is_current").is_err()); + assert!(df.column("__floe_valid_from").is_err()); + assert!(df.column("__floe_valid_to").is_err()); +} + +#[test] +fn local_delta_merge_scd2_compare_columns_and_ignore_columns_control_change_detection() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "batch1.csv", + "id;country;name;status;ingested_at\n1;fr;alice;active;2024-01-01T00:00:00Z\n", + ); + + let yaml = format!( + r#"version: "0.1" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "{accepted_dir}" + merge: + ignore_columns: ["ingested_at"] + compare_columns: ["name"] + policy: + severity: "warn" + schema: + primary_key: ["id", "country"] + columns: + - name: "id" + type: "string" + - name: "country" + type: "string" + - name: "name" + type: "string" + - name: "status" + type: "string" + - name: "ingested_at" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-compare-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd2 run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch"); + write_csv( + &input_dir, + "batch2.csv", + "id;country;name;status;ingested_at\n1;fr;alice;inactive;2024-02-01T00:00:00Z\n", + ); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-compare-second".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("second merge_scd2 run"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.accepted_output.updated_count, Some(0)); + assert_eq!(report.accepted_output.inserted_count, Some(0)); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 1); + let is_current = df + .column("__floe_is_current") + .expect("is current") + .as_materialized_series() + .bool() + .expect("is_current bool"); + assert_eq!(is_current.get(0), Some(true)); + let status = df + .column("status") + .expect("status") + .as_materialized_series() + .str() + .expect("status string"); + assert_eq!(status.get(0), Some("active")); +} + +#[test] +fn local_delta_merge_scd2_compare_columns_map_to_normalized_output_names() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name;status\n1;Alice;active\n"); + + let yaml = format!( + r#"version: "0.1" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "{accepted_dir}" + merge: + compare_columns: ["Name"] + policy: + severity: "warn" + schema: + primary_key: ["id"] + normalize_columns: + enabled: true + strategy: "lower" + columns: + - name: "id" + type: "string" + - name: "Name" + type: "string" + - name: "status" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-compare-normalized-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge_scd2 run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch"); + write_csv( + &input_dir, + "batch2.csv", + "id;name;status\n1;Alice Updated;active\n", + ); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-scd2-compare-normalized-second".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("second merge_scd2 run"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.accepted_output.updated_count, Some(1)); + assert_eq!(report.accepted_output.inserted_count, Some(1)); + + let df = read_local_delta_table(&accepted_dir); + assert!(df.column("name").is_ok()); +} + #[test] fn local_delta_merge_scd2_bootstrap_preserves_configured_nullable_columns() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); diff --git a/crates/floe-core/tests/unit/config/adls_validation.rs b/crates/floe-core/tests/unit/config/adls_validation.rs index 5f3a1e0..152cd4f 100644 --- a/crates/floe-core/tests/unit/config/adls_validation.rs +++ b/crates/floe-core/tests/unit/config/adls_validation.rs @@ -19,6 +19,7 @@ fn base_entity() -> config::EntityConfig { path: "out.parquet".to_string(), storage: None, options: None, + merge: None, iceberg: None, write_mode: config::WriteMode::Overwrite, partition_by: None, diff --git a/crates/floe-core/tests/unit/config/catalogs.rs b/crates/floe-core/tests/unit/config/catalogs.rs index a0eb7fa..aa2691f 100644 --- a/crates/floe-core/tests/unit/config/catalogs.rs +++ b/crates/floe-core/tests/unit/config/catalogs.rs @@ -64,6 +64,7 @@ fn entity() -> config::EntityConfig { path: "customer/orders".to_string(), storage: Some("s3_out".to_string()), options: None, + merge: None, iceberg: Some(config::IcebergSinkTargetConfig { catalog: Some("glue_main".to_string()), namespace: None, diff --git a/crates/floe-core/tests/unit/config/config_validation.rs b/crates/floe-core/tests/unit/config/config_validation.rs index 8b33304..fb70a5a 100644 --- a/crates/floe-core/tests/unit/config/config_validation.rs +++ b/crates/floe-core/tests/unit/config/config_validation.rs @@ -1314,6 +1314,365 @@ entities: ); } +#[test] +fn sink_level_merge_options_are_valid_for_delta_merge_scd2() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + ignore_columns: ["ingested_at", "load_ts"] + compare_columns: ["name", "status"] + scd2: + current_flag_column: "__is_current" + valid_from_column: "__valid_from" + valid_to_column: "__valid_to" + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" + - name: "status" + type: "string" + - name: "ingested_at" + type: "datetime" + - name: "load_ts" + type: "datetime" +"#; + assert_validation_ok(yaml); +} + +#[test] +fn sink_level_merge_options_require_merge_write_mode() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + ignore_columns: ["name"] + policy: + severity: "warn" + schema: + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge", + "sink.write_mode=merge_scd1 or merge_scd2", + ], + ); +} + +#[test] +fn sink_level_merge_options_require_delta_sink() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd1" + accepted: + format: "parquet" + path: "/tmp/out" + merge: + ignore_columns: ["name"] + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.write_mode=merge_scd1", + "sink.accepted.format=delta", + ], + ); +} + +#[test] +fn sink_level_merge_ignore_columns_cannot_reference_primary_key() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + ignore_columns: ["customer_id"] + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.ignore_columns[0]=customer_id", + "schema.primary_key", + ], + ); +} + +#[test] +fn sink_level_merge_compare_columns_cannot_reference_primary_key() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + compare_columns: ["customer_id", "name"] + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.compare_columns[0]=customer_id", + "schema.primary_key", + ], + ); +} + +#[test] +fn sink_level_merge_compare_columns_must_reference_schema_columns() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + compare_columns: ["missing_col"] + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.compare_columns[0]=missing_col", + "unknown schema column", + ], + ); +} + +#[test] +fn sink_level_merge_scd2_system_column_names_must_be_unique_and_not_business_columns() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + scd2: + current_flag_column: "status" + valid_from_column: "__shared" + valid_to_column: "__shared" + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "status" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.scd2.current_flag_column=status", + "collides with schema column name", + ], + ); +} + +#[test] +fn sink_level_merge_scd2_system_column_names_must_not_collide_with_normalized_business_columns() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + scd2: + current_flag_column: "order_id" + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + normalize_columns: + enabled: true + strategy: "snake_case" + columns: + - name: "customer_id" + type: "string" + - name: "Order ID" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.scd2.current_flag_column=order_id", + "collides with schema column name", + ], + ); +} + +#[test] +fn sink_level_merge_scd2_system_column_names_must_be_unique() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + scd2: + current_flag_column: "__shared" + valid_from_column: "__shared" + valid_to_column: "__valid_to" + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.scd2 column names must be unique", + ], + ); +} + +#[test] +fn sink_level_merge_scd2_options_require_merge_scd2_mode() { + let yaml = r#"version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "/tmp/out_delta" + merge: + scd2: + current_flag_column: "__is_current" + policy: + severity: "warn" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" +"#; + assert_validation_error( + yaml, + &[ + "entity.name=customer", + "sink.accepted.merge.scd2", + "sink.write_mode=merge_scd2", + ], + ); +} + #[test] fn iceberg_accepted_sink_is_valid_on_local_storage() { let entity = r#" - name: "customer" diff --git a/crates/floe-core/tests/unit/config/parse.rs b/crates/floe-core/tests/unit/config/parse.rs index afeb6ad..904378e 100644 --- a/crates/floe-core/tests/unit/config/parse.rs +++ b/crates/floe-core/tests/unit/config/parse.rs @@ -177,6 +177,76 @@ entities: assert_eq!(entity.sink.resolved_write_mode(), WriteMode::MergeScd2); } +#[test] +fn parse_config_supports_delta_merge_options_block() { + let yaml = r#" +version: "0.1" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + write_mode: "merge_scd2" + accepted: + format: "delta" + path: "/tmp/out" + merge: + ignore_columns: ["ingested_at", "load_ts"] + compare_columns: ["name", "address", "status"] + scd2: + current_flag_column: "__is_current" + valid_from_column: "__valid_from" + valid_to_column: "__valid_to" + rejected: + format: "csv" + path: "/tmp/rejected" + policy: + severity: "reject" + schema: + primary_key: ["customer_id"] + columns: + - name: "customer_id" + type: "string" + - name: "name" + type: "string" + - name: "address" + type: "string" + - name: "status" + type: "string" + - name: "ingested_at" + type: "datetime" + - name: "load_ts" + type: "datetime" +"#; + let path = write_temp_config(yaml); + let config = load_config(&path).expect("parse config"); + let entity = config.entities.first().expect("entity"); + let merge = entity + .sink + .accepted + .merge + .as_ref() + .expect("sink.accepted.merge"); + + assert_eq!( + merge.ignore_columns.as_ref().expect("ignore columns"), + &vec!["ingested_at".to_string(), "load_ts".to_string()] + ); + assert_eq!( + merge.compare_columns.as_ref().expect("compare columns"), + &vec![ + "name".to_string(), + "address".to_string(), + "status".to_string() + ] + ); + let scd2 = merge.scd2.as_ref().expect("scd2 options"); + assert_eq!(scd2.current_flag_column.as_deref(), Some("__is_current")); + assert_eq!(scd2.valid_from_column.as_deref(), Some("__valid_from")); + assert_eq!(scd2.valid_to_column.as_deref(), Some("__valid_to")); +} + #[test] fn parse_config_defaults_column_source_to_name() { let yaml = r#" diff --git a/crates/floe-core/tests/unit/io/storage/inputs.rs b/crates/floe-core/tests/unit/io/storage/inputs.rs index 6ff6344..688d1f5 100644 --- a/crates/floe-core/tests/unit/io/storage/inputs.rs +++ b/crates/floe-core/tests/unit/io/storage/inputs.rs @@ -219,6 +219,7 @@ fn mock_entity(name: &str) -> config::EntityConfig { path: "out".to_string(), storage: None, options: None, + merge: None, iceberg: None, write_mode: config::WriteMode::Overwrite, partition_by: None, diff --git a/crates/floe-core/tests/unit/io/write/delta_write.rs b/crates/floe-core/tests/unit/io/write/delta_write.rs index 2430e34..5af2936 100644 --- a/crates/floe-core/tests/unit/io/write/delta_write.rs +++ b/crates/floe-core/tests/unit/io/write/delta_write.rs @@ -430,6 +430,7 @@ fn build_entity( path: table_path.display().to_string(), storage: None, options: None, + merge: None, iceberg: None, write_mode, partition_by: None, diff --git a/crates/floe-core/tests/unit/io/write/iceberg_write.rs b/crates/floe-core/tests/unit/io/write/iceberg_write.rs index e08401a..9f4123c 100644 --- a/crates/floe-core/tests/unit/io/write/iceberg_write.rs +++ b/crates/floe-core/tests/unit/io/write/iceberg_write.rs @@ -454,6 +454,7 @@ fn build_entity( path: table_path.display().to_string(), storage: None, options: None, + merge: None, iceberg: None, write_mode, partition_by: None, diff --git a/crates/floe-core/tests/unit/io/write/object_store.rs b/crates/floe-core/tests/unit/io/write/object_store.rs index b194c62..d2ecf3c 100644 --- a/crates/floe-core/tests/unit/io/write/object_store.rs +++ b/crates/floe-core/tests/unit/io/write/object_store.rs @@ -55,6 +55,7 @@ fn delta_store_config_builds_s3_url_and_options() -> FloeResult<()> { path: "delta/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -113,6 +114,7 @@ fn iceberg_store_config_builds_s3_warehouse_and_region_props() -> FloeResult<()> path: "iceberg/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -183,6 +185,7 @@ fn delta_store_config_builds_local_url() -> FloeResult<()> { path: "delta/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -242,6 +245,7 @@ fn iceberg_store_config_builds_local_warehouse_without_props() -> FloeResult<()> path: "iceberg/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -313,6 +317,7 @@ fn delta_store_config_builds_adls_url_and_options() -> FloeResult<()> { path: "delta/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -400,6 +405,7 @@ fn iceberg_store_config_builds_gcs_warehouse_without_props() -> FloeResult<()> { path: "iceberg/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -474,6 +480,7 @@ fn iceberg_store_config_rejects_adls_target() -> FloeResult<()> { path: "iceberg/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, @@ -544,6 +551,7 @@ fn delta_store_config_builds_gcs_url() -> FloeResult<()> { path: "delta/orders".to_string(), storage: None, options: None, + merge: None, iceberg: None, partition_by: None, partition_spec: None, diff --git a/crates/floe-core/tests/unit/io/write/parquet_write.rs b/crates/floe-core/tests/unit/io/write/parquet_write.rs index e449c5f..514fb7a 100644 --- a/crates/floe-core/tests/unit/io/write/parquet_write.rs +++ b/crates/floe-core/tests/unit/io/write/parquet_write.rs @@ -11,6 +11,7 @@ fn sink_target(max_size_per_file: Option) -> config::SinkTarget { row_group_size: None, max_size_per_file, }), + merge: None, iceberg: None, partition_by: None, partition_spec: None, diff --git a/crates/floe-core/tests/unit/io/write/rejected_csv.rs b/crates/floe-core/tests/unit/io/write/rejected_csv.rs index b9d14c7..b9d529f 100644 --- a/crates/floe-core/tests/unit/io/write/rejected_csv.rs +++ b/crates/floe-core/tests/unit/io/write/rejected_csv.rs @@ -25,6 +25,7 @@ fn sample_entity() -> config::EntityConfig { path: "out/accepted".to_string(), storage: None, options: None, + merge: None, iceberg: None, write_mode: config::WriteMode::Overwrite, partition_by: None, @@ -35,6 +36,7 @@ fn sample_entity() -> config::EntityConfig { path: "out/rejected".to_string(), storage: None, options: None, + merge: None, iceberg: None, write_mode: config::WriteMode::Overwrite, partition_by: None, diff --git a/docs/config.md b/docs/config.md index 7fbd1b2..6eb0a20 100644 --- a/docs/config.md +++ b/docs/config.md @@ -188,11 +188,13 @@ is available for templating within that entity. - updates matching keys (SCD1) and inserts new keys - requires `sink.accepted.format: delta` - requires non-empty `schema.primary_key` + - optional merge behavior can be configured in `sink.accepted.merge` - source rows must be unique on `schema.primary_key` (duplicates abort the entity merge) - `merge_scd2`: Delta-only history mode keyed by `schema.primary_key`. - closes changed current rows and inserts new current versions - requires `sink.accepted.format: delta` - requires non-empty `schema.primary_key` + - optional merge behavior can be configured in `sink.accepted.merge` - source rows must be unique on `schema.primary_key` (duplicates abort the entity merge) - Applies to both accepted and rejected outputs. - `accepted` (required) @@ -221,6 +223,24 @@ is available for templating within that entity. - `transform` (optional, default `identity`): `identity`, `year`, `month`, `day`, `hour` - `floe validate` checks column existence and supported transforms. - Runtime wiring is implemented for Iceberg accepted writes (table partition spec + partitioned file layout). + - `merge` (optional, Delta merge modes only) + - Supported only when: + - `sink.accepted.format: delta` + - `sink.write_mode: merge_scd1` or `merge_scd2` + - `ignore_columns` (optional) + - List of schema business columns excluded from merge update/compare behavior. + - Columns must exist in `schema.columns`. + - `schema.primary_key` columns are not allowed. + - `compare_columns` (optional; most relevant for `merge_scd2`) + - Explicit list of schema business columns used for change detection. + - Columns must exist in `schema.columns`. + - `schema.primary_key` columns are not allowed. + - If omitted in `merge_scd2`, Floe compares all non-key business columns minus `ignore_columns`. + - `scd2` (optional, `merge_scd2` only) + - `current_flag_column` (default `__floe_is_current`) + - `valid_from_column` (default `__floe_valid_from`) + - `valid_to_column` (default `__floe_valid_to`) + - Custom names must be non-empty, unique, and must not collide with business schema columns. - `iceberg` (optional, `sink.accepted.format: iceberg`) - Enables Iceberg catalog-specific options. - `catalog` (optional) diff --git a/docs/design/merge_strategy_architecture.md b/docs/design/merge_strategy_architecture.md new file mode 100644 index 0000000..ef5a009 --- /dev/null +++ b/docs/design/merge_strategy_architecture.md @@ -0,0 +1,32 @@ +# Merge Strategy Architecture + +This note summarizes how merge write modes are organized in Floe. + +## Runtime layering + +- `io/write/delta.rs` routes accepted Delta writes by `sink.write_mode`. +- `io/write/strategy/merge/mod.rs` defines merge strategy interfaces. +- `io/write/strategy/merge/scd1.rs` and `scd2.rs` implement Delta-backed merge execution. +- `io/write/strategy/merge/shared.rs` centralizes shared merge utilities + (merge key resolution, schema compatibility checks, SQL predicate helpers, and common metrics mapping). + +## Merge options (`sink.accepted.merge`) + +Merge options are connector configuration only; report/output contracts are unchanged. + +- `ignore_columns` + - optional list of business schema columns ignored by merge update/compare logic +- `compare_columns` + - optional explicit SCD2 change-detection columns + - if unset, SCD2 compares all non-key business columns minus `ignore_columns` +- `scd2` system column overrides + - `current_flag_column` (default `__floe_is_current`) + - `valid_from_column` (default `__floe_valid_from`) + - `valid_to_column` (default `__floe_valid_to`) + +Validation constraints: + +- allowed only when `sink.accepted.format=delta` and `sink.write_mode` is `merge_scd1` or `merge_scd2` +- `ignore_columns` / `compare_columns` must reference schema columns +- merge keys (`schema.primary_key`) cannot appear in `ignore_columns` or `compare_columns` +- SCD2 custom system column names must be non-empty, unique, and non-colliding with business schema columns diff --git a/docs/write_modes.md b/docs/write_modes.md index a156a4e..9ae8a21 100644 --- a/docs/write_modes.md +++ b/docs/write_modes.md @@ -15,6 +15,7 @@ Supported values: - Source rows are validated for merge-key uniqueness during row checks. In `policy.severity=warn`, duplicate merge-key rows are rejected before merge so only unambiguous rows are merged. - On key match: update non-key columns from source. +- `sink.accepted.merge.ignore_columns` can exclude additional business columns from SCD1 update sets. - On missing key: insert source row. - Current v1 behavior validates strict schema compatibility and does not perform schema evolution. - Merge execution uses Delta native merge (`MERGE INTO`) through delta-rs/DataFusion. @@ -30,6 +31,7 @@ Supported values: - `__floe_is_current` - `__floe_valid_from` - `__floe_valid_to` +- `sink.accepted.merge.scd2` can override these system column names. - Changed current rows are closed (`is_current=false`, `valid_to=current_timestamp()`), then new current versions are inserted. - Missing keys are inserted as new current rows. @@ -39,6 +41,9 @@ Supported values: - `inserted_count` = rows inserted as new current versions (changed + brand new keys). - `unchanged_count` = source rows that matched active target rows with no change. - `updated_count` is preserved for backward compatibility and matches `closed_count`. +- Change detection columns are resolved as: + - `sink.accepted.merge.compare_columns` when configured + - otherwise all non-key business columns minus `sink.accepted.merge.ignore_columns` - Current v1 behavior validates strict schema compatibility and does not perform schema evolution. - Single-writer assumption: Delta commit conflicts are returned as clear write errors.