diff --git a/crates/floe-core/src/config/validate.rs b/crates/floe-core/src/config/validate.rs index 59eacfa..c2e7fc8 100644 --- a/crates/floe-core/src/config/validate.rs +++ b/crates/floe-core/src/config/validate.rs @@ -831,7 +831,7 @@ fn validate_schema_evolution( && schema_evolution.mode == crate::config::SchemaEvolutionMode::AddColumns { return Err(Box::new(ConfigError(format!( - "entity.name={} schema.schema_evolution.mode={} requires sink.accepted.format=delta", + "entity.name={} schema.schema_evolution.mode={} requires sink.accepted.format=delta (Delta-only additive schema evolution in this phase)", entity.name, schema_evolution.mode.as_str() )))); diff --git a/crates/floe-core/src/io/write/strategy/merge/shared.rs b/crates/floe-core/src/io/write/strategy/merge/shared.rs index 14d964d..97928fd 100644 --- a/crates/floe-core/src/io/write/strategy/merge/shared.rs +++ b/crates/floe-core/src/io/write/strategy/merge/shared.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use arrow::datatypes::FieldRef; +use arrow::datatypes::{DataType, FieldRef}; use arrow::record_batch::RecordBatch; use deltalake::datafusion::datasource::TableProvider; use deltalake::operations::write::SchemaMode; @@ -188,7 +188,7 @@ fn plan_delta_schema_evolution( if !incompatible_changes.is_empty() { return Err(Box::new(RunError(format!( - "entity.name={} delta schema evolution failed: incompatible changes detected: {}", + "entity.name={} delta schema evolution failed: add_columns supports additive changes only; incompatible changes detected: {}", entity.name, incompatible_changes.join("; ") )))); @@ -225,7 +225,7 @@ fn plan_delta_schema_evolution( let partition_columns = snapshot.metadata().partition_columns(); if !partition_columns.is_empty() { return Err(Box::new(RunError(format!( - "entity.name={} delta schema evolution failed: adding columns is unsupported for partitioned delta tables", + "entity.name={} delta schema evolution failed: add_columns is unsupported for partitioned delta tables in this phase", entity.name )))); } @@ -275,7 +275,7 @@ fn incompatible_schema_changes( incompatible.push(format!("missing existing column {}", target_field.name())); continue; }; - if target_field.data_type() != source_field.data_type() { + if !data_types_compatible(target_field.data_type(), source_field.data_type()) { incompatible.push(format!( "column {} type changed from {:?} to {:?}", target_field.name(), @@ -293,6 +293,17 @@ fn incompatible_schema_changes( incompatible } +fn data_types_compatible(target: &DataType, source: &DataType) -> bool { + normalize_data_type(target) == normalize_data_type(source) +} + +fn normalize_data_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Dictionary(_, value_type) => normalize_data_type(value_type.as_ref()), + other => other.clone(), + } +} + pub(crate) fn load_delta_table( runtime: &tokio::runtime::Runtime, target: &Target, diff --git a/crates/floe-core/tests/unit/io/write/delta_write.rs b/crates/floe-core/tests/unit/io/write/delta_write.rs index 2a6bb85..3d3c4da 100644 --- a/crates/floe-core/tests/unit/io/write/delta_write.rs +++ b/crates/floe-core/tests/unit/io/write/delta_write.rs @@ -361,7 +361,66 @@ fn delta_add_columns_mode_rejects_incompatible_type_changes() -> FloeResult<()> .expect_err("type change should fail"); assert!(err .to_string() - .contains("delta schema evolution failed: incompatible changes detected")); + .contains("delta schema evolution failed: add_columns supports additive changes only; incompatible changes detected")); + + Ok(()) +} + +#[test] +fn delta_add_columns_mode_rejects_partitioned_tables() -> FloeResult<()> { + let temp_dir = tempfile::TempDir::new()?; + let table_path = temp_dir.path().join("delta_table"); + let config = empty_root_config(); + let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?; + let target = resolve_local_target(&resolver, &table_path)?; + + let mut entity = build_entity( + &table_path, + config::WriteMode::Append, + vec![ + column("id", "int64", Some(false)), + column("country", "string", Some(true)), + ], + None, + ); + entity.sink.accepted.partition_by = Some(vec!["country".to_string()]); + + let mut initial = df!( + "id" => &[1i64, 2], + "country" => &[Some("fr"), Some("us")] + )?; + write_delta_table( + &mut initial, + &target, + &resolver, + &entity, + config::WriteMode::Append, + )?; + + entity + .schema + .columns + .push(column("email", "string", Some(true))); + entity.schema.schema_evolution = Some(add_columns_schema_evolution()); + let mut evolved = df!( + "id" => &[3i64, 4], + "country" => &[Some("fr"), Some("us")], + "email" => &[Some("a@example.com"), Some("b@example.com")] + )?; + let err = write_delta_table( + &mut evolved, + &target, + &resolver, + &entity, + config::WriteMode::Append, + ) + .expect_err("partitioned delta add_columns should fail"); + + assert!( + err.to_string() + .contains("add_columns is unsupported for partitioned delta tables"), + "unexpected error: {err}" + ); Ok(()) } diff --git a/docs/config.md b/docs/config.md index fad8845..b42d8af 100644 --- a/docs/config.md +++ b/docs/config.md @@ -271,9 +271,11 @@ is available for templating within that entity. - `table_version` / `snapshot_id` in reports are sink-format specific (for example Delta table version, Iceberg metadata version + snapshot ID). - Compaction/optimization/maintenance remains external to Floe (for Parquet/Delta/Iceberg datasets). - Examples: Delta optimize/vacuum, Iceberg compaction/maintenance jobs. - - `schema.schema_evolution` is scaffolding only in this release. - - Validation is version-aware and `mode: add_columns` is Delta-only. - - Accepted sink runtime behavior does not apply schema evolution yet. + - `schema.schema_evolution` is strict by default. + - Runtime support in this phase is Delta-only and additive-only. + - `mode: add_columns` is implemented for Delta `append`, `overwrite`, `merge_scd1`, and `merge_scd2`. + - Unsupported changes such as drops, renames, type changes, and non-nullable-to-nullable drift still fail the write. + - When enabled, Floe audits the outcome in the entity report `schema_evolution` block and emits a `schema_evolution_applied` lifecycle event when columns are added. - `rejected` (required when `policy.severity: reject`) - `format`: `csv` (v0.1). - `path`: output directory for rejected rows. @@ -303,7 +305,11 @@ is available for templating within that entity. - `mode`: `strict` or `add_columns`. - `on_incompatible`: `fail`. - `mode: add_columns` requires `sink.accepted.format: delta`. - - Current scope is config parsing and validation only; accepted-write runtime behavior remains strict. + - Current implementation is additive-only: + - new columns may be appended to existing Delta schemas + - existing columns must remain compatible + - merge-key columns cannot be introduced by evolution during Delta merge modes + - adding columns to already partitioned Delta tables is rejected in this phase - `primary_key` (optional) - Array of schema column names. - Primary key columns are always treated as required (`not_null`) at runtime. diff --git a/docs/logging.md b/docs/logging.md index ce80802..ce0078e 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -22,6 +22,19 @@ Each JSON line includes: The final line on stdout is always the `run_finished` event. +## Event types + +- `run_started`: run metadata (`run_id`, `config`, `report_base`, `ts_ms`) +- `entity_started`: entity name (`name`, `ts_ms`) +- `file_started`: entity/input pair (`entity`, `input`, `ts_ms`) +- `file_finished`: file outcome and counts (`status`, `rows`, `accepted`, `rejected`, `elapsed_ms`, `ts_ms`) +- `schema_evolution_applied`: Delta additive schema-evolution audit event (`entity`, `mode`, `added_columns`, `ts_ms`) +- `entity_finished`: entity totals (`status`, `files`, `rows`, `accepted`, `rejected`, `warnings`, `errors`, `ts_ms`) +- `run_finished`: final run totals and exit code (`status`, `exit_code`, `files`, `rows`, `accepted`, `rejected`, `warnings`, `errors`, `summary_uri`, `ts_ms`) +- `log`: structured warning/error line (`log_level`, `code`, `message`, optional `entity`, optional `input`, `ts_ms`) + +`schema_evolution_applied` is emitted only when Floe actually adds columns to a Delta table. No event is emitted for strict mode, non-Delta sinks, or no-op `add_columns` runs. + ### `--log-format text` - Prints the same lifecycle events as plain text lines. @@ -45,3 +58,8 @@ floe run -c config.yml --log-format json > events.ndjson floe run -c config.yml --log-format json 2> run.log ``` +### Schema evolution event + +```json +{"schema":"floe.log.v1","level":"info","event":"schema_evolution_applied","run_id":"2026-03-11T16-31-00Z","entity":"customer","mode":"add_columns","added_columns":["email"],"ts_ms":1741710660000} +``` diff --git a/docs/report.md b/docs/report.md index 1ecf0f9..7a15064 100644 --- a/docs/report.md +++ b/docs/report.md @@ -99,10 +99,50 @@ Merge-specific metrics (optional, Delta `merge_scd1` / `merge_scd2`): - `incompatible_changes_detected`: `true` when Floe detected non-additive changes and failed the write Notes: +- Current runtime support is Delta-only and additive-only. - Floe writes an explicit no-op shape (`applied=false`, empty `added_columns`) when `mode=add_columns` is enabled but the target schema already matches the write schema. - This block is populated for entity reports even when the write path remains strict. +Example: + +```json +{ + "schema_evolution": { + "enabled": true, + "mode": "add_columns", + "applied": true, + "added_columns": ["email", "country_code"], + "incompatible_changes_detected": false + } +} +``` + +## Lifecycle event for schema evolution + +When Floe adds Delta columns, it emits a structured `schema_evolution_applied` lifecycle event. + +JSON envelope example (`--log-format json`): + +```json +{ + "schema": "floe.log.v1", + "level": "info", + "event": "schema_evolution_applied", + "run_id": "2026-03-11T16-31-00Z", + "entity": "customer", + "mode": "add_columns", + "added_columns": ["email"], + "ts_ms": 1741710660000 +} +``` + +Text example (`--log-format text`): + +```text +schema_evolution_applied entity=customer mode=add_columns added_columns=email +``` + Notes: - Metrics are populated when the writer can collect them cheaply and reliably. - Delta metrics are derived from committed Delta log `add` actions for the committed version. diff --git a/docs/sinks/delta.md b/docs/sinks/delta.md index 8777627..6b16718 100644 --- a/docs/sinks/delta.md +++ b/docs/sinks/delta.md @@ -31,8 +31,10 @@ Semantics: Schema evolution for standard Delta writes: - Default behavior remains strict. - `schema.schema_evolution.mode: add_columns` enables additive-only schema evolution for - accepted Delta `append` and `overwrite` writes. + accepted Delta `append`, `overwrite`, `merge_scd1`, and `merge_scd2` writes. - Existing columns must remain compatible; incompatible non-additive changes fail before commit. +- Unsupported changes in this phase include drops, renames, type changes, nullable widening of existing non-nullable columns, and additive merge-key changes. +- Adding columns to already partitioned Delta tables is rejected in this phase. - When evolution is applied, Floe emits a structured `schema_evolution_applied` event and populates the entity report `schema_evolution` block with `applied=true` and `added_columns`. - When evolution is enabled but no Delta schema change is needed, the entity report still @@ -41,7 +43,11 @@ Schema evolution for standard Delta writes: Merge mode notes (`merge_scd1`, `merge_scd2`): - Requires `schema.primary_key` in config (non-empty, non-nullable columns). - Only supported on Delta accepted sinks. -- Merge modes remain strict; this task does not change merge schema behavior. +- Strict remains the default behavior. +- `schema.schema_evolution.mode: add_columns` enables additive-only evolution for merge modes too. + - New non-key business columns may be added to the target Delta table before merge. + - Merge-key columns cannot be introduced by evolution. + - SCD2 system columns stay managed by Floe and are not treated as user-additive columns. - Source duplicates on merge key are handled during row checks before write. - Single-writer assumption applies; Delta commit conflicts surface as write errors. diff --git a/docs/support-matrix.md b/docs/support-matrix.md index 9f80f84..13fade6 100644 --- a/docs/support-matrix.md +++ b/docs/support-matrix.md @@ -35,7 +35,7 @@ Notes: | Output | Local | S3 | ADLS | GCS | Notes | |---|---|---|---|---|---| | Accepted: Parquet | ✅ | ✅ (temp) | ✅ (temp) | ✅ (temp) | Writes `part-*.parquet` (overwrite: sequential parts, append: UUID parts) | -| Accepted: Delta | ✅ | ✅ (object_store) | ✅ (object_store) | ✅ (object_store) | Transactional `_delta_log` | +| Accepted: Delta | ✅ | ✅ (object_store) | ✅ (object_store) | ✅ (object_store) | Transactional `_delta_log`; additive schema evolution for Delta only | | Accepted: Iceberg | ✅ | ✅ (filesystem catalog or Glue catalog over object_store) | ❌ | ✅ (filesystem catalog over object_store) | `metadata/` + `data/`; append/overwrite; partition spec runtime supported; no schema evolution/GC | | Rejected: CSV | ✅ | ✅ (temp) | ✅ (temp) | ✅ (temp) | Dataset parts `part-*.csv` | | Reports: JSON | ✅ | ✅ (temp) | ✅ (temp) | ✅ (temp) | Uploaded via temp file | @@ -44,6 +44,9 @@ Notes: - Parquet outputs to cloud are written locally then uploaded. - Delta outputs to cloud are **direct** via object_store (no temp upload). - Delta `partition_by` is runtime-supported on local/S3/ADLS/GCS accepted sinks. +- Delta schema evolution is currently Delta-only and additive-only (`schema.schema_evolution.mode=add_columns`). +- Delta additive evolution is implemented for `append`, `overwrite`, `merge_scd1`, and `merge_scd2`. +- Non-additive schema changes and additive changes against partitioned Delta tables fail fast. - Delta accepted-output report metrics use committed Delta log `add` actions; remote metrics are best-effort and remain nullable on post-write collection failure. - Iceberg on S3 supports filesystem-catalog semantics and AWS Glue catalog registration (S3 data location). - Iceberg `partition_spec` is runtime-supported (validated subset: `identity`, `year`, `month`, `day`, `hour`). diff --git a/docs/write_modes.md b/docs/write_modes.md index 9602fd6..bb2248c 100644 --- a/docs/write_modes.md +++ b/docs/write_modes.md @@ -21,6 +21,7 @@ For accepted Delta sinks, standard write modes stay strict by default. - When enabled and no new columns are present, Floe records a no-op schema-evolution report block. - When columns are added, Floe emits a structured `schema_evolution_applied` event and records the added column names in the entity run report. + - Adding columns to already partitioned Delta tables is rejected in this phase. ## `merge_scd1` semantics @@ -31,7 +32,11 @@ For accepted Delta sinks, standard write modes stay strict by default. - On key match: update non-key columns from source. - `sink.accepted.merge.ignore_columns` can exclude additional business columns from SCD1 update sets. - On missing key: insert source row. -- Merge modes still validate strict schema compatibility and do not perform schema evolution. +- Strict remains the default behavior. +- `schema.schema_evolution.mode: add_columns` enables additive-only Delta schema evolution for merge writes. + - New non-key business columns may be added before the merge commit. + - Existing columns must remain compatible; drop/rename/type-change flows are rejected. + - Merge-key columns cannot be introduced by schema evolution. - Merge execution uses Delta native merge (`MERGE INTO`) through delta-rs/DataFusion. - Single-writer assumption: Delta commit conflicts are returned as clear write errors. @@ -58,7 +63,11 @@ For accepted Delta sinks, standard write modes stay strict by default. - Change detection columns are resolved as: - `sink.accepted.merge.compare_columns` when configured - otherwise all non-key business columns minus `sink.accepted.merge.ignore_columns` -- Merge modes still validate strict schema compatibility and do not perform schema evolution. +- Strict remains the default behavior. +- `schema.schema_evolution.mode: add_columns` enables additive-only Delta schema evolution for merge writes. + - New business columns may be added while Floe preserves SCD2 system columns. + - Existing columns must remain compatible; drop/rename/type-change flows are rejected. + - Merge-key columns cannot be introduced by schema evolution. - Single-writer assumption: Delta commit conflicts are returned as clear write errors. ## Rejected output behavior