diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_file_format.rs index 95168597ebaa..2172c1401630 100644 --- a/datafusion-examples/examples/custom_file_format.rs +++ b/datafusion-examples/examples/custom_file_format.rs @@ -105,6 +105,14 @@ impl FileFormat for TSVFileFormat { .await } + async fn infer_file_ordering( + &self, + _store: &Arc, + _object: &ObjectMeta, + ) -> Option { + None + } + async fn create_physical_plan( &self, state: &SessionState, diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 60a09301ae0f..2790ed1aefab 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1977,7 +1977,13 @@ mod tests { use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions}; use arrow::array::Int32Array; - use datafusion_common::{assert_batches_eq, Constraint, Constraints, ScalarValue}; + use arrow::util::pretty::pretty_format_batches; + use arrow_array::TimestampNanosecondArray; + use arrow_schema::TimeUnit; + use datafusion_common::{ + assert_batches_eq, assert_contains, assert_not_contains, Constraint, Constraints, + ScalarValue, + }; use datafusion_common_runtime::SpawnedTask; use datafusion_expr::expr::WindowFunction; use datafusion_expr::{ @@ -1989,6 +1995,7 @@ mod tests { use datafusion_functions_window::nth_value::first_value_udwf; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; + use rand::Rng; use sqlparser::ast::NullTreatment; use tempfile::TempDir; @@ -4136,11 +4143,6 @@ mod tests { let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; - let df_explain = ctx.sql("explain SELECT a FROM data").await?; - let explain_result = df_explain.collect().await?; - - println!("explain_result {:?}", explain_result); - assert_batches_eq!( &[ "+---+---+", @@ -4327,4 +4329,174 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn write_parquet_with_order_metadata() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + + let tmp_dir = TempDir::new()?; + + // It should work only when we enable the collect_statistics + let ctx = SessionContext::new_with_config( + SessionConfig::default() + .set_bool("datafusion.execution.collect_statistics", true), + ); + + // random write data to parquet + let num_rows = 1000; + let mut rng = rand::thread_rng(); + let ids: Vec = (0..num_rows).collect(); + let timestamps: Vec = (0..num_rows) + .map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000)) + .collect(); + + let id_array = Arc::new(Int64Array::from(ids)); + let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps)); + + let batch = + RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?; + + let file = tmp_dir.path().join("testSorted.parquet"); + let write_df = ctx.read_batch(batch)?; + + write_df + .clone() + .write_parquet( + file.to_str().unwrap(), + DataFrameWriteOptions::new() + .with_sort_by(vec![col("timestamp").sort(true, false)]), + None, + ) + .await?; + + // Create the table without with order + let sql_str = + "create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'" + .to_owned() + + file.to_str().unwrap() + + "'"; + + ctx.sql(sql_str.as_str()).await?.collect().await?; + + let sql_result = ctx + .sql("SELECT * FROM sortData order by timestamp") + .await? + .explain(false, false)? + .collect() + .await?; + + let formatted = pretty_format_batches(&sql_result).unwrap().to_string(); + // Assert we have the output_ordering in the explain plan + assert_contains!( + formatted.as_str(), + "output_ordering=[timestamp@1 ASC NULLS LAST]" + ); + + // Assert we don't contain SortExec in the plan, the optimizer can optimize to remove the sort + assert_not_contains!(formatted.as_str(), "SortExec"); + + // testing multi col sort case + write_df + .clone() + .write_parquet( + file.to_str().unwrap(), + DataFrameWriteOptions::new().with_sort_by(vec![ + col("timestamp").sort(true, false), + col("id").sort(true, false), + ]), + None, + ) + .await?; + + let sql_result = ctx + .sql("SELECT * FROM sortData") + .await? + .explain(false, false)? + .collect() + .await?; + + let formatted = pretty_format_batches(&sql_result).unwrap().to_string(); + // Assert we have the output_ordering in the explain plan + assert_contains!( + formatted.as_str(), + "output_ordering=[timestamp@1 ASC NULLS LAST, id@0 ASC NULLS LAST]" + ); + + // Assert we don't contain SortExec in the plan, the optimizer can optimize to remove the sort + assert_not_contains!(formatted.as_str(), "SortExec"); + Ok(()) + } + + #[tokio::test] + async fn write_parquet_without_order_metadata() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + + let tmp_dir = TempDir::new()?; + + let ctx = SessionContext::new(); + + // random write data to parquet + let num_rows = 1000; + let mut rng = rand::thread_rng(); + let ids: Vec = (0..num_rows).collect(); + let timestamps: Vec = (0..num_rows) + .map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000)) + .collect(); + + let id_array = Arc::new(Int64Array::from(ids)); + let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps)); + + let batch = + RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?; + + let file = tmp_dir.path().join("testSorted.parquet"); + let write_df = ctx.read_batch(batch)?; + + write_df + .clone() + .write_parquet(file.to_str().unwrap(), DataFrameWriteOptions::new(), None) + .await?; + + // Create the table without with order + let sql_str = + "create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'" + .to_owned() + + file.to_str().unwrap() + + "'"; + + ctx.sql(sql_str.as_str()).await?.collect().await?; + + let sql_result = ctx + .sql("SELECT * FROM sortData order by timestamp") + .await? + .explain(false, false)? + .collect() + .await?; + + let formatted = pretty_format_batches(&sql_result).unwrap().to_string(); + // Assert we don't have the output_ordering in the explain plan because we don't disable the statistics + assert_not_contains!( + formatted.as_str(), + "output_ordering=[timestamp@1 ASC NULLS LAST]" + ); + + // Assert we contain SortExec in the plan + // the optimizer will not remove it without metadata sort information + assert_contains!(formatted.as_str(), "SortExec"); + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 2697e5621af3..859ee6378f0c 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -165,6 +165,15 @@ impl FileFormat for ArrowFormat { Ok(Statistics::new_unknown(&table_schema)) } + async fn infer_file_ordering( + &self, + _store: &Arc, + _object: &ObjectMeta, + ) -> Option { + // todo for now we don't support infer ordering for Arrow files + None + } + async fn create_physical_plan( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index f854b9506a64..0428a58f2b38 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -134,6 +134,15 @@ impl FileFormat for AvroFormat { Ok(Arc::new(merged_schema)) } + async fn infer_file_ordering( + &self, + _store: &Arc, + _object: &ObjectMeta, + ) -> Option { + // todo Avro files sort order are not sorted + None + } + async fn infer_stats( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index e8fb3690efbf..b82dedbf506a 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -406,6 +406,15 @@ impl FileFormat for CsvFormat { Ok(Statistics::new_unknown(&table_schema)) } + async fn infer_file_ordering( + &self, + _store: &Arc, + _object: &ObjectMeta, + ) -> Option { + // CSV infer files order info is not supported + None + } + async fn create_physical_plan( &self, state: &SessionState, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 4bdf336881c9..53cc9a3f4091 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -242,6 +242,15 @@ impl FileFormat for JsonFormat { Ok(Statistics::new_unknown(&table_schema)) } + async fn infer_file_ordering( + &self, + _store: &Arc, + _object: &ObjectMeta, + ) -> Option { + // Json infer files order are not supported + None + } + async fn create_physical_plan( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index f47e2107ade6..a614e8ffe7c3 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -31,17 +31,16 @@ pub mod options; pub mod parquet; pub mod write; -use std::any::Any; -use std::collections::{HashMap, VecDeque}; -use std::fmt::{self, Debug, Display}; -use std::sync::Arc; -use std::task::Poll; - use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; +use std::any::Any; +use std::collections::{HashMap, VecDeque}; +use std::fmt::{self, Debug, Display}; +use std::sync::Arc; +use std::task::Poll; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema}; @@ -123,6 +122,22 @@ pub trait FileFormat: Send + Sync + Debug { object: &ObjectMeta, ) -> Result; + /// Infers the file ordering for a given object store and object meta. + /// + /// # Arguments + /// + /// * `store` - A reference to the object store. + /// * `object` - A reference to the object meta. + /// + /// # Returns + /// + /// An optional string representing the file ordering. + async fn infer_file_ordering( + &self, + store: &Arc, + object: &ObjectMeta, + ) -> Option; + /// Take a list of files and convert it to the appropriate executor /// according to this file format. async fn create_physical_plan( @@ -398,6 +413,11 @@ pub fn file_type_to_format( } } +/// Check if the file format is parquet +pub fn is_file_parquet_format(file_format: &Arc) -> bool { + file_format.get_ext() == "parquet" +} + /// Create a new field with the specified data type, copying the other /// properties from the input field fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 383fd6575234..c392160d5431 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -397,6 +397,37 @@ impl FileFormat for ParquetFormat { Ok(stats) } + async fn infer_file_ordering( + &self, + store: &Arc, + object: &ObjectMeta, + ) -> Option { + // Fetch metadata + let metadata = + fetch_parquet_metadata(store.as_ref(), object, self.metadata_size_hint()) + .await + .map_err(|e| ParquetError::General(format!("Get_metadata error: {e}"))) + .ok()?; // Propagate errors to avoid breaking execution + + let file_metadata = metadata.file_metadata(); + + // Convert Parquet schema to Arrow schema + let file_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .map_err(|e| { + ParquetError::General(format!( + "Failed to convert Parquet schema to Arrow schema: {e}" + )) + }) + .ok()?; // Propagate errors + + // Get "DATAFUSION_ORDER_BY" from metadata + let order_by = file_schema.metadata().get("DATAFUSION_ORDER_BY").cloned(); + order_by + } + async fn create_physical_plan( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 791b15704d09..4bb1074444cb 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -17,11 +17,10 @@ //! The table implementation. -use std::collections::HashMap; -use std::{any::Any, str::FromStr, sync::Arc}; - use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; use super::{ListingTableUrl, PartitionedFile}; +use std::collections::HashMap; +use std::{any::Any, str::FromStr, sync::Arc}; use crate::datasource::{ create_ordering, @@ -35,7 +34,7 @@ use crate::execution::context::SessionState; use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_expr::dml::InsertOp; -use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; +use datafusion_expr::{col, utils::conjunction, Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; @@ -54,6 +53,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use datafusion_catalog::Session; +use datafusion_expr::expr::Sort; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -860,7 +860,60 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(projected_schema))); } - let output_ordering = self.try_create_output_ordering()?; + let mut output_ordering = self.try_create_output_ordering()?; + + let store = if let Some(url) = self.table_paths.first() { + Some(session_state.runtime_env().object_store(url)?) + } else { + None + }; + + // We only need the first file to infer the ordering + // todo this is a bit of a hack + let object_meta = match partitioned_file_lists.first() { + Some(file_list) => file_list.first().map(|file| &file.object_meta), + None => None, + }; + + // If the output ordering is empty, try to infer it from the file + if output_ordering.is_empty() && store.is_some() && object_meta.is_some() { + let sort_by_value = self + .options() + .format + .infer_file_ordering(&store.unwrap(), object_meta.unwrap()) + .await; + + if let Some(sort_by_value) = sort_by_value { + // Split the input into individual sort expressions separated by commas + let sort_expressions: Vec<&str> = + sort_by_value.split(',').map(str::trim).collect(); + + let mut sort_order = vec![]; + + for sort_expr in sort_expressions { + // Split each expression into components (e.g., "timestamp ASC NULLS LAST") + let tokens: Vec<&str> = sort_expr.split_whitespace().collect(); + if tokens.is_empty() { + continue; // Skip empty tokens + } + // Parse the expression, direction, and nulls ordering + let expr = tokens[0].to_string(); + let asc = tokens + .get(1) + .map_or(true, |&t| t.eq_ignore_ascii_case("ASC")); // Default to ASC + let nulls_first = tokens + .get(2) + .map_or(false, |&t| t.eq_ignore_ascii_case("NULLS FIRST")); // Default to NULLS LAST + + // Create a Sort object + let sort = Sort::new(col(expr), asc, nulls_first); + sort_order.push(sort); + } + + output_ordering = create_ordering(&self.table_schema, &[sort_order])? + } + } + match state .config_options() .execution diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d..fa2d652b36c4 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -21,7 +21,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; -use crate::datasource::file_format::file_type_to_format; +use crate::datasource::file_format::{file_type_to_format, is_file_parquet_format}; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; use crate::datasource::source_as_provider; @@ -65,6 +65,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; +use async_trait::async_trait; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, @@ -83,15 +84,14 @@ use datafusion_expr::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; - -use async_trait::async_trait; -use datafusion_physical_optimizer::PhysicalOptimizerRule; use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; use log::{debug, trace}; +use regex::Regex; use sqlparser::ast::NullTreatment; use tokio::sync::Mutex; @@ -532,8 +532,28 @@ impl DefaultPhysicalPlanner { keep_partition_by_columns, }; + let mut source_option_tuples = source_option_tuples.clone(); + + if is_file_parquet_format(file_type) { + if let LogicalPlan::Sort(Sort { expr, .. }) = input.as_ref() { + let sort_value = expr + .iter() + .map(|e| { + let re = Regex::new(r"^[^.]+\.(.*)$").unwrap(); + re.replace(e.to_string().as_str(), "$1").to_string() + }) + .collect::>() + .join(", "); + + source_option_tuples.insert( + "format.metadata::DATAFUSION_ORDER_BY".to_string(), + sort_value, + ); + } + } + let sink_format = file_type_to_format(file_type)? - .create(session_state, source_option_tuples)?; + .create(session_state, &source_option_tuples)?; sink_format .create_writer_physical_plan(input_exec, session_state, config, None) diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index caa708483a11..de02a322e65d 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -632,3 +632,46 @@ COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102); # Copy using execution.keep_partition_by_columns with an invalid value query error DataFusion error: Invalid or Unsupported Configuration: provided value for 'execution.keep_partition_by_columns' was not recognized: "invalid_value" COPY source_table to '/tmp/table.parquet' OPTIONS (execution.keep_partition_by_columns invalid_value); + + + +# Copy from table with orderer by clause + +statement ok +create table unsorted_source_table(col1 integer, col2 varchar) as values (8, 'Done'), (1, 'Foo'), (8, 'Cat'), (2, 'Bar'); + +query TT +EXPLAIN COPY (select * from unsorted_source_table order by col1, col2) to 'test_files/scratch/copy/sortTable/sorted.parquet' STORED AS PARQUET +---- +logical_plan +01)CopyTo: format=parquet output_url=test_files/scratch/copy/sortTable/sorted.parquet options: () +02)--Sort: unsorted_source_table.col1 ASC NULLS LAST, unsorted_source_table.col2 ASC NULLS LAST +03)----TableScan: unsorted_source_table projection=[col1, col2] +physical_plan +01)DataSinkExec: sink=ParquetSink(file_groups=[]) +02)--SortExec: expr=[col1@0 ASC NULLS LAST, col2@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----MemoryExec: partitions=1, partition_sizes=[1] + +query I +COPY (select * from unsorted_source_table order by col1, col2) to 'test_files/scratch/copy/sortTable/sorted.parquet' STORED AS PARQUET +---- +4 + +statement ok +CREATE EXTERNAL TABLE sortedParquetTable STORED AS PARQUET +LOCATION 'test_files/scratch/copy/sortTable/' + +query TT +explain select * from sortedParquetTable; +---- +logical_plan TableScan: sortedparquettable projection=[col1, col2] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/copy/sortTable/sorted.parquet]]}, projection=[col1, col2], output_ordering=[col1@0 ASC NULLS LAST, col2@1 ASC NULLS LAST] + + +query IT +select * from sortedParquetTable; +---- +1 Foo +2 Bar +8 Cat +8 Done