Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/floe-cli/tests/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn sample_outcome() -> RunOutcome {
table_root_uri: Some("/tmp/out/accepted".to_string()),
write_mode: Some("overwrite".to_string()),
accepted_rows: 8,
files_written: 1,
files_written: Some(1),
parts_written: 1,
part_files: vec!["part-00000.parquet".to_string()],
table_version: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/floe-core/src/io/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct AcceptedMergeMetrics {

#[derive(Debug, Clone)]
pub struct AcceptedWriteOutput {
pub files_written: u64,
pub files_written: Option<u64>,
pub parts_written: u64,
pub part_files: Vec<String>,
pub table_version: Option<i64>,
Expand Down
2 changes: 1 addition & 1 deletion crates/floe-core/src/io/write/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static DELTA_ACCEPTED_ADAPTER: DeltaAcceptedAdapter = DeltaAcceptedAdapter;
#[derive(Debug)]
struct DeltaWriteResult {
version: i64,
files_written: u64,
files_written: Option<u64>,
part_files: Vec<String>,
metrics: AcceptedWriteMetrics,
merge: Option<AcceptedMergeMetrics>,
Expand Down
53 changes: 32 additions & 21 deletions crates/floe-core/src/io/write/delta/commit_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use deltalake::table::builder::DeltaTableBuilder;
use serde_json::Value;

use crate::errors::RunError;
use crate::io::format::AcceptedWriteMetrics;
use crate::io::storage::{object_store, Target};
use crate::io::write::metrics;
use crate::{config, FloeResult};
Expand All @@ -17,7 +16,11 @@ pub(super) fn delta_commit_metrics_for_target(
entity: &config::EntityConfig,
version: i64,
small_file_threshold_bytes: u64,
) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
) -> FloeResult<(
Option<u64>,
Vec<String>,
crate::io::format::AcceptedWriteMetrics,
)> {
match target {
Target::Local { base_path, .. } => {
let stats = delta_commit_add_stats(Path::new(base_path), version)?;
Expand Down Expand Up @@ -138,7 +141,11 @@ fn parse_delta_commit_add_stats_bytes_with_context(
pub fn delta_commit_metrics_from_log_bytes(
bytes: &[u8],
small_file_threshold_bytes: u64,
) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
) -> FloeResult<(
Option<u64>,
Vec<String>,
crate::io::format::AcceptedWriteMetrics,
)> {
let stats = parse_delta_commit_add_stats_bytes(bytes)?;
Ok(delta_commit_stats_to_output(
stats,
Expand All @@ -150,7 +157,11 @@ pub fn delta_commit_metrics_from_log_bytes(
pub fn delta_commit_metrics_from_log_bytes_best_effort(
bytes: &[u8],
small_file_threshold_bytes: u64,
) -> (u64, Vec<String>, AcceptedWriteMetrics) {
) -> (
Option<u64>,
Vec<String>,
crate::io::format::AcceptedWriteMetrics,
) {
match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
Ok(output) => output,
Err(_) => delta_commit_metrics_fallback_unknown(),
Expand All @@ -160,23 +171,23 @@ pub fn delta_commit_metrics_from_log_bytes_best_effort(
fn delta_commit_stats_to_output(
stats: DeltaCommitAddStats,
small_file_threshold_bytes: u64,
) -> (u64, Vec<String>, AcceptedWriteMetrics) {
let metrics = if stats.file_sizes.len() == stats.files_written as usize {
metrics::summarize_written_file_sizes(&stats.file_sizes, small_file_threshold_bytes)
} else {
null_accepted_write_metrics()
};
(stats.files_written, stats.part_files, metrics)
}

fn delta_commit_metrics_fallback_unknown() -> (u64, Vec<String>, AcceptedWriteMetrics) {
(0, Vec::new(), null_accepted_write_metrics())
) -> (
Option<u64>,
Vec<String>,
crate::io::format::AcceptedWriteMetrics,
) {
let metrics = metrics::summarize_written_file_sizes(
&stats.file_sizes,
stats.files_written,
small_file_threshold_bytes,
);
(Some(stats.files_written), stats.part_files, metrics)
}

fn null_accepted_write_metrics() -> AcceptedWriteMetrics {
AcceptedWriteMetrics {
total_bytes_written: None,
avg_file_size_mb: None,
small_files_count: None,
}
fn delta_commit_metrics_fallback_unknown() -> (
Option<u64>,
Vec<String>,
crate::io::format::AcceptedWriteMetrics,
) {
(None, Vec::new(), metrics::null_accepted_write_metrics())
}
8 changes: 6 additions & 2 deletions crates/floe-core/src/io/write/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn write_iceberg_table_with_remote_context(
))?;
result.perf.conversion_ms = Some(conversion_ms);
Ok(AcceptedWriteOutput {
files_written: result.files_written,
files_written: Some(result.files_written),
parts_written: result.files_written,
part_files: result.file_paths,
table_version: result.metadata_version,
Expand Down Expand Up @@ -356,7 +356,11 @@ async fn write_iceberg_table_async(
.await?;
}
let metrics_start = Instant::now();
let metrics = metrics::summarize_written_file_sizes(&file_sizes, small_file_threshold_bytes);
let metrics = metrics::summarize_written_file_sizes(
&file_sizes,
files_written,
small_file_threshold_bytes,
);
perf.metrics_read_ms = Some(metrics_start.elapsed().as_millis() as u64);

Ok(IcebergWriteResult {
Expand Down
21 changes: 17 additions & 4 deletions crates/floe-core/src/io/write/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,33 @@ pub fn default_small_file_threshold_bytes(target_file_size_bytes: Option<u64>) -
}
}

pub fn null_accepted_write_metrics() -> AcceptedWriteMetrics {
AcceptedWriteMetrics {
total_bytes_written: None,
avg_file_size_mb: None,
small_files_count: None,
}
}

pub fn summarize_written_file_sizes(
file_sizes: &[u64],
files_written: u64,
small_file_threshold_bytes: u64,
) -> AcceptedWriteMetrics {
if file_sizes.is_empty() {
if files_written == 0 {
return AcceptedWriteMetrics {
total_bytes_written: None,
total_bytes_written: Some(0),
avg_file_size_mb: None,
small_files_count: None,
small_files_count: Some(0),
};
}

if file_sizes.len() != files_written as usize {
return null_accepted_write_metrics();
}

let total_bytes_written = file_sizes.iter().copied().sum::<u64>();
let avg_file_size_mb = (total_bytes_written as f64 / file_sizes.len() as f64) / BYTES_PER_MIB;
let avg_file_size_mb = (total_bytes_written as f64 / files_written as f64) / BYTES_PER_MIB;
let small_files_count = file_sizes
.iter()
.filter(|size| **size < small_file_threshold_bytes)
Expand Down
3 changes: 2 additions & 1 deletion crates/floe-core/src/io/write/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,12 @@ impl AcceptedSinkAdapter for ParquetAcceptedAdapter {

let metrics = metrics::summarize_written_file_sizes(
&file_sizes,
parts_written,
runtime_options.small_file_threshold_bytes,
);

Ok(AcceptedWriteOutput {
files_written: parts_written,
files_written: Some(parts_written),
parts_written,
part_files,
table_version: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/floe-core/src/report/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) struct RunReportContext<'a> {
pub severity: report::Severity,
pub accepted_write_mode: config::WriteMode,
pub accepted_parts_written: u64,
pub accepted_files_written: u64,
pub accepted_files_written: Option<u64>,
pub accepted_part_files: Vec<String>,
pub accepted_table_version: Option<i64>,
pub accepted_snapshot_id: Option<i64>,
Expand Down
5 changes: 1 addition & 4 deletions crates/floe-core/src/report/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct AcceptedOutputSummary {
pub write_mode: Option<String>,
pub accepted_rows: u64,
#[serde(default)]
pub files_written: u64,
pub files_written: Option<u64>,
pub parts_written: u64,
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
Expand All @@ -158,13 +158,10 @@ pub struct AcceptedOutputSummary {
#[serde(skip_serializing_if = "Option::is_none")]
pub iceberg_table: Option<String>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes_written: Option<u64>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub avg_file_size_mb: Option<f64>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub small_files_count: Option<u64>,
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
Expand Down
3 changes: 2 additions & 1 deletion crates/floe-core/src/run/entity/accepted_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use io::storage::Target;
#[derive(Debug, Default)]
pub(super) struct AcceptedWriteReportState {
pub(super) parts_written: u64,
pub(super) files_written: u64,
pub(super) files_written: Option<u64>,
pub(super) part_files: Vec<String>,
pub(super) table_version: Option<i64>,
pub(super) snapshot_id: Option<i64>,
Expand Down Expand Up @@ -131,6 +131,7 @@ pub(super) fn run_accepted_write_phase(

let mut accepted_write_report = AcceptedWriteReportState::for_entity(entity, write_mode);
if accepted_accum.is_empty() && write_mode != config::WriteMode::Overwrite {
accepted_write_report.files_written = Some(0);
return Ok(accepted_write_report);
}

Expand Down
Loading
Loading