Skip to content

Commit 3041e90

Browse files
committed
feat: Add evaluate_to_arrays function
1 parent 607325a commit 3041e90

File tree

12 files changed

+41
-78
lines changed

12 files changed

+41
-78
lines changed

datafusion/physical-expr-common/src/utils.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::tree_node::ExprContext;
2222

2323
use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
2424
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
25+
use arrow::record_batch::RecordBatch;
2526
use datafusion_common::Result;
2627
use datafusion_expr_common::sort_properties::ExprProperties;
2728

@@ -91,6 +92,19 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
9192
Ok(make_array(data))
9293
}
9394

95+
/// Evaluates expressions against a record batch.
96+
#[inline]
97+
pub fn evaluate_expressions_to_arrays(
98+
exprs: &[Arc<dyn PhysicalExpr>],
99+
batch: &RecordBatch,
100+
) -> Result<Vec<ArrayRef>> {
101+
let num_rows = batch.num_rows();
102+
exprs
103+
.iter()
104+
.map(|e| e.evaluate(batch).and_then(|col| col.into_array(num_rows)))
105+
.collect::<Result<Vec<ArrayRef>>>()
106+
}
107+
94108
#[cfg(test)]
95109
mod tests {
96110
use std::sync::Arc;

datafusion/physical-expr/src/window/standard_window_function_expr.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::record_batch::RecordBatch;
2323
use datafusion_common::Result;
2424
use datafusion_expr::{LimitEffect, PartitionEvaluator};
2525

26+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
2627
use std::any::Any;
2728
use std::sync::Arc;
2829

@@ -57,13 +58,7 @@ pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug {
5758
///
5859
/// Typically, the resulting vector is a single element vector.
5960
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
60-
self.expressions()
61-
.iter()
62-
.map(|e| {
63-
e.evaluate(batch)
64-
.and_then(|v| v.into_array(batch.num_rows()))
65-
})
66-
.collect()
61+
evaluate_expressions_to_arrays(&self.expressions(), batch)
6762
}
6863

6964
/// Create a [`PartitionEvaluator`] for evaluating the function on

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion_expr::window_state::{
4141
use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame, WindowFrameBound};
4242
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
4343

44+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
4445
use indexmap::IndexMap;
4546

4647
/// Common trait for [window function] implementations
@@ -90,13 +91,7 @@ pub trait WindowExpr: Send + Sync + Debug {
9091
/// Evaluate the window function arguments against the batch and return
9192
/// array ref, normally the resulting `Vec` is a single element one.
9293
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
93-
self.expressions()
94-
.iter()
95-
.map(|e| {
96-
e.evaluate(batch)
97-
.and_then(|v| v.into_array(batch.num_rows()))
98-
})
99-
.collect()
94+
evaluate_expressions_to_arrays(&self.expressions(), batch)
10095
}
10196

10297
/// Evaluate the window function values against the batch

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use datafusion_physical_expr_common::sort_expr::{
5353
};
5454

5555
use datafusion_expr::utils::AggregateOrderSensitivity;
56+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
5657
use itertools::Itertools;
5758

5859
pub mod group_values;
@@ -1346,25 +1347,14 @@ pub fn finalize_aggregation(
13461347
}
13471348
}
13481349

1349-
/// Evaluates expressions against a record batch.
1350-
fn evaluate(
1351-
expr: &[Arc<dyn PhysicalExpr>],
1352-
batch: &RecordBatch,
1353-
) -> Result<Vec<ArrayRef>> {
1354-
expr.iter()
1355-
.map(|expr| {
1356-
expr.evaluate(batch)
1357-
.and_then(|v| v.into_array(batch.num_rows()))
1358-
})
1359-
.collect()
1360-
}
1361-
1362-
/// Evaluates expressions against a record batch.
1350+
/// Evaluates groups of expressions against a record batch.
13631351
pub fn evaluate_many(
13641352
expr: &[Vec<Arc<dyn PhysicalExpr>>],
13651353
batch: &RecordBatch,
13661354
) -> Result<Vec<Vec<ArrayRef>>> {
1367-
expr.iter().map(|expr| evaluate(expr, batch)).collect()
1355+
expr.iter()
1356+
.map(|expr| evaluate_expressions_to_arrays(expr, batch))
1357+
.collect()
13681358
}
13691359

13701360
fn evaluate_optional(

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ use std::borrow::Cow;
3333
use std::sync::Arc;
3434
use std::task::{Context, Poll};
3535

36+
use super::AggregateExec;
3637
use crate::filter::batch_filter;
3738
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
39+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
3840
use futures::stream::{Stream, StreamExt};
3941

40-
use super::AggregateExec;
41-
4242
/// stream struct for aggregation without grouping columns
4343
pub(crate) struct AggregateStream {
4444
stream: BoxStream<'static, Result<RecordBatch>>,
@@ -219,13 +219,8 @@ fn aggregate_batch(
219219
None => Cow::Borrowed(&batch),
220220
};
221221

222-
let n_rows = batch.num_rows();
223-
224222
// 1.3
225-
let values = expr
226-
.iter()
227-
.map(|e| e.evaluate(&batch).and_then(|v| v.into_array(n_rows)))
228-
.collect::<Result<Vec<_>>>()?;
223+
let values = evaluate_expressions_to_arrays(expr, &batch)?;
229224

230225
// 1.4
231226
let size_pre = accum.size();

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
7777

7878
use ahash::RandomState;
7979
use datafusion_physical_expr_common::physical_expr::fmt_sql;
80+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8081
use futures::TryStreamExt;
8182
use parking_lot::Mutex;
8283

@@ -1467,13 +1468,7 @@ async fn collect_left_input(
14671468
BooleanBufferBuilder::new(0)
14681469
};
14691470

1470-
let left_values = on_left
1471-
.iter()
1472-
.map(|c| {
1473-
c.evaluate(&single_batch)?
1474-
.into_array(single_batch.num_rows())
1475-
})
1476-
.collect::<Result<Vec<_>>>()?;
1471+
let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;
14771472

14781473
// Compute bounds for dynamic filter if enabled
14791474
let bounds = match bounds_accumulators {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use datafusion_common::{
5151
use datafusion_physical_expr::PhysicalExprRef;
5252

5353
use ahash::RandomState;
54+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
5455
use futures::{ready, Stream, StreamExt};
5556

5657
/// Represents build-side of hash join.
@@ -448,11 +449,7 @@ impl HashJoinStream {
448449
}
449450
Some(Ok(batch)) => {
450451
// Precalculate hash values for fetched batch
451-
let keys_values = self
452-
.on_right
453-
.iter()
454-
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
455-
.collect::<Result<Vec<_>>>()?;
452+
let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?;
456453

457454
self.hashes_buffer.clear();
458455
self.hashes_buffer.resize(batch.num_rows(), 0);

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
7878
use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
7979

8080
use ahash::RandomState;
81+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8182
use futures::{ready, Stream, StreamExt};
8283
use parking_lot::Mutex;
8384

@@ -1066,14 +1067,8 @@ fn lookup_join_hashmap(
10661067
hashes_buffer: &mut Vec<u64>,
10671068
deleted_offset: Option<usize>,
10681069
) -> Result<(UInt64Array, UInt32Array)> {
1069-
let keys_values = probe_on
1070-
.iter()
1071-
.map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
1072-
.collect::<Result<Vec<_>>>()?;
1073-
let build_join_values = build_on
1074-
.iter()
1075-
.map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
1076-
.collect::<Result<Vec<_>>>()?;
1070+
let keys_values = evaluate_expressions_to_arrays(probe_on, probe_batch)?;
1071+
let build_join_values = evaluate_expressions_to_arrays(build_on, build_batch)?;
10771072

10781073
hashes_buffer.clear();
10791074
hashes_buffer.resize(probe_batch.num_rows(), 0);

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ use datafusion_physical_expr::{
7575
};
7676

7777
use datafusion_physical_expr_common::datum::compare_op_for_nested;
78+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
7879
use futures::future::{BoxFuture, Shared};
7980
use futures::{ready, FutureExt};
8081
use parking_lot::Mutex;
@@ -1675,10 +1676,7 @@ pub fn update_hash(
16751676
fifo_hashmap: bool,
16761677
) -> Result<()> {
16771678
// evaluate the keys
1678-
let keys_values = on
1679-
.iter()
1680-
.map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
1681-
.collect::<Result<Vec<_>>>()?;
1679+
let keys_values = evaluate_expressions_to_arrays(on, batch)?;
16821680

16831681
// calculate the hash values
16841682
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;

datafusion/physical-plan/src/projection.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub use datafusion_physical_expr::projection::{
5757
update_expr, ProjectionExpr, ProjectionExprs,
5858
};
5959

60+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
6061
use futures::stream::{Stream, StreamExt};
6162
use log::trace;
6263

@@ -353,14 +354,7 @@ impl ProjectionStream {
353354
fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
354355
// Records time on drop
355356
let _timer = self.baseline_metrics.elapsed_compute().timer();
356-
let arrays = self
357-
.expr
358-
.iter()
359-
.map(|expr| {
360-
expr.evaluate(batch)
361-
.and_then(|v| v.into_array(batch.num_rows()))
362-
})
363-
.collect::<Result<Vec<_>>>()?;
357+
let arrays = evaluate_expressions_to_arrays(&self.expr, batch)?;
364358

365359
if arrays.is_empty() {
366360
let options =

0 commit comments

Comments
 (0)