Skip to content

Commit 6dc17bb

Browse files
author
a.metwalli
committed
fix(core): normalize merge option columns for scd2 runtime
1 parent f0d2984 commit 6dc17bb

6 files changed

Lines changed: 211 additions & 31 deletions

File tree

crates/floe-core/src/config/validate.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,30 @@ fn validate_merge_options(
312312
.iter()
313313
.map(|column| column.name.as_str())
314314
.collect::<HashSet<_>>();
315+
let normalize_strategy = if entity
316+
.schema
317+
.normalize_columns
318+
.as_ref()
319+
.and_then(|normalize| normalize.enabled)
320+
.unwrap_or(false)
321+
{
322+
entity
323+
.schema
324+
.normalize_columns
325+
.as_ref()
326+
.and_then(|normalize| normalize.strategy.as_deref())
327+
.or(Some("snake_case"))
328+
} else {
329+
None
330+
};
331+
let resolved_output_columns = crate::checks::normalize::resolve_output_columns(
332+
&entity.schema.columns,
333+
normalize_strategy,
334+
);
335+
let resolved_output_column_names = resolved_output_columns
336+
.iter()
337+
.map(|column| column.name.as_str())
338+
.collect::<HashSet<_>>();
315339
let primary_key_columns = entity
316340
.schema
317341
.primary_key
@@ -394,7 +418,7 @@ fn validate_merge_options(
394418
entity.name, field
395419
))));
396420
}
397-
if schema_columns.contains(value) {
421+
if resolved_output_column_names.contains(value) {
398422
return Err(Box::new(ConfigError(format!(
399423
"entity.name={} sink.accepted.merge.scd2.{}={} collides with schema column name",
400424
entity.name, field, value

crates/floe-core/src/io/write/strategy/merge/scd1.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl MergeBackend for DeltaMergeBackend {
105105
.map(|name| name.to_string())
106106
.collect::<Vec<_>>();
107107
let merge_key_set = merge_key.iter().map(String::as_str).collect::<HashSet<_>>();
108-
let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity);
108+
let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity)?;
109109
let update_columns = source_columns
110110
.iter()
111111
.filter(|name| {

crates/floe-core/src/io/write/strategy/merge/scd2.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ impl MergeBackend for DeltaMergeBackend {
5454
let mut perf = shared::DeltaMergePerfBreakdown::default();
5555
let merge_key = shared::resolve_merge_key(ctx.entity)?;
5656
let merge_key_set = merge_key.iter().map(String::as_str).collect::<HashSet<_>>();
57-
let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity);
57+
let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity)?;
5858
let compare_columns =
59-
shared::resolve_merge_compare_columns(ctx.entity).unwrap_or_else(|| {
59+
shared::resolve_merge_compare_columns(ctx.entity)?.unwrap_or_else(|| {
6060
source_df
6161
.get_column_names()
6262
.iter()

crates/floe-core/src/io/write/strategy/merge/shared.rs

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use deltalake::protocol::SaveMode;
55
use deltalake::table::builder::DeltaTableBuilder;
66
use deltalake::{datafusion::prelude::SessionContext, DeltaTable};
77
use polars::prelude::DataFrame;
8-
use std::collections::HashSet;
8+
use std::collections::{HashMap, HashSet};
99

1010
use crate::errors::RunError;
1111
use crate::io::format::AcceptedMergeMetrics;
@@ -140,46 +140,80 @@ pub(crate) fn resolve_merge_key(entity: &config::EntityConfig) -> FloeResult<Vec
140140
Ok(primary_key.clone())
141141
}
142142

143-
pub(crate) fn resolve_merge_ignore_columns(entity: &config::EntityConfig) -> HashSet<String> {
144-
entity
143+
pub(crate) fn resolve_merge_ignore_columns(
144+
entity: &config::EntityConfig,
145+
) -> FloeResult<HashSet<String>> {
146+
let Some(columns) = entity
145147
.sink
146148
.accepted
147149
.merge
148150
.as_ref()
149151
.and_then(|merge| merge.ignore_columns.as_ref())
150-
.map(|columns| {
151-
columns
152-
.iter()
153-
.map(|column| column.trim())
154-
.filter(|column| !column.is_empty())
155-
.map(ToOwned::to_owned)
156-
.collect::<HashSet<_>>()
152+
else {
153+
return Ok(HashSet::new());
154+
};
155+
156+
let schema_to_output = schema_to_output_column_name_map(entity)?;
157+
let resolved = columns
158+
.iter()
159+
.map(|column| column.trim())
160+
.filter(|column| !column.is_empty())
161+
.map(|column| {
162+
schema_to_output
163+
.get(column)
164+
.cloned()
165+
.unwrap_or_else(|| column.to_string())
157166
})
158-
.unwrap_or_default()
167+
.collect::<HashSet<_>>();
168+
Ok(resolved)
159169
}
160170

161-
pub(crate) fn resolve_merge_compare_columns(entity: &config::EntityConfig) -> Option<Vec<String>> {
162-
entity
171+
pub(crate) fn resolve_merge_compare_columns(
172+
entity: &config::EntityConfig,
173+
) -> FloeResult<Option<Vec<String>>> {
174+
let Some(columns) = entity
163175
.sink
164176
.accepted
165177
.merge
166178
.as_ref()
167179
.and_then(|merge| merge.compare_columns.as_ref())
168-
.map(|columns| {
169-
let mut seen = HashSet::new();
170-
columns
171-
.iter()
172-
.map(|column| column.trim())
173-
.filter(|column| !column.is_empty())
174-
.filter_map(|column| {
175-
if seen.insert(column.to_string()) {
176-
Some(column.to_string())
177-
} else {
178-
None
179-
}
180-
})
181-
.collect::<Vec<_>>()
180+
else {
181+
return Ok(None);
182+
};
183+
184+
let schema_to_output = schema_to_output_column_name_map(entity)?;
185+
let mut seen = HashSet::new();
186+
let resolved = columns
187+
.iter()
188+
.map(|column| column.trim())
189+
.filter(|column| !column.is_empty())
190+
.map(|column| {
191+
schema_to_output
192+
.get(column)
193+
.cloned()
194+
.unwrap_or_else(|| column.to_string())
182195
})
196+
.filter(|column| seen.insert(column.clone()))
197+
.collect::<Vec<_>>();
198+
Ok(Some(resolved))
199+
}
200+
201+
fn schema_to_output_column_name_map(
202+
entity: &config::EntityConfig,
203+
) -> FloeResult<HashMap<String, String>> {
204+
let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?;
205+
let output_columns = crate::checks::normalize::resolve_output_columns(
206+
&entity.schema.columns,
207+
normalize_strategy.as_deref(),
208+
);
209+
let mut mapping = HashMap::with_capacity(entity.schema.columns.len());
210+
for (schema_column, output_column) in entity.schema.columns.iter().zip(output_columns.iter()) {
211+
mapping.insert(
212+
schema_column.name.trim().to_string(),
213+
output_column.name.clone(),
214+
);
215+
}
216+
Ok(mapping)
183217
}
184218

185219
pub(crate) fn resolve_scd2_system_columns(entity: &config::EntityConfig) -> Scd2SystemColumns {

crates/floe-core/tests/integration/delta_run.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,89 @@ entities:
721721
assert_eq!(status.get(0), Some("active"));
722722
}
723723

724+
#[test]
725+
fn local_delta_merge_scd2_compare_columns_map_to_normalized_output_names() {
726+
let temp_dir = tempfile::TempDir::new().expect("temp dir");
727+
let root = temp_dir.path();
728+
let input_dir = root.join("in");
729+
let accepted_dir = root.join("out/accepted/customer_delta");
730+
let report_dir = root.join("report");
731+
732+
fs::create_dir_all(&input_dir).expect("create input dir");
733+
write_csv(&input_dir, "batch1.csv", "id;name;status\n1;Alice;active\n");
734+
735+
let yaml = format!(
736+
r#"version: "0.1"
737+
report:
738+
path: "{report_dir}"
739+
entities:
740+
- name: "customer"
741+
source:
742+
format: "csv"
743+
path: "{input_dir}"
744+
sink:
745+
write_mode: "merge_scd2"
746+
accepted:
747+
format: "delta"
748+
path: "{accepted_dir}"
749+
merge:
750+
compare_columns: ["Name"]
751+
policy:
752+
severity: "warn"
753+
schema:
754+
primary_key: ["id"]
755+
normalize_columns:
756+
enabled: true
757+
strategy: "lower"
758+
columns:
759+
- name: "id"
760+
type: "string"
761+
- name: "Name"
762+
type: "string"
763+
- name: "status"
764+
type: "string"
765+
"#,
766+
report_dir = report_dir.display(),
767+
input_dir = input_dir.display(),
768+
accepted_dir = accepted_dir.display(),
769+
);
770+
let config_path = write_config(root, &yaml);
771+
772+
run(
773+
&config_path,
774+
RunOptions {
775+
run_id: Some("it-delta-merge-scd2-compare-normalized-init".to_string()),
776+
entities: Vec::new(),
777+
dry_run: false,
778+
},
779+
)
780+
.expect("initial merge_scd2 run");
781+
782+
fs::remove_file(input_dir.join("batch1.csv")).expect("remove first batch");
783+
write_csv(
784+
&input_dir,
785+
"batch2.csv",
786+
"id;name;status\n1;Alice Updated;active\n",
787+
);
788+
789+
let outcome = run(
790+
&config_path,
791+
RunOptions {
792+
run_id: Some("it-delta-merge-scd2-compare-normalized-second".to_string()),
793+
entities: Vec::new(),
794+
dry_run: false,
795+
},
796+
)
797+
.expect("second merge_scd2 run");
798+
799+
let report = &outcome.entity_outcomes[0].report;
800+
assert_eq!(report.accepted_output.updated_count, Some(1));
801+
assert_eq!(report.accepted_output.inserted_count, Some(1));
802+
803+
let df = read_local_delta_table(&accepted_dir);
804+
assert!(df.column("name").is_ok());
805+
}
806+
724807
#[test]
725808
fn local_delta_merge_scd2_bootstrap_preserves_configured_nullable_columns() {
726809
let temp_dir = tempfile::TempDir::new().expect("temp dir");

crates/floe-core/tests/unit/config/config_validation.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,45 @@ entities:
15651565
);
15661566
}
15671567

1568+
#[test]
1569+
fn sink_level_merge_scd2_system_column_names_must_not_collide_with_normalized_business_columns() {
1570+
let yaml = r#"version: "0.1"
1571+
entities:
1572+
- name: "customer"
1573+
source:
1574+
format: "csv"
1575+
path: "/tmp/input"
1576+
sink:
1577+
write_mode: "merge_scd2"
1578+
accepted:
1579+
format: "delta"
1580+
path: "/tmp/out_delta"
1581+
merge:
1582+
scd2:
1583+
current_flag_column: "order_id"
1584+
policy:
1585+
severity: "warn"
1586+
schema:
1587+
primary_key: ["customer_id"]
1588+
normalize_columns:
1589+
enabled: true
1590+
strategy: "snake_case"
1591+
columns:
1592+
- name: "customer_id"
1593+
type: "string"
1594+
- name: "Order ID"
1595+
type: "string"
1596+
"#;
1597+
assert_validation_error(
1598+
yaml,
1599+
&[
1600+
"entity.name=customer",
1601+
"sink.accepted.merge.scd2.current_flag_column=order_id",
1602+
"collides with schema column name",
1603+
],
1604+
);
1605+
}
1606+
15681607
#[test]
15691608
fn sink_level_merge_scd2_system_column_names_must_be_unique() {
15701609
let yaml = r#"version: "0.1"

0 commit comments

Comments
 (0)