Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/floe-core/src/config/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ fn validate_schema_evolution(
&& schema_evolution.mode == crate::config::SchemaEvolutionMode::AddColumns
{
return Err(Box::new(ConfigError(format!(
"entity.name={} schema.schema_evolution.mode={} requires sink.accepted.format=delta",
"entity.name={} schema.schema_evolution.mode={} requires sink.accepted.format=delta (Delta-only additive schema evolution in this phase)",
entity.name,
schema_evolution.mode.as_str()
))));
Expand Down
19 changes: 15 additions & 4 deletions crates/floe-core/src/io/write/strategy/merge/shared.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Instant;

use arrow::datatypes::FieldRef;
use arrow::datatypes::{DataType, FieldRef};
use arrow::record_batch::RecordBatch;
use deltalake::datafusion::datasource::TableProvider;
use deltalake::operations::write::SchemaMode;
Expand Down Expand Up @@ -188,7 +188,7 @@ fn plan_delta_schema_evolution(

if !incompatible_changes.is_empty() {
return Err(Box::new(RunError(format!(
"entity.name={} delta schema evolution failed: incompatible changes detected: {}",
"entity.name={} delta schema evolution failed: add_columns supports additive changes only; incompatible changes detected: {}",
entity.name,
incompatible_changes.join("; ")
))));
Expand Down Expand Up @@ -225,7 +225,7 @@ fn plan_delta_schema_evolution(
let partition_columns = snapshot.metadata().partition_columns();
if !partition_columns.is_empty() {
return Err(Box::new(RunError(format!(
"entity.name={} delta schema evolution failed: adding columns is unsupported for partitioned delta tables",
"entity.name={} delta schema evolution failed: add_columns is unsupported for partitioned delta tables in this phase",
entity.name
))));
}
Expand Down Expand Up @@ -275,7 +275,7 @@ fn incompatible_schema_changes(
incompatible.push(format!("missing existing column {}", target_field.name()));
continue;
};
if target_field.data_type() != source_field.data_type() {
if !data_types_compatible(target_field.data_type(), source_field.data_type()) {
incompatible.push(format!(
"column {} type changed from {:?} to {:?}",
target_field.name(),
Expand All @@ -293,6 +293,17 @@ fn incompatible_schema_changes(
incompatible
}

fn data_types_compatible(target: &DataType, source: &DataType) -> bool {
normalize_data_type(target) == normalize_data_type(source)
}

fn normalize_data_type(data_type: &DataType) -> DataType {
match data_type {
DataType::Dictionary(_, value_type) => normalize_data_type(value_type.as_ref()),
other => other.clone(),
}
}

pub(crate) fn load_delta_table(
runtime: &tokio::runtime::Runtime,
target: &Target,
Expand Down
61 changes: 60 additions & 1 deletion crates/floe-core/tests/unit/io/write/delta_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,66 @@ fn delta_add_columns_mode_rejects_incompatible_type_changes() -> FloeResult<()>
.expect_err("type change should fail");
assert!(err
.to_string()
.contains("delta schema evolution failed: incompatible changes detected"));
.contains("delta schema evolution failed: add_columns supports additive changes only; incompatible changes detected"));

Ok(())
}

#[test]
fn delta_add_columns_mode_rejects_partitioned_tables() -> FloeResult<()> {
let temp_dir = tempfile::TempDir::new()?;
let table_path = temp_dir.path().join("delta_table");
let config = empty_root_config();
let resolver = config::StorageResolver::from_path(&config, temp_dir.path())?;
let target = resolve_local_target(&resolver, &table_path)?;

let mut entity = build_entity(
&table_path,
config::WriteMode::Append,
vec![
column("id", "int64", Some(false)),
column("country", "string", Some(true)),
],
None,
);
entity.sink.accepted.partition_by = Some(vec!["country".to_string()]);

let mut initial = df!(
"id" => &[1i64, 2],
"country" => &[Some("fr"), Some("us")]
)?;
write_delta_table(
&mut initial,
&target,
&resolver,
&entity,
config::WriteMode::Append,
)?;

entity
.schema
.columns
.push(column("email", "string", Some(true)));
entity.schema.schema_evolution = Some(add_columns_schema_evolution());
let mut evolved = df!(
"id" => &[3i64, 4],
"country" => &[Some("fr"), Some("us")],
"email" => &[Some("a@example.com"), Some("b@example.com")]
)?;
let err = write_delta_table(
&mut evolved,
&target,
&resolver,
&entity,
config::WriteMode::Append,
)
.expect_err("partitioned delta add_columns should fail");

assert!(
err.to_string()
.contains("add_columns is unsupported for partitioned delta tables"),
"unexpected error: {err}"
);

Ok(())
}
Expand Down
14 changes: 10 additions & 4 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,11 @@ is available for templating within that entity.
- `table_version` / `snapshot_id` in reports are sink-format specific (for example Delta table version, Iceberg metadata version + snapshot ID).
- Compaction/optimization/maintenance remains external to Floe (for Parquet/Delta/Iceberg datasets).
- Examples: Delta optimize/vacuum, Iceberg compaction/maintenance jobs.
- `schema.schema_evolution` is scaffolding only in this release.
- Validation is version-aware and `mode: add_columns` is Delta-only.
- Accepted sink runtime behavior does not apply schema evolution yet.
- `schema.schema_evolution` is strict by default.
- Runtime support in this phase is Delta-only and additive-only.
- `mode: add_columns` is implemented for Delta `append`, `overwrite`, `merge_scd1`, and `merge_scd2`.
- Unsupported changes such as drops, renames, type changes, and non-nullable-to-nullable drift still fail the write.
- When enabled, Floe audits the outcome in the entity report `schema_evolution` block and emits a `schema_evolution_applied` lifecycle event when columns are added.
- `rejected` (required when `policy.severity: reject`)
- `format`: `csv` (v0.1).
- `path`: output directory for rejected rows.
Expand Down Expand Up @@ -303,7 +305,11 @@ is available for templating within that entity.
- `mode`: `strict` or `add_columns`.
- `on_incompatible`: `fail`.
- `mode: add_columns` requires `sink.accepted.format: delta`.
- Current scope is config parsing and validation only; accepted-write runtime behavior remains strict.
- Current implementation is additive-only:
- new columns may be appended to existing Delta schemas
- existing columns must remain compatible
- merge-key columns cannot be introduced by evolution during Delta merge modes
- adding columns to already partitioned Delta tables is rejected in this phase
- `primary_key` (optional)
- Array of schema column names.
- Primary key columns are always treated as required (`not_null`) at runtime.
Expand Down
18 changes: 18 additions & 0 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ Each JSON line includes:

The final line on stdout is always the `run_finished` event.

## Event types

- `run_started`: run metadata (`run_id`, `config`, `report_base`, `ts_ms`)
- `entity_started`: entity name (`name`, `ts_ms`)
- `file_started`: entity/input pair (`entity`, `input`, `ts_ms`)
- `file_finished`: file outcome and counts (`status`, `rows`, `accepted`, `rejected`, `elapsed_ms`, `ts_ms`)
- `schema_evolution_applied`: Delta additive schema-evolution audit event (`entity`, `mode`, `added_columns`, `ts_ms`)
- `entity_finished`: entity totals (`status`, `files`, `rows`, `accepted`, `rejected`, `warnings`, `errors`, `ts_ms`)
- `run_finished`: final run totals and exit code (`status`, `exit_code`, `files`, `rows`, `accepted`, `rejected`, `warnings`, `errors`, `summary_uri`, `ts_ms`)
- `log`: structured warning/error line (`log_level`, `code`, `message`, optional `entity`, optional `input`, `ts_ms`)

`schema_evolution_applied` is emitted only when Floe actually adds columns to a Delta table. No event is emitted for strict mode, non-Delta sinks, or no-op `add_columns` runs.

### `--log-format text`

- Prints the same lifecycle events as plain text lines.
Expand All @@ -45,3 +58,8 @@ floe run -c config.yml --log-format json > events.ndjson
floe run -c config.yml --log-format json 2> run.log
```

### Schema evolution event

```json
{"schema":"floe.log.v1","level":"info","event":"schema_evolution_applied","run_id":"2026-03-11T16-31-00Z","entity":"customer","mode":"add_columns","added_columns":["email"],"ts_ms":1741710660000}
```
40 changes: 40 additions & 0 deletions docs/report.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,50 @@ Merge-specific metrics (optional, Delta `merge_scd1` / `merge_scd2`):
- `incompatible_changes_detected`: `true` when Floe detected non-additive changes and failed the write

Notes:
- Current runtime support is Delta-only and additive-only.
- Floe writes an explicit no-op shape (`applied=false`, empty `added_columns`) when
`mode=add_columns` is enabled but the target schema already matches the write schema.
- This block is populated for entity reports even when the write path remains strict.

Example:

```json
{
"schema_evolution": {
"enabled": true,
"mode": "add_columns",
"applied": true,
"added_columns": ["email", "country_code"],
"incompatible_changes_detected": false
}
}
```

## Lifecycle event for schema evolution

When Floe adds Delta columns, it emits a structured `schema_evolution_applied` lifecycle event.

JSON envelope example (`--log-format json`):

```json
{
"schema": "floe.log.v1",
"level": "info",
"event": "schema_evolution_applied",
"run_id": "2026-03-11T16-31-00Z",
"entity": "customer",
"mode": "add_columns",
"added_columns": ["email"],
"ts_ms": 1741710660000
}
```

Text example (`--log-format text`):

```text
schema_evolution_applied entity=customer mode=add_columns added_columns=email
```

Notes:
- Metrics are populated when the writer can collect them cheaply and reliably.
- Delta metrics are derived from committed Delta log `add` actions for the committed version.
Expand Down
10 changes: 8 additions & 2 deletions docs/sinks/delta.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ Semantics:
Schema evolution for standard Delta writes:
- Default behavior remains strict.
- `schema.schema_evolution.mode: add_columns` enables additive-only schema evolution for
accepted Delta `append` and `overwrite` writes.
accepted Delta `append`, `overwrite`, `merge_scd1`, and `merge_scd2` writes.
- Existing columns must remain compatible; incompatible non-additive changes fail before commit.
- Unsupported changes in this phase include drops, renames, type changes, nullable widening of existing non-nullable columns, and additive merge-key changes.
- Adding columns to already partitioned Delta tables is rejected in this phase.
- When evolution is applied, Floe emits a structured `schema_evolution_applied` event and
populates the entity report `schema_evolution` block with `applied=true` and `added_columns`.
- When evolution is enabled but no Delta schema change is needed, the entity report still
Expand All @@ -41,7 +43,11 @@ Schema evolution for standard Delta writes:
Merge mode notes (`merge_scd1`, `merge_scd2`):
- Requires `schema.primary_key` in config (non-empty, non-nullable columns).
- Only supported on Delta accepted sinks.
- Merge modes remain strict; this task does not change merge schema behavior.
- Strict remains the default behavior.
- `schema.schema_evolution.mode: add_columns` enables additive-only evolution for merge modes too.
- New non-key business columns may be added to the target Delta table before merge.
- Merge-key columns cannot be introduced by evolution.
- SCD2 system columns stay managed by Floe and are not treated as user-additive columns.
- Source duplicates on merge key are handled during row checks before write.
- Single-writer assumption applies; Delta commit conflicts surface as write errors.

Expand Down
5 changes: 4 additions & 1 deletion docs/support-matrix.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Notes:
| Output | Local | S3 | ADLS | GCS | Notes |
|---|---|---|---|---|---|
| Accepted: Parquet | ✅ | ✅ (temp) | ✅ (temp) | ✅ (temp) | Writes `part-*.parquet` (overwrite: sequential parts, append: UUID parts) |
| Accepted: Delta | ✅ | ✅ (object_store) | ✅ (object_store) | ✅ (object_store) | Transactional `_delta_log` |
| Accepted: Delta | ✅ | ✅ (object_store) | ✅ (object_store) | ✅ (object_store) | Transactional `_delta_log`; additive schema evolution for Delta only |
| Accepted: Iceberg | ✅ | ✅ (filesystem catalog or Glue catalog over object_store) | ❌ | ✅ (filesystem catalog over object_store) | `metadata/` + `data/`; append/overwrite; partition spec runtime supported; no schema evolution/GC |
| Rejected: CSV | ✅ | ✅ (temp) | ✅ (temp) | ✅ (temp) | Dataset parts `part-*.csv` |
| Reports: JSON | ✅ | ✅ (temp) | ✅ (temp) | ✅ (temp) | Uploaded via temp file |
Expand All @@ -44,6 +44,9 @@ Notes:
- Parquet outputs to cloud are written locally then uploaded.
- Delta outputs to cloud are **direct** via object_store (no temp upload).
- Delta `partition_by` is runtime-supported on local/S3/ADLS/GCS accepted sinks.
- Delta schema evolution is currently Delta-only and additive-only (`schema.schema_evolution.mode=add_columns`).
- Delta additive evolution is implemented for `append`, `overwrite`, `merge_scd1`, and `merge_scd2`.
- Non-additive schema changes and additive changes against partitioned Delta tables fail fast.
- Delta accepted-output report metrics use committed Delta log `add` actions; remote metrics are best-effort and remain nullable on post-write collection failure.
- Iceberg on S3 supports filesystem-catalog semantics and AWS Glue catalog registration (S3 data location).
- Iceberg `partition_spec` is runtime-supported (validated subset: `identity`, `year`, `month`, `day`, `hour`).
Expand Down
13 changes: 11 additions & 2 deletions docs/write_modes.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ For accepted Delta sinks, standard write modes stay strict by default.
- When enabled and no new columns are present, Floe records a no-op schema-evolution report block.
- When columns are added, Floe emits a structured `schema_evolution_applied` event and
records the added column names in the entity run report.
- Adding columns to already partitioned Delta tables is rejected in this phase.

## `merge_scd1` semantics

Expand All @@ -31,7 +32,11 @@ For accepted Delta sinks, standard write modes stay strict by default.
- On key match: update non-key columns from source.
- `sink.accepted.merge.ignore_columns` can exclude additional business columns from SCD1 update sets.
- On missing key: insert source row.
- Merge modes still validate strict schema compatibility and do not perform schema evolution.
- Strict remains the default behavior.
- `schema.schema_evolution.mode: add_columns` enables additive-only Delta schema evolution for merge writes.
- New non-key business columns may be added before the merge commit.
- Existing columns must remain compatible; drop/rename/type-change flows are rejected.
- Merge-key columns cannot be introduced by schema evolution.
- Merge execution uses Delta native merge (`MERGE INTO`) through delta-rs/DataFusion.
- Single-writer assumption: Delta commit conflicts are returned as clear write errors.

Expand All @@ -58,7 +63,11 @@ For accepted Delta sinks, standard write modes stay strict by default.
- Change detection columns are resolved as:
- `sink.accepted.merge.compare_columns` when configured
- otherwise all non-key business columns minus `sink.accepted.merge.ignore_columns`
- Merge modes still validate strict schema compatibility and do not perform schema evolution.
- Strict remains the default behavior.
- `schema.schema_evolution.mode: add_columns` enables additive-only Delta schema evolution for merge writes.
- New business columns may be added while Floe preserves SCD2 system columns.
- Existing columns must remain compatible; drop/rename/type-change flows are rejected.
- Merge-key columns cannot be introduced by schema evolution.
- Single-writer assumption: Delta commit conflicts are returned as clear write errors.

## Rejected output behavior
Expand Down
Loading