Skip to content

Commit d4830f4

Browse files
committed
update POC to generalize materalization physical and logical nodes
1 parent 986fa53 commit d4830f4

9 files changed

Lines changed: 611 additions & 135 deletions

File tree

datafusion/core/src/materialized_cte_planner.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ use async_trait::async_trait;
2828
use datafusion_common::Result;
2929
use datafusion_expr::logical_plan::{MaterializedCteProducer, MaterializedCteReader};
3030
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
31-
use datafusion_physical_plan::materialized_cte::{
32-
MaterializedCteCache, MaterializedCteExec, MaterializedCteReaderExec,
33-
materialized_cte_statistics, replace_materialized_cte_readers,
31+
use datafusion_physical_plan::materialize::{
32+
MaterializeExec, MaterializedCache, MaterializedScanExec, materialized_statistics,
33+
replace_materialized_scans,
3434
};
3535
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3636

@@ -44,7 +44,7 @@ use crate::physical_planner::{ExtensionPlanner, PhysicalPlanner};
4444
#[derive(Debug)]
4545
pub struct MaterializedCtePlanner {
4646
/// Map of CTE name to shared cache
47-
caches: Mutex<HashMap<String, Arc<MaterializedCteCache>>>,
47+
caches: Mutex<HashMap<String, Arc<MaterializedCache>>>,
4848
/// Map of CTE name to the number of partitions readers should expose
4949
partition_counts: Mutex<HashMap<String, usize>>,
5050
}
@@ -59,17 +59,17 @@ impl MaterializedCtePlanner {
5959
}
6060

6161
/// Get or create a cache for the given CTE name.
62-
fn get_or_create_cache(&self, name: &str) -> Arc<MaterializedCteCache> {
62+
fn get_or_create_cache(&self, name: &str) -> Arc<MaterializedCache> {
6363
let mut caches = self.caches.lock().unwrap();
6464
Arc::clone(
6565
caches
6666
.entry(name.to_string())
67-
.or_insert_with(|| Arc::new(MaterializedCteCache::new(name.to_string()))),
67+
.or_insert_with(|| Arc::new(MaterializedCache::new(name.to_string()))),
6868
)
6969
}
7070

71-
fn create_cache(&self, name: &str) -> Arc<MaterializedCteCache> {
72-
let cache = Arc::new(MaterializedCteCache::new(name.to_string()));
71+
fn create_cache(&self, name: &str) -> Arc<MaterializedCache> {
72+
let cache = Arc::new(MaterializedCache::new(name.to_string()));
7373
self.caches
7474
.lock()
7575
.unwrap()
@@ -115,16 +115,16 @@ impl ExtensionPlanner for MaterializedCtePlanner {
115115
let cache = self.create_cache(&producer.name);
116116
let cte_plan = Arc::clone(&physical_inputs[0]);
117117
let partition_count = cte_plan.output_partitioning().partition_count();
118-
let statistics = materialized_cte_statistics(cte_plan.as_ref())?;
118+
let statistics = materialized_statistics(cte_plan.as_ref())?;
119119
self.set_partition_count(&producer.name, partition_count);
120-
let continuation = replace_materialized_cte_readers(
120+
let continuation = replace_materialized_scans(
121121
Arc::clone(&physical_inputs[1]),
122122
&producer.name,
123123
&cache,
124124
partition_count,
125125
&statistics,
126126
)?;
127-
let exec = MaterializedCteExec::new(
127+
let exec = MaterializeExec::new(
128128
producer.name.clone(),
129129
cte_plan,
130130
continuation,
@@ -139,7 +139,7 @@ impl ExtensionPlanner for MaterializedCtePlanner {
139139
let schema = Arc::clone(reader.schema.inner());
140140
let statistics =
141141
Arc::new(datafusion_physical_plan::Statistics::new_unknown(&schema));
142-
let exec = MaterializedCteReaderExec::new(
142+
let exec = MaterializedScanExec::new(
143143
reader.name.clone(),
144144
schema,
145145
cache,

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,25 @@ Rule order matters. The default pipeline may change between releases.
4646
| 7 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. |
4747
| 8 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
4848
| 9 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. |
49-
| 10 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
50-
| 11 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
51-
| 12 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
52-
| 13 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
53-
| 14 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
54-
| 15 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
55-
| 16 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
56-
| 17 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
57-
| 18 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
58-
| 19 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
59-
| 20 | `inline_cte` | Inlines materialized CTEs where materialization is not beneficial (cheap, limited, or disjoint-filtered). |
60-
| 21 | `cte_filter_pusher` | Pushes OR-combined filters from CTE readers into the materialized CTE body to reduce materialization volume. |
61-
| 22 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
62-
| 23 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
63-
| 24 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
64-
| 25 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
65-
| 26 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
66-
| 27 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
49+
| 10 | `common_subplan_eliminate` | Detects duplicate subplans and materializes them so they are computed once and read multiple times. |
50+
| 11 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
51+
| 12 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
52+
| 13 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
53+
| 14 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
54+
| 15 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
55+
| 16 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
56+
| 17 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
57+
| 18 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
58+
| 19 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
59+
| 20 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
60+
| 21 | `inline_cte` | Inlines materialized CTEs where materialization is not beneficial (cheap, limited, or disjoint-filtered). |
61+
| 22 | `cte_filter_pusher` | Pushes OR-combined filters from CTE readers into the materialized CTE body to reduce materialization volume. |
62+
| 23 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
63+
| 24 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
64+
| 25 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
65+
| 26 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
66+
| 27 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
67+
| 28 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
6768

6869
### Physical Optimizer Rules
6970

datafusion/core/tests/sql/cte.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ use super::*;
1919
use arrow::array::StringArray;
2020
use datafusion::catalog::MemTable;
2121
use datafusion::physical_plan::ExecutionPlanProperties;
22-
use datafusion::physical_plan::materialized_cte::{
23-
MaterializedCteExec, MaterializedCteReaderExec,
24-
};
22+
use datafusion::physical_plan::materialize::{MaterializeExec, MaterializedScanExec};
2523
use datafusion::physical_plan::{collect_partitioned, visit_execution_plan};
2624
use datafusion_common::assert_batches_eq;
2725
use datafusion_common::stats::Precision;
@@ -42,8 +40,8 @@ async fn multi_reference_cte_materialization_heuristic() -> Result<()> {
4240
.await?;
4341
let physical_plan = reused_scan.create_physical_plan().await?;
4442
let plan = displayable(physical_plan.as_ref()).indent(true).to_string();
45-
assert_contains!(&plan, "MaterializedCteExec");
46-
assert_contains!(&plan, "MaterializedCteReaderExec");
43+
assert_contains!(&plan, "MaterializeExec");
44+
assert_contains!(&plan, "MaterializedScanExec");
4745

4846
let cheap_literal = ctx
4947
.sql(
@@ -53,8 +51,8 @@ async fn multi_reference_cte_materialization_heuristic() -> Result<()> {
5351
.await?;
5452
let physical_plan = cheap_literal.create_physical_plan().await?;
5553
let plan = displayable(physical_plan.as_ref()).indent(true).to_string();
56-
assert_not_contains!(&plan, "MaterializedCteExec");
57-
assert_not_contains!(&plan, "MaterializedCteReaderExec");
54+
assert_not_contains!(&plan, "MaterializeExec");
55+
assert_not_contains!(&plan, "MaterializedScanExec");
5856

5957
let limited_reuse = ctx
6058
.sql(
@@ -64,8 +62,8 @@ async fn multi_reference_cte_materialization_heuristic() -> Result<()> {
6462
.await?;
6563
let physical_plan = limited_reuse.create_physical_plan().await?;
6664
let plan = displayable(physical_plan.as_ref()).indent(true).to_string();
67-
assert_not_contains!(&plan, "MaterializedCteExec");
68-
assert_not_contains!(&plan, "MaterializedCteReaderExec");
65+
assert_not_contains!(&plan, "MaterializeExec");
66+
assert_not_contains!(&plan, "MaterializedScanExec");
6967

7068
Ok(())
7169
}
@@ -104,11 +102,11 @@ async fn materialized_cte_reader_preserves_input_partitions() -> Result<()> {
104102
type Error = std::convert::Infallible;
105103

106104
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
107-
if plan.is::<MaterializedCteExec>() {
105+
if plan.is::<MaterializeExec>() {
108106
self.producer_partitions
109107
.push(plan.output_partitioning().partition_count());
110108
}
111-
if plan.is::<MaterializedCteReaderExec>() {
109+
if plan.is::<MaterializedScanExec>() {
112110
self.reader_partitions
113111
.push(plan.output_partitioning().partition_count());
114112
}
@@ -194,7 +192,7 @@ async fn materialized_cte_cache_is_per_physical_plan() -> Result<()> {
194192
.await?;
195193
let physical_plan = first.create_physical_plan().await?;
196194
let plan = displayable(physical_plan.as_ref()).indent(true).to_string();
197-
assert_contains!(&plan, "MaterializedCteExec");
195+
assert_contains!(&plan, "MaterializeExec");
198196
let results = first.collect().await?;
199197
let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"];
200198
assert_batches_eq!(expected, &results);
@@ -207,7 +205,7 @@ async fn materialized_cte_cache_is_per_physical_plan() -> Result<()> {
207205
.await?;
208206
let physical_plan = second.create_physical_plan().await?;
209207
let plan = displayable(physical_plan.as_ref()).indent(true).to_string();
210-
assert_contains!(&plan, "MaterializedCteExec");
208+
assert_contains!(&plan, "MaterializeExec");
211209
let results = second.collect().await?;
212210
let expected = ["+---+", "| a |", "+---+", "| 2 |", "+---+"];
213211
assert_batches_eq!(expected, &results);
@@ -242,7 +240,7 @@ async fn materialized_cte_reader_preserves_producer_statistics() -> Result<()> {
242240
type Error = datafusion::error::DataFusionError;
243241

244242
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
245-
if plan.is::<MaterializedCteReaderExec>() {
243+
if plan.is::<MaterializedScanExec>() {
246244
self.reader_rows
247245
.push(plan.partition_statistics(None)?.num_rows);
248246
}
@@ -361,7 +359,7 @@ async fn volatile_cte_is_materialized() -> Result<()> {
361359
.await?;
362360
let physical_plan = df.create_physical_plan().await?;
363361
let plan = displayable(physical_plan.as_ref()).indent(true).to_string();
364-
assert_contains!(&plan, "MaterializedCteExec");
362+
assert_contains!(&plan, "MaterializeExec");
365363

366364
// Verify the values are actually the same (materialized = one evaluation)
367365
let results = ctx

0 commit comments

Comments
 (0)