Skip to content

Commit

Permalink
refactor: restore the ability to add kv metadata into the generated f…
Browse files Browse the repository at this point in the history
…ile sink
  • Loading branch information
wiedld committed Apr 24, 2024
1 parent 391e074 commit 6bea5a7
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 11 deletions.
8 changes: 5 additions & 3 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mod tests {

use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion},
schema::types::ColumnPath,
};

Expand Down Expand Up @@ -79,7 +79,8 @@ mod tests {
table_config.set_file_format(FileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let parquet_options: ParquetWriterOptions =
WriterPropertiesBuilder::try_from(&table_config.parquet)?.into();
let properties = parquet_options.writer_options();

// Verify the expected options propagated down to parquet crate WriterProperties struct
Expand Down Expand Up @@ -184,7 +185,8 @@ mod tests {
table_config.set_file_format(FileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let parquet_options: ParquetWriterOptions =
WriterPropertiesBuilder::try_from(&table_config.parquet)?.into();
let properties = parquet_options.writer_options();

let col1 = ColumnPath::from(vec!["col1".to_owned()]);
Expand Down
18 changes: 12 additions & 6 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::{

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
},
schema::types::ColumnPath,
};

Expand All @@ -46,7 +48,7 @@ impl ParquetWriterOptions {
}
}

impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
Expand Down Expand Up @@ -165,10 +167,14 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
}
}

// ParquetWriterOptions will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
// builder will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns)
Ok(builder)
}
}

impl From<WriterPropertiesBuilder> for ParquetWriterOptions {
fn from(value: WriterPropertiesBuilder) -> Self {
ParquetWriterOptions::new(value.build())
}
}

Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use parquet::arrow::{
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
Expand Down Expand Up @@ -645,7 +645,10 @@ impl DataSink for ParquetSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?;
let parquet_props: ParquetWriterOptions =
WriterPropertiesBuilder::try_from(&self.parquet_options)?
.set_key_value_metadata(self.config.key_value_metadata.clone())
.into();

let object_store = context
.runtime_env()
Expand Down Expand Up @@ -1862,6 +1865,7 @@ mod tests {
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
key_value_metadata: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1933,6 +1937,7 @@ mod tests {
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
key_value_metadata: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
overwrite,
key_value_metadata: None,
};

let unsorted: Vec<Vec<Expr>> = vec![];
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};

use ::parquet::file::metadata::KeyValue;
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvOpener};
Expand Down Expand Up @@ -90,6 +91,9 @@ pub struct FileSinkConfig {
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Optional, additional metadata to be inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
pub key_value_metadata: Option<Vec<KeyValue>>,
}

impl FileSinkConfig {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ impl DefaultPhysicalPlanner {
output_schema: Arc::new(schema),
table_partition_cols,
overwrite: false,
key_value_metadata: None,
};
let mut table_options = session_state.default_table_options();
let sink_format: Arc<dyn FileFormat> = match format_options {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
overwrite: conf.overwrite,
key_value_metadata: None,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ fn roundtrip_json_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
overwrite: true,
key_value_metadata: None,
};
let data_sink = Arc::new(JsonSink::new(
file_sink_config,
Expand Down Expand Up @@ -908,6 +909,7 @@ fn roundtrip_csv_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
overwrite: true,
key_value_metadata: None,
};
let data_sink = Arc::new(CsvSink::new(
file_sink_config,
Expand Down Expand Up @@ -960,6 +962,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
overwrite: true,
key_value_metadata: None,
};
let data_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down

0 comments on commit 6bea5a7

Please sign in to comment.