Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions crates/floe-core/src/config/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use crate::config::yaml_decode::{
use crate::config::{
ArchiveTarget, CatalogDefinition, CatalogsConfig, ColumnConfig, DomainConfig, EntityConfig,
EntityMetadata, EnvConfig, IcebergPartitionFieldConfig, IcebergSinkTargetConfig,
NormalizeColumnsConfig, PolicyConfig, ProjectMetadata, ReportConfig, RootConfig, SchemaConfig,
SchemaMismatchConfig, SinkConfig, SinkOptions, SinkTarget, SourceConfig, SourceOptions,
StorageDefinition, StoragesConfig, WriteMode,
MergeOptionsConfig, MergeScd2OptionsConfig, NormalizeColumnsConfig, PolicyConfig,
ProjectMetadata, ReportConfig, RootConfig, SchemaConfig, SchemaMismatchConfig, SinkConfig,
SinkOptions, SinkTarget, SourceConfig, SourceOptions, StorageDefinition, StoragesConfig,
WriteMode,
};
use crate::{ConfigError, FloeResult};

Expand Down Expand Up @@ -349,7 +350,13 @@ fn parse_sink_target(
let hash = yaml_hash(value, ctx)?;
let mut allowed = vec!["format", "path", "storage", "filesystem"];
if allow_options {
allowed.extend(["options", "iceberg", "partition_by", "partition_spec"]);
allowed.extend([
"options",
"merge",
"iceberg",
"partition_by",
"partition_spec",
]);
}
validate_known_keys(hash, ctx, &allowed)?;
let storage = opt_string(hash, "storage", ctx)?;
Expand All @@ -367,6 +374,14 @@ fn parse_sink_target(
} else {
None
};
let merge = if allow_options {
match hash_get(hash, "merge") {
Some(value) => Some(parse_merge_options(value, &format!("{ctx}.merge"))?),
None => None,
}
} else {
None
};
let partition_by = if allow_options {
opt_vec_string(hash, "partition_by", ctx)?
} else {
Expand Down Expand Up @@ -399,6 +414,7 @@ fn parse_sink_target(
path: get_string(hash, "path", ctx)?,
storage: storage.or(filesystem),
options,
merge,
iceberg,
partition_by,
partition_spec,
Expand Down Expand Up @@ -432,6 +448,38 @@ fn parse_sink_options(value: &Yaml, ctx: &str) -> FloeResult<SinkOptions> {
})
}

fn parse_merge_options(value: &Yaml, ctx: &str) -> FloeResult<MergeOptionsConfig> {
let hash = yaml_hash(value, ctx)?;
validate_known_keys(hash, ctx, &["ignore_columns", "compare_columns", "scd2"])?;
let scd2 = match hash_get(hash, "scd2") {
Some(value) => Some(parse_merge_scd2_options(value, &format!("{ctx}.scd2"))?),
None => None,
};
Ok(MergeOptionsConfig {
ignore_columns: opt_vec_string(hash, "ignore_columns", ctx)?,
compare_columns: opt_vec_string(hash, "compare_columns", ctx)?,
scd2,
})
}

fn parse_merge_scd2_options(value: &Yaml, ctx: &str) -> FloeResult<MergeScd2OptionsConfig> {
let hash = yaml_hash(value, ctx)?;
validate_known_keys(
hash,
ctx,
&[
"current_flag_column",
"valid_from_column",
"valid_to_column",
],
)?;
Ok(MergeScd2OptionsConfig {
current_flag_column: opt_string(hash, "current_flag_column", ctx)?,
valid_from_column: opt_string(hash, "valid_from_column", ctx)?,
valid_to_column: opt_string(hash, "valid_to_column", ctx)?,
})
}

fn parse_iceberg_partition_spec(
value: &Yaml,
ctx: &str,
Expand Down
19 changes: 19 additions & 0 deletions crates/floe-core/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,31 @@ pub struct SinkTarget {
pub path: String,
pub storage: Option<String>,
pub options: Option<SinkOptions>,
pub merge: Option<MergeOptionsConfig>,
pub iceberg: Option<IcebergSinkTargetConfig>,
pub partition_by: Option<Vec<String>>,
pub partition_spec: Option<Vec<IcebergPartitionFieldConfig>>,
pub write_mode: WriteMode,
}

pub const DEFAULT_SCD2_CURRENT_FLAG_COLUMN: &str = "__floe_is_current";
pub const DEFAULT_SCD2_VALID_FROM_COLUMN: &str = "__floe_valid_from";
pub const DEFAULT_SCD2_VALID_TO_COLUMN: &str = "__floe_valid_to";

#[derive(Debug, Clone)]
pub struct MergeOptionsConfig {
pub ignore_columns: Option<Vec<String>>,
pub compare_columns: Option<Vec<String>>,
pub scd2: Option<MergeScd2OptionsConfig>,
}

#[derive(Debug, Clone)]
pub struct MergeScd2OptionsConfig {
pub current_flag_column: Option<String>,
pub valid_from_column: Option<String>,
pub valid_to_column: Option<String>,
}

#[derive(Debug)]
pub struct SinkOptions {
pub compression: Option<String>,
Expand Down
220 changes: 204 additions & 16 deletions crates/floe-core/src/config/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,34 +252,222 @@ fn validate_sink(

fn validate_sink_write_mode(entity: &EntityConfig) -> FloeResult<()> {
let write_mode = entity.sink.resolved_write_mode();
if !matches!(
let is_merge_mode = matches!(
write_mode,
crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
) {
return Ok(());
);
if is_merge_mode {
let mode_name = write_mode.as_str();
if entity.sink.accepted.format != "delta" {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.write_mode={} requires sink.accepted.format=delta",
entity.name, mode_name
))));
}

let primary_key = entity.schema.primary_key.as_ref().ok_or_else(|| {
Box::new(ConfigError(format!(
"entity.name={} sink.write_mode={} requires schema.primary_key",
entity.name, mode_name
))) as Box<dyn std::error::Error + Send + Sync>
})?;
if primary_key.is_empty() {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.write_mode={} requires non-empty schema.primary_key",
entity.name, mode_name
))));
}
}
let mode_name = write_mode.as_str();

validate_merge_options(entity, write_mode)
}

fn validate_merge_options(
entity: &EntityConfig,
write_mode: crate::config::WriteMode,
) -> FloeResult<()> {
let Some(merge) = entity.sink.accepted.merge.as_ref() else {
return Ok(());
};

if entity.sink.accepted.format != "delta" {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.write_mode={} requires sink.accepted.format=delta",
entity.name, mode_name
"entity.name={} sink.accepted.merge is only supported when sink.accepted.format=delta",
entity.name
))));
}

let primary_key = entity.schema.primary_key.as_ref().ok_or_else(|| {
Box::new(ConfigError(format!(
"entity.name={} sink.write_mode={} requires schema.primary_key",
entity.name, mode_name
))) as Box<dyn std::error::Error + Send + Sync>
})?;
if primary_key.is_empty() {
if !matches!(
write_mode,
crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
) {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.write_mode={} requires non-empty schema.primary_key",
entity.name, mode_name
"entity.name={} sink.accepted.merge is only supported with sink.write_mode=merge_scd1 or merge_scd2",
entity.name
))));
}

let schema_columns = entity
.schema
.columns
.iter()
.map(|column| column.name.as_str())
.collect::<HashSet<_>>();
let normalize_strategy = if entity
.schema
.normalize_columns
.as_ref()
.and_then(|normalize| normalize.enabled)
.unwrap_or(false)
{
entity
.schema
.normalize_columns
.as_ref()
.and_then(|normalize| normalize.strategy.as_deref())
.or(Some("snake_case"))
} else {
None
};
let resolved_output_columns = crate::checks::normalize::resolve_output_columns(
&entity.schema.columns,
normalize_strategy,
);
let resolved_output_column_names = resolved_output_columns
.iter()
.map(|column| column.name.as_str())
.collect::<HashSet<_>>();
let primary_key_columns = entity
.schema
.primary_key
.as_ref()
.map(|columns| {
columns
.iter()
.map(|column| column.trim().to_string())
.collect::<HashSet<_>>()
})
.unwrap_or_default();

if let Some(ignore_columns) = merge.ignore_columns.as_ref() {
validate_merge_column_list(
entity,
"sink.accepted.merge.ignore_columns",
ignore_columns,
&schema_columns,
)?;
for (index, column_name) in ignore_columns.iter().enumerate() {
let value = column_name.trim();
if primary_key_columns.contains(value) {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.merge.ignore_columns[{}]={} cannot reference schema.primary_key column",
entity.name, index, value
))));
}
}
}

if let Some(compare_columns) = merge.compare_columns.as_ref() {
validate_merge_column_list(
entity,
"sink.accepted.merge.compare_columns",
compare_columns,
&schema_columns,
)?;
for (index, column_name) in compare_columns.iter().enumerate() {
let value = column_name.trim();
if primary_key_columns.contains(value) {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.merge.compare_columns[{}]={} cannot reference schema.primary_key column",
entity.name, index, value
))));
}
}
}

if let Some(scd2) = merge.scd2.as_ref() {
if write_mode != crate::config::WriteMode::MergeScd2 {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.merge.scd2 is only supported with sink.write_mode=merge_scd2",
entity.name
))));
}
let current_flag_column = scd2
.current_flag_column
.as_deref()
.unwrap_or(crate::config::DEFAULT_SCD2_CURRENT_FLAG_COLUMN)
.trim();
let valid_from_column = scd2
.valid_from_column
.as_deref()
.unwrap_or(crate::config::DEFAULT_SCD2_VALID_FROM_COLUMN)
.trim();
let valid_to_column = scd2
.valid_to_column
.as_deref()
.unwrap_or(crate::config::DEFAULT_SCD2_VALID_TO_COLUMN)
.trim();
let resolved_columns = [
("current_flag_column", current_flag_column),
("valid_from_column", valid_from_column),
("valid_to_column", valid_to_column),
];
for (field, value) in resolved_columns {
if value.is_empty() {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.merge.scd2.{} must not be empty",
entity.name, field
))));
}
if resolved_output_column_names.contains(value) {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.merge.scd2.{}={} collides with schema column name",
entity.name, field, value
))));
}
}
let unique_columns = resolved_columns
.iter()
.map(|(_, value)| *value)
.collect::<HashSet<_>>();
if unique_columns.len() != resolved_columns.len() {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.merge.scd2 column names must be unique",
entity.name
))));
}
}

Ok(())
}

fn validate_merge_column_list(
entity: &EntityConfig,
field: &str,
values: &[String],
schema_columns: &HashSet<&str>,
) -> FloeResult<()> {
let mut seen = HashSet::new();
for (index, value) in values.iter().enumerate() {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(Box::new(ConfigError(format!(
"entity.name={} {}[{}] must not be empty",
entity.name, field, index
))));
}
if !schema_columns.contains(trimmed) {
return Err(Box::new(ConfigError(format!(
"entity.name={} {}[{}]={} references unknown schema column",
entity.name, field, index, trimmed
))));
}
if !seen.insert(trimmed.to_string()) {
return Err(Box::new(ConfigError(format!(
"entity.name={} {} has duplicate column {}",
entity.name, field, trimmed
))));
}
}
Ok(())
}

Expand Down
5 changes: 4 additions & 1 deletion crates/floe-core/src/io/write/strategy/merge/scd1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ impl MergeBackend for DeltaMergeBackend {
.map(|name| name.to_string())
.collect::<Vec<_>>();
let merge_key_set = merge_key.iter().map(String::as_str).collect::<HashSet<_>>();
let ignore_columns = shared::resolve_merge_ignore_columns(ctx.entity)?;
let update_columns = source_columns
.iter()
.filter(|name| !merge_key_set.contains(name.as_str()))
.filter(|name| {
!merge_key_set.contains(name.as_str()) && !ignore_columns.contains(name.as_str())
})
.cloned()
.collect::<Vec<_>>();
let predicate = shared::merge_predicate_sql(&merge_key);
Expand Down
Loading