diff --git a/crates/floe-cli/tests/output.rs b/crates/floe-cli/tests/output.rs index 08e0861..5e83035 100644 --- a/crates/floe-cli/tests/output.rs +++ b/crates/floe-cli/tests/output.rs @@ -49,7 +49,7 @@ fn sample_outcome() -> RunOutcome { table_root_uri: Some("/tmp/out/accepted".to_string()), write_mode: Some("overwrite".to_string()), accepted_rows: 8, - files_written: 1, + files_written: Some(1), parts_written: 1, part_files: vec!["part-00000.parquet".to_string()], table_version: None, diff --git a/crates/floe-core/src/io/format.rs b/crates/floe-core/src/io/format.rs index e6b734a..b80fabe 100644 --- a/crates/floe-core/src/io/format.rs +++ b/crates/floe-core/src/io/format.rs @@ -73,7 +73,7 @@ pub struct AcceptedMergeMetrics { #[derive(Debug, Clone)] pub struct AcceptedWriteOutput { - pub files_written: u64, + pub files_written: Option, pub parts_written: u64, pub part_files: Vec, pub table_version: Option, diff --git a/crates/floe-core/src/io/write/delta.rs b/crates/floe-core/src/io/write/delta.rs index e29a0e9..61378e7 100644 --- a/crates/floe-core/src/io/write/delta.rs +++ b/crates/floe-core/src/io/write/delta.rs @@ -29,7 +29,7 @@ static DELTA_ACCEPTED_ADAPTER: DeltaAcceptedAdapter = DeltaAcceptedAdapter; #[derive(Debug)] struct DeltaWriteResult { version: i64, - files_written: u64, + files_written: Option, part_files: Vec, metrics: AcceptedWriteMetrics, merge: Option, diff --git a/crates/floe-core/src/io/write/delta/commit_metrics.rs b/crates/floe-core/src/io/write/delta/commit_metrics.rs index 89c85e2..d7c2349 100644 --- a/crates/floe-core/src/io/write/delta/commit_metrics.rs +++ b/crates/floe-core/src/io/write/delta/commit_metrics.rs @@ -5,7 +5,6 @@ use deltalake::table::builder::DeltaTableBuilder; use serde_json::Value; use crate::errors::RunError; -use crate::io::format::AcceptedWriteMetrics; use crate::io::storage::{object_store, Target}; use crate::io::write::metrics; use crate::{config, FloeResult}; @@ -17,7 +16,11 @@ pub(super) fn delta_commit_metrics_for_target( entity: &config::EntityConfig, version: i64, small_file_threshold_bytes: u64, -) -> FloeResult<(u64, Vec, AcceptedWriteMetrics)> { +) -> FloeResult<( + Option, + Vec, + crate::io::format::AcceptedWriteMetrics, +)> { match target { Target::Local { base_path, .. } => { let stats = delta_commit_add_stats(Path::new(base_path), version)?; @@ -138,7 +141,11 @@ fn parse_delta_commit_add_stats_bytes_with_context( pub fn delta_commit_metrics_from_log_bytes( bytes: &[u8], small_file_threshold_bytes: u64, -) -> FloeResult<(u64, Vec, AcceptedWriteMetrics)> { +) -> FloeResult<( + Option, + Vec, + crate::io::format::AcceptedWriteMetrics, +)> { let stats = parse_delta_commit_add_stats_bytes(bytes)?; Ok(delta_commit_stats_to_output( stats, @@ -150,7 +157,11 @@ pub fn delta_commit_metrics_from_log_bytes( pub fn delta_commit_metrics_from_log_bytes_best_effort( bytes: &[u8], small_file_threshold_bytes: u64, -) -> (u64, Vec, AcceptedWriteMetrics) { +) -> ( + Option, + Vec, + crate::io::format::AcceptedWriteMetrics, +) { match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) { Ok(output) => output, Err(_) => delta_commit_metrics_fallback_unknown(), @@ -160,23 +171,23 @@ pub fn delta_commit_metrics_from_log_bytes_best_effort( fn delta_commit_stats_to_output( stats: DeltaCommitAddStats, small_file_threshold_bytes: u64, -) -> (u64, Vec, AcceptedWriteMetrics) { - let metrics = if stats.file_sizes.len() == stats.files_written as usize { - metrics::summarize_written_file_sizes(&stats.file_sizes, small_file_threshold_bytes) - } else { - null_accepted_write_metrics() - }; - (stats.files_written, stats.part_files, metrics) -} - -fn delta_commit_metrics_fallback_unknown() -> (u64, Vec, AcceptedWriteMetrics) { - (0, Vec::new(), null_accepted_write_metrics()) +) -> ( + Option, + Vec, + crate::io::format::AcceptedWriteMetrics, +) { + let metrics = metrics::summarize_written_file_sizes( + &stats.file_sizes, + stats.files_written, + small_file_threshold_bytes, + ); + (Some(stats.files_written), stats.part_files, metrics) } -fn null_accepted_write_metrics() -> AcceptedWriteMetrics { - AcceptedWriteMetrics { - total_bytes_written: None, - avg_file_size_mb: None, - small_files_count: None, - } +fn delta_commit_metrics_fallback_unknown() -> ( + Option, + Vec, + crate::io::format::AcceptedWriteMetrics, +) { + (None, Vec::new(), metrics::null_accepted_write_metrics()) } diff --git a/crates/floe-core/src/io/write/iceberg.rs b/crates/floe-core/src/io/write/iceberg.rs index 2f4a5a8..4b01e7e 100644 --- a/crates/floe-core/src/io/write/iceberg.rs +++ b/crates/floe-core/src/io/write/iceberg.rs @@ -150,7 +150,7 @@ fn write_iceberg_table_with_remote_context( ))?; result.perf.conversion_ms = Some(conversion_ms); Ok(AcceptedWriteOutput { - files_written: result.files_written, + files_written: Some(result.files_written), parts_written: result.files_written, part_files: result.file_paths, table_version: result.metadata_version, @@ -356,7 +356,11 @@ async fn write_iceberg_table_async( .await?; } let metrics_start = Instant::now(); - let metrics = metrics::summarize_written_file_sizes(&file_sizes, small_file_threshold_bytes); + let metrics = metrics::summarize_written_file_sizes( + &file_sizes, + files_written, + small_file_threshold_bytes, + ); perf.metrics_read_ms = Some(metrics_start.elapsed().as_millis() as u64); Ok(IcebergWriteResult { diff --git a/crates/floe-core/src/io/write/metrics.rs b/crates/floe-core/src/io/write/metrics.rs index efb8848..7f95729 100644 --- a/crates/floe-core/src/io/write/metrics.rs +++ b/crates/floe-core/src/io/write/metrics.rs @@ -11,20 +11,33 @@ pub fn default_small_file_threshold_bytes(target_file_size_bytes: Option) - } } +pub fn null_accepted_write_metrics() -> AcceptedWriteMetrics { + AcceptedWriteMetrics { + total_bytes_written: None, + avg_file_size_mb: None, + small_files_count: None, + } +} + pub fn summarize_written_file_sizes( file_sizes: &[u64], + files_written: u64, small_file_threshold_bytes: u64, ) -> AcceptedWriteMetrics { - if file_sizes.is_empty() { + if files_written == 0 { return AcceptedWriteMetrics { - total_bytes_written: None, + total_bytes_written: Some(0), avg_file_size_mb: None, - small_files_count: None, + small_files_count: Some(0), }; } + if file_sizes.len() != files_written as usize { + return null_accepted_write_metrics(); + } + let total_bytes_written = file_sizes.iter().copied().sum::(); - let avg_file_size_mb = (total_bytes_written as f64 / file_sizes.len() as f64) / BYTES_PER_MIB; + let avg_file_size_mb = (total_bytes_written as f64 / files_written as f64) / BYTES_PER_MIB; let small_files_count = file_sizes .iter() .filter(|size| **size < small_file_threshold_bytes) diff --git a/crates/floe-core/src/io/write/parquet.rs b/crates/floe-core/src/io/write/parquet.rs index 2434ff9..d17d7d0 100644 --- a/crates/floe-core/src/io/write/parquet.rs +++ b/crates/floe-core/src/io/write/parquet.rs @@ -149,11 +149,12 @@ impl AcceptedSinkAdapter for ParquetAcceptedAdapter { let metrics = metrics::summarize_written_file_sizes( &file_sizes, + parts_written, runtime_options.small_file_threshold_bytes, ); Ok(AcceptedWriteOutput { - files_written: parts_written, + files_written: Some(parts_written), parts_written, part_files, table_version: None, diff --git a/crates/floe-core/src/report/entity.rs b/crates/floe-core/src/report/entity.rs index acc773f..66336fc 100644 --- a/crates/floe-core/src/report/entity.rs +++ b/crates/floe-core/src/report/entity.rs @@ -17,7 +17,7 @@ pub(crate) struct RunReportContext<'a> { pub severity: report::Severity, pub accepted_write_mode: config::WriteMode, pub accepted_parts_written: u64, - pub accepted_files_written: u64, + pub accepted_files_written: Option, pub accepted_part_files: Vec, pub accepted_table_version: Option, pub accepted_snapshot_id: Option, diff --git a/crates/floe-core/src/report/mod.rs b/crates/floe-core/src/report/mod.rs index d275ef0..3a872bc 100644 --- a/crates/floe-core/src/report/mod.rs +++ b/crates/floe-core/src/report/mod.rs @@ -134,7 +134,7 @@ pub struct AcceptedOutputSummary { pub write_mode: Option, pub accepted_rows: u64, #[serde(default)] - pub files_written: u64, + pub files_written: Option, pub parts_written: u64, #[serde(default)] #[serde(skip_serializing_if = "Vec::is_empty")] @@ -158,13 +158,10 @@ pub struct AcceptedOutputSummary { #[serde(skip_serializing_if = "Option::is_none")] pub iceberg_table: Option, #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] pub total_bytes_written: Option, #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] pub avg_file_size_mb: Option, #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] pub small_files_count: Option, #[serde(default)] #[serde(skip_serializing_if = "Vec::is_empty")] diff --git a/crates/floe-core/src/run/entity/accepted_write.rs b/crates/floe-core/src/run/entity/accepted_write.rs index c6bb8ff..8fe46d3 100644 --- a/crates/floe-core/src/run/entity/accepted_write.rs +++ b/crates/floe-core/src/run/entity/accepted_write.rs @@ -15,7 +15,7 @@ use io::storage::Target; #[derive(Debug, Default)] pub(super) struct AcceptedWriteReportState { pub(super) parts_written: u64, - pub(super) files_written: u64, + pub(super) files_written: Option, pub(super) part_files: Vec, pub(super) table_version: Option, pub(super) snapshot_id: Option, @@ -131,6 +131,7 @@ pub(super) fn run_accepted_write_phase( let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity, write_mode); if accepted_accum.is_empty() && write_mode != config::WriteMode::Overwrite { + accepted_write_report.files_written = Some(0); return Ok(accepted_write_report); } diff --git a/crates/floe-core/tests/integration/delta_run.rs b/crates/floe-core/tests/integration/delta_run.rs index ff21cac..1ce8432 100644 --- a/crates/floe-core/tests/integration/delta_run.rs +++ b/crates/floe-core/tests/integration/delta_run.rs @@ -219,7 +219,10 @@ entities: let report = &outcome.entity_outcomes[0].report; assert_eq!(report.sink.accepted.format, "delta"); - assert!(report.accepted_output.files_written > 0); + assert!(report + .accepted_output + .files_written + .is_some_and(|value| value > 0)); assert_eq!(report.accepted_output.parts_written, 1); assert!(report.accepted_output.total_bytes_written.is_some()); assert!(report.accepted_output.avg_file_size_mb.is_some()); @@ -285,7 +288,10 @@ entities: let report = &outcome.entity_outcomes[0].report; assert_eq!(report.sink.accepted.format, "delta"); - assert!(report.accepted_output.files_written > 0); + assert!(report + .accepted_output + .files_written + .is_some_and(|value| value > 0)); } #[test] @@ -659,6 +665,88 @@ entities: assert_eq!(report.results.accepted_total, 0); assert_eq!(report.results.rejected_total, 2); assert_eq!(report.accepted_output.write_mode.as_deref(), Some("append")); + assert_eq!(report.accepted_output.files_written, Some(0)); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 2); +} + +#[test] +fn local_delta_append_noop_run_reports_zero_files_written() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let rejected_dir = root.join("out/rejected/customer_csv"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n2;bob\n"); + + let yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "append" + accepted: + format: "delta" + path: "{accepted_dir}" + rejected: + format: "csv" + path: "{rejected_dir}" + policy: + severity: "reject" + schema: + mismatch: + missing_columns: "reject_file" + columns: + - name: "id" + type: "int64" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + rejected_dir = rejected_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-append-noop-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial append run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id\n3\n4\n"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-append-noop-second".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("append run with precheck-rejected file"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.rows_total, 0); + assert_eq!(report.results.accepted_total, 0); + assert_eq!(report.results.rejected_total, 0); + assert_eq!(report.accepted_output.write_mode.as_deref(), Some("append")); + assert_eq!(report.accepted_output.files_written, Some(0)); let df = read_local_delta_table(&accepted_dir); assert_eq!(df.height(), 2); @@ -1560,6 +1648,91 @@ entities: assert_eq!(df.height(), 2); } +#[test] +fn local_delta_merge_noop_run_reports_zero_files_written() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let root = temp_dir.path(); + let input_dir = root.join("in"); + let accepted_dir = root.join("out/accepted/customer_delta"); + let rejected_dir = root.join("out/rejected/customer_csv"); + let report_dir = root.join("report"); + + fs::create_dir_all(&input_dir).expect("create input dir"); + write_csv(&input_dir, "batch1.csv", "id;name\n1;alice\n2;bob\n"); + + let yaml = format!( + r#"version: "0.2" +report: + path: "{report_dir}" +entities: + - name: "customer" + source: + format: "csv" + path: "{input_dir}" + sink: + write_mode: "merge_scd1" + accepted: + format: "delta" + path: "{accepted_dir}" + rejected: + format: "csv" + path: "{rejected_dir}" + policy: + severity: "reject" + schema: + mismatch: + missing_columns: "reject_file" + primary_key: ["id"] + columns: + - name: "id" + type: "int64" + - name: "name" + type: "string" +"#, + report_dir = report_dir.display(), + input_dir = input_dir.display(), + accepted_dir = accepted_dir.display(), + rejected_dir = rejected_dir.display(), + ); + let config_path = write_config(root, &yaml); + + run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-noop-init".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("initial merge run"); + + fs::remove_file(input_dir.join("batch1.csv")).expect("remove batch1"); + write_csv(&input_dir, "batch2.csv", "id\n3\n4\n"); + + let outcome = run( + &config_path, + RunOptions { + run_id: Some("it-delta-merge-noop-second".to_string()), + entities: Vec::new(), + dry_run: false, + }, + ) + .expect("merge run with precheck-rejected file"); + + let report = &outcome.entity_outcomes[0].report; + assert_eq!(report.results.rows_total, 0); + assert_eq!(report.results.accepted_total, 0); + assert_eq!(report.results.rejected_total, 0); + assert_eq!( + report.accepted_output.write_mode.as_deref(), + Some("merge_scd1") + ); + assert_eq!(report.accepted_output.files_written, Some(0)); + + let df = read_local_delta_table(&accepted_dir); + assert_eq!(df.height(), 2); +} + #[test] fn local_delta_merge_scd2_supports_custom_system_column_names() { let temp_dir = tempfile::TempDir::new().expect("temp dir"); diff --git a/crates/floe-core/tests/integration/iceberg_run.rs b/crates/floe-core/tests/integration/iceberg_run.rs index 9e31a28..7e2cb53 100644 --- a/crates/floe-core/tests/integration/iceberg_run.rs +++ b/crates/floe-core/tests/integration/iceberg_run.rs @@ -80,7 +80,7 @@ entities: Some(report.sink.accepted.path.as_str()) ); assert_eq!(report.accepted_output.write_mode.as_deref(), Some("append")); - assert_eq!(report.accepted_output.files_written, 1); + assert_eq!(report.accepted_output.files_written, Some(1)); assert_eq!(report.accepted_output.parts_written, 1); assert!(report.accepted_output.snapshot_id.is_some()); assert!(report.accepted_output.total_bytes_written.is_some()); @@ -89,7 +89,7 @@ entities: let (data_file_count, data_total_bytes) = collect_file_stats(&accepted_dir.join("data")).expect("collect iceberg data stats"); - assert_eq!(report.accepted_output.files_written, data_file_count); + assert_eq!(report.accepted_output.files_written, Some(data_file_count)); assert_eq!( report.accepted_output.total_bytes_written, Some(data_total_bytes) diff --git a/crates/floe-core/tests/integration/local_run.rs b/crates/floe-core/tests/integration/local_run.rs index 7bb32db..7097f44 100644 --- a/crates/floe-core/tests/integration/local_run.rs +++ b/crates/floe-core/tests/integration/local_run.rs @@ -96,7 +96,7 @@ entities: let report = &outcome.entity_outcomes[0].report; assert_eq!(report.sink.accepted.format, "parquet"); assert_eq!( - report.accepted_output.files_written as usize, + report.accepted_output.files_written.expect("files_written") as usize, parquet_files.len() ); assert_eq!( @@ -175,7 +175,10 @@ entities: assert!(parquet_files > 1, "expected chunked parquet output"); let report = &outcome.entity_outcomes[0].report; - assert_eq!(report.accepted_output.files_written as usize, parquet_files); + assert_eq!( + report.accepted_output.files_written.expect("files_written") as usize, + parquet_files + ); assert_eq!(report.accepted_output.parts_written as usize, parquet_files); let total_bytes = report .accepted_output diff --git a/crates/floe-core/tests/unit/io/write/delta_write.rs b/crates/floe-core/tests/unit/io/write/delta_write.rs index 3d3c4da..2d12229 100644 --- a/crates/floe-core/tests/unit/io/write/delta_write.rs +++ b/crates/floe-core/tests/unit/io/write/delta_write.rs @@ -546,7 +546,7 @@ fn delta_commit_metrics_from_log_bytes_counts_add_actions_and_caps_part_files() let (files_written, part_files, metrics) = delta_commit_metrics_from_log_bytes(content.as_bytes(), 32)?; - assert_eq!(files_written, 55); + assert_eq!(files_written, Some(55)); assert_eq!(part_files.len(), 50); assert_eq!(part_files[0], "part-00000.parquet"); assert_eq!(part_files[49], "part-00049.parquet"); @@ -566,7 +566,7 @@ fn delta_commit_metrics_from_log_bytes_missing_size_keeps_file_count_but_nulls_m let (files_written, part_files, metrics) = delta_commit_metrics_from_log_bytes(content.as_bytes(), 16)?; - assert_eq!(files_written, 2); + assert_eq!(files_written, Some(2)); assert_eq!(part_files, vec!["part-00000.parquet", "part-00001.parquet"]); assert_eq!(metrics.total_bytes_written, None); assert_eq!(metrics.avg_file_size_mb, None); @@ -581,13 +581,27 @@ fn delta_commit_metrics_from_log_bytes_best_effort_falls_back_on_malformed_json( let (files_written, part_files, metrics) = delta_commit_metrics_from_log_bytes_best_effort(malformed, 16); - assert_eq!(files_written, 0); + assert_eq!(files_written, None); assert!(part_files.is_empty()); assert_eq!(metrics.total_bytes_written, None); assert_eq!(metrics.avg_file_size_mb, None); assert_eq!(metrics.small_files_count, None); } +#[test] +fn delta_commit_metrics_from_log_bytes_zero_adds_preserve_exact_zero_metrics() -> FloeResult<()> { + let content = b"{\"commitInfo\":{\"operation\":\"WRITE\"}}\n"; + + let (files_written, part_files, metrics) = delta_commit_metrics_from_log_bytes(content, 16)?; + + assert_eq!(files_written, Some(0)); + assert!(part_files.is_empty()); + assert_eq!(metrics.total_bytes_written, Some(0)); + assert_eq!(metrics.avg_file_size_mb, None); + assert_eq!(metrics.small_files_count, Some(0)); + Ok(()) +} + fn empty_root_config() -> config::RootConfig { config::RootConfig { version: "0.1".to_string(), diff --git a/crates/floe-core/tests/unit/io/write/iceberg_write.rs b/crates/floe-core/tests/unit/io/write/iceberg_write.rs index ecd69a6..1f8725c 100644 --- a/crates/floe-core/tests/unit/io/write/iceberg_write.rs +++ b/crates/floe-core/tests/unit/io/write/iceberg_write.rs @@ -125,9 +125,10 @@ fn write_iceberg_table_empty_dataframe_creates_table_without_snapshot() -> FloeR let out = write_iceberg_table(&mut df, &target, &entity, config::WriteMode::Overwrite)?; assert_eq!(out.parts_written, 0); assert!(out.snapshot_id.is_none()); - assert_eq!(out.metrics.total_bytes_written, None); + assert_eq!(out.files_written, Some(0)); + assert_eq!(out.metrics.total_bytes_written, Some(0)); assert_eq!(out.metrics.avg_file_size_mb, None); - assert_eq!(out.metrics.small_files_count, None); + assert_eq!(out.metrics.small_files_count, Some(0)); assert!(table_path.join("metadata").exists()); assert!(!table_path.join("data").exists()); assert_eq!(metadata_json_count(&table_path)?, 1); @@ -154,7 +155,7 @@ fn write_iceberg_table_local_metrics_count_data_files_not_metadata() -> FloeResu let (_metadata_file_count, metadata_total_bytes) = collect_file_stats(&table_path.join("metadata"))?; - assert_eq!(out.files_written, data_file_count); + assert_eq!(out.files_written, Some(data_file_count)); assert_eq!(out.parts_written, data_file_count); assert_eq!(out.metrics.total_bytes_written, Some(data_total_bytes)); assert!(out.metrics.avg_file_size_mb.is_some()); diff --git a/crates/floe-core/tests/unit/io/write/metrics.rs b/crates/floe-core/tests/unit/io/write/metrics.rs index b9d3417..cf8e05c 100644 --- a/crates/floe-core/tests/unit/io/write/metrics.rs +++ b/crates/floe-core/tests/unit/io/write/metrics.rs @@ -5,7 +5,7 @@ use floe_core::io::write::metrics::{ #[test] fn summarize_written_file_sizes_computes_totals_average_and_small_count() { - let metrics = summarize_written_file_sizes(&[4, 10, 20], 10); + let metrics = summarize_written_file_sizes(&[4, 10, 20], 3, 10); assert_eq!(metrics.total_bytes_written, Some(34)); assert_eq!(metrics.small_files_count, Some(1)); let avg = metrics.avg_file_size_mb.expect("avg_file_size_mb"); @@ -14,8 +14,16 @@ fn summarize_written_file_sizes_computes_totals_average_and_small_count() { } #[test] -fn summarize_written_file_sizes_empty_is_unset() { - let metrics = summarize_written_file_sizes(&[], 1024); +fn summarize_written_file_sizes_empty_preserves_exact_zero_counts() { + let metrics = summarize_written_file_sizes(&[], 0, 1024); + assert_eq!(metrics.total_bytes_written, Some(0)); + assert_eq!(metrics.avg_file_size_mb, None); + assert_eq!(metrics.small_files_count, Some(0)); +} + +#[test] +fn summarize_written_file_sizes_incomplete_sizes_are_unknown() { + let metrics = summarize_written_file_sizes(&[4], 2, 10); assert_eq!(metrics.total_bytes_written, None); assert_eq!(metrics.avg_file_size_mb, None); assert_eq!(metrics.small_files_count, None); diff --git a/crates/floe-core/tests/unit/report/accepted_output.rs b/crates/floe-core/tests/unit/report/accepted_output.rs index d51811d..66ab3df 100644 --- a/crates/floe-core/tests/unit/report/accepted_output.rs +++ b/crates/floe-core/tests/unit/report/accepted_output.rs @@ -14,7 +14,7 @@ fn accepted_output_summary_deserializes_legacy_payload_without_new_metrics() { assert_eq!(summary.path, "/tmp/out"); assert_eq!(summary.accepted_rows, 10); - assert_eq!(summary.files_written, 1); + assert_eq!(summary.files_written, Some(1)); assert_eq!(summary.parts_written, 1); assert_eq!(summary.total_bytes_written, None); assert_eq!(summary.avg_file_size_mb, None); @@ -30,7 +30,7 @@ fn accepted_output_summary_serializes_new_metrics_when_present() { table_root_uri: Some("/tmp/out".to_string()), write_mode: Some("append".to_string()), accepted_rows: 10, - files_written: 2, + files_written: Some(2), parts_written: 2, part_files: vec![ "part-00000.parquet".to_string(), @@ -76,13 +76,57 @@ fn accepted_output_summary_serializes_new_metrics_when_present() { } #[test] -fn accepted_output_summary_omits_new_metrics_when_absent() { +fn accepted_output_summary_serializes_exact_zero_metrics_as_zero() { + let summary = AcceptedOutputSummary { + path: "/tmp/out".to_string(), + table_root_uri: None, + write_mode: Some("overwrite".to_string()), + accepted_rows: 0, + files_written: Some(0), + parts_written: 0, + part_files: Vec::new(), + table_version: None, + snapshot_id: None, + iceberg_catalog_name: None, + iceberg_database: None, + iceberg_namespace: None, + iceberg_table: None, + total_bytes_written: Some(0), + avg_file_size_mb: None, + small_files_count: Some(0), + merge_key: Vec::new(), + inserted_count: None, + updated_count: None, + closed_count: None, + unchanged_count: None, + target_rows_before: None, + target_rows_after: None, + merge_elapsed_ms: None, + }; + + let value = serde_json::to_value(summary).expect("serialize"); + let obj = value.as_object().expect("object"); + + assert_eq!(obj.get("files_written").and_then(|v| v.as_u64()), Some(0)); + assert_eq!( + obj.get("total_bytes_written").and_then(|v| v.as_u64()), + Some(0) + ); + assert_eq!( + obj.get("small_files_count").and_then(|v| v.as_u64()), + Some(0) + ); + assert!(obj.get("avg_file_size_mb").is_some_and(|v| v.is_null())); +} + +#[test] +fn accepted_output_summary_serializes_unknown_metrics_as_null() { let summary = AcceptedOutputSummary { path: "/tmp/out".to_string(), table_root_uri: None, write_mode: None, accepted_rows: 10, - files_written: 0, + files_written: None, parts_written: 0, part_files: Vec::new(), table_version: None, @@ -107,9 +151,10 @@ fn accepted_output_summary_omits_new_metrics_when_absent() { let value = serde_json::to_value(summary).expect("serialize"); let obj = value.as_object().expect("object"); - assert!(!obj.contains_key("total_bytes_written")); - assert!(!obj.contains_key("avg_file_size_mb")); - assert!(!obj.contains_key("small_files_count")); + assert!(obj.get("files_written").is_some_and(|v| v.is_null())); + assert!(obj.get("total_bytes_written").is_some_and(|v| v.is_null())); + assert!(obj.get("avg_file_size_mb").is_some_and(|v| v.is_null())); + assert!(obj.get("small_files_count").is_some_and(|v| v.is_null())); assert!(!obj.contains_key("merge_key")); assert!(!obj.contains_key("inserted_count")); assert!(!obj.contains_key("updated_count")); diff --git a/crates/floe-core/tests/unit/run/report.rs b/crates/floe-core/tests/unit/run/report.rs index c335981..9b81480 100644 --- a/crates/floe-core/tests/unit/run/report.rs +++ b/crates/floe-core/tests/unit/run/report.rs @@ -50,7 +50,7 @@ fn sample_report() -> RunReport { table_root_uri: Some("/tmp/out/accepted".to_string()), write_mode: Some("overwrite".to_string()), accepted_rows: 10, - files_written: 1, + files_written: Some(1), parts_written: 1, part_files: vec!["part-00000.parquet".to_string()], table_version: None, diff --git a/docs/report.md b/docs/report.md index 7a15064..f405c49 100644 --- a/docs/report.md +++ b/docs/report.md @@ -63,7 +63,7 @@ Common fields: - `write_mode`: resolved write mode (`overwrite` or `append`) - possible values: `overwrite`, `append`, `merge_scd1`, `merge_scd2` - `accepted_rows`: total accepted rows written for the entity -- `files_written`: number of accepted data files written (format-specific semantics) +- `files_written`: number of accepted data files written (format-specific semantics; `null` when unknown) - `parts_written`: writer part count (may differ from `files_written`, especially for table formats) - `part_files`: capped list of output data-file basenames (when collected) @@ -80,6 +80,17 @@ Write-time file sizing metrics (optional): - `avg_file_size_mb` - `small_files_count` +Accepted-output metrics contract: + +| Report value | Meaning | +| --- | --- | +| `null` | Unknown or not computable | +| `0` | Computed exactly and the exact value is zero | +| `> 0` | Computed exactly and the exact value is non-zero | + +This applies to `files_written`, `total_bytes_written`, `avg_file_size_mb`, and +`small_files_count`. Floe does not use sentinel values such as `-1`. + Merge-specific metrics (optional, Delta `merge_scd1` / `merge_scd2`): - `merge_key`: merge key columns (`schema.primary_key`) - `inserted_count` @@ -147,8 +158,7 @@ Notes: - Metrics are populated when the writer can collect them cheaply and reliably. - Delta metrics are derived from committed Delta log `add` actions for the committed version. - Remote Delta metrics are collected best-effort after the write; if collection fails, - the run still succeeds; `part_files` may be empty and size metrics remain `null` - (Delta `files_written` falls back to `0` when exact post-write commit metrics are unavailable). + the run still succeeds; `part_files` may be empty and accepted-output metrics remain `null`. - Iceberg file sizing metrics count Iceberg data files only (not metadata/manifests). diff --git a/docs/sinks/delta.md b/docs/sinks/delta.md index 6b16718..cca67ae 100644 --- a/docs/sinks/delta.md +++ b/docs/sinks/delta.md @@ -74,7 +74,7 @@ Current status: - Remote metrics collection is best-effort after a successful write: - if commit-log read/parse succeeds, metrics are exact for the committed version - if commit-log read/parse fails, the write still succeeds and the report falls back to - `files_written=0`, `part_files=[]`, and nullable size metrics (no fake size values) + `files_written=null`, `part_files=[]`, and nullable size metrics - `files_written` counts `add` actions in the committed version file (not Delta log files). - `part_files` in the report is a capped list of data-file basenames from `add.path`. - Compaction/optimization remains external to Floe.