From 8238c01307544572c7ee5205247bad6d134ce481 Mon Sep 17 00:00:00 2001 From: "Agent (OpenClaw)" Date: Tue, 10 Mar 2026 21:46:23 +0100 Subject: [PATCH 1/4] feat: implement requested changes --- crates/floe-core/src/config/parse.rs | 55 ++++++++++- crates/floe-core/src/config/types.rs | 41 ++++++++ crates/floe-core/src/config/validate.rs | 52 +++++++++- .../tests/unit/config/adls_validation.rs | 1 + .../floe-core/tests/unit/config/catalogs.rs | 1 + .../tests/unit/config/config_validation.rs | 99 ++++++++++++++++++- crates/floe-core/tests/unit/config/parse.rs | 46 ++++++++- .../floe-core/tests/unit/io/storage/inputs.rs | 1 + .../tests/unit/io/write/delta_write.rs | 1 + .../tests/unit/io/write/iceberg_write.rs | 1 + .../tests/unit/io/write/object_store.rs | 8 ++ .../tests/unit/io/write/rejected_csv.rs | 1 + docs/config.md | 19 +++- 13 files changed, 317 insertions(+), 9 deletions(-) diff --git a/crates/floe-core/src/config/parse.rs b/crates/floe-core/src/config/parse.rs index 0b7a8e9..2600d2c 100644 --- a/crates/floe-core/src/config/parse.rs +++ b/crates/floe-core/src/config/parse.rs @@ -12,7 +12,8 @@ use crate::config::{ ArchiveTarget, CatalogDefinition, CatalogsConfig, ColumnConfig, DomainConfig, EntityConfig, EntityMetadata, EnvConfig, IcebergPartitionFieldConfig, IcebergSinkTargetConfig, MergeOptionsConfig, MergeScd2OptionsConfig, NormalizeColumnsConfig, PolicyConfig, - ProjectMetadata, ReportConfig, RootConfig, SchemaConfig, SchemaMismatchConfig, SinkConfig, + ProjectMetadata, ReportConfig, RootConfig, SchemaConfig, SchemaEvolutionConfig, + SchemaEvolutionIncompatibleAction, SchemaEvolutionMode, SchemaMismatchConfig, SinkConfig, SinkOptions, SinkTarget, SourceConfig, SourceOptions, StorageDefinition, StoragesConfig, WriteMode, }; @@ -644,6 +645,7 @@ fn parse_schema(value: &Yaml) -> FloeResult { &[ "normalize_columns", "mismatch", + "schema_evolution", "primary_key", "unique_keys", "columns", @@ -657,6 +659,10 @@ fn parse_schema(value: &Yaml) -> FloeResult { Some(value) => Some(parse_mismatch(value)?), None => None, }; + let schema_evolution = match hash_get(hash, "schema_evolution") { + Some(value) => Some(parse_schema_evolution(value)?), + None => None, + }; let primary_key = opt_vec_string(hash, "primary_key", "schema")?; let unique_keys = opt_vec_vec_string(hash, "unique_keys", "schema")?; let columns_yaml = get_array(hash, "columns", "schema")?; @@ -671,6 +677,7 @@ fn parse_schema(value: &Yaml) -> FloeResult { Ok(SchemaConfig { normalize_columns, mismatch, + schema_evolution, primary_key, unique_keys, columns, @@ -699,6 +706,52 @@ fn parse_mismatch(value: &Yaml) -> FloeResult { }) } +fn parse_schema_evolution(value: &Yaml) -> FloeResult { + let hash = yaml_hash(value, "schema.schema_evolution")?; + validate_known_keys( + hash, + "schema.schema_evolution", + &["mode", "on_incompatible"], + )?; + let mode = match opt_string(hash, "mode", "schema.schema_evolution")? { + Some(value) => parse_schema_evolution_mode(&value, "schema.schema_evolution.mode")?, + None => SchemaEvolutionMode::Strict, + }; + let on_incompatible = match opt_string(hash, "on_incompatible", "schema.schema_evolution")? { + Some(value) => parse_schema_evolution_incompatible_action( + &value, + "schema.schema_evolution.on_incompatible", + )?, + None => SchemaEvolutionIncompatibleAction::Fail, + }; + Ok(SchemaEvolutionConfig { + mode, + on_incompatible, + }) +} + +fn parse_schema_evolution_mode(value: &str, ctx: &str) -> FloeResult { + match value.trim().to_ascii_lowercase().as_str() { + "strict" => Ok(SchemaEvolutionMode::Strict), + "add_columns" => Ok(SchemaEvolutionMode::AddColumns), + _ => Err(Box::new(ConfigError(format!( + "unsupported value at {ctx}: {value} (allowed: strict, add_columns)" + )))), + } +} + +fn parse_schema_evolution_incompatible_action( + value: &str, + ctx: &str, +) -> FloeResult { + match value.trim().to_ascii_lowercase().as_str() { + "fail" => Ok(SchemaEvolutionIncompatibleAction::Fail), + _ => Err(Box::new(ConfigError(format!( + "unsupported value at {ctx}: {value} (allowed: fail)" + )))), + } +} + fn parse_column(value: &Yaml) -> FloeResult { let hash = yaml_hash(value, "schema.columns")?; validate_known_keys( diff --git a/crates/floe-core/src/config/types.rs b/crates/floe-core/src/config/types.rs index 9f689a3..54ba417 100644 --- a/crates/floe-core/src/config/types.rs +++ b/crates/floe-core/src/config/types.rs @@ -319,12 +319,17 @@ pub struct PolicyConfig { pub struct SchemaConfig { pub normalize_columns: Option, pub mismatch: Option, + pub schema_evolution: Option, pub primary_key: Option>, pub unique_keys: Option>>, pub columns: Vec, } impl SchemaConfig { + pub fn resolved_schema_evolution(&self) -> SchemaEvolutionConfig { + self.schema_evolution.unwrap_or_default() + } + pub fn to_polars_schema(&self) -> FloeResult { let mut schema = Schema::with_capacity(self.columns.len()); for column in &self.columns { @@ -362,6 +367,42 @@ pub struct NormalizeColumnsConfig { pub strategy: Option, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct SchemaEvolutionConfig { + pub mode: SchemaEvolutionMode, + pub on_incompatible: SchemaEvolutionIncompatibleAction, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SchemaEvolutionMode { + #[default] + Strict, + AddColumns, +} + +impl SchemaEvolutionMode { + pub fn as_str(self) -> &'static str { + match self { + SchemaEvolutionMode::Strict => "strict", + SchemaEvolutionMode::AddColumns => "add_columns", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SchemaEvolutionIncompatibleAction { + #[default] + Fail, +} + +impl SchemaEvolutionIncompatibleAction { + pub fn as_str(self) -> &'static str { + match self { + SchemaEvolutionIncompatibleAction::Fail => "fail", + } + } +} + #[derive(Debug)] pub struct SchemaMismatchConfig { pub missing_columns: Option, diff --git a/crates/floe-core/src/config/validate.rs b/crates/floe-core/src/config/validate.rs index 66078de..5a7500d 100644 --- a/crates/floe-core/src/config/validate.rs +++ b/crates/floe-core/src/config/validate.rs @@ -17,9 +17,12 @@ const ALLOWED_EXTRA_POLICIES: &[&str] = &["reject_file", "ignore"]; const ALLOWED_STORAGE_TYPES: &[&str] = &["local", "s3", "adls", "gcs"]; const ALLOWED_CATALOG_TYPES: &[&str] = &["glue"]; const ALLOWED_ICEBERG_PARTITION_TRANSFORMS: &[&str] = &["identity", "year", "month", "day", "hour"]; +const ALLOWED_CONFIG_VERSIONS: &[&str] = &["0.1", "0.2"]; const MAX_JSON_COLUMNS: usize = 1024; pub(crate) fn validate_config(config: &RootConfig) -> FloeResult<()> { + validate_version(config)?; + if config.entities.is_empty() { return Err(Box::new(ConfigError( "entities list is empty (at least one entity is required)".to_string(), @@ -34,7 +37,12 @@ pub(crate) fn validate_config(config: &RootConfig) -> FloeResult<()> { let mut names = HashSet::new(); for entity in &config.entities { - validate_entity(entity, &storage_registry, &catalog_registry)?; + validate_entity( + entity, + &config.version, + &storage_registry, + &catalog_registry, + )?; if !names.insert(entity.name.as_str()) { return Err(Box::new(ConfigError(format!( "entity.name={} is duplicated in config", @@ -46,6 +54,17 @@ pub(crate) fn validate_config(config: &RootConfig) -> FloeResult<()> { Ok(()) } +fn validate_version(config: &RootConfig) -> FloeResult<()> { + if !ALLOWED_CONFIG_VERSIONS.contains(&config.version.as_str()) { + return Err(Box::new(ConfigError(format!( + "root.version={} is unsupported (allowed: {})", + config.version, + ALLOWED_CONFIG_VERSIONS.join(", ") + )))); + } + Ok(()) +} + fn validate_report( report: &crate::config::ReportConfig, storages: &StorageRegistry, @@ -57,13 +76,14 @@ fn validate_report( fn validate_entity( entity: &EntityConfig, + config_version: &str, storages: &StorageRegistry, catalogs: &CatalogRegistry, ) -> FloeResult<()> { validate_source(entity, storages)?; validate_policy(entity)?; validate_sink(entity, storages, catalogs)?; - validate_schema(entity)?; + validate_schema(entity, config_version)?; Ok(()) } @@ -658,7 +678,7 @@ fn validate_policy(entity: &EntityConfig) -> FloeResult<()> { Ok(()) } -fn validate_schema(entity: &EntityConfig) -> FloeResult<()> { +fn validate_schema(entity: &EntityConfig, config_version: &str) -> FloeResult<()> { if entity.source.format == "json" && entity.schema.columns.len() > MAX_JSON_COLUMNS { return Err(Box::new(ConfigError(format!( "entity.name={} schema.columns has {} entries which exceeds the JSON selector limit of {}", @@ -752,6 +772,32 @@ fn validate_schema(entity: &EntityConfig) -> FloeResult<()> { validate_schema_primary_key(entity)?; validate_schema_unique_keys(entity)?; + validate_schema_evolution(entity, config_version)?; + + Ok(()) +} + +fn validate_schema_evolution(entity: &EntityConfig, config_version: &str) -> FloeResult<()> { + let Some(schema_evolution) = entity.schema.schema_evolution else { + return Ok(()); + }; + + if config_version != "0.2" { + return Err(Box::new(ConfigError(format!( + "entity.name={} schema.schema_evolution requires root.version=\"0.2\"", + entity.name + )))); + } + + if entity.sink.accepted.format != "delta" + && schema_evolution.mode == crate::config::SchemaEvolutionMode::AddColumns + { + return Err(Box::new(ConfigError(format!( + "entity.name={} schema.schema_evolution.mode={} requires sink.accepted.format=delta", + entity.name, + schema_evolution.mode.as_str() + )))); + } Ok(()) } diff --git a/crates/floe-core/tests/unit/config/adls_validation.rs b/crates/floe-core/tests/unit/config/adls_validation.rs index 152cd4f..e2ac1d8 100644 --- a/crates/floe-core/tests/unit/config/adls_validation.rs +++ b/crates/floe-core/tests/unit/config/adls_validation.rs @@ -34,6 +34,7 @@ fn base_entity() -> config::EntityConfig { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), diff --git a/crates/floe-core/tests/unit/config/catalogs.rs b/crates/floe-core/tests/unit/config/catalogs.rs index aa2691f..83203f3 100644 --- a/crates/floe-core/tests/unit/config/catalogs.rs +++ b/crates/floe-core/tests/unit/config/catalogs.rs @@ -84,6 +84,7 @@ fn entity() -> config::EntityConfig { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), diff --git a/crates/floe-core/tests/unit/config/config_validation.rs b/crates/floe-core/tests/unit/config/config_validation.rs index fb70a5a..84c8020 100644 --- a/crates/floe-core/tests/unit/config/config_validation.rs +++ b/crates/floe-core/tests/unit/config/config_validation.rs @@ -54,8 +54,12 @@ fn base_entity(name: &str) -> String { } fn base_config(entities_yaml: &str) -> String { + base_config_with_version("0.1", entities_yaml) +} + +fn base_config_with_version(version: &str, entities_yaml: &str) -> String { format!( - r#"version: "0.1" + r#"version: "{version}" report: path: "/tmp/reports" entities: @@ -116,6 +120,19 @@ entities: assert_validation_ok(&yaml); } +#[test] +fn config_version_0_2_is_valid() { + assert_validation_ok(&base_config_with_version("0.2", &base_entity("customer"))); +} + +#[test] +fn unsupported_config_version_errors() { + assert_validation_error( + &base_config_with_version("0.3", &base_entity("customer")), + &["root.version=0.3", "unsupported", "0.1", "0.2"], + ); +} + #[test] fn empty_entities_errors() { let yaml = r#"version: "0.1" @@ -288,6 +305,86 @@ fn delta_partition_by_unknown_column_errors() { ); } +#[test] +fn schema_evolution_add_columns_requires_delta_sink() { + let entity = r#" - name: "orders" + source: + format: "csv" + path: "/tmp/input" + sink: + accepted: + format: "parquet" + path: "/tmp/out" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "order_id" + type: "string" +"#; + assert_validation_error( + &base_config_with_version("0.2", entity), + &[ + "entity.name=orders", + "schema.schema_evolution.mode=add_columns", + "sink.accepted.format=delta", + ], + ); +} + +#[test] +fn schema_evolution_is_rejected_for_version_0_1() { + let entity = r#" - name: "orders" + source: + format: "csv" + path: "/tmp/input" + sink: + accepted: + format: "delta" + path: "/tmp/out" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "strict" + columns: + - name: "order_id" + type: "string" +"#; + assert_validation_error( + &base_config(entity), + &[ + "entity.name=orders", + "schema.schema_evolution", + "root.version=\"0.2\"", + ], + ); +} + +#[test] +fn schema_evolution_add_columns_is_valid_for_delta_on_version_0_2() { + let entity = r#" - name: "orders" + source: + format: "csv" + path: "/tmp/input" + sink: + accepted: + format: "delta" + path: "/tmp/out" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "order_id" + type: "string" +"#; + assert_validation_ok(&base_config_with_version("0.2", entity)); +} + #[test] fn iceberg_partition_spec_validates_supported_transform_subset() { let entity = r#" - name: "orders" diff --git a/crates/floe-core/tests/unit/config/parse.rs b/crates/floe-core/tests/unit/config/parse.rs index 904378e..a534535 100644 --- a/crates/floe-core/tests/unit/config/parse.rs +++ b/crates/floe-core/tests/unit/config/parse.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; -use floe_core::config::WriteMode; +use floe_core::config::{SchemaEvolutionIncompatibleAction, SchemaEvolutionMode, WriteMode}; use floe_core::load_config; static TEMP_CONFIG_SEQ: AtomicU64 = AtomicU64::new(0); @@ -65,6 +65,50 @@ entities: WriteMode::Overwrite ); assert_eq!(entity.sink.resolved_write_mode(), WriteMode::Overwrite); + let schema_evolution = entity.schema.resolved_schema_evolution(); + assert_eq!(schema_evolution.mode, SchemaEvolutionMode::Strict); + assert_eq!( + schema_evolution.on_incompatible, + SchemaEvolutionIncompatibleAction::Fail + ); +} + +#[test] +fn parse_config_supports_version_0_2_and_schema_evolution() { + let yaml = r#" +version: "0.2" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + accepted: + format: "delta" + path: "/tmp/out" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + on_incompatible: "fail" + columns: + - name: "customer_id" + type: "string" +"#; + let path = write_temp_config(yaml); + let config = load_config(&path).expect("parse config"); + + assert_eq!(config.version, "0.2"); + let schema_evolution = config.entities[0] + .schema + .schema_evolution + .expect("schema evolution"); + assert_eq!(schema_evolution.mode, SchemaEvolutionMode::AddColumns); + assert_eq!( + schema_evolution.on_incompatible, + SchemaEvolutionIncompatibleAction::Fail + ); } #[test] diff --git a/crates/floe-core/tests/unit/io/storage/inputs.rs b/crates/floe-core/tests/unit/io/storage/inputs.rs index 688d1f5..4a89e9a 100644 --- a/crates/floe-core/tests/unit/io/storage/inputs.rs +++ b/crates/floe-core/tests/unit/io/storage/inputs.rs @@ -234,6 +234,7 @@ fn mock_entity(name: &str) -> config::EntityConfig { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), diff --git a/crates/floe-core/tests/unit/io/write/delta_write.rs b/crates/floe-core/tests/unit/io/write/delta_write.rs index 5af2936..876d9b8 100644 --- a/crates/floe-core/tests/unit/io/write/delta_write.rs +++ b/crates/floe-core/tests/unit/io/write/delta_write.rs @@ -445,6 +445,7 @@ fn build_entity( schema: config::SchemaConfig { normalize_columns, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns, diff --git a/crates/floe-core/tests/unit/io/write/iceberg_write.rs b/crates/floe-core/tests/unit/io/write/iceberg_write.rs index 9f4123c..ecd69a6 100644 --- a/crates/floe-core/tests/unit/io/write/iceberg_write.rs +++ b/crates/floe-core/tests/unit/io/write/iceberg_write.rs @@ -469,6 +469,7 @@ fn build_entity( schema: config::SchemaConfig { normalize_columns, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns, diff --git a/crates/floe-core/tests/unit/io/write/object_store.rs b/crates/floe-core/tests/unit/io/write/object_store.rs index d2ecf3c..0ea648d 100644 --- a/crates/floe-core/tests/unit/io/write/object_store.rs +++ b/crates/floe-core/tests/unit/io/write/object_store.rs @@ -70,6 +70,7 @@ fn delta_store_config_builds_s3_url_and_options() -> FloeResult<()> { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -129,6 +130,7 @@ fn iceberg_store_config_builds_s3_warehouse_and_region_props() -> FloeResult<()> schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -200,6 +202,7 @@ fn delta_store_config_builds_local_url() -> FloeResult<()> { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -260,6 +263,7 @@ fn iceberg_store_config_builds_local_warehouse_without_props() -> FloeResult<()> schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -332,6 +336,7 @@ fn delta_store_config_builds_adls_url_and_options() -> FloeResult<()> { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -420,6 +425,7 @@ fn iceberg_store_config_builds_gcs_warehouse_without_props() -> FloeResult<()> { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -495,6 +501,7 @@ fn iceberg_store_config_rejects_adls_target() -> FloeResult<()> { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), @@ -566,6 +573,7 @@ fn delta_store_config_builds_gcs_url() -> FloeResult<()> { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), diff --git a/crates/floe-core/tests/unit/io/write/rejected_csv.rs b/crates/floe-core/tests/unit/io/write/rejected_csv.rs index b9d529f..2284d52 100644 --- a/crates/floe-core/tests/unit/io/write/rejected_csv.rs +++ b/crates/floe-core/tests/unit/io/write/rejected_csv.rs @@ -50,6 +50,7 @@ fn sample_entity() -> config::EntityConfig { schema: config::SchemaConfig { normalize_columns: None, mismatch: None, + schema_evolution: None, primary_key: None, unique_keys: None, columns: Vec::new(), diff --git a/docs/config.md b/docs/config.md index 6eb0a20..91c9b44 100644 --- a/docs/config.md +++ b/docs/config.md @@ -6,7 +6,7 @@ all supported options. See `example/config.yml` for a full working example. ## Structure at a glance ```yaml -version: "0.1" +version: "0.2" metadata: { ... } report: path: "/abs/or/relative/report/dir" @@ -48,6 +48,9 @@ entities: normalize_columns: enabled: true strategy: "snake_case" + schema_evolution: + mode: "strict" + on_incompatible: "fail" columns: - name: "customer_id" type: "string" @@ -58,7 +61,9 @@ entities: ## Top-level fields - `version` (required) - - String used to validate config compatibility (example: `"0.1"`). + - String used to validate config compatibility. + - Supported values: `"0.1"` and `"0.2"`. + - `schema.schema_evolution` requires `version: "0.2"`. - `metadata` (optional) - Free-form project metadata. Keys supported in schema: `project`, `description`, `owner`, `tags`. @@ -265,7 +270,9 @@ is available for templating within that entity. - `table_version` / `snapshot_id` in reports are sink-format specific (for example Delta table version, Iceberg metadata version + snapshot ID). - Compaction/optimization/maintenance remains external to Floe (for Parquet/Delta/Iceberg datasets). - Examples: Delta optimize/vacuum, Iceberg compaction/maintenance jobs. - - Schema evolution is currently out of scope for accepted Delta/Iceberg sink workflows in Floe. + - `schema.schema_evolution` is scaffolding only in this release. + - Validation is version-aware and `mode: add_columns` is Delta-only. + - Accepted sink runtime behavior does not apply schema evolution yet. - `rejected` (required when `policy.severity: reject`) - `format`: `csv` (v0.1). - `path`: output directory for rejected rows. @@ -290,6 +297,12 @@ is available for templating within that entity. - `strategy`: `snake_case`, `lower`, `camel_case`, `none`. - When enabled, both schema column names and input column names are normalized before checks. If normalization causes a name collision, the run fails. +- `schema_evolution` (optional, v0.2) + - Omitted by default, Floe behaves as `mode: strict` and `on_incompatible: fail`. + - `mode`: `strict` or `add_columns`. + - `on_incompatible`: `fail`. + - `mode: add_columns` requires `sink.accepted.format: delta`. + - Current scope is config parsing and validation only; accepted-write runtime behavior remains strict. - `primary_key` (optional) - Array of schema column names. - Primary key columns are always treated as required (`not_null`) at runtime. From a7e4c5786f621df32def4b51bdebac92c6afb997 Mon Sep 17 00:00:00 2001 From: "Agent (OpenClaw)" Date: Tue, 10 Mar 2026 22:17:22 +0100 Subject: [PATCH 2/4] feat: implement requested changes --- crates/floe-core/src/config/validate.rs | 78 ++++++++++++++----- .../tests/unit/config/config_validation.rs | 48 +++++++++++- crates/floe-core/tests/unit/config/parse.rs | 36 +++++++++ docs/config.md | 5 +- 4 files changed, 141 insertions(+), 26 deletions(-) diff --git a/crates/floe-core/src/config/validate.rs b/crates/floe-core/src/config/validate.rs index 5a7500d..59eacfa 100644 --- a/crates/floe-core/src/config/validate.rs +++ b/crates/floe-core/src/config/validate.rs @@ -17,11 +17,51 @@ const ALLOWED_EXTRA_POLICIES: &[&str] = &["reject_file", "ignore"]; const ALLOWED_STORAGE_TYPES: &[&str] = &["local", "s3", "adls", "gcs"]; const ALLOWED_CATALOG_TYPES: &[&str] = &["glue"]; const ALLOWED_ICEBERG_PARTITION_TRANSFORMS: &[&str] = &["identity", "year", "month", "day", "hour"]; -const ALLOWED_CONFIG_VERSIONS: &[&str] = &["0.1", "0.2"]; +const MIN_SUPPORTED_CONFIG_VERSION: ConfigVersion = ConfigVersion::new(0, 1); +const MIN_SCHEMA_EVOLUTION_CONFIG_VERSION: ConfigVersion = ConfigVersion::new(0, 2); const MAX_JSON_COLUMNS: usize = 1024; +#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] +struct ConfigVersion { + major: u64, + minor: u64, +} + +impl ConfigVersion { + const fn new(major: u64, minor: u64) -> Self { + Self { major, minor } + } + + fn parse(raw: &str) -> FloeResult { + let (major, minor) = raw.split_once('.').ok_or_else(|| { + Box::new(ConfigError(format!( + "root.version={raw} is invalid; expected numeric major.minor format like \"0.1\"" + ))) as Box + })?; + + if minor.contains('.') { + return Err(Box::new(ConfigError(format!( + "root.version={raw} is invalid; expected numeric major.minor format like \"0.1\"" + )))); + } + + let major = major.parse::().map_err(|_| { + Box::new(ConfigError(format!( + "root.version={raw} is invalid; expected numeric major.minor format like \"0.1\"" + ))) as Box + })?; + let minor = minor.parse::().map_err(|_| { + Box::new(ConfigError(format!( + "root.version={raw} is invalid; expected numeric major.minor format like \"0.1\"" + ))) as Box + })?; + + Ok(Self { major, minor }) + } +} + pub(crate) fn validate_config(config: &RootConfig) -> FloeResult<()> { - validate_version(config)?; + let config_version = validate_version(config)?; if config.entities.is_empty() { return Err(Box::new(ConfigError( @@ -37,12 +77,7 @@ pub(crate) fn validate_config(config: &RootConfig) -> FloeResult<()> { let mut names = HashSet::new(); for entity in &config.entities { - validate_entity( - entity, - &config.version, - &storage_registry, - &catalog_registry, - )?; + validate_entity(entity, config_version, &storage_registry, &catalog_registry)?; if !names.insert(entity.name.as_str()) { return Err(Box::new(ConfigError(format!( "entity.name={} is duplicated in config", @@ -54,15 +89,15 @@ pub(crate) fn validate_config(config: &RootConfig) -> FloeResult<()> { Ok(()) } -fn validate_version(config: &RootConfig) -> FloeResult<()> { - if !ALLOWED_CONFIG_VERSIONS.contains(&config.version.as_str()) { +fn validate_version(config: &RootConfig) -> FloeResult { + let version = ConfigVersion::parse(&config.version)?; + if version < MIN_SUPPORTED_CONFIG_VERSION { return Err(Box::new(ConfigError(format!( - "root.version={} is unsupported (allowed: {})", - config.version, - ALLOWED_CONFIG_VERSIONS.join(", ") + "root.version={} is unsupported (minimum supported version: {}.{})", + config.version, MIN_SUPPORTED_CONFIG_VERSION.major, MIN_SUPPORTED_CONFIG_VERSION.minor )))); } - Ok(()) + Ok(version) } fn validate_report( @@ -76,7 +111,7 @@ fn validate_report( fn validate_entity( entity: &EntityConfig, - config_version: &str, + config_version: ConfigVersion, storages: &StorageRegistry, catalogs: &CatalogRegistry, ) -> FloeResult<()> { @@ -678,7 +713,7 @@ fn validate_policy(entity: &EntityConfig) -> FloeResult<()> { Ok(()) } -fn validate_schema(entity: &EntityConfig, config_version: &str) -> FloeResult<()> { +fn validate_schema(entity: &EntityConfig, config_version: ConfigVersion) -> FloeResult<()> { if entity.source.format == "json" && entity.schema.columns.len() > MAX_JSON_COLUMNS { return Err(Box::new(ConfigError(format!( "entity.name={} schema.columns has {} entries which exceeds the JSON selector limit of {}", @@ -777,15 +812,18 @@ fn validate_schema(entity: &EntityConfig, config_version: &str) -> FloeResult<() Ok(()) } -fn validate_schema_evolution(entity: &EntityConfig, config_version: &str) -> FloeResult<()> { +fn validate_schema_evolution( + entity: &EntityConfig, + config_version: ConfigVersion, +) -> FloeResult<()> { let Some(schema_evolution) = entity.schema.schema_evolution else { return Ok(()); }; - if config_version != "0.2" { + if config_version < MIN_SCHEMA_EVOLUTION_CONFIG_VERSION { return Err(Box::new(ConfigError(format!( - "entity.name={} schema.schema_evolution requires root.version=\"0.2\"", - entity.name + "entity.name={} schema.schema_evolution requires root.version >= \"0.2\"", + entity.name, )))); } diff --git a/crates/floe-core/tests/unit/config/config_validation.rs b/crates/floe-core/tests/unit/config/config_validation.rs index 84c8020..47f7762 100644 --- a/crates/floe-core/tests/unit/config/config_validation.rs +++ b/crates/floe-core/tests/unit/config/config_validation.rs @@ -126,10 +126,28 @@ fn config_version_0_2_is_valid() { } #[test] -fn unsupported_config_version_errors() { +fn config_version_0_3_is_valid() { + assert_validation_ok(&base_config_with_version("0.3", &base_entity("customer"))); +} + +#[test] +fn malformed_config_version_errors() { + assert_validation_error( + &base_config_with_version("abc", &base_entity("customer")), + &["root.version=abc", "invalid", "major.minor"], + ); +} + +#[test] +fn config_version_below_minimum_errors() { assert_validation_error( - &base_config_with_version("0.3", &base_entity("customer")), - &["root.version=0.3", "unsupported", "0.1", "0.2"], + &base_config_with_version("0.0", &base_entity("customer")), + &[ + "root.version=0.0", + "unsupported", + "minimum supported version", + "0.1", + ], ); } @@ -358,7 +376,7 @@ fn schema_evolution_is_rejected_for_version_0_1() { &[ "entity.name=orders", "schema.schema_evolution", - "root.version=\"0.2\"", + "root.version >= \"0.2\"", ], ); } @@ -385,6 +403,28 @@ fn schema_evolution_add_columns_is_valid_for_delta_on_version_0_2() { assert_validation_ok(&base_config_with_version("0.2", entity)); } +#[test] +fn schema_evolution_add_columns_is_valid_for_delta_on_higher_version() { + let entity = r#" - name: "orders" + source: + format: "csv" + path: "/tmp/input" + sink: + accepted: + format: "delta" + path: "/tmp/out" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "order_id" + type: "string" +"#; + assert_validation_ok(&base_config_with_version("0.3", entity)); +} + #[test] fn iceberg_partition_spec_validates_supported_transform_subset() { let entity = r#" - name: "orders" diff --git a/crates/floe-core/tests/unit/config/parse.rs b/crates/floe-core/tests/unit/config/parse.rs index a534535..8136706 100644 --- a/crates/floe-core/tests/unit/config/parse.rs +++ b/crates/floe-core/tests/unit/config/parse.rs @@ -111,6 +111,42 @@ entities: ); } +#[test] +fn parse_config_supports_higher_version_with_schema_evolution() { + let yaml = r#" +version: "0.3" +entities: + - name: "customer" + source: + format: "csv" + path: "/tmp/input" + sink: + accepted: + format: "delta" + path: "/tmp/out" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "customer_id" + type: "string" +"#; + let path = write_temp_config(yaml); + let config = load_config(&path).expect("parse config"); + + assert_eq!(config.version, "0.3"); + assert_eq!( + config.entities[0] + .schema + .schema_evolution + .expect("schema evolution") + .mode, + SchemaEvolutionMode::AddColumns + ); +} + #[test] fn parse_config_supports_sink_level_append_write_mode() { let yaml = r#" diff --git a/docs/config.md b/docs/config.md index 91c9b44..fad8845 100644 --- a/docs/config.md +++ b/docs/config.md @@ -62,8 +62,9 @@ entities: - `version` (required) - String used to validate config compatibility. - - Supported values: `"0.1"` and `"0.2"`. - - `schema.schema_evolution` requires `version: "0.2"`. + - Must be a numeric `major.minor` string such as `"0.1"`, `"0.2"`, or `"0.3"`. + - Minimum supported version is `"0.1"`. + - `schema.schema_evolution` requires `version >= "0.2"`. - `metadata` (optional) - Free-form project metadata. Keys supported in schema: `project`, `description`, `owner`, `tags`. From 708aae930ace6503cc74e70456cf49145b213080 Mon Sep 17 00:00:00 2001 From: "Agent (OpenClaw)" Date: Wed, 11 Mar 2026 11:06:58 +0100 Subject: [PATCH 3/4] feat: implement requested changes --- crates/floe-cli/src/logging.rs | 14 +- crates/floe-cli/tests/output.rs | 7 + crates/floe-core/src/io/format.rs | 10 + crates/floe-core/src/io/write/delta.rs | 30 +- crates/floe-core/src/io/write/iceberg.rs | 12 + crates/floe-core/src/io/write/parquet.rs | 12 + .../src/io/write/strategy/merge/scd2.rs | 1 + .../src/io/write/strategy/merge/shared.rs | 159 +++++++- crates/floe-core/src/report/entity.rs | 2 + crates/floe-core/src/report/mod.rs | 13 + .../src/run/entity/accepted_write.rs | 35 +- crates/floe-core/src/run/entity/mod.rs | 10 + crates/floe-core/src/run/events.rs | 7 + .../floe-core/tests/integration/delta_run.rs | 386 +++++++++++++++++- .../tests/unit/io/write/delta_write.rs | 156 +++++++ crates/floe-core/tests/unit/run/report.rs | 8 + docs/report.md | 14 + docs/sinks/delta.md | 12 +- docs/write_modes.md | 18 +- 19 files changed, 896 insertions(+), 10 deletions(-) diff --git a/crates/floe-cli/src/logging.rs b/crates/floe-cli/src/logging.rs index 945c47b..6364396 100644 --- a/crates/floe-cli/src/logging.rs +++ b/crates/floe-cli/src/logging.rs @@ -129,7 +129,8 @@ fn level_for_event(event: &RunEvent) -> Level { }, RunEvent::RunStarted { .. } | RunEvent::EntityStarted { .. } - | RunEvent::FileStarted { .. } => Level::Info, + | RunEvent::FileStarted { .. } + | RunEvent::SchemaEvolutionApplied { .. } => Level::Info, RunEvent::FileFinished { status, .. } => match status.as_str() { "success" => Level::Info, "rejected" => Level::Warn, @@ -233,6 +234,17 @@ pub fn format_event_text(event: &RunEvent) -> String { "entity_finished name={} status={} files={} rows={} accepted={} rejected={} warnings={} errors={}", name, status, files, rows, accepted, rejected, warnings, errors ), + RunEvent::SchemaEvolutionApplied { + entity, + mode, + added_columns, + .. + } => format!( + "schema_evolution_applied entity={} mode={} added_columns={}", + entity, + mode, + added_columns.join(",") + ), RunEvent::RunFinished { status, exit_code, diff --git a/crates/floe-cli/tests/output.rs b/crates/floe-cli/tests/output.rs index f6ec51f..08e0861 100644 --- a/crates/floe-cli/tests/output.rs +++ b/crates/floe-cli/tests/output.rs @@ -70,6 +70,13 @@ fn sample_outcome() -> RunOutcome { target_rows_after: None, merge_elapsed_ms: None, }, + schema_evolution: report::SchemaEvolutionSummary { + enabled: false, + mode: "strict".to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, unique_constraints: Vec::new(), results: report::ResultsTotals { files_total: 1, diff --git a/crates/floe-core/src/io/format.rs b/crates/floe-core/src/io/format.rs index 74c6cf7..e6b734a 100644 --- a/crates/floe-core/src/io/format.rs +++ b/crates/floe-core/src/io/format.rs @@ -40,6 +40,15 @@ pub struct AcceptedWriteMetrics { pub small_files_count: Option, } +#[derive(Debug, Clone, Default)] +pub struct AcceptedSchemaEvolution { + pub enabled: bool, + pub mode: String, + pub applied: bool, + pub added_columns: Vec, + pub incompatible_changes_detected: bool, +} + #[derive(Debug, Clone, Default)] pub struct AcceptedWritePerfBreakdown { pub conversion_ms: Option, @@ -76,6 +85,7 @@ pub struct AcceptedWriteOutput { pub iceberg_table: Option, pub metrics: AcceptedWriteMetrics, pub merge: Option, + pub schema_evolution: AcceptedSchemaEvolution, pub perf: Option, } diff --git a/crates/floe-core/src/io/write/delta.rs b/crates/floe-core/src/io/write/delta.rs index 8514311..33bba39 100644 --- a/crates/floe-core/src/io/write/delta.rs +++ b/crates/floe-core/src/io/write/delta.rs @@ -33,6 +33,7 @@ struct DeltaWriteResult { part_files: Vec, metrics: AcceptedWriteMetrics, merge: Option, + schema_evolution: crate::io::format::AcceptedSchemaEvolution, perf: AcceptedWritePerfBreakdown, } @@ -68,7 +69,7 @@ fn write_delta_table_with_metrics( .enable_all() .build() .map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?; - let (version, merge, mut perf_breakdown) = match mode { + let (version, merge, schema_evolution, mut perf_breakdown) = match mode { config::WriteMode::Overwrite | config::WriteMode::Append => { let outcome = shared::write_standard_delta_version_with_perf( &runtime, @@ -83,6 +84,7 @@ fn write_delta_table_with_metrics( ( outcome.version, None, + outcome.schema_evolution, AcceptedWritePerfBreakdown { conversion_ms: Some(outcome.perf.conversion_ms), commit_ms: Some(outcome.perf.commit_ms), @@ -103,6 +105,18 @@ fn write_delta_table_with_metrics( ( version, Some(merge), + crate::io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, AcceptedWritePerfBreakdown { conversion_ms: Some(perf.conversion_ms), source_df_build_ms: Some(perf.source_df_build_ms), @@ -125,6 +139,18 @@ fn write_delta_table_with_metrics( ( version, Some(merge), + crate::io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, AcceptedWritePerfBreakdown { conversion_ms: Some(perf.conversion_ms), source_df_build_ms: Some(perf.source_df_build_ms), @@ -153,6 +179,7 @@ fn write_delta_table_with_metrics( part_files, metrics, merge, + schema_evolution, perf: perf_breakdown, }) } @@ -184,6 +211,7 @@ impl AcceptedSinkAdapter for DeltaAcceptedAdapter { iceberg_table: None, metrics: result.metrics, merge: result.merge, + schema_evolution: result.schema_evolution, perf: Some(result.perf), }) } diff --git a/crates/floe-core/src/io/write/iceberg.rs b/crates/floe-core/src/io/write/iceberg.rs index 38dab80..2f4a5a8 100644 --- a/crates/floe-core/src/io/write/iceberg.rs +++ b/crates/floe-core/src/io/write/iceberg.rs @@ -165,6 +165,18 @@ fn write_iceberg_table_with_remote_context( iceberg_table: result.iceberg_table, metrics: result.metrics, merge: None, + schema_evolution: crate::io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, perf: Some(result.perf), }) } diff --git a/crates/floe-core/src/io/write/parquet.rs b/crates/floe-core/src/io/write/parquet.rs index dcb9077..2434ff9 100644 --- a/crates/floe-core/src/io/write/parquet.rs +++ b/crates/floe-core/src/io/write/parquet.rs @@ -165,6 +165,18 @@ impl AcceptedSinkAdapter for ParquetAcceptedAdapter { iceberg_table: None, metrics, merge: None, + schema_evolution: io::format::AcceptedSchemaEvolution { + enabled: false, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, perf: None, }) } diff --git a/crates/floe-core/src/io/write/strategy/merge/scd2.rs b/crates/floe-core/src/io/write/strategy/merge/scd2.rs index 3687f25..95314db 100644 --- a/crates/floe-core/src/io/write/strategy/merge/scd2.rs +++ b/crates/floe-core/src/io/write/strategy/merge/scd2.rs @@ -107,6 +107,7 @@ impl MergeBackend for DeltaMergeBackend { SaveMode::Append, ctx.partition_by.clone(), ctx.target_file_size_bytes, + None, )?; perf.commit_ms = commit_start.elapsed().as_millis() as u64; return Ok(( diff --git a/crates/floe-core/src/io/write/strategy/merge/shared.rs b/crates/floe-core/src/io/write/strategy/merge/shared.rs index 76361be..bb5fd93 100644 --- a/crates/floe-core/src/io/write/strategy/merge/shared.rs +++ b/crates/floe-core/src/io/write/strategy/merge/shared.rs @@ -1,6 +1,9 @@ use std::time::Instant; +use arrow::datatypes::FieldRef; use arrow::record_batch::RecordBatch; +use deltalake::datafusion::datasource::TableProvider; +use deltalake::operations::write::SchemaMode; use deltalake::protocol::SaveMode; use deltalake::table::builder::DeltaTableBuilder; use deltalake::{datafusion::prelude::SessionContext, DeltaTable}; @@ -18,10 +21,11 @@ pub(crate) struct DeltaStandardWritePerf { pub(crate) commit_ms: u64, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) struct DeltaVersionWriteOutcome { pub(crate) version: i64, pub(crate) perf: DeltaStandardWritePerf, + pub(crate) schema_evolution: crate::io::format::AcceptedSchemaEvolution, } #[derive(Debug, Default, Clone, Copy)] @@ -51,6 +55,8 @@ pub(crate) fn write_standard_delta_version_with_perf( ) -> FloeResult { let conversion_start = Instant::now(); let batch = crate::io::write::delta::record_batch::dataframe_to_record_batch(df, entity)?; + let schema_evolution = + plan_standard_delta_schema_evolution(runtime, &batch, target, resolver, entity, mode)?; let conversion_ms = conversion_start.elapsed().as_millis() as u64; let commit_start = Instant::now(); let version = write_delta_batch_version( @@ -62,10 +68,12 @@ pub(crate) fn write_standard_delta_version_with_perf( save_mode_for_write_mode(mode), partition_by, target_file_size_bytes, + schema_evolution.write_schema_mode, )?; let commit_ms = commit_start.elapsed().as_millis() as u64; Ok(DeltaVersionWriteOutcome { version, + schema_evolution: schema_evolution.summary, perf: DeltaStandardWritePerf { conversion_ms, commit_ms, @@ -73,6 +81,151 @@ pub(crate) fn write_standard_delta_version_with_perf( }) } +struct PlannedSchemaEvolution { + summary: crate::io::format::AcceptedSchemaEvolution, + write_schema_mode: Option, +} + +fn plan_standard_delta_schema_evolution( + runtime: &tokio::runtime::Runtime, + batch: &RecordBatch, + target: &Target, + resolver: &config::StorageResolver, + entity: &config::EntityConfig, + mode: config::WriteMode, +) -> FloeResult { + let schema_evolution = entity.schema.resolved_schema_evolution(); + let enabled = schema_evolution.mode == config::SchemaEvolutionMode::AddColumns + && matches!( + mode, + config::WriteMode::Append | config::WriteMode::Overwrite + ); + let mut summary = crate::io::format::AcceptedSchemaEvolution { + enabled, + mode: schema_evolution.mode.as_str().to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }; + if !enabled { + return Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: None, + }); + } + + let store = object_store::delta_store_config(target, resolver, entity)?; + let table_url = store.table_url; + let storage_options = store.storage_options; + let builder = DeltaTableBuilder::from_url(table_url.clone()) + .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))? + .with_storage_options(storage_options.clone()); + + let maybe_table = runtime + .block_on(async move { + match builder.load().await { + Ok(table) => Ok(Some(table)), + Err(deltalake::DeltaTableError::NotATable(_)) => Ok(None), + Err(err) => Err(err), + } + }) + .map_err(|err| Box::new(RunError(format!("delta schema load failed: {err}"))))?; + + let Some(table) = maybe_table else { + return Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: None, + }); + }; + + let snapshot = table + .snapshot() + .map_err(|err| Box::new(RunError(format!("delta schema load failed: {err}"))))?; + let target_schema = table.schema(); + let target_fields = target_schema.fields(); + let source_schema = batch.schema(); + let source_fields = source_schema.fields(); + let added_columns = additive_columns(target_fields, source_fields); + let incompatible_changes = incompatible_schema_changes(target_fields, source_fields); + + summary.added_columns = added_columns; + summary.incompatible_changes_detected = !incompatible_changes.is_empty(); + + if !incompatible_changes.is_empty() { + return Err(Box::new(RunError(format!( + "entity.name={} delta schema evolution failed: incompatible changes detected: {}", + entity.name, + incompatible_changes.join("; ") + )))); + } + + if summary.added_columns.is_empty() { + return Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: None, + }); + } + + let partition_columns = snapshot.metadata().partition_columns(); + if !partition_columns.is_empty() { + return Err(Box::new(RunError(format!( + "entity.name={} delta schema evolution failed: adding columns is unsupported for partitioned delta tables", + entity.name + )))); + } + + summary.applied = true; + Ok(PlannedSchemaEvolution { + summary, + write_schema_mode: Some(SchemaMode::Merge), + }) +} + +fn additive_columns(target_fields: &[FieldRef], source_fields: &[FieldRef]) -> Vec { + let target_names = target_fields + .iter() + .map(|field| field.name().to_string()) + .collect::>(); + source_fields + .iter() + .filter(|field| !target_names.contains(field.name())) + .map(|field| field.name().to_string()) + .collect() +} + +fn incompatible_schema_changes( + target_fields: &[FieldRef], + source_fields: &[FieldRef], +) -> Vec { + let source_by_name = source_fields + .iter() + .map(|field| (field.name(), field)) + .collect::>(); + let mut incompatible = Vec::new(); + for target_field in target_fields { + let Some(source_field) = source_by_name.get(target_field.name()) else { + incompatible.push(format!("missing existing column {}", target_field.name())); + continue; + }; + if target_field.data_type() != source_field.data_type() { + incompatible.push(format!( + "column {} type changed from {:?} to {:?}", + target_field.name(), + target_field.data_type(), + source_field.data_type() + )); + } + if !target_field.is_nullable() && source_field.is_nullable() { + incompatible.push(format!( + "column {} nullability changed from non-nullable to nullable", + target_field.name() + )); + } + } + incompatible +} + +#[allow(clippy::too_many_arguments)] pub(crate) fn write_delta_batch_version( runtime: &tokio::runtime::Runtime, batch: deltalake::arrow::record_batch::RecordBatch, @@ -82,6 +235,7 @@ pub(crate) fn write_delta_batch_version( save_mode: SaveMode, partition_by: Option>, target_file_size_bytes: Option, + schema_mode: Option, ) -> FloeResult { let store = object_store::delta_store_config(target, resolver, entity)?; let table_url = store.table_url; @@ -103,6 +257,9 @@ pub(crate) fn write_delta_batch_version( }, }; let mut write = table.write(vec![batch]).with_save_mode(save_mode); + if let Some(schema_mode) = schema_mode { + write = write.with_schema_mode(schema_mode); + } if let Some(partition_by) = partition_by { write = write.with_partition_columns(partition_by); } diff --git a/crates/floe-core/src/report/entity.rs b/crates/floe-core/src/report/entity.rs index 47201f9..acc773f 100644 --- a/crates/floe-core/src/report/entity.rs +++ b/crates/floe-core/src/report/entity.rs @@ -37,6 +37,7 @@ pub(crate) struct RunReportContext<'a> { pub accepted_target_rows_before: Option, pub accepted_target_rows_after: Option, pub accepted_merge_elapsed_ms: Option, + pub accepted_schema_evolution: report::SchemaEvolutionSummary, pub unique_constraints: Vec, } @@ -119,6 +120,7 @@ pub(crate) fn build_run_report(ctx: RunReportContext<'_>) -> report::RunReport { target_rows_after: ctx.accepted_target_rows_after, merge_elapsed_ms: ctx.accepted_merge_elapsed_ms, }, + schema_evolution: ctx.accepted_schema_evolution, unique_constraints: build_unique_constraint_reports(ctx.severity, &ctx.unique_constraints), results: ctx.totals, files: ctx.file_reports, diff --git a/crates/floe-core/src/report/mod.rs b/crates/floe-core/src/report/mod.rs index 46e61d2..d275ef0 100644 --- a/crates/floe-core/src/report/mod.rs +++ b/crates/floe-core/src/report/mod.rs @@ -19,6 +19,7 @@ pub struct RunReport { pub sink: SinkEcho, pub policy: PolicyEcho, pub accepted_output: AcceptedOutputSummary, + pub schema_evolution: SchemaEvolutionSummary, #[serde(default)] #[serde(skip_serializing_if = "Vec::is_empty")] pub unique_constraints: Vec, @@ -191,6 +192,18 @@ pub struct AcceptedOutputSummary { pub merge_elapsed_ms: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub struct SchemaEvolutionSummary { + pub enabled: bool, + pub mode: String, + pub applied: bool, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub added_columns: Vec, + pub incompatible_changes_detected: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct UniqueConstraintReport { diff --git a/crates/floe-core/src/run/entity/accepted_write.rs b/crates/floe-core/src/run/entity/accepted_write.rs index 803169b..1081ce8 100644 --- a/crates/floe-core/src/run/entity/accepted_write.rs +++ b/crates/floe-core/src/run/entity/accepted_write.rs @@ -8,6 +8,7 @@ use crate::{config, io, report, FloeResult}; use super::super::output::{write_accepted_output, AcceptedOutputContext}; use super::EntityPhaseTimings; +use crate::run::events::{event_time_ms, RunEvent, RunObserver}; use crate::run::RunContext; use io::storage::Target; @@ -34,10 +35,30 @@ pub(super) struct AcceptedWriteReportState { pub(super) target_rows_before: Option, pub(super) target_rows_after: Option, pub(super) merge_elapsed_ms: Option, + pub(super) schema_evolution: io::format::AcceptedSchemaEvolution, pub(super) write_perf: Option, } impl AcceptedWriteReportState { + pub(super) fn for_entity(entity: &config::EntityConfig) -> Self { + Self { + schema_evolution: io::format::AcceptedSchemaEvolution { + enabled: entity.schema.resolved_schema_evolution().mode + == config::SchemaEvolutionMode::AddColumns, + mode: entity + .schema + .resolved_schema_evolution() + .mode + .as_str() + .to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, + ..Self::default() + } + } + pub(super) fn from_write_output(output: io::format::AcceptedWriteOutput) -> Self { Self { parts_written: output.parts_written, @@ -68,6 +89,7 @@ impl AcceptedWriteReportState { target_rows_before: output.merge.as_ref().map(|merge| merge.target_rows_before), target_rows_after: output.merge.as_ref().map(|merge| merge.target_rows_after), merge_elapsed_ms: output.merge.as_ref().map(|merge| merge.merge_elapsed_ms), + schema_evolution: output.schema_evolution, write_perf: output.perf, } } @@ -89,6 +111,7 @@ impl AcceptedWriteReportState { pub(super) struct AcceptedWritePhaseContext<'a> { pub(super) run_context: &'a RunContext, + pub(super) observer: &'a dyn RunObserver, pub(super) runtime: &'a mut dyn crate::runtime::Runtime, pub(super) entity: &'a config::EntityConfig, pub(super) accepted_target: &'a Target, @@ -104,6 +127,7 @@ pub(super) fn run_accepted_write_phase( ) -> FloeResult { let AcceptedWritePhaseContext { run_context, + observer, runtime, entity, accepted_target, @@ -114,7 +138,7 @@ pub(super) fn run_accepted_write_phase( accepted_accum, } = context; - let mut accepted_write_report = AcceptedWriteReportState::default(); + let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity); if accepted_accum.is_empty() { return Ok(accepted_write_report); } @@ -151,6 +175,15 @@ pub(super) fn run_accepted_write_phase( } accepted_write_report = AcceptedWriteReportState::from_write_output(accepted_output); + if accepted_write_report.schema_evolution.applied { + observer.on_event(RunEvent::SchemaEvolutionApplied { + run_id: run_context.run_id.clone(), + entity: entity.name.clone(), + mode: accepted_write_report.schema_evolution.mode.clone(), + added_columns: accepted_write_report.schema_evolution.added_columns.clone(), + ts_ms: event_time_ms(), + }); + } Ok(accepted_write_report) } diff --git a/crates/floe-core/src/run/entity/mod.rs b/crates/floe-core/src/run/entity/mod.rs index cdfe1ac..00bd5b5 100644 --- a/crates/floe-core/src/run/entity/mod.rs +++ b/crates/floe-core/src/run/entity/mod.rs @@ -241,6 +241,7 @@ pub(super) fn run_entity( let accepted_target_uri = accepted_target.target_uri().to_string(); let accepted_write_report = run_accepted_write_phase(AcceptedWritePhaseContext { run_context: context, + observer, runtime, entity, accepted_target: &accepted_target, @@ -289,6 +290,15 @@ pub(super) fn run_entity( accepted_target_rows_before: accepted_write_report.target_rows_before, accepted_target_rows_after: accepted_write_report.target_rows_after, accepted_merge_elapsed_ms: accepted_write_report.merge_elapsed_ms, + accepted_schema_evolution: report::SchemaEvolutionSummary { + enabled: accepted_write_report.schema_evolution.enabled, + mode: accepted_write_report.schema_evolution.mode, + applied: accepted_write_report.schema_evolution.applied, + added_columns: accepted_write_report.schema_evolution.added_columns, + incompatible_changes_detected: accepted_write_report + .schema_evolution + .incompatible_changes_detected, + }, unique_constraints, }); diff --git a/crates/floe-core/src/run/events.rs b/crates/floe-core/src/run/events.rs index 8233294..8570ae9 100644 --- a/crates/floe-core/src/run/events.rs +++ b/crates/floe-core/src/run/events.rs @@ -43,6 +43,13 @@ pub enum RunEvent { elapsed_ms: u64, ts_ms: u128, }, + SchemaEvolutionApplied { + run_id: String, + entity: String, + mode: String, + added_columns: Vec, + ts_ms: u128, + }, EntityFinished { run_id: String, name: String, diff --git a/crates/floe-core/tests/integration/delta_run.rs b/crates/floe-core/tests/integration/delta_run.rs index cb00817..107c5be 100644 --- a/crates/floe-core/tests/integration/delta_run.rs +++ b/crates/floe-core/tests/integration/delta_run.rs @@ -1,11 +1,82 @@ use std::fs; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, OnceLock}; use deltalake::table::builder::DeltaTableBuilder; -use floe_core::{run, validate, RunOptions, ValidateOptions}; +use floe_core::{run, set_observer, validate, RunEvent, RunObserver, RunOptions, ValidateOptions}; use polars::prelude::DataFrame; use url::Url; +#[derive(Default)] +struct TestObserver { + events: Mutex>, +} + +impl TestObserver { + fn reset(&self) { + self.events.lock().expect("observer lock").clear(); + } + + fn events_for_run(&self, run_id: &str) -> Vec { + self.events + .lock() + .expect("observer lock") + .iter() + .filter(|event| match event { + RunEvent::Log { + run_id: event_run_id, + .. + } + | RunEvent::RunStarted { + run_id: event_run_id, + .. + } + | RunEvent::EntityStarted { + run_id: event_run_id, + .. + } + | RunEvent::FileStarted { + run_id: event_run_id, + .. + } + | RunEvent::FileFinished { + run_id: event_run_id, + .. + } + | RunEvent::SchemaEvolutionApplied { + run_id: event_run_id, + .. + } + | RunEvent::EntityFinished { + run_id: event_run_id, + .. + } + | RunEvent::RunFinished { + run_id: event_run_id, + .. + } => event_run_id == run_id, + }) + .cloned() + .collect() + } +} + +impl RunObserver for TestObserver { + fn on_event(&self, event: RunEvent) { + self.events.lock().expect("observer lock").push(event); + } +} + +fn test_observer() -> &'static TestObserver { + static OBSERVER: OnceLock> = OnceLock::new(); + let observer = OBSERVER.get_or_init(|| { + let observer = Arc::new(TestObserver::default()); + let _ = set_observer(observer.clone()); + observer + }); + observer.as_ref() +} + fn write_csv(dir: &Path, name: &str, contents: &str) -> PathBuf { let path = dir.join(name); fs::write(&path, contents).expect("write csv"); @@ -133,7 +204,7 @@ fn local_delta_run_without_partitioning_preserves_existing_behavior() { fs::create_dir_all(&input_dir).expect("create input dir"); write_csv(&input_dir, "orders.csv", "id;country\n1;us\n2;ca\n"); - let yaml = format!( + let initial_yaml = format!( r#"version: "0.1" report: path: "{report_dir}" @@ -159,7 +230,7 @@ entities: input_dir = input_dir.display(), accepted_dir = accepted_dir.display(), ); - let config_path = write_config(root, &yaml); + let config_path = write_config(root, &initial_yaml); let outcome = run( &config_path, @@ -184,6 +255,315 @@ entities: assert!(report.accepted_output.files_written > 0); } +#[test] +fn local_delta_append_add_columns_mode_reports_schema_evolution_and_event() { + let observer = test_observer(); + observer.reset(); + + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/orders_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id\n1\n2\n"); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;email\n3;a@example.com\n4;\n"); + let evolved_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" + - name: "email" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-append".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("schema evolution append run"); + + let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(report.schema_evolution.applied); + assert_eq!( + report.schema_evolution.added_columns, + vec!["email".to_string()] + ); + assert!(!report.schema_evolution.incompatible_changes_detected); + + let events = observer.events_for_run("it-delta-schema-evolution-append"); + let event = events + .iter() + .find_map(|event| match event { + RunEvent::SchemaEvolutionApplied { + entity, + mode, + added_columns, + .. + } => Some((entity, mode, added_columns)), + _ => None, + }) + .expect("schema evolution event"); + assert_eq!(event.0, "orders"); + assert_eq!(event.1, "add_columns"); + assert_eq!(event.2, &vec!["email".to_string()]); + + let url = Url::from_directory_path(&accepted_dir).expect("delta table path url"); + let builder = DeltaTableBuilder::from_url(url) + .expect("delta table builder") + .with_storage_options(std::collections::HashMap::new()); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("delta runtime"); + let table = runtime + .block_on(async move { builder.load().await }) + .expect("load delta table"); + let field_names = table + .snapshot() + .expect("table snapshot") + .schema() + .fields() + .map(|field| field.name.clone()) + .collect::>(); + assert!(field_names.contains(&"email".to_string())); +} + +#[test] +fn local_delta_overwrite_add_columns_mode_reports_noop_when_unchanged() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/orders_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv( + &input_dir, + "orders.csv", + "id;email\n1;a@example.com\n2;b@example.com\n", + ); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "overwrite" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" + - name: "email" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-overwrite-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial overwrite run"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-schema-evolution-overwrite-noop".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("overwrite no-op run"); + + let report = &outcome.entity_outcomes[0].report; + assert!(report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(!report.schema_evolution.applied); + assert!(report.schema_evolution.added_columns.is_empty()); + assert!(!report.schema_evolution.incompatible_changes_detected); +} + +#[test] +fn local_delta_strict_mode_rejects_added_columns() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/orders_delta"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id\n1\n2\n"); + + let initial_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + columns: + - name: "id" + type: "int64" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + let config_path = write_config(root, &initial_yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-strict-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial strict run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;email\n3;a@example.com\n"); + let evolved_yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "orders" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + policy: + severity: "warn" + schema: + columns: + - name: "id" + type: "int64" + - name: "email" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + ); + write_config(root, &evolved_yaml); + + let err = run( + &config_path, + RunOptions { + run_id: Some("it-delta-strict-extra-column".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect_err("strict mode should reject added columns"); + assert!(err.to_string().contains("delta write failed")); +} + #[test] fn local_delta_merge_scd1_upserts_and_reports_metrics() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); diff --git a/crates/floe-core/tests/unit/io/write/delta_write.rs b/crates/floe-core/tests/unit/io/write/delta_write.rs index 876d9b8..2a6bb85 100644 --- a/crates/floe-core/tests/unit/io/write/delta_write.rs +++ b/crates/floe-core/tests/unit/io/write/delta_write.rs @@ -217,6 +217,155 @@ fn delta_append_rejects_nulls_for_non_nullable_columns() -> FloeResult<()> { Ok(()) } +#[test] +fn delta_append_add_columns_mode_evolves_schema() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Append, + vec![column("id", "int64", Some(false))], + None, + ); + let mut initial = df!("id" => &[1i64, 2])?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + entity + .schema + .columns + .push(column("email", "string", Some(true))); + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut evolved = df!( + "id" => &[3i64, 4], + "email" => &[Some("a@example.com"), None] + )?; + write_delta_table( + &mut evolved, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + let runtime = runtime()?; + let table = open_table(&runtime, &table_path)?; + let field_names = table + .snapshot()? + .schema() + .fields() + .map(|field| field.name.clone()) + .collect::>(); + assert!(field_names.contains(&"email".to_string())); + assert_eq!(row_count(&table)?, 4); + + Ok(()) +} + +#[test] +fn delta_overwrite_add_columns_mode_evolves_schema() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Overwrite, + vec![column("id", "int64", Some(false))], + None, + ); + let mut initial = df!("id" => &[1i64, 2])?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Overwrite, + )?; + + entity + .schema + .columns + .push(column("email", "string", Some(true))); + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut evolved = df!( + "id" => &[3i64, 4], + "email" => &[Some("a@example.com"), Some("b@example.com")] + )?; + write_delta_table( + &mut evolved, + &target, + &resolver, + &entity, + config::WriteMode::Overwrite, + )?; + + let runtime = runtime()?; + let table = open_table(&runtime, &table_path)?; + let field_names = table + .snapshot()? + .schema() + .fields() + .map(|field| field.name.clone()) + .collect::>(); + assert!(field_names.contains(&"email".to_string())); + assert_eq!(row_count(&table)?, 2); + + Ok(()) +} + +#[test] +fn delta_add_columns_mode_rejects_incompatible_type_changes() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Append, + vec![column("id", "int64", Some(false))], + None, + ); + let mut initial = df!("id" => &[1i64, 2])?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + entity.schema.columns = vec![column("id", "string", Some(false))]; + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut incompatible = df!("id" => &["3", "4"])?; + let err = write_delta_table( + &mut incompatible, + &target, + &resolver, + &entity, + config::WriteMode::Append, + ) + .expect_err("type change should fail"); + assert!(err + .to_string() + .contains("delta schema evolution failed: incompatible changes detected")); + + Ok(()) +} + #[test] fn delta_write_uses_normalized_schema_names() -> FloeResult<()> { let temp_dir = tempfile::TempDir::new()?; @@ -472,6 +621,13 @@ fn normalize_config(strategy: &str) -> config::NormalizeColumnsConfig { } } +fn add_columns_schema_evolution() -> config::SchemaEvolutionConfig { + config::SchemaEvolutionConfig { + mode: config::SchemaEvolutionMode::AddColumns, + on_incompatible: config::SchemaEvolutionIncompatibleAction::Fail, + } +} + fn runtime() -> FloeResult { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/crates/floe-core/tests/unit/run/report.rs b/crates/floe-core/tests/unit/run/report.rs index c2a320c..c335981 100644 --- a/crates/floe-core/tests/unit/run/report.rs +++ b/crates/floe-core/tests/unit/run/report.rs @@ -71,6 +71,13 @@ fn sample_report() -> RunReport { target_rows_after: None, merge_elapsed_ms: None, }, + schema_evolution: SchemaEvolutionSummary { + enabled: false, + mode: "strict".to_string(), + applied: false, + added_columns: Vec::new(), + incompatible_changes_detected: false, + }, unique_constraints: Vec::new(), results: ResultsTotals { files_total: 1, @@ -170,6 +177,7 @@ fn report_serializes_expected_keys() { assert!(object.contains_key("sink")); assert!(object.contains_key("policy")); assert!(object.contains_key("accepted_output")); + assert!(object.contains_key("schema_evolution")); assert!(object.contains_key("results")); assert!(object.contains_key("files")); } diff --git a/docs/report.md b/docs/report.md index 1c7a8d4..1ecf0f9 100644 --- a/docs/report.md +++ b/docs/report.md @@ -49,6 +49,7 @@ Golden example files live under `example/report/run_2026-01-19T10-23-45Z/`. - `sink`: Accepted/rejected/archive paths and formats. - `policy`: Severity. - `accepted_output`: Entity-level accepted output summary (path, row counts, write metadata, and file metrics). +- `schema_evolution`: Entity-level schema-evolution summary for the accepted write path. - `results`: Totals for files, rows, accepted/rejected rows, warnings, errors. - `files`: Per-file outcomes and validation summary. @@ -89,6 +90,19 @@ Merge-specific metrics (optional, Delta `merge_scd1` / `merge_scd2`): - `target_rows_after` - `merge_elapsed_ms` +### `schema_evolution` (entity report) + +- `enabled`: `true` when schema evolution is active for the entity write path +- `mode`: resolved config mode (`strict` or `add_columns`) +- `applied`: `true` when Floe actually added Delta columns during the write +- `added_columns`: ordered list of Delta columns added in that write +- `incompatible_changes_detected`: `true` when Floe detected non-additive changes and failed the write + +Notes: +- Floe writes an explicit no-op shape (`applied=false`, empty `added_columns`) when + `mode=add_columns` is enabled but the target schema already matches the write schema. +- This block is populated for entity reports even when the write path remains strict. + Notes: - Metrics are populated when the writer can collect them cheaply and reliably. - Delta metrics are derived from committed Delta log `add` actions for the committed version. diff --git a/docs/sinks/delta.md b/docs/sinks/delta.md index 783daac..8777627 100644 --- a/docs/sinks/delta.md +++ b/docs/sinks/delta.md @@ -28,10 +28,20 @@ Semantics: `__floe_is_current`, `__floe_valid_from`, `__floe_valid_to` - Local, S3, ADLS, and GCS storage are supported for delta output. +Schema evolution for standard Delta writes: +- Default behavior remains strict. +- `schema.schema_evolution.mode: add_columns` enables additive-only schema evolution for + accepted Delta `append` and `overwrite` writes. +- Existing columns must remain compatible; incompatible non-additive changes fail before commit. +- When evolution is applied, Floe emits a structured `schema_evolution_applied` event and + populates the entity report `schema_evolution` block with `applied=true` and `added_columns`. +- When evolution is enabled but no Delta schema change is needed, the entity report still + includes `schema_evolution` with `applied=false`. + Merge mode notes (`merge_scd1`, `merge_scd2`): - Requires `schema.primary_key` in config (non-empty, non-nullable columns). - Only supported on Delta accepted sinks. -- Current implementation validates strict schema compatibility; no schema evolution. +- Merge modes remain strict; this task does not change merge schema behavior. - Source duplicates on merge key are handled during row checks before write. - Single-writer assumption applies; Delta commit conflicts surface as write errors. diff --git a/docs/write_modes.md b/docs/write_modes.md index 9ae8a21..9602fd6 100644 --- a/docs/write_modes.md +++ b/docs/write_modes.md @@ -8,6 +8,20 @@ Supported values: - `merge_scd1` (Delta accepted sink only): SCD1 upsert keyed by `schema.primary_key`. - `merge_scd2` (Delta accepted sink only): SCD2 merge keyed by `schema.primary_key`. +## Standard Delta writes (`append`, `overwrite`) + +For accepted Delta sinks, standard write modes stay strict by default. + +- `schema.schema_evolution.mode: strict` preserves the previous behavior: + source and target Delta schemas must already be compatible. +- `schema.schema_evolution.mode: add_columns` enables additive-only Delta schema evolution + for `append` and `overwrite`. + - Only new columns present in the source schema may be added to the target Delta table. + - Existing columns must remain compatible; drop/rename/type-change flows are rejected. + - When enabled and no new columns are present, Floe records a no-op schema-evolution report block. + - When columns are added, Floe emits a structured `schema_evolution_applied` event and + records the added column names in the entity run report. + ## `merge_scd1` semantics - Accepted sink format must be `delta`. @@ -17,7 +31,7 @@ Supported values: - On key match: update non-key columns from source. - `sink.accepted.merge.ignore_columns` can exclude additional business columns from SCD1 update sets. - On missing key: insert source row. -- Current v1 behavior validates strict schema compatibility and does not perform schema evolution. +- Merge modes still validate strict schema compatibility and do not perform schema evolution. - Merge execution uses Delta native merge (`MERGE INTO`) through delta-rs/DataFusion. - Single-writer assumption: Delta commit conflicts are returned as clear write errors. @@ -44,7 +58,7 @@ Supported values: - Change detection columns are resolved as: - `sink.accepted.merge.compare_columns` when configured - otherwise all non-key business columns minus `sink.accepted.merge.ignore_columns` -- Current v1 behavior validates strict schema compatibility and does not perform schema evolution. +- Merge modes still validate strict schema compatibility and do not perform schema evolution. - Single-writer assumption: Delta commit conflicts are returned as clear write errors. ## Rejected output behavior From d86f8746a66fa25f46ce051889276a9c7748e4ff Mon Sep 17 00:00:00 2001 From: "Agent (OpenClaw)" Date: Wed, 11 Mar 2026 12:36:24 +0100 Subject: [PATCH 4/4] feat: implement requested changes --- .../src/run/entity/accepted_write.rs | 20 ++--- .../floe-core/tests/integration/delta_run.rs | 85 +++++++++++++++++++ 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/crates/floe-core/src/run/entity/accepted_write.rs b/crates/floe-core/src/run/entity/accepted_write.rs index 1081ce8..900672d 100644 --- a/crates/floe-core/src/run/entity/accepted_write.rs +++ b/crates/floe-core/src/run/entity/accepted_write.rs @@ -40,17 +40,17 @@ pub(super) struct AcceptedWriteReportState { } impl AcceptedWriteReportState { - pub(super) fn for_entity(entity: &config::EntityConfig) -> Self { + pub(super) fn for_entity(entity: &config::EntityConfig, write_mode: config::WriteMode) -> Self { + let schema_evolution = entity.schema.resolved_schema_evolution(); Self { schema_evolution: io::format::AcceptedSchemaEvolution { - enabled: entity.schema.resolved_schema_evolution().mode - == config::SchemaEvolutionMode::AddColumns, - mode: entity - .schema - .resolved_schema_evolution() - .mode - .as_str() - .to_string(), + enabled: entity.sink.accepted.format == "delta" + && schema_evolution.mode == config::SchemaEvolutionMode::AddColumns + && matches!( + write_mode, + config::WriteMode::Append | config::WriteMode::Overwrite + ), + mode: schema_evolution.mode.as_str().to_string(), applied: false, added_columns: Vec::new(), incompatible_changes_detected: false, @@ -138,7 +138,7 @@ pub(super) fn run_accepted_write_phase( accepted_accum, } = context; - let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity); + let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity, write_mode); if accepted_accum.is_empty() { return Ok(accepted_write_report); } diff --git a/crates/floe-core/tests/integration/delta_run.rs b/crates/floe-core/tests/integration/delta_run.rs index 107c5be..eb24438 100644 --- a/crates/floe-core/tests/integration/delta_run.rs +++ b/crates/floe-core/tests/integration/delta_run.rs @@ -910,6 +910,91 @@ entities: assert!(rows.contains(&("3".to_string(), "us".to_string(), "carol".to_string(), true))); } +#[test] +fn local_delta_merge_add_columns_mode_reports_schema_evolution_disabled_when_all_rows_rejected() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let rejected_dir = root.join("out/rejected/customer_csv"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n2;bob\n"); + + let yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + rejected: + format: "csv" + path: "{rejected_dir}" + policy: + severity: "reject" + schema: + primary_key: ["id"] + schema_evolution: + mode: "add_columns" + columns: + - name: "id" + type: "int64" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + rejected_dir = rejected_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-schema-evolution-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id;name\nx;carol\ny;dave\n"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-schema-evolution-empty".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge run with zero accepted rows"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.rows_total, 2); + assert_eq!(report.results.accepted_total, 0); + assert_eq!(report.results.rejected_total, 2); + assert!(!report.schema_evolution.enabled); + assert_eq!(report.schema_evolution.mode, "add_columns"); + assert!(!report.schema_evolution.applied); + assert!(report.schema_evolution.added_columns.is_empty()); + assert!(!report.schema_evolution.incompatible_changes_detected); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 2); +} + #[test] fn local_delta_merge_scd2_supports_custom_system_column_names() { let temp_dir = tempfile::TempDir::new().expect("temp dir");