Skip to content

Find a way to communicate the ordering of a file back with the existi… #13933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
@@ -105,6 +105,14 @@ impl FileFormat for TSVFileFormat {
.await
}

async fn infer_file_ordering(
&self,
_store: &Arc<dyn ObjectStore>,
_object: &ObjectMeta,
) -> Option<String> {
None
}

async fn create_physical_plan(
&self,
state: &SessionState,
184 changes: 178 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
@@ -1977,7 +1977,13 @@ mod tests {

use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
use arrow::array::Int32Array;
use datafusion_common::{assert_batches_eq, Constraint, Constraints, ScalarValue};
use arrow::util::pretty::pretty_format_batches;
use arrow_array::TimestampNanosecondArray;
use arrow_schema::TimeUnit;
use datafusion_common::{
assert_batches_eq, assert_contains, assert_not_contains, Constraint, Constraints,
ScalarValue,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::{
@@ -1989,6 +1995,7 @@ mod tests {
use datafusion_functions_window::nth_value::first_value_udwf;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
use rand::Rng;
use sqlparser::ast::NullTreatment;
use tempfile::TempDir;

@@ -4136,11 +4143,6 @@ mod tests {
let df = ctx.sql("SELECT * FROM data").await?;
let results = df.collect().await?;

let df_explain = ctx.sql("explain SELECT a FROM data").await?;
let explain_result = df_explain.collect().await?;

println!("explain_result {:?}", explain_result);

assert_batches_eq!(
&[
"+---+---+",
@@ -4327,4 +4329,174 @@ mod tests {
);
Ok(())
}

#[tokio::test]
async fn write_parquet_with_order_metadata() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let tmp_dir = TempDir::new()?;

// It should work only when we enable the collect_statistics
let ctx = SessionContext::new_with_config(
SessionConfig::default()
.set_bool("datafusion.execution.collect_statistics", true),
);

// random write data to parquet
let num_rows = 1000;
let mut rng = rand::thread_rng();
let ids: Vec<i64> = (0..num_rows).collect();
let timestamps: Vec<i64> = (0..num_rows)
.map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000))
.collect();

let id_array = Arc::new(Int64Array::from(ids));
let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps));

let batch =
RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?;

let file = tmp_dir.path().join("testSorted.parquet");
let write_df = ctx.read_batch(batch)?;

write_df
.clone()
.write_parquet(
file.to_str().unwrap(),
DataFrameWriteOptions::new()
.with_sort_by(vec![col("timestamp").sort(true, false)]),
None,
)
.await?;

// Create the table without with order
let sql_str =
"create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'"
.to_owned()
+ file.to_str().unwrap()
+ "'";

ctx.sql(sql_str.as_str()).await?.collect().await?;

let sql_result = ctx
.sql("SELECT * FROM sortData order by timestamp")
.await?
.explain(false, false)?
.collect()
.await?;

let formatted = pretty_format_batches(&sql_result).unwrap().to_string();
// Assert we have the output_ordering in the explain plan
assert_contains!(
formatted.as_str(),
"output_ordering=[timestamp@1 ASC NULLS LAST]"
);

// Assert we don't contain SortExec in the plan, the optimizer can optimize to remove the sort
assert_not_contains!(formatted.as_str(), "SortExec");

// testing multi col sort case
write_df
.clone()
.write_parquet(
file.to_str().unwrap(),
DataFrameWriteOptions::new().with_sort_by(vec![
col("timestamp").sort(true, false),
col("id").sort(true, false),
]),
None,
)
.await?;

let sql_result = ctx
.sql("SELECT * FROM sortData")
.await?
.explain(false, false)?
.collect()
.await?;

let formatted = pretty_format_batches(&sql_result).unwrap().to_string();
// Assert we have the output_ordering in the explain plan
assert_contains!(
formatted.as_str(),
"output_ordering=[timestamp@1 ASC NULLS LAST, id@0 ASC NULLS LAST]"
);

// Assert we don't contain SortExec in the plan, the optimizer can optimize to remove the sort
assert_not_contains!(formatted.as_str(), "SortExec");
Ok(())
}

#[tokio::test]
async fn write_parquet_without_order_metadata() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let tmp_dir = TempDir::new()?;

let ctx = SessionContext::new();

// random write data to parquet
let num_rows = 1000;
let mut rng = rand::thread_rng();
let ids: Vec<i64> = (0..num_rows).collect();
let timestamps: Vec<i64> = (0..num_rows)
.map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000))
.collect();

let id_array = Arc::new(Int64Array::from(ids));
let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps));

let batch =
RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?;

let file = tmp_dir.path().join("testSorted.parquet");
let write_df = ctx.read_batch(batch)?;

write_df
.clone()
.write_parquet(file.to_str().unwrap(), DataFrameWriteOptions::new(), None)
.await?;

// Create the table without with order
let sql_str =
"create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'"
.to_owned()
+ file.to_str().unwrap()
+ "'";

ctx.sql(sql_str.as_str()).await?.collect().await?;

let sql_result = ctx
.sql("SELECT * FROM sortData order by timestamp")
.await?
.explain(false, false)?
.collect()
.await?;

let formatted = pretty_format_batches(&sql_result).unwrap().to_string();
// Assert we don't have the output_ordering in the explain plan because we don't disable the statistics
assert_not_contains!(
formatted.as_str(),
"output_ordering=[timestamp@1 ASC NULLS LAST]"
);

// Assert we contain SortExec in the plan
// the optimizer will not remove it without metadata sort information
assert_contains!(formatted.as_str(), "SortExec");
Ok(())
}
}
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
@@ -165,6 +165,15 @@ impl FileFormat for ArrowFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn infer_file_ordering(
&self,
_store: &Arc<dyn ObjectStore>,
_object: &ObjectMeta,
) -> Option<String> {
// todo for now we don't support infer ordering for Arrow files
None
}

async fn create_physical_plan(
&self,
_state: &SessionState,
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
@@ -134,6 +134,15 @@ impl FileFormat for AvroFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_file_ordering(
&self,
_store: &Arc<dyn ObjectStore>,
_object: &ObjectMeta,
) -> Option<String> {
// todo Avro files sort order are not sorted
None
}

async fn infer_stats(
&self,
_state: &SessionState,
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
@@ -406,6 +406,15 @@ impl FileFormat for CsvFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn infer_file_ordering(
&self,
_store: &Arc<dyn ObjectStore>,
_object: &ObjectMeta,
) -> Option<String> {
// CSV infer files order info is not supported
None
}

async fn create_physical_plan(
&self,
state: &SessionState,
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
@@ -242,6 +242,15 @@ impl FileFormat for JsonFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn infer_file_ordering(
&self,
_store: &Arc<dyn ObjectStore>,
_object: &ObjectMeta,
) -> Option<String> {
// Json infer files order are not supported
None
}

async fn create_physical_plan(
&self,
_state: &SessionState,
32 changes: 26 additions & 6 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
@@ -31,17 +31,16 @@ pub mod options;
pub mod parquet;
pub mod write;

use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
use std::task::Poll;

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};
use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
use std::task::Poll;

use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema};
@@ -123,6 +122,22 @@ pub trait FileFormat: Send + Sync + Debug {
object: &ObjectMeta,
) -> Result<Statistics>;

/// Infers the file ordering for a given object store and object meta.
///
/// # Arguments
///
/// * `store` - A reference to the object store.
/// * `object` - A reference to the object meta.
///
/// # Returns
///
/// An optional string representing the file ordering.
async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
async fn create_physical_plan(
@@ -398,6 +413,11 @@ pub fn file_type_to_format(
}
}

/// Check if the file format is parquet
pub fn is_file_parquet_format(file_format: &Arc<dyn FileType>) -> bool {
file_format.get_ext() == "parquet"
}

/// Create a new field with the specified data type, copying the other
/// properties from the input field
fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
31 changes: 31 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
@@ -397,6 +397,37 @@ impl FileFormat for ParquetFormat {
Ok(stats)
}

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String> {
// Fetch metadata
let metadata =
fetch_parquet_metadata(store.as_ref(), object, self.metadata_size_hint())
.await
.map_err(|e| ParquetError::General(format!("Get_metadata error: {e}")))
.ok()?; // Propagate errors to avoid breaking execution

let file_metadata = metadata.file_metadata();

// Convert Parquet schema to Arrow schema
let file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.map_err(|e| {
ParquetError::General(format!(
"Failed to convert Parquet schema to Arrow schema: {e}"
))
})
.ok()?; // Propagate errors

// Get "DATAFUSION_ORDER_BY" from metadata
let order_by = file_schema.metadata().get("DATAFUSION_ORDER_BY").cloned();
order_by
}

async fn create_physical_plan(
&self,
_state: &SessionState,
63 changes: 58 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
@@ -17,11 +17,10 @@

//! The table implementation.
use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use super::{ListingTableUrl, PartitionedFile};
use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};

use crate::datasource::{
create_ordering,
@@ -35,7 +34,7 @@ use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{col, utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};

@@ -54,6 +53,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::expr::Sort;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
@@ -860,7 +860,60 @@ impl TableProvider for ListingTable {
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let output_ordering = self.try_create_output_ordering()?;
let mut output_ordering = self.try_create_output_ordering()?;

let store = if let Some(url) = self.table_paths.first() {
Some(session_state.runtime_env().object_store(url)?)
} else {
None
};

// We only need the first file to infer the ordering
// todo this is a bit of a hack
let object_meta = match partitioned_file_lists.first() {
Some(file_list) => file_list.first().map(|file| &file.object_meta),
None => None,
};

// If the output ordering is empty, try to infer it from the file
if output_ordering.is_empty() && store.is_some() && object_meta.is_some() {
let sort_by_value = self
.options()
.format
.infer_file_ordering(&store.unwrap(), object_meta.unwrap())
.await;

if let Some(sort_by_value) = sort_by_value {
// Split the input into individual sort expressions separated by commas
let sort_expressions: Vec<&str> =
sort_by_value.split(',').map(str::trim).collect();

let mut sort_order = vec![];

for sort_expr in sort_expressions {
// Split each expression into components (e.g., "timestamp ASC NULLS LAST")
let tokens: Vec<&str> = sort_expr.split_whitespace().collect();
if tokens.is_empty() {
continue; // Skip empty tokens
}
// Parse the expression, direction, and nulls ordering
let expr = tokens[0].to_string();
let asc = tokens
.get(1)
.map_or(true, |&t| t.eq_ignore_ascii_case("ASC")); // Default to ASC
let nulls_first = tokens
.get(2)
.map_or(false, |&t| t.eq_ignore_ascii_case("NULLS FIRST")); // Default to NULLS LAST

// Create a Sort object
let sort = Sort::new(col(expr), asc, nulls_first);
sort_order.push(sort);
}

output_ordering = create_ordering(&self.table_schema, &[sort_order])?
}
}

match state
.config_options()
.execution
30 changes: 25 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::file_format::file_type_to_format;
use crate::datasource::file_format::{file_type_to_format, is_file_parquet_format};
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::source_as_provider;
@@ -65,6 +65,7 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
use async_trait::async_trait;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
@@ -83,15 +84,14 @@ use datafusion_expr::{
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;

use async_trait::async_trait;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use regex::Regex;
use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;

@@ -532,8 +532,28 @@ impl DefaultPhysicalPlanner {
keep_partition_by_columns,
};

let mut source_option_tuples = source_option_tuples.clone();

if is_file_parquet_format(file_type) {
if let LogicalPlan::Sort(Sort { expr, .. }) = input.as_ref() {
let sort_value = expr
.iter()
.map(|e| {
let re = Regex::new(r"^[^.]+\.(.*)$").unwrap();
re.replace(e.to_string().as_str(), "$1").to_string()
})
.collect::<Vec<String>>()
.join(", ");

source_option_tuples.insert(
"format.metadata::DATAFUSION_ORDER_BY".to_string(),
sort_value,
);
}
}

let sink_format = file_type_to_format(file_type)?
.create(session_state, source_option_tuples)?;
.create(session_state, &source_option_tuples)?;

sink_format
.create_writer_physical_plan(input_exec, session_state, config, None)
43 changes: 43 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
@@ -632,3 +632,46 @@ COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102);
# Copy using execution.keep_partition_by_columns with an invalid value
query error DataFusion error: Invalid or Unsupported Configuration: provided value for 'execution.keep_partition_by_columns' was not recognized: "invalid_value"
COPY source_table to '/tmp/table.parquet' OPTIONS (execution.keep_partition_by_columns invalid_value);



# Copy from table with orderer by clause

statement ok
create table unsorted_source_table(col1 integer, col2 varchar) as values (8, 'Done'), (1, 'Foo'), (8, 'Cat'), (2, 'Bar');

query TT
EXPLAIN COPY (select * from unsorted_source_table order by col1, col2) to 'test_files/scratch/copy/sortTable/sorted.parquet' STORED AS PARQUET
----
logical_plan
01)CopyTo: format=parquet output_url=test_files/scratch/copy/sortTable/sorted.parquet options: ()
02)--Sort: unsorted_source_table.col1 ASC NULLS LAST, unsorted_source_table.col2 ASC NULLS LAST
03)----TableScan: unsorted_source_table projection=[col1, col2]
physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
02)--SortExec: expr=[col1@0 ASC NULLS LAST, col2@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----MemoryExec: partitions=1, partition_sizes=[1]

query I
COPY (select * from unsorted_source_table order by col1, col2) to 'test_files/scratch/copy/sortTable/sorted.parquet' STORED AS PARQUET
----
4

statement ok
CREATE EXTERNAL TABLE sortedParquetTable STORED AS PARQUET
LOCATION 'test_files/scratch/copy/sortTable/'

query TT
explain select * from sortedParquetTable;
----
logical_plan TableScan: sortedparquettable projection=[col1, col2]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/copy/sortTable/sorted.parquet]]}, projection=[col1, col2], output_ordering=[col1@0 ASC NULLS LAST, col2@1 ASC NULLS LAST]


query IT
select * from sortedParquetTable;
----
1 Foo
2 Bar
8 Cat
8 Done