Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::display::DisplayableExecutionPlan;
use crate::execution_plan::EvaluationType;
use crate::metrics::{MetricCategory, MetricType};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

Expand Down Expand Up @@ -172,6 +173,7 @@ impl AnalyzeExec {
input.pipeline_behavior(),
input.boundedness(),
)
.with_evaluation_type(EvaluationType::Eager)
}
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`BufferExec`] decouples production and consumption on messages by buffering the input in the
//! background up to a certain capacity.

use crate::execution_plan::{CardinalityEffect, SchedulingType};
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
Expand Down Expand Up @@ -101,7 +101,8 @@ impl BufferExec {
/// Builds a new [BufferExec] with the provided capacity in bytes.
pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self {
let properties = PlanProperties::clone(input.properties())
.with_scheduling_type(SchedulingType::Cooperative);
.with_scheduling_type(SchedulingType::Cooperative)
.with_evaluation_type(EvaluationType::Eager);

Self {
input,
Expand Down
80 changes: 60 additions & 20 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
use crate::metrics::MetricsSet;
use crate::projection::ProjectionExec;
use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::stream::RecordBatchStreamAdapter;

use arrow::array::{Array, RecordBatch};
Expand Down Expand Up @@ -962,25 +964,36 @@ pub enum SchedulingType {
Cooperative,
}

/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
/// Represents how an operator's stream drives [`RecordBatch`] production
/// relative to downstream demand.
///
/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
///
/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
/// is known as data-driven or eager evaluation.
/// This is execution-topology metadata for optimizers. It distinguishes streams
/// whose batch production is driven directly by downstream calls to
/// `Stream::poll_next` from streams that may also drive input or output
/// production independently, such as by spawning tasks or buffering batches
/// ahead of demand.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EvaluationType {
/// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
/// instances when it is demanded by invoking `Stream::poll_next`.
/// Filter, projection, and join are examples of such lazy operators.
/// The stream generated by [`execute`](ExecutionPlan::execute) is
/// demand-driven: it produces [`RecordBatch`]es in response to downstream
/// calls to `Stream::poll_next`.
///
/// Filter, projection, and join operators are examples of lazy operators.
///
/// Lazy operators are also known as demand-driven operators.
Lazy,
/// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
/// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
/// `Stream::poll_next` is called.
/// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
/// The stream generated by [`execute`](ExecutionPlan::execute) may drive
/// input or output [`RecordBatch`] production ahead of, or independently
/// from, downstream calls to `Stream::poll_next`.
///
/// Eager operators commonly poll input streams from spawned Tokio tasks,
/// buffer batches ahead of demand, or otherwise create an independent
/// child-polling pipeline. Eager work may start when `execute` creates the
/// stream or when the returned stream is first polled; that timing is an
/// implementation detail.
///
/// Repartition, coalesce partitions, sort-preserving merge, buffer, and
/// analyze operators are examples of eager operators.
///
/// Eager operators are also known as a data-driven operators.
Eager,
Expand Down Expand Up @@ -1209,15 +1222,31 @@ pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
Ok(())
}

/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
/// especially for the distributed engine to judge whether need to deal with shuffling.
/// Currently, there are 3 kinds of execution plan which needs data exchange
/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
/// Indicate whether a data exchange is needed for the input of `plan`.
///
/// This identifies physical operators that redistribute child partitions or
/// gather multiple child partitions into one output partition:
///
/// 1. RepartitionExec for non-round-robin repartitioning
/// 2. CoalescePartitionsExec for collapsing multiple partitions into one without ordering guarantee
/// 3. SortPreservingMergeExec for collapsing multiple sorted partitions into one with ordering guarantee
#[expect(clippy::needless_pass_by_value)]
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
plan.properties().evaluation_type == EvaluationType::Eager
if let Some(repartition) = plan.downcast_ref::<RepartitionExec>() {
!matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_))
} else if let Some(coalesce) = plan.downcast_ref::<CoalescePartitionsExec>() {
coalesce.input().output_partitioning().partition_count() > 1
} else if let Some(sort_preserving_merge) =
plan.downcast_ref::<SortPreservingMergeExec>()
{
sort_preserving_merge
.input()
.output_partitioning()
.partition_count()
> 1
} else {
false
}
Comment on lines -1220 to +1249
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some operators that are correctly marked as Eager, but that this function was wrongly treating them as need_data_exchange() -> true:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need_data_exchange was pretty broken then... As discussed privately, we should look into deprecating then deleting this method, as probably no-one is using it (at least correctly.)

}

/// Returns a copy of this plan if we change any child according to the pointer comparison.
Expand Down Expand Up @@ -1556,6 +1585,8 @@ pub(crate) fn stub_properties() -> Arc<PlanProperties> {
mod tests {

use super::*;
use crate::buffer::BufferExec;
use crate::test::exec::MockExec;
use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};

use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
Expand Down Expand Up @@ -1768,6 +1799,15 @@ mod tests {
let _ = plan.name();
}

#[test]
fn buffer_exec_does_not_need_data_exchange() {
let schema = Arc::new(Schema::empty());
let input: Arc<dyn ExecutionPlan> = Arc::new(MockExec::new(vec![], schema));
let buffer: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(input, 1024));

assert!(!need_data_exchange(buffer));
}

#[test]
fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
check_not_null_constraints(
Expand Down
Loading