Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/floe-cli/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ fn level_for_event(event: &RunEvent) -> Level {
},
RunEvent::RunStarted { .. }
| RunEvent::EntityStarted { .. }
| RunEvent::FileStarted { .. } => Level::Info,
| RunEvent::FileStarted { .. }
| RunEvent::SchemaEvolutionApplied { .. } => Level::Info,
RunEvent::FileFinished { status, .. } => match status.as_str() {
"success" => Level::Info,
"rejected" => Level::Warn,
Expand Down Expand Up @@ -233,6 +234,17 @@ pub fn format_event_text(event: &RunEvent) -> String {
"entity_finished name={} status={} files={} rows={} accepted={} rejected={} warnings={} errors={}",
name, status, files, rows, accepted, rejected, warnings, errors
),
RunEvent::SchemaEvolutionApplied {
entity,
mode,
added_columns,
..
} => format!(
"schema_evolution_applied entity={} mode={} added_columns={}",
entity,
mode,
added_columns.join(",")
),
RunEvent::RunFinished {
status,
exit_code,
Expand Down
7 changes: 7 additions & 0 deletions crates/floe-cli/tests/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ fn sample_outcome() -> RunOutcome {
target_rows_after: None,
merge_elapsed_ms: None,
},
schema_evolution: report::SchemaEvolutionSummary {
enabled: false,
mode: "strict".to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
unique_constraints: Vec::new(),
results: report::ResultsTotals {
files_total: 1,
Expand Down
10 changes: 10 additions & 0 deletions crates/floe-core/src/io/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ pub struct AcceptedWriteMetrics {
pub small_files_count: Option<u64>,
}

#[derive(Debug, Clone, Default)]
pub struct AcceptedSchemaEvolution {
pub enabled: bool,
pub mode: String,
pub applied: bool,
pub added_columns: Vec<String>,
pub incompatible_changes_detected: bool,
}

#[derive(Debug, Clone, Default)]
pub struct AcceptedWritePerfBreakdown {
pub conversion_ms: Option<u64>,
Expand Down Expand Up @@ -76,6 +85,7 @@ pub struct AcceptedWriteOutput {
pub iceberg_table: Option<String>,
pub metrics: AcceptedWriteMetrics,
pub merge: Option<AcceptedMergeMetrics>,
pub schema_evolution: AcceptedSchemaEvolution,
pub perf: Option<AcceptedWritePerfBreakdown>,
}

Expand Down
30 changes: 29 additions & 1 deletion crates/floe-core/src/io/write/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct DeltaWriteResult {
part_files: Vec<String>,
metrics: AcceptedWriteMetrics,
merge: Option<AcceptedMergeMetrics>,
schema_evolution: crate::io::format::AcceptedSchemaEvolution,
perf: AcceptedWritePerfBreakdown,
}

Expand Down Expand Up @@ -68,7 +69,7 @@ fn write_delta_table_with_metrics(
.enable_all()
.build()
.map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?;
let (version, merge, mut perf_breakdown) = match mode {
let (version, merge, schema_evolution, mut perf_breakdown) = match mode {
config::WriteMode::Overwrite | config::WriteMode::Append => {
let outcome = shared::write_standard_delta_version_with_perf(
&runtime,
Expand All @@ -83,6 +84,7 @@ fn write_delta_table_with_metrics(
(
outcome.version,
None,
outcome.schema_evolution,
AcceptedWritePerfBreakdown {
conversion_ms: Some(outcome.perf.conversion_ms),
commit_ms: Some(outcome.perf.commit_ms),
Expand All @@ -103,6 +105,18 @@ fn write_delta_table_with_metrics(
(
version,
Some(merge),
crate::io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
AcceptedWritePerfBreakdown {
conversion_ms: Some(perf.conversion_ms),
source_df_build_ms: Some(perf.source_df_build_ms),
Expand All @@ -125,6 +139,18 @@ fn write_delta_table_with_metrics(
(
version,
Some(merge),
crate::io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
AcceptedWritePerfBreakdown {
conversion_ms: Some(perf.conversion_ms),
source_df_build_ms: Some(perf.source_df_build_ms),
Expand Down Expand Up @@ -153,6 +179,7 @@ fn write_delta_table_with_metrics(
part_files,
metrics,
merge,
schema_evolution,
perf: perf_breakdown,
})
}
Expand Down Expand Up @@ -184,6 +211,7 @@ impl AcceptedSinkAdapter for DeltaAcceptedAdapter {
iceberg_table: None,
metrics: result.metrics,
merge: result.merge,
schema_evolution: result.schema_evolution,
perf: Some(result.perf),
})
}
Expand Down
12 changes: 12 additions & 0 deletions crates/floe-core/src/io/write/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ fn write_iceberg_table_with_remote_context(
iceberg_table: result.iceberg_table,
metrics: result.metrics,
merge: None,
schema_evolution: crate::io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
perf: Some(result.perf),
})
}
Expand Down
12 changes: 12 additions & 0 deletions crates/floe-core/src/io/write/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ impl AcceptedSinkAdapter for ParquetAcceptedAdapter {
iceberg_table: None,
metrics,
merge: None,
schema_evolution: io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
perf: None,
})
}
Expand Down
1 change: 1 addition & 0 deletions crates/floe-core/src/io/write/strategy/merge/scd2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl MergeBackend for DeltaMergeBackend {
SaveMode::Append,
ctx.partition_by.clone(),
ctx.target_file_size_bytes,
None,
)?;
perf.commit_ms = commit_start.elapsed().as_millis() as u64;
return Ok((
Expand Down
Loading
Loading