From 6bea5a73fe3e40c8506b4bb5945848b33c96375e Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 23 Apr 2024 17:33:40 -0700 Subject: [PATCH] refactor: restore the ability to add kv metadata into the generated file sink --- datafusion/common/src/file_options/mod.rs | 8 +++++--- .../common/src/file_options/parquet_writer.rs | 18 ++++++++++++------ .../core/src/datasource/file_format/parquet.rs | 9 +++++++-- .../core/src/datasource/listing/table.rs | 1 + .../core/src/datasource/physical_plan/mod.rs | 4 ++++ datafusion/core/src/physical_planner.rs | 1 + .../proto/src/physical_plan/from_proto.rs | 1 + .../tests/cases/roundtrip_physical_plan.rs | 3 +++ 8 files changed, 34 insertions(+), 11 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index a760619a7ba8..6771b9feb1c3 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -40,7 +40,7 @@ mod tests { use parquet::{ basic::{Compression, Encoding, ZstdLevel}, - file::properties::{EnabledStatistics, WriterVersion}, + file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion}, schema::types::ColumnPath, }; @@ -79,7 +79,8 @@ mod tests { table_config.set_file_format(FileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; + let parquet_options: ParquetWriterOptions = + WriterPropertiesBuilder::try_from(&table_config.parquet)?.into(); let properties = parquet_options.writer_options(); // Verify the expected options propagated down to parquet crate WriterProperties struct @@ -184,7 +185,8 @@ mod tests { table_config.set_file_format(FileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; + let parquet_options: ParquetWriterOptions = + WriterPropertiesBuilder::try_from(&table_config.parquet)?.into(); let properties = parquet_options.writer_options(); let col1 = ColumnPath::from(vec!["col1".to_owned()]); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f651ff932a5a..72aa952ff4ef 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -24,7 +24,9 @@ use crate::{ use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{EnabledStatistics, WriterProperties, WriterVersion}, + file::properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, + }, schema::types::ColumnPath, }; @@ -46,7 +48,7 @@ impl ParquetWriterOptions { } } -impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { +impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { type Error = DataFusionError; fn try_from(parquet_options: &TableParquetOptions) -> Result { @@ -165,10 +167,14 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { } } - // ParquetWriterOptions will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns) - Ok(ParquetWriterOptions { - writer_options: builder.build(), - }) + // builder will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns) + Ok(builder) + } +} + +impl From for ParquetWriterOptions { + fn from(value: WriterPropertiesBuilder) -> Self { + ParquetWriterOptions::new(value.build()) } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bcf4e8a2c8e4..54927c566fe0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -66,7 +66,7 @@ use parquet::arrow::{ }; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; @@ -645,7 +645,10 @@ impl DataSink for ParquetSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?; + let parquet_props: ParquetWriterOptions = + WriterPropertiesBuilder::try_from(&self.parquet_options)? + .set_key_value_metadata(self.config.key_value_metadata.clone()) + .into(); let object_store = context .runtime_env() @@ -1862,6 +1865,7 @@ mod tests { output_schema: schema.clone(), table_partition_cols: vec![], overwrite: true, + key_value_metadata: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1933,6 +1937,7 @@ mod tests { output_schema: schema.clone(), table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning overwrite: true, + key_value_metadata: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c1e337b5c44a..e54cb60e6e4a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -759,6 +759,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), overwrite, + key_value_metadata: None, }; let unsorted: Vec> = vec![]; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ddb8d032f3d8..8532dd9bef15 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -32,6 +32,7 @@ pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; +use ::parquet::file::metadata::KeyValue; pub use arrow_file::ArrowExec; pub use avro::AvroExec; pub use csv::{CsvConfig, CsvExec, CsvOpener}; @@ -90,6 +91,9 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, + /// Optional, additional metadata to be inserted into the key_value_metadata + /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). + pub key_value_metadata: Option>, } impl FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0a1730e944d3..62ae590db8be 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -595,6 +595,7 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols, overwrite: false, + key_value_metadata: None, }; let mut table_options = session_state.default_table_options(); let sink_format: Arc = match format_options { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index aaca4dc48236..9e4a1453e259 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -760,6 +760,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { output_schema: Arc::new(convert_required!(conf.output_schema)?), table_partition_cols, overwrite: conf.overwrite, + key_value_metadata: None, }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 4924128ae190..e22e0d7f8717 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -873,6 +873,7 @@ fn roundtrip_json_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], overwrite: true, + key_value_metadata: None, }; let data_sink = Arc::new(JsonSink::new( file_sink_config, @@ -908,6 +909,7 @@ fn roundtrip_csv_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], overwrite: true, + key_value_metadata: None, }; let data_sink = Arc::new(CsvSink::new( file_sink_config, @@ -960,6 +962,7 @@ fn roundtrip_parquet_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], overwrite: true, + key_value_metadata: None, }; let data_sink = Arc::new(ParquetSink::new( file_sink_config,