Skip to content

Conversation

pepijnve
Copy link
Contributor

@pepijnve pepijnve commented Oct 14, 2025

Which issue does this PR close?

Rationale for this change

When CaseExpr calls PhysicalExpr::evaluate_selection that function will filter the entire input RecordBatch before calling PhysicalExpr::evaluate. This filtering filters all columns of the RecordBatch, even ones that will not be accessed by the PhysicalExpr. For wide record batches and narrow expressions it can be beneficial to project the record batch first to reduce the amount of wasted filtering work.

What changes are included in this PR?

  • Adds a ProjectedExpr type which projects incoming record batches and then evaluates a project version of the original PhysicalExpr

Are these changes tested?

  • Covered by existing tests
  • Micro benchmark for case has been extended

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Oct 14, 2025
@pepijnve
Copy link
Contributor Author

pepijnve commented Oct 14, 2025

I hadn't considered physical expression serialisation in my implementation. Perhaps it would be better to hide the projection logic inside CaseExpr entirely?

@pepijnve
Copy link
Contributor Author

apache/arrow-rs#8591 should help mitigate the overhead of the RecordBatch#project call.

@pepijnve
Copy link
Contributor Author

@alamb this one is the next episode in my quest to squeeze more performance out of case. The TL;DR is that we project away unneeded columns from the record batch to avoid work during filtering. I thought this would be most elegant as a decorator expression, but that's immediately visible externally and as a consequence would need serialisation support. The alternative is that I pull this into CaseExpr, perhaps as a distinct (or wrapping) evaluation mode? Any opinion on the direction in which to proceed?

@alamb
Copy link
Contributor

alamb commented Oct 15, 2025

@alamb this one is the next episode in my quest to squeeze more performance out of case. The TL;DR is that we project away unneeded columns from the record batch to avoid work during filtering. I thought this would be most elegant as a decorator expression, but that's immediately visible externally and as a consequence would need serialisation support. The alternative is that I pull this into CaseExpr, perhaps as a distinct (or wrapping) evaluation mode? Any opinion on the direction in which to proceed?

LOVE IT! I will continue to help support you / review these PRs. I love a good set of performance optimizations

I am also going to start tracking the collection here as a larger theme

@alamb
Copy link
Contributor

alamb commented Oct 15, 2025

There is similar code for filtering here (namely that evaluates the filter expression first, and then only calles filter with columns that are needed)

predicate
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match (as_boolean_array(&array), projection) {
// Apply filter array to record batch
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
let projected_batch = RecordBatch::try_new(
Arc::clone(output_schema),
projected_columns,
)?;
filter_record_batch(&projected_batch, filter_array)?
}
(Err(_), _) => {
return internal_err!(
"Cannot create filter_array from non-boolean predicates"
);
}
})
})

@pepijnve
Copy link
Contributor Author

pepijnve commented Oct 15, 2025

There is similar code for filtering here (namely that evaluates the filter expression first, and then only calles filter with columns that are needed)

This touches on one of the things I was struggling with a bit working in the PhysicalExpr domain rather than the ExecutionPlan domain where the code you pointed to lives. While each ExecutionPlan is aware of its own input and output schema, PhysicalExpr is not. Instead the Schema is passed in as argument to functions like nullable. And for evaluate specifically, you get it via the RecordBatch. The consequence is that I have to RecordBatch::project which ends up deriving the same schema on every invocation. I wasn't sure how we could fix this.

I already need the schema anyway in order to decide if it makes sense to project or not. One simple solution is to just keep a reference to that one. But things get a bit weird when a PhysicalExpr has a reference to a schema but also receives one externally when nullable and friends are called.

@alamb
Copy link
Contributor

alamb commented Oct 15, 2025

I already need the schema anyway in order to decide if it makes sense to project or not. One simple solution is to just keep a reference to that one. But things get a bit weird when a PhysicalExpr has a reference to a schema but also receives one externally when nullable and friends are called.

At somepoint in the past we discussed some sort of "precompile" step for PhysicalExprs -- which was invoked right before the plan started executing. The usecase as I recall was to compile regexp once (rather than per batch). Maybe it is time to do that now 🤔

There is more background here if you are interested

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-expr Changes to the physical-expr crates

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants