diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index b10761a5fe816..5f96d70e83a54 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -729,6 +729,17 @@ config_namespace! {
/// tables with a highly-selective join filter, but is also slightly slower.
pub enforce_batch_size_in_joins: bool, default = false
+ /// (experimental) When enabled, `FilterExec` adaptively reorders the
+ /// conjuncts of a conjunctive predicate at runtime. It measures each
+ /// conjunct's selectivity and evaluation cost on the rows that reach it
+ /// and runs the conjuncts that discard the most rows per unit of CPU
+ /// time first, so cheap-and-selective predicates gate expensive ones.
+ /// Reordering never changes query results (only the evaluation order of
+ /// a conjunction) but can change observable side effects of fallible
+ /// predicates, so it is off by default. Predicates containing volatile
+ /// expressions are never reordered.
+ pub adaptive_filter_reordering: bool, default = false
+
/// Size (bytes) of data buffer DataFusion uses when writing output files.
/// This affects the size of the data chunks that are uploaded to remote
/// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
diff --git a/datafusion/physical-expr-common/src/adaptive/arbiter.rs b/datafusion/physical-expr-common/src/adaptive/arbiter.rs
new file mode 100644
index 0000000000000..e5e702d65702f
--- /dev/null
+++ b/datafusion/physical-expr-common/src/adaptive/arbiter.rs
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared champion/challenger coordination for adaptive filter execution.
+//!
+//! An *arrangement* is whatever a consumer can execute a batch of filters
+//! with: the evaluation **order** of a conjunction (an adaptive
+//! `FilterExec`), the **placement** of each filter as a row filter or a
+//! post-scan filter (an adaptive parquet scan deciding late
+//! materialization), or any other executable configuration. The arbiter is
+//! deliberately ignorant of what the arrangement *means* — it only
+//! coordinates the experiment that decides between two of them:
+//!
+//! - the **champion** is the arrangement validated by the most recent
+//! adopted trial, broadcast through a lock-free epoch so every stream
+//! notices an adoption with one atomic load per batch;
+//! - a **trial** pits one candidate against the incumbent. Streams run the
+//! two arms on consecutive batches, time them end-to-end, and submit each
+//! pair's `ln(candidate / incumbent)` cost ratio (pairing cancels cold
+//! caches and concurrent-query interference, which otherwise dwarf the
+//! arms' true difference). The trial is shared: all streams feed the same
+//! ledger, so streams shorter than a whole trial still produce a verdict;
+//! - the candidate is **adopted** only if its mean log-ratio is below zero
+//! with statistical confidence; ties favour the incumbent. A **rejected**
+//! candidate is remembered so the consumer can avoid re-running a lost
+//! experiment.
+//!
+//! What to propose, when to measure, and how to schedule re-checks is the
+//! consumer's policy; the arbiter only guarantees that arrangements change
+//! solely on a measured, shared, statistically separated win.
+
+use std::sync::atomic::{AtomicU64, Ordering};
+
+use parking_lot::Mutex;
+
+use super::stats::SelectivityStats;
+
+/// Paired samples before a trial verdict. The challenger must beat the
+/// incumbent with confidence within this many pairs or it is rejected (ties
+/// favour the incumbent).
+pub const TRIAL_PAIRS: u64 = 8;
+/// Confidence multiplier for the interval on the mean paired log-ratio
+/// (~95% two-sided at 2.0).
+const CONFIDENCE_Z: f64 = 2.0;
+
+/// The arbiter's answer to a submitted trial pair.
+#[derive(Debug, Clone, PartialEq)]
+pub enum TrialUpdate {
+ /// The trial needs more pairs.
+ Running,
+ /// This pair concluded the trial: the candidate won and is now the
+ /// champion (the epoch has been bumped for the other streams).
+ Adopted(A),
+ /// This pair concluded the trial: the incumbent stands, and the
+ /// candidate is remembered as rejected.
+ Rejected(A),
+ /// The trial this pair belonged to no longer exists (it concluded on
+ /// another stream, or a different candidate's trial replaced it).
+ Superseded,
+}
+
+/// A challenger arrangement under end-to-end A/B test against the incumbent.
+#[derive(Debug)]
+struct Trial {
+ candidate: A,
+ /// Welford accumulator over per-pair `ln(candidate / incumbent)` cost
+ /// ratios. Negative mean means the candidate is faster.
+ pairs: SelectivityStats,
+}
+
+#[derive(Debug)]
+struct State {
+ champion: Option,
+ rejected: Option,
+ trial: Option>,
+}
+
+impl Default for State {
+ fn default() -> Self {
+ Self {
+ champion: None,
+ rejected: None,
+ trial: None,
+ }
+ }
+}
+
+/// Champion/challenger coordination shared by every stream of one consumer
+/// (e.g. all partition streams of a `FilterExec`, or all row-group streams of
+/// a parquet scan). See the [module docs](self).
+#[derive(Debug)]
+pub struct AdaptiveArbiter {
+ /// Champion epoch; bumped on every adoption. Kept outside the lock so the
+ /// per-batch staleness check is a single relaxed atomic load.
+ epoch: AtomicU64,
+ state: Mutex>,
+}
+
+impl Default for AdaptiveArbiter {
+ fn default() -> Self {
+ Self {
+ epoch: AtomicU64::new(0),
+ state: Mutex::new(State::default()),
+ }
+ }
+}
+
+impl AdaptiveArbiter {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// The current champion epoch (0 until a first adoption). A stream that
+ /// cached an older value should call [`champion`](Self::champion).
+ pub fn epoch(&self) -> u64 {
+ self.epoch.load(Ordering::Relaxed)
+ }
+
+ /// The current champion, if any stream's trial has adopted one, together
+ /// with the epoch it was read at.
+ pub fn champion(&self) -> (Option, u64) {
+ let state = self.state.lock();
+ // Read the epoch under the lock so a concurrent adoption is not
+ // missed between the load and the clone.
+ (state.champion.clone(), self.epoch.load(Ordering::Relaxed))
+ }
+
+ /// The most recently rejected candidate, for the consumer's
+ /// "is this the same lost experiment re-proposed?" gate.
+ pub fn rejected(&self) -> Option {
+ self.state.lock().rejected.clone()
+ }
+
+ /// Whether a trial is currently in progress.
+ pub fn trial_in_progress(&self) -> bool {
+ self.state.lock().trial.is_some()
+ }
+
+ /// Start a trial for `candidate`, or join the trial already in progress
+ /// (its candidate stands in for the proposal: a verdict on any change
+ /// beats racing experiments against each other). Returns the candidate
+ /// actually under trial.
+ pub fn begin_trial(&self, candidate: A) -> A {
+ let mut state = self.state.lock();
+ match &state.trial {
+ Some(trial) => trial.candidate.clone(),
+ None => {
+ state.trial = Some(Trial {
+ candidate: candidate.clone(),
+ pairs: SelectivityStats::default(),
+ });
+ candidate
+ }
+ }
+ }
+
+ /// Submit one paired observation for `candidate`: `ln_ratio` is
+ /// `ln(candidate cost / incumbent cost)` measured on consecutive batches
+ /// of the same stream (`rows`/`nanos` describe the candidate leg). Returns
+ /// what this pair did to the trial; on [`TrialUpdate::Adopted`] the
+ /// submitting stream has already seen the new champion (its epoch is
+ /// current), while other streams notice via [`epoch`](Self::epoch).
+ pub fn submit_pair(
+ &self,
+ candidate: &A,
+ rows: u64,
+ nanos: u64,
+ ln_ratio: f64,
+ ) -> TrialUpdate {
+ let mut state = self.state.lock();
+ let Some(trial) = &mut state.trial else {
+ return TrialUpdate::Superseded;
+ };
+ if trial.candidate != *candidate {
+ return TrialUpdate::Superseded;
+ }
+ trial.pairs.record(0, rows, nanos, ln_ratio);
+ if trial.pairs.sample_count() < TRIAL_PAIRS {
+ return TrialUpdate::Running;
+ }
+ // Verdict: adopt only if the candidate is faster with confidence —
+ // the whole CI of the mean log-ratio below zero.
+ let adopted = matches!(
+ trial.pairs.confidence_upper_bound(CONFIDENCE_Z),
+ Some(up) if up < 0.0
+ );
+ let trial = state.trial.take().expect("checked above");
+ if adopted {
+ state.champion = Some(trial.candidate.clone());
+ state.rejected = None;
+ self.epoch.fetch_add(1, Ordering::Relaxed);
+ TrialUpdate::Adopted(trial.candidate)
+ } else {
+ state.rejected = Some(trial.candidate.clone());
+ TrialUpdate::Rejected(trial.candidate)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn run_trial(arbiter: &AdaptiveArbiter, candidate: u32, ln_ratio: f64) {
+ assert_eq!(arbiter.begin_trial(candidate), candidate);
+ for i in 0..TRIAL_PAIRS {
+ let update =
+ arbiter.submit_pair(&candidate, 1000, 1000, ln_ratio + i as f64 * 0.01);
+ if i + 1 < TRIAL_PAIRS {
+ assert_eq!(update, TrialUpdate::Running);
+ } else {
+ match update {
+ TrialUpdate::Adopted(a) | TrialUpdate::Rejected(a) => {
+ assert_eq!(a, candidate)
+ }
+ other => panic!("trial must conclude, got {other:?}"),
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn adoption_publishes_champion_and_bumps_epoch() {
+ let arbiter = AdaptiveArbiter::::new();
+ assert_eq!(arbiter.epoch(), 0);
+ run_trial(&arbiter, 7, -10.0); // decisively faster
+ assert_eq!(arbiter.champion(), (Some(7), 1));
+ assert_eq!(arbiter.rejected(), None);
+ }
+
+ #[test]
+ fn rejection_keeps_incumbent_and_remembers_candidate() {
+ let arbiter = AdaptiveArbiter::::new();
+ run_trial(&arbiter, 7, 10.0); // decisively slower
+ assert_eq!(arbiter.champion(), (None, 0));
+ assert_eq!(arbiter.rejected(), Some(7));
+ }
+
+ #[test]
+ fn ties_favour_the_incumbent() {
+ let arbiter = AdaptiveArbiter::::new();
+ // Samples straddle zero with overlapping CI: not a confident win.
+ let candidate = 7;
+ arbiter.begin_trial(candidate);
+ for i in 0..TRIAL_PAIRS {
+ let sample = if i % 2 == 0 { 0.5 } else { -0.5 };
+ let update = arbiter.submit_pair(&candidate, 1000, 1000, sample);
+ if i + 1 == TRIAL_PAIRS {
+ assert_eq!(update, TrialUpdate::Rejected(candidate));
+ }
+ }
+ }
+
+ #[test]
+ fn adoption_clears_the_rejected_memory() {
+ let arbiter = AdaptiveArbiter::::new();
+ run_trial(&arbiter, 7, 10.0);
+ assert_eq!(arbiter.rejected(), Some(7));
+ run_trial(&arbiter, 8, -10.0);
+ assert_eq!(arbiter.rejected(), None);
+ assert_eq!(arbiter.champion(), (Some(8), 1));
+ }
+
+ #[test]
+ fn concurrent_proposals_join_the_active_trial() {
+ let arbiter = AdaptiveArbiter::::new();
+ assert_eq!(arbiter.begin_trial(7), 7);
+ // A second proposal joins the running trial instead of replacing it.
+ assert_eq!(arbiter.begin_trial(9), 7);
+ }
+
+ #[test]
+ fn pairs_for_a_stale_candidate_are_superseded() {
+ let arbiter = AdaptiveArbiter::::new();
+ assert_eq!(
+ arbiter.submit_pair(&7, 1000, 1000, 0.0),
+ TrialUpdate::Superseded
+ );
+ arbiter.begin_trial(7);
+ assert_eq!(
+ arbiter.submit_pair(&9, 1000, 1000, 0.0),
+ TrialUpdate::Superseded
+ );
+ }
+}
diff --git a/datafusion/physical-expr-common/src/adaptive/mod.rs b/datafusion/physical-expr-common/src/adaptive/mod.rs
new file mode 100644
index 0000000000000..a353a7a1ec809
--- /dev/null
+++ b/datafusion/physical-expr-common/src/adaptive/mod.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared substrate for adaptive (measurement-driven) filtering.
+//!
+//! Adaptive filter policies observe how predicates behave at runtime and
+//! re-decide accordingly — the parquet scan adapts filter *placement*
+//! (row-level vs. post-scan vs. dropped), and an adaptive `FilterExec` could
+//! adapt conjunct evaluation *order*. Both need the same ingredients:
+//!
+//! - per-predicate online **selectivity + cost** measurement with confidence
+//! intervals — [`SelectivityStats`];
+//! - a concurrent **registry** keyed by a caller-local [`FilterId`], with
+//! per-predicate skip flags so an optional predicate can be made a no-op
+//! mid-stream — [`AdaptiveStatsRegistry`];
+//! - shared **champion/challenger coordination** over an opaque arrangement
+//! type, so a proposed change (a new conjunct order, a new filter
+//! placement) is adopted only on a measured, statistically separated A/B
+//! win shared across all of a consumer's streams —
+//! [`AdaptiveArbiter`](arbiter::AdaptiveArbiter).
+//!
+//! What stays with each consumer is *policy*: the per-batch effectiveness
+//! metric it feeds in, the proposal it computes over the snapshots, what an
+//! arrangement *is* and how to execute one. This module intentionally
+//! contains no placement or ordering logic.
+
+pub mod arbiter;
+pub mod registry;
+pub mod stats;
+
+pub use arbiter::{AdaptiveArbiter, TRIAL_PAIRS, TrialUpdate};
+pub use registry::AdaptiveStatsRegistry;
+pub use stats::{FilterId, SelectivityStats};
diff --git a/datafusion/physical-expr-common/src/adaptive/registry.rs b/datafusion/physical-expr-common/src/adaptive/registry.rs
new file mode 100644
index 0000000000000..cc30e85eed90e
--- /dev/null
+++ b/datafusion/physical-expr-common/src/adaptive/registry.rs
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Concurrent registry of per-predicate [`SelectivityStats`] plus the
+//! "skip" flags that let an *optional* predicate be turned into a no-op
+//! mid-stream.
+//!
+//! This is shared plumbing, free of any placement/ordering policy. A consumer
+//! [`register`](AdaptiveStatsRegistry::register)s the predicates it tracks,
+//! calls [`record`](AdaptiveStatsRegistry::record) on the per-batch hot path,
+//! and reads back [`snapshot`](AdaptiveStatsRegistry::snapshot)s when it
+//! periodically re-decides (placement, ordering, drop, …).
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use parking_lot::{Mutex, RwLock};
+
+use super::stats::{FilterId, SelectivityStats};
+
+/// Thread-safe map of [`FilterId`] → online [`SelectivityStats`], with
+/// per-predicate skip flags.
+///
+/// # Locking
+///
+/// The outer [`RwLock`] over the stats map is almost always *read*-locked: both
+/// [`record`](Self::record) (hot, per-batch) and the snapshot readers only need
+/// shared access to look up an existing entry. The write lock is taken only by
+/// [`register`](Self::register) when a new [`FilterId`] is first seen — a brief,
+/// infrequent operation.
+///
+/// Each entry is an independent [`Mutex`], so concurrent
+/// `record` calls on *different* predicates proceed in parallel with zero
+/// contention.
+#[derive(Debug, Default)]
+pub struct AdaptiveStatsRegistry {
+ /// Per-predicate selectivity statistics, each individually `Mutex`-guarded.
+ stats: RwLock>>,
+ /// Per-predicate "skip" flags. When set, the consumer treats the predicate
+ /// as a no-op for subsequent batches. Only ever set for predicates whose
+ /// [`SelectivityStats::is_optional`] is `true` — mandatory predicates must
+ /// always execute or queries return wrong rows.
+ skip_flags: RwLock>>,
+}
+
+impl AdaptiveStatsRegistry {
+ /// Create an empty registry.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Register a predicate so future [`record`](Self::record) calls can find
+ /// it. Idempotent: an already-registered id keeps its accumulated stats and
+ /// its existing optional flag.
+ ///
+ /// `is_optional` records whether the predicate may be dropped without
+ /// affecting correctness (see [`SelectivityStats::is_optional`]).
+ pub fn register(&self, id: FilterId, is_optional: bool) {
+ if self.stats.read().contains_key(&id) {
+ return;
+ }
+ let mut stats = self.stats.write();
+ stats
+ .entry(id)
+ .or_insert_with(|| Mutex::new(SelectivityStats::new(is_optional)));
+ self.skip_flags
+ .write()
+ .entry(id)
+ .or_insert_with(|| Arc::new(AtomicBool::new(false)));
+ }
+
+ /// Register many predicates at once (see [`register`](Self::register)).
+ pub fn register_all(&self, entries: impl IntoIterator- ) {
+ let mut stats = self.stats.write();
+ let mut flags = self.skip_flags.write();
+ for (id, is_optional) in entries {
+ stats
+ .entry(id)
+ .or_insert_with(|| Mutex::new(SelectivityStats::new(is_optional)));
+ flags
+ .entry(id)
+ .or_insert_with(|| Arc::new(AtomicBool::new(false)));
+ }
+ }
+
+ /// Record one batch of observations for `id` (per-batch hot path).
+ ///
+ /// Takes only a shared lock on the map plus the per-predicate mutex, so it
+ /// never contends with `record` calls on other predicates. A no-op if `id`
+ /// was never [`register`](Self::register)ed.
+ pub fn record(
+ &self,
+ id: FilterId,
+ matched: u64,
+ total: u64,
+ eval_nanos: u64,
+ effectiveness_sample: f64,
+ ) {
+ let map = self.stats.read();
+ if let Some(entry) = map.get(&id) {
+ entry
+ .lock()
+ .record(matched, total, eval_nanos, effectiveness_sample);
+ }
+ }
+
+ /// Merge a locally-accumulated delta into `id`'s shared stats (see
+ /// [`SelectivityStats::merge`]). A no-op if `id` was never
+ /// [`register`](Self::register)ed.
+ ///
+ /// Lets a consumer record per-batch observations into a private
+ /// accumulator with no locking and fold them in here periodically.
+ pub fn merge(&self, id: FilterId, delta: &SelectivityStats) {
+ let map = self.stats.read();
+ if let Some(entry) = map.get(&id) {
+ entry.lock().merge(delta);
+ }
+ }
+
+ /// Copy out the current stats for `id`, or `None` if unregistered.
+ ///
+ /// [`SelectivityStats`] is `Copy`, so consumers read every derived metric
+ /// (pass rate, cost-per-row, effectiveness, confidence bounds) off the
+ /// returned value without holding any lock.
+ pub fn snapshot(&self, id: FilterId) -> Option {
+ self.stats.read().get(&id).map(|entry| *entry.lock())
+ }
+
+ /// Clear the accumulated stats for `id`, preserving its optional flag and
+ /// skip flag. Used when a dynamic predicate re-arms under a stable id (see
+ /// [`SelectivityStats::reset`]). A no-op if `id` is unregistered.
+ pub fn reset(&self, id: FilterId) {
+ if let Some(entry) = self.stats.read().get(&id) {
+ entry.lock().reset();
+ }
+ }
+
+ /// The shared skip flag for `id`, registering `id` as optional if it was
+ /// not already present. The returned `Arc` can be cached by an evaluator so
+ /// it can cheaply check the flag without touching the registry.
+ pub fn skip_flag(&self, id: FilterId) -> Arc {
+ if let Some(flag) = self.skip_flags.read().get(&id) {
+ return Arc::clone(flag);
+ }
+ // First sighting: register as optional and create the flag.
+ self.register(id, true);
+ Arc::clone(
+ self.skip_flags
+ .read()
+ .get(&id)
+ .expect("skip flag inserted by register"),
+ )
+ }
+
+ /// Whether `id`'s skip flag is currently set. `false` if unregistered.
+ pub fn is_skipped(&self, id: FilterId) -> bool {
+ self.skip_flags
+ .read()
+ .get(&id)
+ .is_some_and(|flag| flag.load(Ordering::Relaxed))
+ }
+
+ /// Set or clear `id`'s skip flag. A no-op if `id` is unregistered.
+ pub fn set_skipped(&self, id: FilterId, skipped: bool) {
+ if let Some(flag) = self.skip_flags.read().get(&id) {
+ flag.store(skipped, Ordering::Relaxed);
+ }
+ }
+
+ /// Whether `id` has been registered.
+ pub fn contains(&self, id: FilterId) -> bool {
+ self.stats.read().contains_key(&id)
+ }
+
+ /// Number of registered predicates.
+ pub fn len(&self) -> usize {
+ self.stats.read().len()
+ }
+
+ /// Whether no predicates are registered.
+ pub fn is_empty(&self) -> bool {
+ self.stats.read().is_empty()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn record_requires_registration() {
+ let reg = AdaptiveStatsRegistry::new();
+ // recording an unregistered id is a silent no-op
+ reg.record(0, 1, 10, 100, 5.0);
+ assert!(reg.snapshot(0).is_none());
+ assert_eq!(reg.len(), 0);
+
+ reg.register(0, false);
+ reg.record(0, 1, 10, 100, 5.0);
+ let s = reg.snapshot(0).unwrap();
+ assert_eq!(s.pass_rate(), Some(0.1));
+ assert_eq!(s.sample_count(), 1);
+ }
+
+ #[test]
+ fn register_is_idempotent_and_keeps_stats() {
+ let reg = AdaptiveStatsRegistry::new();
+ reg.register(7, false);
+ reg.record(7, 2, 10, 100, 9.0);
+ // re-register must not wipe accumulated stats
+ reg.register(7, true);
+ let s = reg.snapshot(7).unwrap();
+ assert_eq!(s.sample_count(), 1);
+ // optional flag is not flipped by a redundant register
+ assert!(!s.is_optional());
+ }
+
+ #[test]
+ fn register_all_bulk() {
+ let reg = AdaptiveStatsRegistry::new();
+ reg.register_all([(0, false), (1, true), (2, false)]);
+ assert_eq!(reg.len(), 3);
+ assert!(reg.snapshot(1).unwrap().is_optional());
+ assert!(!reg.snapshot(0).unwrap().is_optional());
+ }
+
+ #[test]
+ fn merge_folds_local_delta_into_shared() {
+ let reg = AdaptiveStatsRegistry::new();
+ reg.register(0, false);
+ reg.record(0, 1, 10, 100, 4.0);
+
+ let mut local = SelectivityStats::default();
+ local.record(9, 10, 300, 8.0);
+ reg.merge(0, &local);
+
+ let s = reg.snapshot(0).unwrap();
+ assert_eq!(s.pass_rate(), Some(0.5)); // 10/20
+ assert_eq!(s.sample_count(), 2);
+ assert!((s.effectiveness().unwrap() - 6.0).abs() < 1e-9);
+
+ // merging an unregistered id is a silent no-op
+ reg.merge(42, &local);
+ assert!(reg.snapshot(42).is_none());
+ }
+
+ #[test]
+ fn skip_flag_round_trips_and_is_shared() {
+ let reg = AdaptiveStatsRegistry::new();
+ reg.register(3, true);
+ assert!(!reg.is_skipped(3));
+ let flag = reg.skip_flag(3);
+ reg.set_skipped(3, true);
+ // the cached Arc observes the change made through the registry
+ assert!(flag.load(Ordering::Relaxed));
+ assert!(reg.is_skipped(3));
+ }
+
+ #[test]
+ fn skip_flag_autoregisters_as_optional() {
+ let reg = AdaptiveStatsRegistry::new();
+ let _ = reg.skip_flag(42);
+ assert!(reg.contains(42));
+ assert!(reg.snapshot(42).unwrap().is_optional());
+ }
+
+ #[test]
+ fn reset_clears_stats_keeps_flag() {
+ let reg = AdaptiveStatsRegistry::new();
+ reg.register(1, true);
+ reg.record(1, 5, 10, 100, 3.0);
+ reg.set_skipped(1, true);
+ reg.reset(1);
+ let s = reg.snapshot(1).unwrap();
+ assert_eq!(s.sample_count(), 0);
+ assert_eq!(s.pass_rate(), None);
+ assert!(s.is_optional());
+ // skip flag is independent of stats reset
+ assert!(reg.is_skipped(1));
+ }
+
+ #[test]
+ fn concurrent_records_on_distinct_ids() {
+ use std::thread;
+ let reg = Arc::new(AdaptiveStatsRegistry::new());
+ reg.register_all((0..8).map(|i| (i, false)));
+ thread::scope(|scope| {
+ for id in 0..8usize {
+ let reg = Arc::clone(®);
+ scope.spawn(move || {
+ for _ in 0..1000 {
+ reg.record(id, 1, 2, 10, id as f64);
+ }
+ });
+ }
+ });
+ for id in 0..8usize {
+ let s = reg.snapshot(id).unwrap();
+ assert_eq!(s.sample_count(), 1000);
+ assert_eq!(s.pass_rate(), Some(0.5));
+ assert!((s.effectiveness().unwrap() - id as f64).abs() < 1e-9);
+ }
+ }
+}
diff --git a/datafusion/physical-expr-common/src/adaptive/stats.rs b/datafusion/physical-expr-common/src/adaptive/stats.rs
new file mode 100644
index 0000000000000..877392f7a8b1d
--- /dev/null
+++ b/datafusion/physical-expr-common/src/adaptive/stats.rs
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Online per-predicate selectivity and cost accumulator.
+//!
+//! This is the domain-agnostic measurement substrate shared by the adaptive
+//! filter machinery. It records, for a single predicate (conjunct), the
+//! quantities every adaptive policy needs:
+//!
+//! - **selectivity** — `rows_matched / rows_total` (the *pass rate*),
+//! - **cost** — cumulative `eval_nanos`, from which a per-row cost is derived,
+//! - **effectiveness** — a caller-supplied per-batch scalar, accumulated with
+//! [Welford's online algorithm] so callers can put a confidence interval on
+//! its mean.
+//!
+//! The accumulator deliberately does *not* define what "effectiveness" means or
+//! what to do with these numbers — that is policy, and lives with the consumer:
+//!
+//! - the parquet scan ranks filters by *bytes-saved-per-second* to decide
+//! row-level vs. post-scan **placement**;
+//! - an adaptive `FilterExec` ranks conjuncts by `cost_per_row / (1 - pass_rate)`
+//! to decide evaluation **order**.
+//!
+//! Both feed the same accumulator; only the per-batch sample and the ranking
+//! function differ.
+//!
+//! [Welford's online algorithm]: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
+
+/// Identifier for a tracked predicate, **local to a single
+/// [`AdaptiveStatsRegistry`](super::registry::AdaptiveStatsRegistry)**.
+///
+/// There is no global expression id. Each consumer mints its own ids — in
+/// practice the index of a conjunct in *that consumer's* predicate `Vec` — and
+/// the same numeric value in two different registries refers to two unrelated
+/// predicates. Ids are opaque here so the accumulator works regardless of how a
+/// consumer enumerates its predicates.
+pub type FilterId = usize;
+
+/// Online selectivity + cost statistics for a single predicate expression.
+///
+/// Cheap to copy; a consumer typically owns one behind a per-predicate lock
+/// (see [`AdaptiveStatsRegistry`](super::registry::AdaptiveStatsRegistry)) so
+/// concurrent updates on *different* predicates never contend.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct SelectivityStats {
+ /// Number of rows that passed (matched) the predicate.
+ rows_matched: u64,
+ /// Total number of rows the predicate was evaluated on.
+ rows_total: u64,
+ /// Cumulative evaluation time in nanoseconds.
+ eval_nanos: u64,
+ /// Welford's online algorithm: number of per-batch effectiveness samples.
+ sample_count: u64,
+ /// Welford's online algorithm: running mean of the per-batch effectiveness
+ /// sample.
+ eff_mean: f64,
+ /// Welford's online algorithm: running sum of squared deviations (M2).
+ eff_m2: f64,
+ /// Whether the underlying predicate is *optional* — i.e. may be skipped
+ /// entirely without affecting query results (e.g. a dynamic join filter).
+ ///
+ /// Cached here so the per-batch hot path can decide whether the
+ /// skip/drop logic applies with a single field load, without re-inspecting
+ /// the expression. Mandatory predicates must always execute or queries
+ /// return wrong rows.
+ is_optional: bool,
+}
+
+impl Default for SelectivityStats {
+ fn default() -> Self {
+ Self::new(false)
+ }
+}
+
+impl SelectivityStats {
+ /// Create an empty accumulator. `is_optional` records whether the predicate
+ /// may be dropped without affecting correctness.
+ pub fn new(is_optional: bool) -> Self {
+ Self {
+ rows_matched: 0,
+ rows_total: 0,
+ eval_nanos: 0,
+ sample_count: 0,
+ eff_mean: 0.0,
+ eff_m2: 0.0,
+ is_optional,
+ }
+ }
+
+ /// Whether the predicate may be dropped without affecting correctness.
+ pub fn is_optional(&self) -> bool {
+ self.is_optional
+ }
+
+ /// Record one batch of observations.
+ ///
+ /// - `matched` / `total` — rows that passed / were evaluated this batch.
+ /// - `eval_nanos` — wall time spent evaluating the predicate this batch.
+ /// - `effectiveness_sample` — the caller's per-batch effectiveness metric
+ /// (e.g. bytes-saved-per-second for placement, or any scalar the
+ /// consumer wants confidence intervals on). Its unit is opaque here.
+ ///
+ /// The raw counters are always updated. The Welford accumulator only
+ /// ingests the sample when `total > 0 && eval_nanos > 0` and the sample is
+ /// finite — an empty or zero-time batch is not a meaningful sample.
+ pub fn record(
+ &mut self,
+ matched: u64,
+ total: u64,
+ eval_nanos: u64,
+ effectiveness_sample: f64,
+ ) {
+ self.rows_matched += matched;
+ self.rows_total += total;
+ self.eval_nanos += eval_nanos;
+
+ if total > 0 && eval_nanos > 0 && effectiveness_sample.is_finite() {
+ self.sample_count += 1;
+ let delta = effectiveness_sample - self.eff_mean;
+ self.eff_mean += delta / self.sample_count as f64;
+ let delta2 = effectiveness_sample - self.eff_mean;
+ self.eff_m2 += delta * delta2;
+ }
+ }
+
+ /// Cumulative pass rate `rows_matched / rows_total` in `[0, 1]`.
+ ///
+ /// `None` until at least one row has been evaluated. Lower means more
+ /// selective — `1 - pass_rate()` is the fraction of rows discarded.
+ pub fn pass_rate(&self) -> Option {
+ if self.rows_total == 0 {
+ return None;
+ }
+ Some(self.rows_matched as f64 / self.rows_total as f64)
+ }
+
+ /// Average evaluation cost per row, in nanoseconds.
+ ///
+ /// Unlike [`pass_rate`](Self::pass_rate) this is roughly independent of the
+ /// predicate's position in a conjunction (it is a property of the predicate
+ /// and the data it sees), which makes it the stable term to rank on.
+ /// `None` until at least one row has been evaluated.
+ pub fn cost_per_row_nanos(&self) -> Option {
+ if self.rows_total == 0 {
+ return None;
+ }
+ Some(self.eval_nanos as f64 / self.rows_total as f64)
+ }
+
+ /// Mean of the per-batch effectiveness samples (Welford `eff_mean`).
+ ///
+ /// `None` until at least one sample has been recorded. The unit is whatever
+ /// the consumer fed to [`record`](Self::record); callers should not assume
+ /// it.
+ pub fn effectiveness(&self) -> Option {
+ if self.sample_count == 0 {
+ return None;
+ }
+ Some(self.eff_mean)
+ }
+
+ /// Number of per-batch effectiveness samples recorded so far.
+ pub fn sample_count(&self) -> u64 {
+ self.sample_count
+ }
+
+ /// Sample variance of the per-batch effectiveness samples.
+ ///
+ /// Uses the unbiased (`n - 1`) estimator; `None` with fewer than 2 samples.
+ fn variance(&self) -> Option {
+ if self.sample_count < 2 {
+ return None;
+ }
+ Some(self.eff_m2 / (self.sample_count - 1) as f64)
+ }
+
+ /// Lower bound of a one-sided confidence interval on mean effectiveness:
+ /// `mean - z * stderr`.
+ ///
+ /// `z` is the caller's confidence multiplier (e.g. `2.0` ≈ 97.5% one-sided).
+ /// `None` with fewer than 2 samples.
+ pub fn confidence_lower_bound(&self, z: f64) -> Option {
+ let stderr = (self.variance()? / self.sample_count as f64).sqrt();
+ Some(self.eff_mean - z * stderr)
+ }
+
+ /// Upper bound of a one-sided confidence interval on mean effectiveness:
+ /// `mean + z * stderr`.
+ ///
+ /// `None` with fewer than 2 samples.
+ pub fn confidence_upper_bound(&self, z: f64) -> Option {
+ let stderr = (self.variance()? / self.sample_count as f64).sqrt();
+ Some(self.eff_mean + z * stderr)
+ }
+
+ /// Merge another accumulator's observations into this one, as if every
+ /// batch recorded on `other` had been recorded here.
+ ///
+ /// The Welford state is combined with the standard parallel-variance
+ /// formula (Chan et al.), so mean, variance and the confidence bounds match
+ /// what a single accumulator over the union of samples would report. Lets
+ /// concurrent consumers accumulate locally (lock-free) and periodically
+ /// fold their pending observations into a shared accumulator.
+ ///
+ /// `self.is_optional` is preserved; it is a property of the predicate, not
+ /// of the observations.
+ pub fn merge(&mut self, other: &Self) {
+ self.rows_matched += other.rows_matched;
+ self.rows_total += other.rows_total;
+ self.eval_nanos += other.eval_nanos;
+
+ if other.sample_count == 0 {
+ return;
+ }
+ if self.sample_count == 0 {
+ self.sample_count = other.sample_count;
+ self.eff_mean = other.eff_mean;
+ self.eff_m2 = other.eff_m2;
+ return;
+ }
+ let n1 = self.sample_count as f64;
+ let n2 = other.sample_count as f64;
+ let delta = other.eff_mean - self.eff_mean;
+ self.eff_mean += delta * n2 / (n1 + n2);
+ self.eff_m2 += other.eff_m2 + delta * delta * n1 * n2 / (n1 + n2);
+ self.sample_count += other.sample_count;
+ }
+
+ /// Clear all accumulated observations, preserving [`is_optional`].
+ ///
+ /// Used when a predicate's identity changes underneath a stable
+ /// [`FilterId`] — e.g. a dynamic filter that re-arms with new values — so
+ /// stale measurements don't bias the new predicate.
+ ///
+ /// [`is_optional`]: Self::is_optional
+ pub fn reset(&mut self) {
+ *self = Self::new(self.is_optional);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn empty_stats_report_none() {
+ let s = SelectivityStats::default();
+ assert_eq!(s.pass_rate(), None);
+ assert_eq!(s.cost_per_row_nanos(), None);
+ assert_eq!(s.effectiveness(), None);
+ assert_eq!(s.confidence_lower_bound(2.0), None);
+ assert_eq!(s.sample_count(), 0);
+ assert!(!s.is_optional());
+ }
+
+ #[test]
+ fn pass_rate_and_cost_per_row() {
+ let mut s = SelectivityStats::default();
+ // 2 of 10 rows pass, 1000ns spent.
+ s.record(2, 10, 1000, 0.0);
+ assert_eq!(s.pass_rate(), Some(0.2));
+ assert_eq!(s.cost_per_row_nanos(), Some(100.0));
+ // accumulates across batches
+ s.record(8, 10, 1000, 0.0);
+ assert_eq!(s.pass_rate(), Some(0.5)); // 10/20
+ assert_eq!(s.cost_per_row_nanos(), Some(100.0)); // 2000/20
+ }
+
+ #[test]
+ fn welford_mean_matches_naive_average() {
+ let mut s = SelectivityStats::default();
+ let samples = [10.0, 20.0, 30.0, 40.0];
+ for &x in &samples {
+ s.record(1, 1, 1, x);
+ }
+ assert_eq!(s.sample_count(), 4);
+ let mean = s.effectiveness().unwrap();
+ assert!((mean - 25.0).abs() < 1e-9, "mean was {mean}");
+ }
+
+ #[test]
+ fn welford_variance_matches_naive() {
+ let mut s = SelectivityStats::default();
+ let samples = [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
+ for &x in &samples {
+ s.record(1, 1, 1, x);
+ }
+ // Known sample variance (n-1) of this set is 4.571428...
+ let var = s.variance().unwrap();
+ assert!((var - 32.0 / 7.0).abs() < 1e-9, "variance was {var}");
+ }
+
+ #[test]
+ fn confidence_interval_brackets_mean() {
+ let mut s = SelectivityStats::default();
+ for &x in &[10.0, 12.0, 8.0, 11.0, 9.0] {
+ s.record(1, 1, 1, x);
+ }
+ let mean = s.effectiveness().unwrap();
+ let lo = s.confidence_lower_bound(2.0).unwrap();
+ let hi = s.confidence_upper_bound(2.0).unwrap();
+ assert!(lo < mean && mean < hi, "lo={lo} mean={mean} hi={hi}");
+ // symmetric around the mean
+ assert!(((hi - mean) - (mean - lo)).abs() < 1e-9);
+ }
+
+ #[test]
+ fn ci_needs_two_samples() {
+ let mut s = SelectivityStats::default();
+ s.record(1, 1, 1, 5.0);
+ assert_eq!(s.confidence_lower_bound(2.0), None);
+ assert_eq!(s.confidence_upper_bound(2.0), None);
+ }
+
+ #[test]
+ fn empty_or_zero_time_batches_are_not_samples() {
+ let mut s = SelectivityStats::default();
+ s.record(0, 0, 1000, 5.0); // no rows
+ s.record(5, 10, 0, 5.0); // no time
+ s.record(5, 10, 100, f64::NAN); // non-finite sample
+ assert_eq!(s.sample_count(), 0);
+ // but raw counters still moved where rows/time were present
+ assert_eq!(s.pass_rate(), Some(0.5)); // from the latter two row-bearing batches
+ }
+
+ #[test]
+ fn merge_matches_single_accumulator() {
+ let samples = [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
+ let mut whole = SelectivityStats::default();
+ let mut left = SelectivityStats::default();
+ let mut right = SelectivityStats::default();
+ for (i, &x) in samples.iter().enumerate() {
+ whole.record(1, 2, 10, x);
+ if i < 3 {
+ left.record(1, 2, 10, x);
+ } else {
+ right.record(1, 2, 10, x);
+ }
+ }
+ left.merge(&right);
+ assert_eq!(left.sample_count(), whole.sample_count());
+ assert_eq!(left.pass_rate(), whole.pass_rate());
+ assert_eq!(left.cost_per_row_nanos(), whole.cost_per_row_nanos());
+ let (m1, m2) = (
+ left.effectiveness().unwrap(),
+ whole.effectiveness().unwrap(),
+ );
+ assert!((m1 - m2).abs() < 1e-9, "means {m1} vs {m2}");
+ let (v1, v2) = (left.variance().unwrap(), whole.variance().unwrap());
+ assert!((v1 - v2).abs() < 1e-9, "variances {v1} vs {v2}");
+ }
+
+ #[test]
+ fn merge_with_empty_is_identity() {
+ let mut s = SelectivityStats::default();
+ s.record(2, 10, 100, 7.0);
+ let snapshot = s;
+ s.merge(&SelectivityStats::default());
+ assert_eq!(s, snapshot);
+
+ let mut empty = SelectivityStats::default();
+ empty.merge(&snapshot);
+ assert_eq!(empty, snapshot);
+ }
+
+ #[test]
+ fn reset_preserves_optional_flag() {
+ let mut s = SelectivityStats::new(true);
+ s.record(2, 10, 1000, 7.0);
+ s.reset();
+ assert_eq!(s.sample_count(), 0);
+ assert_eq!(s.pass_rate(), None);
+ assert!(s.is_optional());
+ }
+}
diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs
index b6eaacdca2505..bc53abc993a0c 100644
--- a/datafusion/physical-expr-common/src/lib.rs
+++ b/datafusion/physical-expr-common/src/lib.rs
@@ -30,6 +30,7 @@
//!
//! [DataFusion]:
+pub mod adaptive;
pub mod binary_map;
pub mod binary_view_map;
pub mod datum;
diff --git a/datafusion/physical-plan/src/adaptive_filter.rs b/datafusion/physical-plan/src/adaptive_filter.rs
new file mode 100644
index 0000000000000..be5aaaa432b86
--- /dev/null
+++ b/datafusion/physical-plan/src/adaptive_filter.rs
@@ -0,0 +1,1841 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Runtime-adaptive evaluation of a conjunctive (`AND`) predicate for
+//! [`FilterExec`](crate::filter::FilterExec).
+//!
+//! This is the *ordering* policy that sits on top of the shared measurement
+//! substrate in
+//! [`datafusion_physical_expr_common::adaptive`]. The substrate
+//! ([`AdaptiveStatsRegistry`]) measures per-conjunct selectivity and cost; this
+//! module decides, from those measurements, what order to evaluate the
+//! conjuncts in.
+//!
+//! ## How it evaluates
+//!
+//! The conjuncts are evaluated sequentially, combining their boolean results
+//! with `AND`. The working batch is physically compacted to the surviving rows
+//! only once the accumulated mask becomes selective enough (the same
+//! pre-selection gate [`BinaryExpr`]'s `AND` short-circuit uses) — so a run of
+//! non-selective conjuncts costs only cheap bitwise `AND`s, while a selective
+//! conjunct shrinks the batch the expensive conjuncts after it must decode.
+//! Gating is what makes ordering matter: a conjunct that compacts early saves
+//! every later conjunct work, so the cheap-and-selective ones should run first.
+//! Each conjunct is timed and counted on exactly the rows it evaluated, giving
+//! the *marginal* selectivity and cost on the current working population.
+//!
+//! ## How it reorders: the model proposes, a trial disposes
+//!
+//! Every conjunct accrues a per-batch *effectiveness* sample of **rows
+//! discarded per second** (`(total - matched) * 1e9 / eval_nanos`). Maximising
+//! discards-per-second is exactly minimising `cost_per_row / (1 - pass_rate)`,
+//! the classic optimal ordering key for independent conjuncts — so an expensive
+//! but very selective predicate (e.g. a `LIKE` that keeps one row) correctly
+//! sorts ahead of a cheap but unselective one.
+//!
+//! Crucially, this ranking is only ever a *proposal*. The classic key is
+//! built on assumptions real execution violates: it prices a conjunct's work
+//! as `rows × cost_per_row` even though vectorised kernels do not scale down
+//! without paying for compaction; and the marginal cost-per-row measured on a
+//! handful of survivor rows says little about cost at the front of the order.
+//! An order that looks optimal on paper can run slower than the one the query
+//! author wrote. So a proposed order is never adopted on the model's word: it
+//! is put through an **A/B trial** — batches alternate between the incumbent
+//! and the candidate arrangement, each timed end-to-end — and the candidate
+//! wins only if its measured ns/row beats the incumbent's with statistical
+//! confidence. Ties favour the incumbent; a rejected candidate is remembered
+//! and not re-tried. Adopted orders run via the compact-once loop (the
+//! arrangement the trial actually measured), not re-fused into a chain with
+//! different cost behaviour.
+//!
+//! Proposals themselves are debounced: the learner only settles once every
+//! adjacent pair of the ranking is either statistically certain or a tie
+//! whose order provably cannot matter, and a proposal that does not promise a
+//! material improvement over the incumbent freezes the incumbent unchanged —
+//! interchangeable conjuncts never trigger an experiment at all.
+//!
+//! To stay correct under distribution drift, a frozen evaluator periodically
+//! *re-thaws*: it re-measures a short window and re-proposes. Each re-thaw
+//! that keeps the incumbent backs the next one off exponentially, so a stable
+//! filter is re-checked geometrically less often and steady-state overhead
+//! decays toward zero; a candidate that wins its trial resets the interval so
+//! real drift is caught quickly.
+//!
+//! All partition streams of a `FilterExec` share their measurements and trial
+//! verdicts, so the learning cost is paid once per query, not once per
+//! stream, and a stream that starts late adopts the validated champion on its
+//! first batch.
+//!
+//! ## Known limitation
+//!
+//! The measured selectivity of a conjunct is *conditional* on the conjuncts
+//! ordered before it (it only sees their survivors), so with strongly
+//! correlated predicates the *proposal* can be a local optimum — the trial
+//! protects against adopting a bad one, but a better unexplored order may be
+//! missed. A proper exploration phase (measuring each conjunct's marginal
+//! selectivity on a common population) is future work.
+//!
+//! [`BinaryExpr`]: datafusion_physical_expr::expressions::BinaryExpr
+
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array};
+use arrow::buffer::BooleanBuffer;
+use arrow::compute::kernels::boolean::and;
+use arrow::compute::{filter, filter_record_batch, prep_null_mask_filter};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::instant::Instant;
+use datafusion_expr::Operator;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::utils::split_conjunction;
+use datafusion_physical_expr_common::adaptive::{
+ AdaptiveArbiter, AdaptiveStatsRegistry, SelectivityStats, TrialUpdate,
+};
+use datafusion_physical_expr_common::physical_expr::is_volatile;
+use log::debug;
+
+/// Confidence multiplier for the one-sided interval on effectiveness
+/// (~97.5% one-sided at 2.0).
+const CONFIDENCE_Z: f64 = 2.0;
+/// Minimum per-conjunct samples before the confidence intervals are trusted
+/// enough to make a freeze decision.
+const MIN_SAMPLES_FOR_CI: u64 = 4;
+/// Freeze after this many learning samples even if the order's confidence
+/// intervals still overlap: if the conjuncts cannot be told apart by now, their
+/// relative order does not matter, so stop paying to measure it.
+const MAX_LEARNING_SAMPLES: u64 = 64;
+/// Batches the fast frozen path runs before the first re-thaw check.
+const INITIAL_THAW_INTERVAL: u64 = 64;
+/// Each re-thaw that confirms the order is unchanged multiplies the next
+/// interval by this factor (exponential backoff), so a stable filter is
+/// re-checked geometrically less often and steady-state overhead decays to ~0.
+const THAW_BACKOFF: u64 = 4;
+/// Upper bound on the re-thaw interval.
+const MAX_THAW_INTERVAL: u64 = 16_384;
+/// Batches measured during a re-thaw before re-deciding the order.
+const REMEASURE_WINDOW: u64 = 16;
+/// Measured batches at the start of each measuring phase (learning and
+/// re-measure windows) that run back-to-back, so freeze decisions are
+/// reachable quickly even on short streams.
+const MEASURE_WARMUP: u64 = 8;
+/// After the warmup, measure one batch in this many; the rest evaluate the
+/// fused predicate in the current order with no instrumentation. Bounds the
+/// steady measurement overhead even when the order never resolves.
+const MEASURE_STRIDE: u64 = 8;
+/// Fraction of the conjunction's expected total cost below which a difference
+/// is immaterial. Adjacent conjuncts whose swap could not change the expected
+/// cost by more than this are a *tie*: their relative order does not matter,
+/// so an unresolved tie neither delays freezing nor counts as drift when a
+/// re-thaw re-ranks them.
+const TIE_COST_FRACTION: f64 = 0.05;
+/// Physically compact the working batch to the surviving rows only when the
+/// accumulated mask keeps at most this fraction of them. Above this, the cost
+/// of materializing a barely-smaller batch is not repaid, so we keep evaluating
+/// against the full working batch and just AND the boolean masks — mirroring
+/// the pre-selection gate in `BinaryExpr`'s `AND` short-circuit.
+const COMPACTION_SELECTIVITY_THRESHOLD: f64 = 0.2;
+/// Paired samples before a trial verdict. The challenger must beat the
+/// incumbent with confidence within this many pairs or it is rejected (ties
+/// favour the incumbent).
+/// How an arrangement evaluates its conjuncts.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum Strategy {
+ /// A single left-deep `AND` of [`BinaryExpr`]s — byte-for-byte what
+ /// `FilterExec` runs when the feature is off. The incumbent starts here so
+ /// that doing nothing costs nothing.
+ Fused,
+ /// The compact-once loop ([`eval_conjuncts`]): masks are `AND`-combined
+ /// and the working batch is physically compacted at most when the
+ /// survivors drop below the pre-selection threshold, instead of
+ /// re-filtering and re-scattering at every `AND` level like the fused
+ /// chain does. Orders adopted by a trial run this way, since this is the
+ /// strategy the trial measured.
+ CompactOnce,
+}
+
+/// The executable arrangement an adaptive `FilterExec` decides between: an
+/// evaluation order plus the strategy that runs it. This is the type the
+/// shared [`AdaptiveArbiter`] coordinates trials over; the arbiter itself is
+/// arrangement-agnostic (a parquet scan would put filter *placement* here
+/// instead).
+#[derive(Debug, Clone, PartialEq)]
+struct Arrangement {
+ /// Evaluation order: indices into the conjunct list.
+ order: Vec,
+ /// How the order is executed.
+ strategy: Strategy,
+}
+
+/// Lifecycle of the adaptive evaluator.
+#[derive(Debug)]
+enum Phase {
+ /// Measuring batches (warmup, then strided), building confidence in the
+ /// per-conjunct statistics that *propose* a candidate order.
+ Learning,
+ /// Participating in the shared A/B trial (see [`AdaptiveArbiter`]):
+ /// alternating arms on consecutive batches and submitting paired
+ /// end-to-end timings. Only a measured win changes the order — the
+ /// per-conjunct cost model proposes, but never decides.
+ Trial {
+ /// The candidate arrangement, copied out of the shared trial so the
+ /// hot path does not lock to learn what to evaluate.
+ candidate: Arrangement,
+ /// ns/row of the incumbent leg of the current pair, once run.
+ pending_incumbent: Option,
+ /// Freeze interval to use if the candidate is rejected (carries this
+ /// stream's re-thaw backoff); an adopted candidate resets it.
+ interval_if_rejected: u64,
+ },
+ /// Order settled; the incumbent arrangement is evaluated with no
+ /// measurement until `thaw_at` batches have been processed.
+ Frozen {
+ /// Batch count at which to re-measure.
+ thaw_at: u64,
+ /// Interval that produced `thaw_at`; grows on each confirmation.
+ interval: u64,
+ },
+ /// Briefly measuring again after a thaw, to detect distribution drift.
+ Remeasuring {
+ /// Measured batches left in the re-measurement window.
+ window_left: u64,
+ /// The frozen interval before this thaw (for backoff bookkeeping).
+ interval: u64,
+ },
+}
+
+/// State shared by every partition stream of one `FilterExec`: the
+/// per-conjunct measurement registry plus the arrangement arbiter, so the
+/// streams learn as one and one stream's completed trial serves all of them.
+#[derive(Debug, Default)]
+pub(crate) struct AdaptiveFilterShared {
+ /// Per-conjunct selectivity/cost stats, keyed by conjunct index.
+ stats: AdaptiveStatsRegistry,
+ /// Champion/challenger coordination over [`Arrangement`]s.
+ arbiter: AdaptiveArbiter,
+}
+
+impl AdaptiveFilterShared {
+ pub(crate) fn new() -> Self {
+ Self::default()
+ }
+}
+
+/// Adaptive evaluator for a single conjunctive predicate.
+///
+/// Owned per partition stream (single-threaded), so all state — order, stats,
+/// and lifecycle [`Phase`] — is held directly and mutated through `&mut self`
+/// with no locking. The stats are a plain `Vec` indexed by conjunct id (ids are
+/// dense `0..n`), so the per-batch hot path is a direct index, not a locked map
+/// lookup.
+///
+/// What *is* shared is the measurements and the trial verdicts: per-batch
+/// observations accumulate in the stream-local `stats` (lock-free) and are
+/// folded into the [`AdaptiveFilterShared`] registry whenever this stream
+/// makes a decision, so N streams converge ~N× faster; and the first stream
+/// whose trial validates (or rejects) a candidate publishes the verdict so the
+/// others adopt it without re-running the experiment. Drift decisions
+/// (re-thaw windows) deliberately use only the local fresh window — the shared
+/// accumulator is a long-run prior and would dilute a distribution change.
+#[derive(Debug)]
+pub(crate) struct AdaptiveConjunction {
+ /// The conjuncts. Stats/order indices refer to positions in this `Vec`.
+ conjuncts: Vec>,
+ /// Per-conjunct observations accumulated locally since the last flush to
+ /// [`shared`](Self::shared), indexed by conjunct id.
+ stats: Vec,
+ /// Measurements and verdicts shared by every partition stream of the same
+ /// `FilterExec`.
+ shared: Arc,
+ /// Champion epoch this stream has caught up to (see
+ /// [`AdaptiveFilterShared::epoch`]).
+ epoch_seen: u64,
+ /// Incumbent evaluation order: indices into [`conjuncts`](Self::conjuncts).
+ order: Vec,
+ /// Incumbent evaluation strategy. Starts as [`Strategy::Fused`] over the
+ /// written order (identical to the feature being off); becomes
+ /// [`Strategy::CompactOnce`] when a trial adopts a new arrangement.
+ strategy: Strategy,
+ /// The conjuncts fused into a left-deep `AND` in [`order`](Self::order),
+ /// used by [`Strategy::Fused`] and by unmeasured learning batches.
+ /// Rebuilt when the order changes.
+ fused: Arc,
+ /// Total batches processed; drives the re-thaw schedule.
+ batches: u64,
+ /// Measured batches in the current measuring phase; drives the
+ /// warmup-then-stride measurement schedule.
+ measured: u64,
+ /// Current lifecycle phase.
+ phase: Phase,
+}
+
+impl AdaptiveConjunction {
+ /// Build an adaptive evaluator for `predicate`, or `None` if adaptive
+ /// reordering does not apply:
+ ///
+ /// - `enabled` is false (the config flag is off);
+ /// - the predicate has fewer than two `AND` conjuncts (nothing to reorder);
+ /// - any conjunct is volatile (reordering could change results).
+ ///
+ /// `shared` is the state common to all partition streams of the owning
+ /// `FilterExec`; the conjuncts are registered in it under their indices.
+ pub(crate) fn try_new(
+ predicate: &Arc,
+ enabled: bool,
+ shared: Arc,
+ ) -> Option {
+ if !enabled {
+ return None;
+ }
+ let conjuncts: Vec> = split_conjunction(predicate)
+ .into_iter()
+ .map(Arc::clone)
+ .collect();
+ if conjuncts.len() < 2 {
+ return None;
+ }
+ if conjuncts.iter().any(is_volatile) {
+ return None;
+ }
+
+ shared
+ .stats
+ .register_all((0..conjuncts.len()).map(|id| (id, false)));
+ let stats = vec![SelectivityStats::new(false); conjuncts.len()];
+ let order: Vec = (0..conjuncts.len()).collect();
+ let fused = fuse(&conjuncts, &order);
+
+ Some(Self {
+ conjuncts,
+ stats,
+ shared,
+ epoch_seen: 0,
+ order,
+ strategy: Strategy::Fused,
+ fused,
+ batches: 0,
+ measured: 0,
+ phase: Phase::Learning,
+ })
+ }
+
+ /// Evaluate the conjunction against `batch`, returning the boolean mask
+ /// (over the batch's original rows) of rows that passed every conjunct.
+ ///
+ /// While [`Learning`](Phase::Learning) or [`Remeasuring`](Phase::Remeasuring),
+ /// a warmup-then-strided subset of batches is evaluated and measured
+ /// per-conjunct (see [`evaluate_measured`](Self::evaluate_measured));
+ /// [`Trial`](Phase::Trial) batches alternate between the incumbent and the
+ /// candidate arrangement, timed end-to-end; all other batches — including
+ /// the whole [`Frozen`](Phase::Frozen) phase — evaluate the incumbent with
+ /// no instrumentation.
+ pub(crate) fn evaluate(&mut self, batch: &RecordBatch) -> Result {
+ self.batches += 1;
+
+ // Adopt a champion another stream's trial has validated since we last
+ // looked: a relaxed atomic load per batch, a lock only on change.
+ if self.shared.arbiter.epoch() != self.epoch_seen {
+ self.adopt_shared_champion();
+ }
+
+ // Frozen fast path; when the interval elapses, drop into a fresh
+ // measurement window to check whether the data has drifted.
+ if let Phase::Frozen { thaw_at, interval } = self.phase {
+ if self.batches < thaw_at {
+ return self.evaluate_incumbent(batch);
+ }
+ self.stats.iter_mut().for_each(SelectivityStats::reset);
+ self.measured = 0;
+ self.phase = Phase::Remeasuring {
+ window_left: REMEASURE_WINDOW,
+ interval,
+ };
+ }
+
+ if matches!(self.phase, Phase::Trial { .. }) {
+ return self.evaluate_trial(batch);
+ }
+
+ // Learning or re-measuring: measure a warmup of consecutive batches,
+ // then only one batch per stride, so an order that stays unresolved
+ // costs a bounded fraction of the stream rather than all of it.
+ if self.measured >= MEASURE_WARMUP && !self.batches.is_multiple_of(MEASURE_STRIDE)
+ {
+ return self.evaluate_incumbent(batch);
+ }
+
+ self.measured += 1;
+ let result = self.evaluate_measured(batch)?;
+ self.update_phase();
+ Ok(result)
+ }
+
+ /// Evaluate the incumbent arrangement with no instrumentation.
+ fn evaluate_incumbent(&self, batch: &RecordBatch) -> Result {
+ match self.strategy {
+ Strategy::Fused => self.fused.evaluate(batch)?.into_array(batch.num_rows()),
+ Strategy::CompactOnce => {
+ eval_conjuncts(&self.conjuncts, &self.order, batch, None)
+ }
+ }
+ }
+
+ /// Install the trial-validated champion published by another stream and
+ /// freeze on it: the experiment has already been run, re-running it here
+ /// would only repeat the cost.
+ fn adopt_shared_champion(&mut self) {
+ let (champion, epoch) = self.shared.arbiter.champion();
+ self.epoch_seen = epoch;
+ if let Some(champion) = champion {
+ self.set_order(champion.order);
+ self.strategy = champion.strategy;
+ self.freeze(INITIAL_THAW_INTERVAL);
+ }
+ }
+
+ /// Contribute one batch to the shared A/B trial. Each stream runs the
+ /// incumbent and the candidate on consecutive batches, timing both, and
+ /// submits the pair's `ln(candidate / incumbent)` ns/row ratio; once
+ /// enough pairs accumulate (across all streams) the trial concludes. The
+ /// candidate is adopted only if the mean log-ratio is below zero with
+ /// confidence; ties favour the incumbent.
+ fn evaluate_trial(&mut self, batch: &RecordBatch) -> Result {
+ let Phase::Trial {
+ candidate,
+ pending_incumbent,
+ interval_if_rejected,
+ } = &mut self.phase
+ else {
+ unreachable!("caller checked the phase");
+ };
+ let interval_if_rejected = *interval_if_rejected;
+ let rows = batch.num_rows();
+
+ // Incumbent leg first; no locking on either leg.
+ let Some(incumbent_ns) = *pending_incumbent else {
+ let timer = Instant::now();
+ let result = match self.strategy {
+ Strategy::Fused => {
+ self.fused.evaluate(batch)?.into_array(batch.num_rows())
+ }
+ Strategy::CompactOnce => {
+ eval_conjuncts(&self.conjuncts, &self.order, batch, None)
+ }
+ }?;
+ let nanos = timer.elapsed().as_nanos() as u64;
+ if rows > 0
+ && nanos > 0
+ && let Phase::Trial {
+ pending_incumbent, ..
+ } = &mut self.phase
+ {
+ *pending_incumbent = Some(nanos as f64 / rows as f64);
+ }
+ return Ok(result);
+ };
+
+ // Candidate leg; completes the pair.
+ let candidate = std::mem::replace(
+ candidate,
+ Arrangement {
+ order: Vec::new(),
+ strategy: Strategy::CompactOnce,
+ },
+ );
+ let timer = Instant::now();
+ let result = eval_conjuncts(&self.conjuncts, &candidate.order, batch, None)?;
+ let nanos = timer.elapsed().as_nanos() as u64;
+ let sample = (rows > 0 && nanos > 0)
+ .then(|| (nanos as f64 / rows as f64 / incumbent_ns).ln())
+ .filter(|s| s.is_finite());
+
+ let Some(sample) = sample else {
+ // Not a usable pair; start the next one.
+ self.phase = Phase::Trial {
+ candidate,
+ pending_incumbent: None,
+ interval_if_rejected,
+ };
+ return Ok(result);
+ };
+ match self
+ .shared
+ .arbiter
+ .submit_pair(&candidate, rows as u64, nanos, sample)
+ {
+ TrialUpdate::Running => {
+ self.phase = Phase::Trial {
+ candidate,
+ pending_incumbent: None,
+ interval_if_rejected,
+ };
+ }
+ TrialUpdate::Adopted(champion) => {
+ debug!("adaptive filter trial adopted {champion:?}");
+ self.epoch_seen = self.shared.arbiter.epoch();
+ self.set_order(champion.order);
+ self.strategy = champion.strategy;
+ self.freeze(INITIAL_THAW_INTERVAL);
+ }
+ // Rejected here, or concluded on another stream (an adoption
+ // there is handled by the epoch check): the incumbent stands.
+ TrialUpdate::Rejected(rejected) => {
+ debug!("adaptive filter trial rejected {rejected:?}");
+ self.freeze(interval_if_rejected);
+ }
+ TrialUpdate::Superseded => self.freeze(interval_if_rejected),
+ }
+ Ok(result)
+ }
+
+ /// Evaluate the conjuncts in the incumbent order via the compact-once
+ /// loop, recording each conjunct's marginal selectivity and cost into the
+ /// stream-local stats.
+ fn evaluate_measured(&mut self, batch: &RecordBatch) -> Result {
+ eval_conjuncts(&self.conjuncts, &self.order, batch, Some(&mut self.stats))
+ }
+
+ /// Advance the lifecycle phase after measuring a batch:
+ /// - [`Learning`](Phase::Learning) folds the local observations into the
+ /// shared registry and proposes a candidate once the *shared* stats
+ /// settle the ranking (see [`settled_order`]), so all partition streams
+ /// learn as one;
+ /// - [`Remeasuring`](Phase::Remeasuring) proposes from the *local* fresh
+ /// window only when its window ends (the shared accumulator is a
+ /// long-run prior that would dilute a distribution change), then folds
+ /// the window into the shared registry.
+ ///
+ /// Either way the proposal goes through [`decide`](Self::decide): a
+ /// proposal that does not promise a material improvement freezes the
+ /// incumbent unchanged, and one that does is handed to an A/B
+ /// [`Trial`](Phase::Trial) rather than adopted on the model's word.
+ fn update_phase(&mut self) {
+ match std::mem::replace(&mut self.phase, Phase::Learning) {
+ Phase::Learning => {
+ self.flush_to_shared();
+ let stats = self.shared_snapshot();
+ if let Some(order) = settled_order(&stats) {
+ self.decide(order, &stats, INITIAL_THAW_INTERVAL);
+ }
+ // else: stay in `Learning` (already restored by the replace).
+ }
+ Phase::Remeasuring {
+ window_left,
+ interval,
+ } => {
+ let window_left = window_left - 1;
+ if window_left == 0 {
+ let candidate = rank_by_effectiveness(&self.stats);
+ let stats = std::mem::take(&mut self.stats);
+ let backoff =
+ interval.saturating_mul(THAW_BACKOFF).min(MAX_THAW_INTERVAL);
+ self.decide(candidate, &stats, backoff);
+ self.stats = stats;
+ self.flush_to_shared();
+ } else {
+ self.phase = Phase::Remeasuring {
+ window_left,
+ interval,
+ };
+ }
+ }
+ // Not reachable: frozen and trial batches never take the measured
+ // path. Restore the phase.
+ other => self.phase = other,
+ }
+ }
+
+ /// Act on a proposed `candidate` order, judged against `stats`:
+ ///
+ /// - the incumbent's order (or an immaterial reshuffle of it, per
+ /// [`TIE_COST_FRACTION`]) freezes the incumbent for
+ /// `interval_if_unchanged` batches — ties never trigger an experiment;
+ /// - a candidate the shared verdicts already rejected does the same — the
+ /// experiment has been run and lost;
+ /// - otherwise the candidate enters an A/B [`Trial`](Phase::Trial). The
+ /// per-conjunct stats only ever *propose*; adoption requires the trial's
+ /// measured end-to-end win.
+ fn decide(
+ &mut self,
+ candidate: Vec,
+ stats: &[SelectivityStats],
+ interval_if_unchanged: u64,
+ ) {
+ let materially_better = expected_cost_per_row(stats, &candidate)
+ < (1.0 - TIE_COST_FRACTION) * expected_cost_per_row(stats, &self.order);
+ if candidate == self.order || !materially_better {
+ self.freeze(interval_if_unchanged);
+ return;
+ }
+ // A candidate whose modelled cost is not materially better than the
+ // last rejected candidate's is the same experiment re-proposed —
+ // with tied conjuncts the ranking reshuffles them freely, and an
+ // exact-match memory would re-run the lost trial forever.
+ if let Some(rejected) = self.shared.arbiter.rejected()
+ && expected_cost_per_row(stats, &candidate)
+ >= (1.0 - TIE_COST_FRACTION)
+ * expected_cost_per_row(stats, &rejected.order)
+ {
+ self.freeze(interval_if_unchanged);
+ return;
+ }
+ // Start the shared trial, or join the one already in progress (its
+ // candidate stands in for ours: proposals are made from the same
+ // shared statistics, and a verdict on any reordering beats racing
+ // experiments against each other).
+ let candidate = self.shared.arbiter.begin_trial(Arrangement {
+ order: candidate,
+ strategy: Strategy::CompactOnce,
+ });
+ self.phase = Phase::Trial {
+ candidate,
+ pending_incumbent: None,
+ interval_if_rejected: interval_if_unchanged,
+ };
+ }
+
+ /// Fold the locally-accumulated observations into the shared registry and
+ /// clear them.
+ fn flush_to_shared(&mut self) {
+ for (id, stats) in self.stats.iter_mut().enumerate() {
+ if stats.sample_count() > 0 || stats.pass_rate().is_some() {
+ self.shared.stats.merge(id, stats);
+ stats.reset();
+ }
+ }
+ }
+
+ /// Copy out the shared stats for every conjunct.
+ fn shared_snapshot(&self) -> Vec {
+ (0..self.conjuncts.len())
+ .map(|id| self.shared.stats.snapshot(id).unwrap_or_default())
+ .collect()
+ }
+
+ /// Install `order` as the incumbent order, rebuilding the fused predicate
+ /// if it changed.
+ fn set_order(&mut self, order: Vec) {
+ if order != self.order {
+ self.order = order;
+ self.fused = fuse(&self.conjuncts, &self.order);
+ }
+ }
+
+ /// Freeze the incumbent, due to re-measure after `interval` batches.
+ fn freeze(&mut self, interval: u64) {
+ self.phase = Phase::Frozen {
+ thaw_at: self.batches + interval,
+ interval,
+ };
+ }
+}
+
+/// Evaluate `conjuncts` in `order` against `batch` via the compact-once loop,
+/// returning the boolean mask (over the batch's original rows) of rows that
+/// passed every conjunct. With `stats`, each conjunct is additionally timed
+/// and counted on exactly the rows it evaluated (its *marginal* selectivity
+/// and cost on the current working population).
+///
+/// The working batch is physically compacted to the surviving rows only once
+/// the accumulated mask becomes selective enough (see
+/// [`COMPACTION_SELECTIVITY_THRESHOLD`]); until then masks are combined with a
+/// cheap bitwise `AND`, so a run of non-selective conjuncts pays no
+/// materialization cost. Unlike the fused `BinaryExpr` chain, survivors stay
+/// compacted across the remaining conjuncts instead of being re-filtered and
+/// re-scattered at every `AND` level.
+///
+/// The bookkeeping is all deferred so the common shapes cost what the fused
+/// chain would: an all-true mask is dropped without an `AND` merge, the
+/// row-index array mapping survivors back to original rows is only
+/// materialized at the first compaction, and the final scatter only happens
+/// if a compaction occurred (otherwise the accumulated mask already covers
+/// the original rows).
+fn eval_conjuncts(
+ conjuncts: &[Arc],
+ order: &[usize],
+ batch: &RecordBatch,
+ mut stats: Option<&mut [SelectivityStats]>,
+) -> Result {
+ let num_rows = batch.num_rows();
+ if num_rows == 0 {
+ return Ok(Arc::new(BooleanArray::from(Vec::::new())));
+ }
+
+ // `working` is the batch conjuncts are evaluated against. `acc` is the
+ // accumulated (`AND`-combined, null-free) result over `working`'s rows
+ // since the last compaction; `None` means all of them are still live.
+ // `live` maps `working`'s rows back to original row indices; `None`
+ // until a compaction first drops rows.
+ let mut working = batch.clone();
+ let mut acc: Option = None;
+ let mut live: Option = None;
+
+ for &id in order {
+ let rows_in = working.num_rows();
+
+ let timer = stats.is_some().then(Instant::now);
+ let array = conjuncts[id].evaluate(&working)?.into_array(rows_in)?;
+ let mask = as_boolean_array(&array)?;
+ // `matched` counts non-null trues (SQL filter semantics).
+ let matched = mask.true_count() as u64;
+
+ if let (Some(stats), Some(timer)) = (stats.as_deref_mut(), timer) {
+ let eval_nanos = timer.elapsed().as_nanos() as u64;
+ let discarded = rows_in as u64 - matched;
+ let sample = if eval_nanos > 0 {
+ discarded as f64 * 1e9 / eval_nanos as f64
+ } else {
+ 0.0
+ };
+ stats[id].record(matched, rows_in as u64, eval_nanos, sample);
+ }
+
+ // An all-true mask leaves the accumulated result untouched.
+ if matched == rows_in as u64 && mask.null_count() == 0 {
+ continue;
+ }
+
+ // Fold this conjunct into the accumulated mask (null -> false).
+ let mask = if mask.null_count() > 0 {
+ prep_null_mask_filter(mask)
+ } else {
+ mask.clone()
+ };
+ let folded = match &acc {
+ None => mask,
+ Some(prev) => and(prev, &mask)?,
+ };
+
+ let alive = folded.true_count();
+ if alive == 0 {
+ // Nothing survives; the result is all-false over the original
+ // rows no matter what the remaining conjuncts say.
+ return Ok(Arc::new(BooleanArray::new(
+ BooleanBuffer::new_unset(num_rows),
+ None,
+ )));
+ }
+ // Compact only when the survivors are a small fraction of the
+ // working batch — otherwise the copy is not worth it.
+ if (alive as f64) <= COMPACTION_SELECTIVITY_THRESHOLD * rows_in as f64 {
+ working = filter_record_batch(&working, &folded)?;
+ let indices = live.take().unwrap_or_else(|| {
+ Arc::new(UInt32Array::from_iter_values(0..num_rows as u32))
+ });
+ live = Some(filter(&indices, &folded)?);
+ acc = None;
+ } else {
+ acc = Some(folded);
+ }
+ }
+
+ match live {
+ // Never compacted: `acc` (or all-true) already covers the
+ // original rows.
+ None => Ok(match acc {
+ Some(acc) => Arc::new(acc),
+ None => Arc::new(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)),
+ }),
+ // Compacted at least once: scatter the surviving original indices
+ // (`live`, narrowed by any residual `acc`) into a full-length mask.
+ Some(indices) => {
+ let indices = match acc {
+ Some(acc) => filter(&indices, &acc)?,
+ None => indices,
+ };
+ let indices = indices
+ .as_any()
+ .downcast_ref::()
+ .expect("u32 live");
+ let mut builder = BooleanBufferBuilder::new(num_rows);
+ builder.append_n(num_rows, false);
+ for &idx in indices.values() {
+ builder.set_bit(idx as usize, true);
+ }
+ Ok(Arc::new(BooleanArray::new(builder.finish(), None)))
+ }
+ }
+}
+
+/// Decide whether `stats` settle the conjunct order enough to freeze.
+/// Returns the order to freeze, or `None` to keep learning.
+///
+/// We freeze once every adjacent pair of the ranking is *resolved* — the
+/// pair is either statistically certain or a tie whose order cannot matter
+/// (see [`order_is_resolved`]) — or once enough samples have accrued that
+/// more measurement is not worth its cost.
+///
+/// Conjuncts that have never received a row (everything upstream of them
+/// was discarded) cannot be measured, but their position cannot matter
+/// either, so they are excluded from the sample-count gates rather than
+/// holding up freezing forever.
+fn settled_order(stats: &[SelectivityStats]) -> Option> {
+ let samples_of_measured = || {
+ stats
+ .iter()
+ .filter(|s| s.pass_rate().is_some())
+ .map(SelectivityStats::sample_count)
+ };
+ if samples_of_measured().min().unwrap_or(0) < MIN_SAMPLES_FOR_CI {
+ return None;
+ }
+ let order = rank_by_effectiveness(stats);
+ if order_is_resolved(stats, &order)
+ || samples_of_measured().max().unwrap_or(0) >= MAX_LEARNING_SAMPLES
+ {
+ Some(order)
+ } else {
+ None
+ }
+}
+
+/// Rank conjunct ids by mean effectiveness (discards-per-second) descending;
+/// ids without samples sort last. Stable, so equal ids keep ascending order.
+fn rank_by_effectiveness(stats: &[SelectivityStats]) -> Vec {
+ let mut ids: Vec = (0..stats.len()).collect();
+ ids.sort_by(
+ |&a, &b| match (stats[a].effectiveness(), stats[b].effectiveness()) {
+ (Some(x), Some(y)) => y.partial_cmp(&x).unwrap_or(std::cmp::Ordering::Equal),
+ (Some(_), None) => std::cmp::Ordering::Less,
+ (None, Some(_)) => std::cmp::Ordering::Greater,
+ (None, None) => std::cmp::Ordering::Equal,
+ },
+ );
+ ids
+}
+
+/// Whether every adjacent pair in `order` is *resolved*: the ranking is
+/// either statistically certain ([`pair_is_certain`]) or provably immaterial
+/// ([`pair_is_tied`]). Only unresolved pairs — distinguishable conjuncts whose
+/// order matters but whose measurements have not separated yet — justify more
+/// measurement.
+fn order_is_resolved(stats: &[SelectivityStats], order: &[usize]) -> bool {
+ let total = expected_cost_per_row(stats, order);
+ let mut weight = 1.0_f64;
+ for pair in order.windows(2) {
+ let (a, b) = (pair[0], pair[1]);
+ if !pair_is_certain(stats, a, b) && !pair_is_tied(stats, a, b, weight, total) {
+ return false;
+ }
+ weight *= stats[a].pass_rate().unwrap_or(1.0);
+ }
+ true
+}
+
+/// Whether `a` ranks above `b` with statistical certainty: their one-sided
+/// effectiveness confidence intervals do not overlap.
+fn pair_is_certain(stats: &[SelectivityStats], a: usize, b: usize) -> bool {
+ match (
+ stats[a].confidence_lower_bound(CONFIDENCE_Z),
+ stats[b].confidence_upper_bound(CONFIDENCE_Z),
+ ) {
+ (Some(lo), Some(up)) => lo >= up,
+ _ => false,
+ }
+}
+
+/// Whether the order of adjacent conjuncts `a` and `b` is immaterial:
+/// swapping them could not change the conjunction's expected cost by more
+/// than [`TIE_COST_FRACTION`] of `total`. `weight` is the fraction of input
+/// rows expected to reach the pair, `total` the conjunction's expected cost
+/// per input row (see [`expected_cost_per_row`]).
+///
+/// Treats the conjuncts' pass rates as independent, like the optimal-order
+/// ranking itself does.
+fn pair_is_tied(
+ stats: &[SelectivityStats],
+ a: usize,
+ b: usize,
+ weight: f64,
+ total: f64,
+) -> bool {
+ // A conjunct that has never received a row is unmeasurable, but rows
+ // only stop reaching it when an upstream conjunct discards everything,
+ // in which case its position cannot matter.
+ let (Some(cost_a), Some(pass_a)) =
+ (stats[a].cost_per_row_nanos(), stats[a].pass_rate())
+ else {
+ return true;
+ };
+ let (Some(cost_b), Some(pass_b)) =
+ (stats[b].cost_per_row_nanos(), stats[b].pass_rate())
+ else {
+ return true;
+ };
+ // Expected cost of `a` before `b` vs `b` before `a`, on the rows that
+ // reach the pair.
+ let gain = weight * ((cost_a + pass_a * cost_b) - (cost_b + pass_b * cost_a)).abs();
+ gain <= TIE_COST_FRACTION * total
+}
+
+/// Expected cost of evaluating the conjuncts in `order`, in nanoseconds per
+/// input row: each conjunct's measured per-row cost, weighted by the
+/// fraction of rows expected to reach it (the product of the pass rates of
+/// the conjuncts ordered before it, treated as independent). Conjuncts with
+/// no measurements contribute nothing — rows never reached them.
+fn expected_cost_per_row(stats: &[SelectivityStats], order: &[usize]) -> f64 {
+ let mut weight = 1.0_f64;
+ let mut total = 0.0_f64;
+ for &id in order {
+ let (Some(cost), Some(pass)) =
+ (stats[id].cost_per_row_nanos(), stats[id].pass_rate())
+ else {
+ continue;
+ };
+ total += weight * cost;
+ weight *= pass;
+ }
+ total
+}
+
+/// Fuse `conjuncts` into a single left-deep `AND` in `order`, so unmeasured
+/// batches evaluate as a normal predicate (inheriting `BinaryExpr`'s
+/// short-circuit and pre-selection).
+fn fuse(conjuncts: &[Arc], order: &[usize]) -> Arc {
+ let mut it = order.iter().map(|&i| Arc::clone(&conjuncts[i]));
+ let first = it.next().expect("at least two conjuncts");
+ it.fold(first, |acc, e| {
+ Arc::new(BinaryExpr::new(acc, Operator::And, e)) as Arc
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use arrow::array::Int32Array;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_expr::Operator;
+ use datafusion_physical_expr::expressions::{binary, col, lit};
+ use datafusion_physical_expr_common::adaptive::TRIAL_PAIRS;
+
+ /// The conjunction arrangement for `order`, as trials propose it.
+ fn arrangement(order: Vec) -> Arrangement {
+ Arrangement {
+ order,
+ strategy: Strategy::CompactOnce,
+ }
+ }
+
+ /// Drive a full trial for `order` through the arbiter's public protocol
+ /// with decisive fake pairs (negative log-ratios adopt, positive reject).
+ fn run_fake_trial(shared: &AdaptiveFilterShared, order: Vec, wins: bool) {
+ let candidate = arrangement(order);
+ assert_eq!(shared.arbiter.begin_trial(candidate.clone()), candidate);
+ for i in 0..TRIAL_PAIRS {
+ let jitter = i as f64 * 0.01;
+ shared.arbiter.submit_pair(
+ &candidate,
+ 1000,
+ 1000,
+ if wins { -10.0 } else { 10.0 } + jitter,
+ );
+ }
+ }
+
+ fn test_batch(schema: &Arc, a: Vec, b: Vec) -> RecordBatch {
+ RecordBatch::try_new(
+ Arc::clone(schema),
+ vec![Arc::new(Int32Array::from(a)), Arc::new(Int32Array::from(b))],
+ )
+ .unwrap()
+ }
+
+ fn schema() -> Arc {
+ Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]))
+ }
+
+ /// `AdaptiveConjunction::try_new` with a fresh, unshared registry.
+ fn try_new_unshared(
+ predicate: &Arc,
+ enabled: bool,
+ ) -> Option {
+ AdaptiveConjunction::try_new(
+ predicate,
+ enabled,
+ Arc::new(AdaptiveFilterShared::new()),
+ )
+ }
+
+ /// `a > 2 AND b < 5`
+ fn predicate(schema: &Arc) -> Arc {
+ let left =
+ binary(col("a", schema).unwrap(), Operator::Gt, lit(2i32), schema).unwrap();
+ let right =
+ binary(col("b", schema).unwrap(), Operator::Lt, lit(5i32), schema).unwrap();
+ binary(left, Operator::And, right, schema).unwrap()
+ }
+
+ fn passing_rows(mask: &ArrayRef) -> Vec {
+ let mask = as_boolean_array(mask).unwrap();
+ (0..mask.len()).filter(|&i| mask.value(i)).collect()
+ }
+
+ #[test]
+ fn single_conjunct_is_not_adaptive() {
+ let schema = schema();
+ let p =
+ binary(col("a", &schema).unwrap(), Operator::Gt, lit(2i32), &schema).unwrap();
+ assert!(try_new_unshared(&p, true).is_none());
+ }
+
+ #[test]
+ fn disabled_is_none() {
+ let schema = schema();
+ assert!(try_new_unshared(&predicate(&schema), false).is_none());
+ }
+
+ #[test]
+ fn matches_plain_conjunction_evaluation() {
+ let schema = schema();
+ let p = predicate(&schema);
+ let mut adaptive = try_new_unshared(&p, true).unwrap();
+
+ let batch = test_batch(
+ &schema,
+ vec![1, 3, 5, 2, 4], // a
+ vec![9, 4, 6, 1, 0], // b
+ );
+ // a > 2 AND b < 5: rows where a in {3,5,2,4} AND b in {<5}
+ // idx0 a=1 -> false
+ // idx1 a=3,b=4 -> true
+ // idx2 a=5,b=6 -> false (b)
+ // idx3 a=2 -> false (a)
+ // idx4 a=4,b=0 -> true
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![1, 4]);
+
+ // Result is independent of the internal order: force a reordering and
+ // re-check on the same data.
+ adaptive.order = vec![1, 0];
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![1, 4]);
+ }
+
+ fn frozen_interval(adaptive: &AdaptiveConjunction) -> Option {
+ match &adaptive.phase {
+ Phase::Frozen { interval, .. } => Some(*interval),
+ _ => None,
+ }
+ }
+
+ /// Upper bound on the batches a measuring phase can span: a warmup, then
+ /// one measured batch per stride until the sample cap.
+ const MEASURE_SPAN: u64 =
+ MEASURE_WARMUP + (MAX_LEARNING_SAMPLES - MEASURE_WARMUP + 1) * MEASURE_STRIDE;
+
+ /// Run batches until frozen (panics if it never freezes within a bound).
+ fn run_until_frozen(adaptive: &mut AdaptiveConjunction, batch: &RecordBatch) {
+ for _ in 0..MEASURE_SPAN + 5 {
+ adaptive.evaluate(batch).unwrap();
+ if matches!(adaptive.phase, Phase::Frozen { .. }) {
+ return;
+ }
+ }
+ panic!("did not freeze");
+ }
+
+ /// From a frozen state, run batches through exactly one thaw → re-measure
+ /// → freeze cycle and return the newly frozen interval.
+ fn run_one_rethaw_cycle(
+ adaptive: &mut AdaptiveConjunction,
+ batch: &RecordBatch,
+ ) -> u64 {
+ let interval = frozen_interval(adaptive).expect("must start frozen");
+ let mut thawed = false;
+ for _ in 0..interval + MEASURE_SPAN + MEASURE_STRIDE + 2 {
+ adaptive.evaluate(batch).unwrap();
+ match (&adaptive.phase, thawed) {
+ (Phase::Frozen { .. }, true) => {
+ return frozen_interval(adaptive).unwrap();
+ }
+ (Phase::Frozen { .. }, false) => {}
+ _ => thawed = true,
+ }
+ }
+ panic!("did not complete a re-thaw cycle");
+ }
+
+ /// b<5 discards almost everything, a>2 discards nothing, so the
+ /// effectiveness CIs separate fast and we freeze on certainty.
+ fn selective_b_batch(schema: &Arc) -> RecordBatch {
+ let a: Vec = (0..1000).map(|_| 100).collect();
+ let b: Vec = (0..1000).map(|i| if i == 0 { 1 } else { 100 }).collect();
+ test_batch(schema, a, b)
+ }
+
+ #[test]
+ fn freezes_on_certainty_and_stops_measuring() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ let batch = selective_b_batch(&schema);
+
+ // The learner proposes promoting b; whether the trial adopts it is a
+ // timing question (not asserted here) — either verdict must freeze.
+ run_until_frozen(&mut adaptive, &batch);
+
+ // Once frozen, further batches (within the thaw interval) do not record.
+ let before: u64 = adaptive.stats.iter().map(|s| s.sample_count()).sum();
+ for _ in 0..10 {
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0]); // ...and stay correct
+ }
+ let after: u64 = adaptive.stats.iter().map(|s| s.sample_count()).sum();
+ assert_eq!(before, after, "frozen evaluator must not keep measuring");
+ }
+
+ #[test]
+ fn rethaw_backs_off_when_order_is_stable() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ let batch = selective_b_batch(&schema);
+
+ run_until_frozen(&mut adaptive, &batch);
+ let interval1 = frozen_interval(&adaptive).unwrap();
+ assert_eq!(interval1, INITIAL_THAW_INTERVAL);
+
+ // Cross the thaw point and the re-measurement window; same data, so the
+ // order is reconfirmed and the next interval backs off.
+ let interval2 = run_one_rethaw_cycle(&mut adaptive, &batch);
+ assert_eq!(interval2, interval1 * THAW_BACKOFF);
+ }
+
+ #[test]
+ fn rethaw_proposes_trial_on_drift() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+
+ // Settle on the initial distribution (b<5 selective). Whether a trial
+ // adopts the promotion is timing-dependent; force the incumbent to the
+ // b-first order so the drifted proposal below must differ from it.
+ run_until_frozen(&mut adaptive, &selective_b_batch(&schema));
+ adaptive.set_order(vec![1, 0]);
+
+ // Drift: now a>2 is the selective one (only row 0), b<5 is always true.
+ let a: Vec = (0..1000).map(|i| if i == 0 { 100 } else { 0 }).collect();
+ let b: Vec = (0..1000).map(|_| 0).collect();
+ let drift = test_batch(&schema, a, b);
+
+ // Crossing the thaw point and the re-measure window must propose an
+ // A/B trial for the now-better order, never adopt it outright.
+ let mut proposed = None;
+ for _ in 0..INITIAL_THAW_INTERVAL + MEASURE_SPAN + MEASURE_STRIDE + 2 {
+ let mask = adaptive.evaluate(&drift).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0]); // a>2 AND b<5 -> row 0 only
+ if let Phase::Trial { candidate, .. } = &adaptive.phase {
+ proposed = Some(candidate.order.clone());
+ break;
+ }
+ }
+ assert_eq!(
+ proposed,
+ Some(vec![0, 1]),
+ "drift must send the re-ranked order to a trial"
+ );
+ assert_eq!(
+ adaptive.order,
+ vec![1, 0],
+ "incumbent unchanged until verdict"
+ );
+ }
+
+ #[test]
+ fn non_selective_conjuncts_never_compact_but_are_correct() {
+ // Both conjuncts keep well over the compaction threshold, so the
+ // working batch is never compacted and the result is produced purely
+ // by AND-combining masks. Result must still be exact.
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ // a > 2 keeps 8/10; b < 5 keeps 7/10 — neither is <= 20%.
+ let a = vec![5, 6, 7, 8, 9, 10, 11, 12, 1, 2]; // last two fail a>2
+ let b = vec![0, 1, 2, 3, 4, 9, 9, 9, 0, 0]; // idx5..7 fail b<5
+ let batch = test_batch(&schema, a, b);
+ // a>2 AND b<5: idx0..4 pass both; idx5..7 fail b; idx8..9 fail a.
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0, 1, 2, 3, 4]);
+ }
+
+ /// `a > 2 AND b < 5 AND b >= -5` — a selective leader plus two followers.
+ fn three_conjunct_predicate(schema: &Arc) -> Arc {
+ let leader =
+ binary(col("a", schema).unwrap(), Operator::Gt, lit(2i32), schema).unwrap();
+ let f1 =
+ binary(col("b", schema).unwrap(), Operator::Lt, lit(5i32), schema).unwrap();
+ let f2 = binary(
+ col("b", schema).unwrap(),
+ Operator::GtEq,
+ lit(-5i32),
+ schema,
+ )
+ .unwrap();
+ let and = binary(leader, Operator::And, f1, schema).unwrap();
+ binary(and, Operator::And, f2, schema).unwrap()
+ }
+
+ /// Followers that are statistically indistinguishable (each discards one
+ /// of the leader's survivors, so their effectiveness samples are small,
+ /// positive, and overlapping) must freeze as a tie within a few batches,
+ /// not run to the learning sample cap.
+ #[test]
+ fn freezes_fast_on_tied_followers() {
+ let schema = schema();
+ let mut adaptive =
+ try_new_unshared(&three_conjunct_predicate(&schema), true).unwrap();
+
+ // Leader keeps 10 of 1000 rows; each follower discards exactly one of
+ // those survivors (b = 10 fails b < 5, b = -10 fails b >= -5).
+ let a: Vec = (0..1000)
+ .map(|i| if i % 100 == 0 { 100 } else { 0 })
+ .collect();
+ let b: Vec = (0..1000)
+ .map(|i| match i {
+ 100 => 10,
+ 200 => -10,
+ _ => 0,
+ })
+ .collect();
+ let batch = test_batch(&schema, a, b);
+
+ let mut frozen_at = None;
+ for n in 1..=MAX_LEARNING_SAMPLES {
+ adaptive.evaluate(&batch).unwrap();
+ if matches!(adaptive.phase, Phase::Frozen { .. }) {
+ frozen_at = Some(n);
+ break;
+ }
+ }
+ let frozen_at = frozen_at.expect("must freeze");
+ assert!(
+ frozen_at <= MIN_SAMPLES_FOR_CI + 2,
+ "tied followers should freeze right after the CI gate, froze at {frozen_at}"
+ );
+ // The selective leader still leads.
+ assert_eq!(adaptive.order.first().copied(), Some(0));
+ }
+
+ /// A leader that discards everything starves the followers of rows; they
+ /// can never be measured, but their order can never matter either, so the
+ /// evaluator must still freeze (previously it measured forever).
+ #[test]
+ fn starved_conjuncts_do_not_block_freezing() {
+ let schema = schema();
+ let mut adaptive =
+ try_new_unshared(&three_conjunct_predicate(&schema), true).unwrap();
+
+ // a > 2 fails every row.
+ let batch = test_batch(&schema, vec![0; 1000], vec![0; 1000]);
+ for _ in 0..MIN_SAMPLES_FOR_CI + 2 {
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert!(passing_rows(&mask).is_empty());
+ }
+ assert!(
+ matches!(adaptive.phase, Phase::Frozen { .. }),
+ "starved followers must not hold up freezing"
+ );
+ }
+
+ /// Tied followers re-rank on noise at every re-thaw; that must not be
+ /// treated as drift, or the interval never backs off and the evaluator
+ /// re-measures forever.
+ #[test]
+ fn rethaw_backs_off_despite_tied_followers() {
+ let schema = schema();
+ let mut adaptive =
+ try_new_unshared(&three_conjunct_predicate(&schema), true).unwrap();
+ let a: Vec = (0..1000)
+ .map(|i| if i % 100 == 0 { 100 } else { 0 })
+ .collect();
+ let b: Vec = (0..1000)
+ .map(|i| match i {
+ 100 => 10,
+ 200 => -10,
+ _ => 0,
+ })
+ .collect();
+ let batch = test_batch(&schema, a, b);
+
+ run_until_frozen(&mut adaptive, &batch);
+ let interval1 = frozen_interval(&adaptive).unwrap();
+ // Cross the thaw point and the re-measurement window; the tied
+ // followers may re-rank, but the interval must still back off.
+ let interval2 = run_one_rethaw_cycle(&mut adaptive, &batch);
+ assert_eq!(interval2, interval1 * THAW_BACKOFF);
+ }
+
+ /// A stream sharing state with one whose trial already validated a
+ /// champion adopts it on its very first batch instead of re-running the
+ /// experiment.
+ #[test]
+ fn streams_adopt_shared_champion_without_retrial() {
+ let schema = schema();
+ let shared = Arc::new(AdaptiveFilterShared::new());
+ let p = predicate(&schema);
+ let batch = selective_b_batch(&schema);
+
+ // Another stream's completed, adopted trial.
+ run_fake_trial(&shared, vec![1, 0], true);
+ assert_eq!(shared.arbiter.epoch(), 1);
+
+ let mut stream = AdaptiveConjunction::try_new(&p, true, shared).unwrap();
+ let mask = stream.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0]);
+ assert!(
+ matches!(stream.phase, Phase::Frozen { .. }),
+ "the published champion must be adopted, not re-learned"
+ );
+ assert_eq!(stream.order, vec![1, 0]);
+ assert_eq!(stream.strategy, Strategy::CompactOnce);
+ assert_eq!(stream.epoch_seen, 1);
+ }
+
+ /// Prime the shared trial with decisive fake paired log-ratio samples,
+ /// so the verdict after the remaining real pair is deterministic, and put
+ /// the stream into the trial.
+ fn prime_trial(
+ adaptive: &mut AdaptiveConjunction,
+ candidate: Vec,
+ candidate_wins: bool,
+ ) {
+ let candidate = arrangement(candidate);
+ assert_eq!(
+ adaptive.shared.arbiter.begin_trial(candidate.clone()),
+ candidate
+ );
+ for i in 0..TRIAL_PAIRS - 1 {
+ // Decisive log-ratios with a little variance: candidate ~e^10
+ // times faster (or slower) than the incumbent in every pair.
+ let jitter = i as f64 * 0.01;
+ adaptive.shared.arbiter.submit_pair(
+ &candidate,
+ 1000,
+ 1000,
+ if candidate_wins { -10.0 } else { 10.0 } + jitter,
+ );
+ }
+ adaptive.phase = Phase::Trial {
+ candidate,
+ pending_incumbent: None,
+ interval_if_rejected: 512,
+ };
+ }
+
+ /// A trial whose samples show the candidate decisively faster must adopt
+ /// it: switch the order and strategy, publish the champion, bump the
+ /// epoch.
+ #[test]
+ fn trial_adopts_decisively_faster_candidate() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ let batch = selective_b_batch(&schema);
+
+ prime_trial(&mut adaptive, vec![1, 0], true);
+ // One batch per arm completes both sample counts and delivers the
+ // verdict; results stay correct throughout.
+ for _ in 0..2 {
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0]);
+ }
+ assert!(matches!(adaptive.phase, Phase::Frozen { .. }));
+ assert_eq!(adaptive.order, vec![1, 0]);
+ assert_eq!(adaptive.strategy, Strategy::CompactOnce);
+ let (champion, epoch) = adaptive.shared.arbiter.champion();
+ assert_eq!(
+ champion,
+ Some(arrangement(vec![1, 0])),
+ "the win must be published for the other streams"
+ );
+ assert_eq!(epoch, 1);
+ }
+
+ /// A trial whose samples show the candidate decisively slower must keep
+ /// the incumbent untouched, freeze with the carried (backed-off) interval,
+ /// and record the rejection so no stream re-trials the same order.
+ #[test]
+ fn trial_rejects_decisively_slower_candidate() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ let batch = selective_b_batch(&schema);
+
+ prime_trial(&mut adaptive, vec![1, 0], false);
+ for _ in 0..2 {
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0]);
+ }
+ assert_eq!(adaptive.order, vec![0, 1], "incumbent must be kept");
+ assert_eq!(adaptive.strategy, Strategy::Fused);
+ assert_eq!(frozen_interval(&adaptive), Some(512));
+ let (champion, epoch) = adaptive.shared.arbiter.champion();
+ assert_eq!(champion, None);
+ assert_eq!(epoch, 0);
+ assert_eq!(
+ adaptive.shared.arbiter.rejected(),
+ Some(arrangement(vec![1, 0]))
+ );
+ }
+
+ /// Streams contribute samples to the same shared trial, so it concludes
+ /// across them: a stream shorter than a whole trial still gets a verdict.
+ #[test]
+ fn trial_is_completed_across_streams() {
+ let schema = schema();
+ let shared = Arc::new(AdaptiveFilterShared::new());
+ let p = predicate(&schema);
+ let batch = selective_b_batch(&schema);
+
+ let mut stream_a =
+ AdaptiveConjunction::try_new(&p, true, Arc::clone(&shared)).unwrap();
+ let mut stream_b =
+ AdaptiveConjunction::try_new(&p, true, Arc::clone(&shared)).unwrap();
+ prime_trial(&mut stream_a, vec![1, 0], true);
+ stream_b.phase = Phase::Trial {
+ candidate: arrangement(vec![1, 0]),
+ pending_incumbent: None,
+ interval_if_rejected: 512,
+ };
+
+ // One pair is missing; stream B (which did not start the trial)
+ // contributes it over two batches and concludes the experiment.
+ stream_b.evaluate(&batch).unwrap();
+ stream_b.evaluate(&batch).unwrap();
+ assert!(!shared.arbiter.trial_in_progress(), "trial must conclude");
+ assert_eq!(shared.arbiter.champion().0, Some(arrangement(vec![1, 0])));
+ assert_eq!(stream_b.order, vec![1, 0]);
+ assert!(matches!(stream_b.phase, Phase::Frozen { .. }));
+ // Stream A adopts the published champion via the epoch check.
+ stream_a.evaluate(&batch).unwrap();
+ assert_eq!(stream_a.order, vec![1, 0]);
+ assert!(matches!(stream_a.phase, Phase::Frozen { .. }));
+ }
+
+ /// A proposal that matches an already-rejected order freezes the incumbent
+ /// instead of starting another trial: the experiment was run and lost.
+ #[test]
+ fn decide_skips_rejected_candidate() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+
+ // Fabricated stats under which [1, 0] is materially better: conjunct 1
+ // discards almost everything at equal per-row cost.
+ let mut stats = vec![SelectivityStats::default(); 2];
+ stats[0].record(1000, 1000, 1000, 0.0);
+ stats[1].record(10, 1000, 1000, 990.0 * 1e9 / 1000.0);
+
+ // Record a lost trial for [1, 0] through the arbiter.
+ run_fake_trial(&adaptive.shared, vec![1, 0], false);
+ adaptive.decide(vec![1, 0], &stats, 256);
+ assert_eq!(frozen_interval(&adaptive), Some(256));
+ assert_eq!(adaptive.order, vec![0, 1]);
+
+ // Without the recorded rejection the same proposal goes to trial.
+ let mut fresh = try_new_unshared(&predicate(&schema), true).unwrap();
+ fresh.decide(vec![1, 0], &stats, 256);
+ assert!(matches!(fresh.phase, Phase::Trial { .. }));
+ }
+
+ /// While the order stays unresolved, only the warmup is measured
+ /// back-to-back; after that measurement drops to one batch per stride, so
+ /// unresolved learning costs a bounded fraction of the stream.
+ #[test]
+ fn learning_measures_warmup_then_strided() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ // Empty batches never record a sample, so the evaluator stays in
+ // `Learning` and the measurement schedule is observable in isolation.
+ let batch = test_batch(&schema, vec![], vec![]);
+ let n = 10 * MEASURE_STRIDE;
+ for _ in 0..n {
+ adaptive.evaluate(&batch).unwrap();
+ }
+ assert!(matches!(adaptive.phase, Phase::Learning));
+ assert!(
+ adaptive.measured >= MEASURE_WARMUP,
+ "warmup batches must all be measured"
+ );
+ assert!(
+ adaptive.measured <= MEASURE_WARMUP + n / MEASURE_STRIDE,
+ "post-warmup measurement must be strided, measured {} of {n}",
+ adaptive.measured
+ );
+ }
+
+ #[test]
+ fn all_true_conjuncts_yield_all_true_mask() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ // Every row passes both conjuncts.
+ let batch = test_batch(&schema, vec![10; 5], vec![0; 5]);
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0, 1, 2, 3, 4]);
+ }
+
+ #[test]
+ fn fully_discarding_conjunct_yields_all_false_mask() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ // No row passes a > 2; the remaining conjunct must not resurrect rows.
+ let batch = test_batch(&schema, vec![0; 5], vec![0; 5]);
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert!(passing_rows(&mask).is_empty());
+ assert_eq!(as_boolean_array(&mask).unwrap().len(), 5);
+ }
+
+ #[test]
+ fn empty_batch() {
+ let schema = schema();
+ let mut adaptive = try_new_unshared(&predicate(&schema), true).unwrap();
+ let batch = test_batch(&schema, vec![], vec![]);
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(as_boolean_array(&mask).unwrap().len(), 0);
+ }
+
+ /// Manual perf harness comparing the adaptive evaluator against the plain
+ /// fused predicate (what `FilterExec` runs when the flag is off), on a
+ /// ClickBench-Q41-shaped filter: several trivially cheap conjuncts, the
+ /// most selective one already written first. This is the adversarial case
+ /// for the adaptive path (nothing to learn, overhead only).
+ ///
+ /// Run with:
+ /// ```sh
+ /// cargo test --release -p datafusion-physical-plan --lib \
+ /// adaptive_filter::tests::perf_overhead_vs_fused -- --ignored --nocapture
+ /// ```
+ /// For a profile: find the test binary via `cargo test --no-run` and run it
+ /// under `samply record` with `ADAPTIVE_PERF_ITERS=200000`.
+ /// Manual experiment replicating predicate_eval `cardinality_q30`:
+ /// `c0<90 AND c1<5` over uniform ints, replayed in fresh short streams the
+ /// way a criterion iteration would, to expose per-query (re-learning and
+ /// re-trial) costs.
+ #[test]
+ #[ignore = "manual perf experiment, run with --ignored --nocapture"]
+ fn perf_q30_per_query_cost() {
+ use datafusion_physical_expr::expressions::{binary, col, lit};
+
+ const ROWS: usize = 8192;
+ const BATCHES_PER_QUERY: u64 = 122;
+ // K-1 ~90% conjuncts followed by one ~5% conjunct (the qXX sweep).
+ let k: usize = std::env::var("ADAPTIVE_PERF_K")
+ .ok()
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(2);
+ let multipliers = [
+ 1i64, 3, 7, 9, 11, 13, 17, 19, 21, 23, 27, 29, 31, 33, 37, 39,
+ ];
+ let schema = Arc::new(Schema::new(
+ (0..k)
+ .map(|i| Field::new(format!("c{i}"), DataType::Int32, false))
+ .collect::>(),
+ ));
+ let cols: Vec = multipliers[..k]
+ .iter()
+ .map(|m| {
+ Arc::new(Int32Array::from_iter_values(
+ (0..ROWS as i64).map(move |v| ((v * m) % 100) as i32),
+ )) as ArrayRef
+ })
+ .collect();
+ let batch = RecordBatch::try_new(Arc::clone(&schema), cols).unwrap();
+ let c = |name: &str| col(name, &schema).unwrap();
+ let conjuncts: Vec> = (0..k)
+ .map(|i| {
+ let threshold = if i == k - 1 { 5i32 } else { 90i32 };
+ binary(c(&format!("c{i}")), Operator::Lt, lit(threshold), &schema)
+ .unwrap()
+ })
+ .collect();
+ let written = fuse(&conjuncts, &(0..k).collect::>());
+
+ let queries: u64 = 200;
+ // Flag-off equivalent: the fused written order.
+ let start = Instant::now();
+ for _ in 0..queries {
+ for _ in 0..BATCHES_PER_QUERY {
+ let v = written.evaluate(&batch).unwrap();
+ std::hint::black_box(v.into_array(ROWS).unwrap());
+ }
+ }
+ let off = start.elapsed().as_nanos() as u64 / (queries * BATCHES_PER_QUERY);
+
+ // Flag-on: a fresh evaluator per query (criterion-iteration shape).
+ let start = Instant::now();
+ for _ in 0..queries {
+ let mut adaptive = try_new_unshared(&written, true).unwrap();
+ for _ in 0..BATCHES_PER_QUERY {
+ std::hint::black_box(adaptive.evaluate(&batch).unwrap());
+ }
+ }
+ let on = start.elapsed().as_nanos() as u64 / (queries * BATCHES_PER_QUERY);
+ println!(
+ "fused written: {off} ns/batch; adaptive fresh-per-query: {on} ns/batch ({:.2}x)",
+ on as f64 / off as f64
+ );
+
+ // Phase census for one query.
+ let mut adaptive = try_new_unshared(&written, true).unwrap();
+ let mut census = std::collections::BTreeMap::new();
+ for _ in 0..BATCHES_PER_QUERY {
+ adaptive.evaluate(&batch).unwrap();
+ let phase = match adaptive.phase {
+ Phase::Learning => "learning",
+ Phase::Trial { .. } => "trial",
+ Phase::Frozen { .. } => "frozen",
+ Phase::Remeasuring { .. } => "remeasuring",
+ };
+ *census.entry(phase).or_insert(0u64) += 1;
+ }
+ println!("phase census over one {BATCHES_PER_QUERY}-batch query: {census:?}");
+ println!(
+ "final order {:?} strategy {:?}",
+ adaptive.order, adaptive.strategy
+ );
+ }
+
+ /// Manual experiment replicating predicate_eval `cardinality_q31`:
+ /// `c0<90 AND c1<90 AND c2<90 AND c3<5` over uniform ints. Compares the
+ /// written order, the optimal order, and the adaptive evaluator under
+ /// each evaluation strategy, to attribute regressions to overhead vs a
+ /// bad order choice vs the fused chain's evaluation strategy.
+ #[test]
+ #[ignore = "manual perf experiment, run with --ignored --nocapture"]
+ fn perf_q31_order_strategies() {
+ use datafusion_physical_expr::expressions::{binary, col, lit};
+
+ const ROWS: usize = 8192;
+ let schema = Arc::new(Schema::new(
+ (0..4)
+ .map(|i| Field::new(format!("c{i}"), DataType::Int32, false))
+ .collect::>(),
+ ));
+ // Mirror the bench's decorrelated uniform columns.
+ let cols: Vec = [1i64, 3, 7, 9]
+ .iter()
+ .map(|m| {
+ Arc::new(Int32Array::from_iter_values(
+ (0..ROWS as i64).map(move |v| ((v * m) % 100) as i32),
+ )) as ArrayRef
+ })
+ .collect();
+ let batch = RecordBatch::try_new(Arc::clone(&schema), cols).unwrap();
+
+ let c = |name: &str| col(name, &schema).unwrap();
+ let conjuncts: Vec> = vec![
+ binary(c("c0"), Operator::Lt, lit(90i32), &schema).unwrap(),
+ binary(c("c1"), Operator::Lt, lit(90i32), &schema).unwrap(),
+ binary(c("c2"), Operator::Lt, lit(90i32), &schema).unwrap(),
+ binary(c("c3"), Operator::Lt, lit(5i32), &schema).unwrap(),
+ ];
+ let written = fuse(&conjuncts, &[0, 1, 2, 3]);
+ let optimal = fuse(&conjuncts, &[3, 0, 1, 2]);
+
+ let iters: u64 = 20_000;
+ let time = |expr: &Arc| {
+ let start = Instant::now();
+ for _ in 0..iters {
+ let v = expr.evaluate(&batch).unwrap();
+ std::hint::black_box(v.into_array(ROWS).unwrap());
+ }
+ start.elapsed().as_nanos() as u64 / iters
+ };
+
+ // Warm up.
+ time(&written);
+ let w = time(&written);
+ let o = time(&optimal);
+ println!("fused written [c0,c1,c2,c3]: {w:>8} ns/batch");
+ println!(
+ "fused optimal [c3,c0,c1,c2]: {o:>8} ns/batch ({:+.1}%)",
+ (o as f64 / w as f64 - 1.0) * 100.0
+ );
+
+ // Adaptive evaluator: a long stream (mostly frozen in learned order)
+ // and a short stream (mostly measured path).
+ for n in [16u64, 122, 20_000] {
+ let mut adaptive = try_new_unshared(&written, true).unwrap();
+ let start = Instant::now();
+ for _ in 0..n {
+ std::hint::black_box(adaptive.evaluate(&batch).unwrap());
+ }
+ let ns = start.elapsed().as_nanos() as u64 / n;
+ let phase = match adaptive.phase {
+ Phase::Learning => "learning",
+ Phase::Trial { .. } => "trial",
+ Phase::Frozen { .. } => "frozen",
+ Phase::Remeasuring { .. } => "remeasuring",
+ };
+ println!(
+ "adaptive {n:>6}-batch stream: {ns:>8} ns/batch ({:+.1}% vs written, order {:?}, ends {phase})",
+ (ns as f64 / w as f64 - 1.0) * 100.0,
+ adaptive.order,
+ );
+ }
+
+ // The measured path itself, pinned to the optimal order (no learning
+ // decisions): what a compact-once strategy costs at steady state.
+ let mut pinned = try_new_unshared(&written, true).unwrap();
+ pinned.set_order(vec![3, 0, 1, 2]);
+ let start = Instant::now();
+ for _ in 0..iters {
+ std::hint::black_box(pinned.evaluate_measured(&batch).unwrap());
+ }
+ let ns = start.elapsed().as_nanos() as u64 / iters;
+ println!(
+ "measured path, optimal order: {ns:>8} ns/batch ({:+.1}% vs written)",
+ (ns as f64 / w as f64 - 1.0) * 100.0
+ );
+ }
+
+ #[test]
+ #[ignore = "manual perf harness, run with --ignored --nocapture"]
+ fn perf_overhead_vs_fused() {
+ use datafusion_physical_expr::expressions::{binary, col, lit};
+
+ const ROWS: usize = 8192;
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("counter_id", DataType::Int32, false),
+ Field::new("event_date", DataType::Int32, false),
+ Field::new("is_refresh", DataType::Int32, false),
+ Field::new("dont_count", DataType::Int32, false),
+ Field::new("url_hash", DataType::Int32, false),
+ ]));
+ // ~0.1% of rows match counter_id = 62; the follower conjuncts each
+ // discard exactly one of those survivors, mirroring Q41's shape: their
+ // effectiveness is small, positive, and statistically
+ // indistinguishable from each other (overlapping CIs), so certainty
+ // freezing cannot resolve their order.
+ let counter: Vec = (0..ROWS)
+ .map(|i| if i % 1000 == 0 { 62 } else { 1 })
+ .collect();
+ let is_refresh: Vec =
+ (0..ROWS).map(|i| if i == 1000 { 1 } else { 0 }).collect();
+ let dont_count: Vec =
+ (0..ROWS).map(|i| if i == 2000 { 1 } else { 0 }).collect();
+ let url_hash: Vec =
+ (0..ROWS).map(|i| if i == 3000 { 999 } else { 7 }).collect();
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(counter)),
+ Arc::new(Int32Array::from(vec![100; ROWS])),
+ Arc::new(Int32Array::from(is_refresh)),
+ Arc::new(Int32Array::from(dont_count)),
+ Arc::new(Int32Array::from(url_hash)),
+ ],
+ )
+ .unwrap();
+
+ let c = |name: &str| col(name, &schema).unwrap();
+ let conjuncts: Vec> = vec![
+ binary(c("counter_id"), Operator::Eq, lit(62i32), &schema).unwrap(),
+ binary(c("event_date"), Operator::GtEq, lit(50i32), &schema).unwrap(),
+ binary(c("event_date"), Operator::LtEq, lit(150i32), &schema).unwrap(),
+ binary(c("is_refresh"), Operator::Eq, lit(0i32), &schema).unwrap(),
+ binary(c("dont_count"), Operator::Eq, lit(0i32), &schema).unwrap(),
+ binary(c("url_hash"), Operator::Eq, lit(7i32), &schema).unwrap(),
+ ];
+ let fused = conjuncts
+ .clone()
+ .into_iter()
+ .reduce(|acc, e| {
+ Arc::new(BinaryExpr::new(acc, Operator::And, e)) as Arc
+ })
+ .unwrap();
+
+ let iters: u64 = std::env::var("ADAPTIVE_PERF_ITERS")
+ .ok()
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(20_000);
+
+ let time_fused = |n: u64| {
+ let start = Instant::now();
+ for _ in 0..n {
+ let v = fused.evaluate(&batch).unwrap();
+ std::hint::black_box(v.into_array(ROWS).unwrap());
+ }
+ start.elapsed().as_nanos() as u64 / n
+ };
+ let time_adaptive = |n: u64| {
+ let mut adaptive = try_new_unshared(&fused, true).unwrap();
+ let start = Instant::now();
+ for _ in 0..n {
+ std::hint::black_box(adaptive.evaluate(&batch).unwrap());
+ }
+ (start.elapsed().as_nanos() as u64 / n, adaptive)
+ };
+
+ // Profiling mode: spend all the time in the measured (learning) path by
+ // replaying short streams on fresh evaluators, then exit.
+ if std::env::var("ADAPTIVE_PERF_PROFILE").is_ok() {
+ for _ in 0..iters {
+ std::hint::black_box(time_adaptive(16));
+ }
+ return;
+ }
+
+ // Warm up.
+ time_fused(100);
+ time_adaptive(100);
+
+ let fused_ns = time_fused(iters);
+ println!("fused (flag off equivalent): {fused_ns:>8} ns/batch");
+ // Short streams never amortize learning; long streams should.
+ for n in [16u64, 64, 256, iters] {
+ let (ns, adaptive) = time_adaptive(n);
+ let phase = match adaptive.phase {
+ Phase::Learning => "learning",
+ Phase::Trial { .. } => "trial",
+ Phase::Frozen { .. } => "frozen",
+ Phase::Remeasuring { .. } => "remeasuring",
+ };
+ println!(
+ "adaptive, {n:>6}-batch stream: {ns:>8} ns/batch ({:.2}x fused, ends {phase})",
+ ns as f64 / fused_ns as f64,
+ );
+ }
+ }
+
+ #[test]
+ fn reorders_selective_conjunct_first() {
+ let schema = schema();
+ let p = predicate(&schema); // [a>2, b<5]
+ let mut adaptive = try_new_unshared(&p, true).unwrap();
+
+ // Conjunct 1 (b < 5) is far more selective than conjunct 0 (a > 2):
+ // a is always > 2 (never discards), b is almost always >= 5 (discards
+ // ~everything). Discards-per-second is highest for conjunct 1, so the
+ // learner must propose promoting it — into a trial, never directly.
+ let batch = selective_b_batch(&schema);
+ let mut proposed = None;
+ for _ in 0..MEASURE_SPAN + 5 {
+ let mask = adaptive.evaluate(&batch).unwrap();
+ assert_eq!(passing_rows(&mask), vec![0]); // correct in every phase
+ if let Phase::Trial { candidate, .. } = &adaptive.phase {
+ proposed = Some(candidate.order.clone());
+ break;
+ }
+ }
+ assert_eq!(proposed, Some(vec![1, 0]));
+ assert_eq!(
+ adaptive.order,
+ vec![0, 1],
+ "incumbent unchanged until verdict"
+ );
+ }
+}
diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs
index 11d36192f3aae..c319027b3eb09 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -28,6 +28,7 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use crate::adaptive_filter::{AdaptiveConjunction, AdaptiveFilterShared};
use crate::check_if_same_properties;
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::common::can_project;
@@ -97,6 +98,10 @@ pub struct FilterExec {
batch_size: usize,
/// Number of rows to fetch
fetch: Option,
+ /// Measurements and trial verdicts shared by all partition streams,
+ /// used by adaptive conjunct reordering (see [`AdaptiveConjunction`]) so
+ /// the streams learn as one. Fresh per plan node; never affects the plan.
+ adaptive_stats: Arc,
}
/// Builder for [`FilterExec`] to set optional parameters
@@ -216,6 +221,7 @@ impl FilterExecBuilder {
projection: self.projection,
batch_size: self.batch_size,
fetch: self.fetch,
+ adaptive_stats: Arc::new(AdaptiveFilterShared::new()),
})
}
}
@@ -289,6 +295,7 @@ impl FilterExec {
projection: self.projection.clone(),
batch_size,
fetch: self.fetch,
+ adaptive_stats: Arc::clone(&self.adaptive_stats),
})
}
@@ -570,9 +577,19 @@ impl ExecutionPlan for FilterExec {
context.task_id()
);
let metrics = FilterExecMetrics::new(&self.metrics, partition);
+ let adaptive = AdaptiveConjunction::try_new(
+ &self.predicate,
+ context
+ .session_config()
+ .options()
+ .execution
+ .adaptive_filter_reordering,
+ Arc::clone(&self.adaptive_stats),
+ );
Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
+ adaptive,
input: self.input.execute(partition, context)?,
metrics,
projection: self.projection.clone(),
@@ -762,6 +779,9 @@ impl ExecutionPlan for FilterExec {
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
+ // The predicate changed; accumulated per-conjunct stats no
+ // longer describe it.
+ adaptive_stats: Arc::new(AdaptiveFilterShared::new()),
};
Some(Arc::new(new) as _)
};
@@ -786,6 +806,7 @@ impl ExecutionPlan for FilterExec {
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch,
+ adaptive_stats: Arc::clone(&self.adaptive_stats),
}))
}
@@ -1049,6 +1070,9 @@ struct FilterExecStream {
schema: SchemaRef,
/// The expression to filter on. This expression must evaluate to a boolean value.
predicate: Arc,
+ /// When set, the predicate is a reorderable conjunction and is evaluated
+ /// adaptively (per-conjunct, in measured order) instead of via `predicate`.
+ adaptive: Option,
/// The input partition to filter.
input: SendableRecordBatchStream,
/// Runtime metrics recording
@@ -1144,9 +1168,15 @@ impl Stream for FilterExecStream {
}
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
- let status = self.predicate.as_ref()
- .evaluate(&batch)
- .and_then(|v| v.into_array(batch.num_rows()))
+ let array = match self.adaptive.as_mut() {
+ Some(adaptive) => adaptive.evaluate(&batch),
+ None => self
+ .predicate
+ .as_ref()
+ .evaluate(&batch)
+ .and_then(|v| v.into_array(batch.num_rows())),
+ };
+ let status = array
.and_then(|array| {
Ok(match self.projection.as_ref() {
Some(projection) => {
diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs
index c7b1d4729e21d..27dcdc040bcbb 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -60,6 +60,7 @@ mod render_tree;
mod topk;
mod visitor;
+mod adaptive_filter;
pub mod aggregates;
pub mod analyze;
pub mod async_func;
diff --git a/datafusion/sqllogictest/test_files/adaptive_filter.slt b/datafusion/sqllogictest/test_files/adaptive_filter.slt
new file mode 100644
index 0000000000000..b0e2d958a3be9
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/adaptive_filter.slt
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests for execution.adaptive_filter_reordering. Runtime reordering of a
+# conjunction must never change query results, only evaluation order.
+
+statement ok
+CREATE TABLE t AS
+SELECT
+ i AS a,
+ i % 7 AS b,
+ arrow_cast(i, 'Utf8') AS s
+FROM generate_series(1, 1000) AS tbl(i);
+
+# Baseline (flag off): multi-conjunct filter mixing a cheap comparison with an
+# expensive LIKE.
+query I
+SELECT count(*) FROM t WHERE b = 3 AND s LIKE '1%';
+----
+17
+
+statement ok
+SET datafusion.execution.adaptive_filter_reordering = true;
+
+# Same query with adaptive reordering on must return the same result.
+query I
+SELECT count(*) FROM t WHERE b = 3 AND s LIKE '1%';
+----
+17
+
+# A three-conjunct predicate, including an expensive-but-selective LIKE.
+query I
+SELECT count(*) FROM t WHERE a > 100 AND s LIKE '5%' AND b <> 0;
+----
+86
+
+# Full row materialization (not just count) is unchanged by reordering.
+query IIT
+SELECT a, b, s FROM t WHERE b = 1 AND a > 990 AND s LIKE '99%' ORDER BY a;
+----
+995 1 995
+
+# EXPLAIN: runtime reordering is invisible to the plan — the FilterExec
+# predicate is unchanged whether the flag is on or off.
+query TT
+EXPLAIN SELECT count(*) FROM t WHERE b = 3 AND s LIKE '1%';
+----
+logical_plan
+01)Projection: count(Int64(1)) AS count(*)
+02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+03)----Projection:
+04)------Filter: t.b = Int64(3) AND t.s LIKE Utf8("1%")
+05)--------TableScan: t projection=[b, s]
+physical_plan
+01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
+03)----CoalescePartitionsExec
+04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
+05)--------FilterExec: b@0 = 3 AND s@1 LIKE 1%, projection=[]
+06)----------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+statement ok
+SET datafusion.execution.adaptive_filter_reordering = false;
+
+statement ok
+DROP TABLE t;
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt
index 8d334d8433284..1fd3e49df7fba 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -214,6 +214,7 @@ datafusion.catalog.has_header true
datafusion.catalog.information_schema true
datafusion.catalog.location NULL
datafusion.catalog.newlines_in_values false
+datafusion.execution.adaptive_filter_reordering false
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics true
@@ -371,6 +372,7 @@ datafusion.catalog.has_header true Default value for `format.has_header` for `CR
datafusion.catalog.information_schema true Should DataFusion provide access to `information_schema` virtual tables for displaying schema information
datafusion.catalog.location NULL Location scanned to load tables for `default` schema
datafusion.catalog.newlines_in_values false Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance.
+datafusion.execution.adaptive_filter_reordering false (experimental) When enabled, `FilterExec` adaptively reorders the conjuncts of a conjunctive predicate at runtime. It measures each conjunct's selectivity and evaluation cost on the rows that reach it and runs the conjuncts that discard the most rows per unit of CPU time first, so cheap-and-selective predicates gate expensive ones. Reordering never changes query results (only the evaluation order of a conjunction) but can change observable side effects of fallible predicates, so it is off by default. Predicates containing volatile expressions are never reordered.
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true.
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 7c6756a096309..78b3f373bae31 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -139,6 +139,7 @@ The following configuration settings are available:
| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode |
| datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. |
| datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. |
+| datafusion.execution.adaptive_filter_reordering | false | (experimental) When enabled, `FilterExec` adaptively reorders the conjuncts of a conjunctive predicate at runtime. It measures each conjunct's selectivity and evaluation cost on the rows that reach it and runs the conjuncts that discard the most rows per unit of CPU time first, so cheap-and-selective predicates gate expensive ones. Reordering never changes query results (only the evaluation order of a conjunction) but can change observable side effects of fallible predicates, so it is off by default. Predicates containing volatile expressions are never reordered. |
| datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. |
| datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. |
| datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. |