Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patched DataFusion version 45.0.0 #54

Draft
wants to merge 6 commits into
base: base-df-upgrade-ver45
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ config_namespace! {
///
/// This is used to workaround bugs in the planner that are now caught by
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false
pub skip_physical_aggregate_schema_check: bool, default = true

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ tempfile = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7.4", features = ["io"], optional = true }
url = { workspace = true }
uuid = { version = "1.7", features = ["v4"] }
uuid = { version = "1.7", features = ["v4", "js"] }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,9 @@ impl DefaultPhysicalPlanner {
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
}
}

log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());

return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
.iter()
.map(|s| format!("\n\t- {}", s))
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/unwrap_cast_in_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

use crate::utils::NamePreserver;
#[allow(deprecated)]
use arrow::datatypes::{
DataType, TimeUnit, MAX_DECIMAL128_FOR_EACH_PRECISION,
MIN_DECIMAL128_FOR_EACH_PRECISION,
Expand Down
194 changes: 30 additions & 164 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2148,17 +2148,38 @@ fn calculate_union_binary(
})
.collect::<Vec<_>>();

// TEMP HACK WORKAROUND
// Revert code from https://github.com/apache/datafusion/pull/12562
// Context: https://github.com/apache/datafusion/issues/13748
// Context: https://github.com/influxdata/influxdb_iox/issues/13038

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = UnionEquivalentOrderingBuilder::new();
orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs);
orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs);
let orderings = orderings.build();

let mut eq_properties =
EquivalenceProperties::new(lhs.schema).with_constants(constants);

let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().into_iter() {
// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().into_iter() {
// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
let mut eq_properties = EquivalenceProperties::new(lhs.schema);
eq_properties.constants = constants;
eq_properties.add_new_orderings(orderings);

Ok(eq_properties)
}

Expand Down Expand Up @@ -2204,6 +2225,7 @@ struct UnionEquivalentOrderingBuilder {
orderings: Vec<LexOrdering>,
}

#[expect(unused)]
impl UnionEquivalentOrderingBuilder {
fn new() -> Self {
Self { orderings: vec![] }
Expand Down Expand Up @@ -3552,134 +3574,6 @@ mod tests {
.run()
}

#[test]
fn test_union_equivalence_properties_constants_fill_gaps() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [a ASC, c ASC], const [b]
vec![vec!["a", "c"]],
vec!["b"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [b ASC, c ASC], const [a]
vec![vec!["b", "c"]],
vec!["a"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [
// [a ASC, b ASC, c ASC],
// [b ASC, a ASC, c ASC]
// ], const []
vec![vec!["a", "b", "c"], vec!["b", "a", "c"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_no_fill_gaps() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [a ASC, c ASC], const [d] // some other constant
vec![vec!["a", "c"]],
vec!["d"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [b ASC, c ASC], const [a]
vec![vec!["b", "c"]],
vec!["a"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [[a]] (only a is constant)
vec![vec!["a"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_fill_some_gaps() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [c ASC], const [a, b] // some other constant
vec![vec!["c"]],
vec!["a", "b"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [a DESC, b], const []
vec![vec!["a DESC", "b"]],
vec![],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [[a, b]] (can fill in the a/b with constants)
vec![vec!["a DESC", "b"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [a ASC, c ASC], const [b]
vec![vec!["a", "c"]],
vec!["b"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [b ASC, c ASC], const [a]
vec![vec!["b DESC", "c"]],
vec!["a"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [
// [a ASC, b ASC, c ASC],
// [b ASC, a ASC, c ASC]
// ], const []
vec![vec!["a", "b DESC", "c"], vec!["b DESC", "a", "c"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_gap_fill_symmetric() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child: [a ASC, b ASC, d ASC], const [c]
vec![vec!["a", "b", "d"]],
vec!["c"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child: [a ASC, c ASC, d ASC], const [b]
vec![vec!["a", "c", "d"]],
vec!["b"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings:
// [a, b, c, d]
// [a, c, b, d]
vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_gap_fill_and_common() {
let schema = create_test_schema().unwrap();
Expand All @@ -3705,34 +3599,6 @@ mod tests {
.run()
}

#[test]
fn test_union_equivalence_properties_constants_middle_desc() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// NB `b DESC` in the first child
//
// First child: [a ASC, b DESC, d ASC], const [c]
vec![vec!["a", "b DESC", "d"]],
vec!["c"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child: [a ASC, c ASC, d ASC], const [b]
vec![vec!["a", "c", "d"]],
vec!["b"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings:
// [a, b, d] (c constant)
// [a, c, d] (b constant)
vec![vec!["a", "c", "b DESC", "d"], vec!["a", "b DESC", "c", "d"]],
vec![],
)
.run()
}

// TODO tests with multiple constants

#[derive(Debug)]
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-optimizer/src/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use crate::PhysicalOptimizerRule;
Expand Down Expand Up @@ -135,6 +137,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering(),
plan.required_input_distribution(),
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(&sort_req) {
Expand Down
39 changes: 25 additions & 14 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,31 +499,42 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
}

fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
let first_schema = inputs[0].schema();
// needs to handle n children, including child which have an empty projection or different number of fields
let num_fields = inputs.iter().fold(0, |acc, input| {
std::cmp::max(acc, input.schema().fields().len())
});

let fields = (0..first_schema.fields().len())
let fields: Vec<Field> = (0..num_fields)
.map(|i| {
inputs
.iter()
.enumerate()
.map(|(input_idx, input)| {
let field = input.schema().field(i).clone();
let mut metadata = field.metadata().clone();
// collect fields for i
let field_options_for_i =
inputs.iter().enumerate().filter_map(|(input_idx, input)| {
let field = if input.schema().fields().len() <= i {
return None;
} else {
input.schema().field(i).clone()
};

// merge field metadata
let mut metadata = field.metadata().clone();
let other_metadatas = inputs
.iter()
.enumerate()
.filter(|(other_idx, _)| *other_idx != input_idx)
.filter(|(other_idx, other_input)| {
*other_idx != input_idx
&& other_input.schema().fields().len() > i
})
.flat_map(|(_, other_input)| {
other_input.schema().field(i).metadata().clone().into_iter()
});

metadata.extend(other_metadatas);
field.with_metadata(metadata)
})
Some(field.with_metadata(metadata))
});

// pick first nullable field (if exists)
field_options_for_i
.find_or_first(Field::is_nullable)
// We can unwrap this because if inputs was empty, this would've already panic'ed when we
// indexed into inputs[0].
// We can unwrap this because if inputs was empty, we would never had iterated with (0..num_fields)
.unwrap()
})
.collect::<Vec<_>>();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
datafusion.execution.skip_physical_aggregate_schema_check false
datafusion.execution.skip_physical_aggregate_schema_check true
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
Expand Down Expand Up @@ -317,7 +317,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.skip_physical_aggregate_schema_check true When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
Expand Down
Loading