diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 580bf31231210..27e0f5e923d85 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -25,6 +25,7 @@ use super::{ SendableRecordBatchStream, }; use crate::display::DisplayableExecutionPlan; +use crate::execution_plan::EvaluationType; use crate::metrics::{MetricCategory, MetricType}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; @@ -172,6 +173,7 @@ impl AnalyzeExec { input.pipeline_behavior(), input.boundedness(), ) + .with_evaluation_type(EvaluationType::Eager) } } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 19a4ebba83eae..2985dc57661b0 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -18,7 +18,7 @@ //! [`BufferExec`] decouples production and consumption on messages by buffering the input in the //! background up to a certain capacity. -use crate::execution_plan::{CardinalityEffect, SchedulingType}; +use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -101,7 +101,8 @@ impl BufferExec { /// Builds a new [BufferExec] with the provided capacity in bytes. pub fn new(input: Arc, capacity: usize) -> Self { let properties = PlanProperties::clone(input.properties()) - .with_scheduling_type(SchedulingType::Cooperative); + .with_scheduling_type(SchedulingType::Cooperative) + .with_evaluation_type(EvaluationType::Eager); Self { input, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 50eac566d90ef..8577e86f00514 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -45,6 +45,8 @@ use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; use crate::metrics::MetricsSet; use crate::projection::ProjectionExec; +use crate::repartition::RepartitionExec; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; @@ -962,25 +964,36 @@ pub enum SchedulingType { Cooperative, } -/// Represents how an operator's `Stream` implementation generates `RecordBatch`es. +/// Represents how an operator's stream drives [`RecordBatch`] production +/// relative to downstream demand. /// -/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to -/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation. -/// -/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This -/// is known as data-driven or eager evaluation. +/// This is execution-topology metadata for optimizers. It distinguishes streams +/// whose batch production is driven directly by downstream calls to +/// `Stream::poll_next` from streams that may also drive input or output +/// production independently, such as by spawning tasks or buffering batches +/// ahead of demand. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EvaluationType { - /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch` - /// instances when it is demanded by invoking `Stream::poll_next`. - /// Filter, projection, and join are examples of such lazy operators. + /// The stream generated by [`execute`](ExecutionPlan::execute) is + /// demand-driven: it produces [`RecordBatch`]es in response to downstream + /// calls to `Stream::poll_next`. + /// + /// Filter, projection, and join operators are examples of lazy operators. /// /// Lazy operators are also known as demand-driven operators. Lazy, - /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch` - /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time - /// `Stream::poll_next` is called. - /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge. + /// The stream generated by [`execute`](ExecutionPlan::execute) may drive + /// input or output [`RecordBatch`] production ahead of, or independently + /// from, downstream calls to `Stream::poll_next`. + /// + /// Eager operators commonly poll input streams from spawned Tokio tasks, + /// buffer batches ahead of demand, or otherwise create an independent + /// child-polling pipeline. Eager work may start when `execute` creates the + /// stream or when the returned stream is first polled; that timing is an + /// implementation detail. + /// + /// Repartition, coalesce partitions, sort-preserving merge, buffer, and + /// analyze operators are examples of eager operators. /// /// Eager operators are also known as a data-driven operators. Eager, @@ -1209,15 +1222,31 @@ pub fn check_default_invariants( Ok(()) } -/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful -/// especially for the distributed engine to judge whether need to deal with shuffling. -/// Currently, there are 3 kinds of execution plan which needs data exchange -/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s -/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee -/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee +/// Indicate whether a data exchange is needed for the input of `plan`. +/// +/// This identifies physical operators that redistribute child partitions or +/// gather multiple child partitions into one output partition: +/// +/// 1. RepartitionExec for non-round-robin repartitioning +/// 2. CoalescePartitionsExec for collapsing multiple partitions into one without ordering guarantee +/// 3. SortPreservingMergeExec for collapsing multiple sorted partitions into one with ordering guarantee #[expect(clippy::needless_pass_by_value)] pub fn need_data_exchange(plan: Arc) -> bool { - plan.properties().evaluation_type == EvaluationType::Eager + if let Some(repartition) = plan.downcast_ref::() { + !matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_)) + } else if let Some(coalesce) = plan.downcast_ref::() { + coalesce.input().output_partitioning().partition_count() > 1 + } else if let Some(sort_preserving_merge) = + plan.downcast_ref::() + { + sort_preserving_merge + .input() + .output_partitioning() + .partition_count() + > 1 + } else { + false + } } /// Returns a copy of this plan if we change any child according to the pointer comparison. @@ -1556,6 +1585,8 @@ pub(crate) fn stub_properties() -> Arc { mod tests { use super::*; + use crate::buffer::BufferExec; + use crate::test::exec::MockExec; use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray}; @@ -1768,6 +1799,15 @@ mod tests { let _ = plan.name(); } + #[test] + fn buffer_exec_does_not_need_data_exchange() { + let schema = Arc::new(Schema::empty()); + let input: Arc = Arc::new(MockExec::new(vec![], schema)); + let buffer: Arc = Arc::new(BufferExec::new(input, 1024)); + + assert!(!need_data_exchange(buffer)); + } + #[test] fn test_check_not_null_constraints_accept_non_null() -> Result<()> { check_not_null_constraints(