Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 4 additions & 26 deletions crates/floe-core/src/io/write/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn write_delta_table_with_metrics(
)
}
config::WriteMode::MergeScd1 => {
let (version, merge, perf) = scd1::execute_merge_scd1_with_runtime(
let (version, merge, schema_evolution, perf) = scd1::execute_merge_scd1_with_runtime(
&runtime,
df,
target,
Expand All @@ -105,18 +105,7 @@ fn write_delta_table_with_metrics(
(
version,
Some(merge),
crate::io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
schema_evolution,
AcceptedWritePerfBreakdown {
conversion_ms: Some(perf.conversion_ms),
source_df_build_ms: Some(perf.source_df_build_ms),
Expand All @@ -127,7 +116,7 @@ fn write_delta_table_with_metrics(
)
}
config::WriteMode::MergeScd2 => {
let (version, merge, perf) = scd2::execute_merge_scd2_with_runtime(
let (version, merge, schema_evolution, perf) = scd2::execute_merge_scd2_with_runtime(
&runtime,
df,
target,
Expand All @@ -139,18 +128,7 @@ fn write_delta_table_with_metrics(
(
version,
Some(merge),
crate::io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
schema_evolution,
AcceptedWritePerfBreakdown {
conversion_ms: Some(perf.conversion_ms),
source_df_build_ms: Some(perf.source_df_build_ms),
Expand Down
14 changes: 12 additions & 2 deletions crates/floe-core/src/io/write/strategy/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@ pub(crate) trait MergeBackend {
&self,
source_df: &mut DataFrame,
ctx: &MergeExecutionContext<'_>,
) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)>;
) -> FloeResult<(
i64,
AcceptedMergeMetrics,
crate::io::format::AcceptedSchemaEvolution,
shared::DeltaMergePerfBreakdown,
)>;

fn execute_scd2(
&self,
_source_df: &mut DataFrame,
_ctx: &MergeExecutionContext<'_>,
) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> {
) -> FloeResult<(
i64,
AcceptedMergeMetrics,
crate::io::format::AcceptedSchemaEvolution,
shared::DeltaMergePerfBreakdown,
)> {
Err(Box::new(RunError(
"write_mode=merge_scd2 is not implemented for this backend".to_string(),
)))
Expand Down
60 changes: 37 additions & 23 deletions crates/floe-core/src/io/write/strategy/merge/scd1.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use deltalake::table::builder::DeltaTableBuilder;
use polars::prelude::DataFrame;
use std::collections::HashSet;
use std::time::Instant;

use crate::errors::RunError;
use crate::io::format::AcceptedMergeMetrics;
use crate::io::storage::{object_store, Target};
use crate::io::storage::Target;
use crate::{config, FloeResult};

use super::{shared, MergeBackend, MergeExecutionContext};
Expand All @@ -20,7 +19,12 @@ pub(crate) fn execute_merge_scd1_with_runtime(
entity: &config::EntityConfig,
partition_by: Option<Vec<String>>,
target_file_size_bytes: Option<usize>,
) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> {
) -> FloeResult<(
i64,
AcceptedMergeMetrics,
crate::io::format::AcceptedSchemaEvolution,
shared::DeltaMergePerfBreakdown,
)> {
let ctx = MergeExecutionContext {
runtime,
target,
Expand All @@ -37,26 +41,19 @@ impl MergeBackend for DeltaMergeBackend {
&self,
source_df: &mut DataFrame,
ctx: &MergeExecutionContext<'_>,
) -> FloeResult<(i64, AcceptedMergeMetrics, shared::DeltaMergePerfBreakdown)> {
) -> FloeResult<(
i64,
AcceptedMergeMetrics,
crate::io::format::AcceptedSchemaEvolution,
shared::DeltaMergePerfBreakdown,
)> {
let merge_start = Instant::now();
let mut perf = shared::DeltaMergePerfBreakdown::default();
let merge_key = shared::resolve_merge_key(ctx.entity)?;
// Merge-key uniqueness is enforced upstream by check::UniqueTracker for merge_scd1,
// so the writer can stay focused on sink-level merge execution.
let store = object_store::delta_store_config(ctx.target, ctx.resolver, ctx.entity)?;
let table_url = store.table_url;
let storage_options = store.storage_options;
let builder = DeltaTableBuilder::from_url(table_url.clone())
.map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))?
.with_storage_options(storage_options.clone());
let loaded_table = ctx
.runtime
.block_on(async move { builder.load().await })
.map(Some)
.or_else(|err| match err {
deltalake::DeltaTableError::NotATable(_) => Ok(None),
other => Err(Box::new(RunError(format!("delta load failed: {other}")))),
})?;
let loaded_table =
shared::load_delta_table(ctx.runtime, ctx.target, ctx.resolver, ctx.entity)?;
if loaded_table.is_none() {
let outcome = shared::write_standard_delta_version_with_perf(
ctx.runtime,
Expand All @@ -82,20 +79,31 @@ impl MergeBackend for DeltaMergeBackend {
target_rows_after: source_df.height() as u64,
merge_elapsed_ms: merge_start.elapsed().as_millis() as u64,
},
outcome.schema_evolution,
perf,
));
}

let table = loaded_table.expect("checked is_some");
let conversion_start = Instant::now();
let source_batch = shared::source_record_batch(source_df, ctx.entity)?;
perf.conversion_ms = conversion_start.elapsed().as_millis() as u64;
let schema_evolution = shared::plan_merge_delta_schema_evolution(
ctx.runtime,
&source_batch,
ctx.target,
ctx.resolver,
ctx.entity,
config::WriteMode::MergeScd1,
&[],
)?;
let target_schema_columns = shared::delta_schema_columns(&table)?;
shared::validate_merge_schema_compatibility(
&target_schema_columns,
source_df,
&ctx.entity.name,
schema_evolution.merge_schema,
)?;
let conversion_start = Instant::now();
let source_batch = shared::source_record_batch(source_df, ctx.entity)?;
perf.conversion_ms = conversion_start.elapsed().as_millis() as u64;
let source_df_build_start = Instant::now();
let source = shared::source_as_datafusion_df_from_batch(source_batch, &ctx.entity.name)?;
perf.source_df_build_ms = source_df_build_start.elapsed().as_millis() as u64;
Expand All @@ -119,7 +127,8 @@ impl MergeBackend for DeltaMergeBackend {
let mut merge = table
.merge(source, predicate)
.with_source_alias("source")
.with_target_alias("target");
.with_target_alias("target")
.with_merge_schema(schema_evolution.merge_schema);
if !update_columns.is_empty() {
let update_cols = update_columns.clone();
merge = merge.when_matched_update(|update| {
Expand Down Expand Up @@ -155,6 +164,11 @@ impl MergeBackend for DeltaMergeBackend {
&merge_metrics,
merge_start.elapsed().as_millis() as u64,
);
Ok((version, accepted_merge_metrics, perf))
Ok((
version,
accepted_merge_metrics,
schema_evolution.summary,
perf,
))
}
}
Loading
Loading