Skip to content

GroupValuesRows::emit untracked decode buffer leaks past MemoryReservation #22739

@avantgardnerio

Description

@avantgardnerio

Background — Coralogix DataFusion upgrade cycle

Coralogix was very behind in it's datafusion upgrades. We've gone from 42->50 in the last few months. When we upgraded to 51, we hit a severe one (this is not the first, we are always fighting OOMs). This is why we created #22626's allocator-vs-pool tracker (the memory-accounting Cargo feature), and it's been very useful for finding tracking issues.

The DF51 regression

DF51 changed the spill story materially — RepartitionExec gained spilling (#18014) and SpillingPool was introduced (#18207). We suspect that introduced the bug represented by this github issue.

Empirically, the same high-cardinality groupby query that emits a clean ResourcesExhausted on DF50 now sometimes kernel-OOM-kills the executor on DF51. Same query path, same data; the failure mode degraded from "structured error to client" → "pod restart with no signal."

Production manifestation

One of our customers ran 5 concurrent variants of a groupby <free-text log message field> over a 24-hour window. The workload took down 79 executor pods in 2 minutes, then another wave a few minutes later. The query shape:

source logs(<team>)
  | filter <priority predicate>
        && \$d.message != null
  | groupby \$d.message agg count(1) as \$d.occurrences
  | orderby \$d.occurrences desc
  | limit 15

i.e. GROUP BY on a wide free-text string column with high distinct cardinality.

Memory math at production scale

Bytes
Pod cgroup limit 31.14 GB
DataFusion memory_fraction (default 0.7) × 0.7
→ declared MemoryPool budget ~21.8 GB tracked
Headroom from pool budget to kernel ceiling ~9.3 GB

From DF50's ResourcesExhausted consumer dump at incident time: 5 GroupedHashAggregateStreams at ~700 MB tracked each = ~3.5 GB explicitly attributed to those streams. Total MemoryPool::reserved() at incident time wasn't captured directly, but the kernel-OOM-kill establishes that resident exceeded 31.14 GB while the voluntary tracker thought the agg was within its share. Conservatively that's ~7–19 GB of untracked allocation outside MemoryReservation's view, growing linearly with cardinality × key-width × concurrency.

Why the tracker is what made the follow-up reproducible

Without an allocator-level ground truth there's no way to distinguish "we shipped DF51 and something OOM-kills" from any of the other reasons pod resident grows. With #22626 enabled, every Layout::size() is debited against the declared MemoryPool budget; the bank goes negative exactly when an allocation skipped try_grow. That converted "production OOM" into "deterministic SLT failure with a stack trace pointing at the offending site."

The companion PR (sibling to #22721 / #22723) is the follow-up: an SLT that exercises this exact operator (GroupedHashAggregateStream::emitGroupValuesRows::emitRowConverter::convert_rowsdecode_columnarrow_row::variable::decode_string_view / decode_binary_view_inner) and produces a clean allocator overdraft: account balance at panic = -1344326 bytes against a 1M pool with HEADROOM_FACTOR = 5.0.

The bug

In GroupValuesRows::emit (datafusion/physical-plan/src/aggregates/group_values/row.rs:198):

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
    // ...
    let arrays = self.row_converter.convert_rows(rows)?;  // ← per-column decode buffers
    // ...
}

RowConverter::convert_rows (arrow-rs/arrow-row/src/lib.rs:806) calls decode_column (lib.rs:1675) per column, dispatching to:

  • arrow_row::variable::decode_binary (variable.rs:205) for Utf8/Binary
  • arrow_row::variable::decode_binary_view_inner (variable.rs:249) for Utf8View/BinaryView
  • arrow_row::list::decode for List<_>

Each decoder allocates MutableBuffer::with_capacity sized to the column's bytes. None of these allocations are routed through MemoryReservation::try_grow. The bytes show up in process resident but not in MemoryPool::reserved(), leaking past the declared budget — kernel OOM territory at production scale.

Proposed fixes

(a) Point fix in GroupValuesRows::emit. One reservation grow before convert_rows:

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
    let rows = /* slice being converted */;
    let estimated = rows.iter().map(|r| r.as_ref().len()).sum::<usize>();
    self.reservation.try_grow(estimated)?;
    let arrays = self.row_converter.convert_rows(rows)?;
    self.reservation.shrink(estimated);  // decode result is now owned by `arrays`
    // ...
}

GroupValuesRows already carries a MemoryReservation field — this is one extra try_grow against it. Same shape as GroupValuesBytesView, GroupValuesColumn, etc. would each need (sibling impls).

(b) Systemic alternative: OomGuardMemoryPool. Rather than chasing each operator/kernel that bypasses voluntary tracking — a class of bug we've found at least 6 distinct instances of so far (see below), we could add a new OomGuardMemoryTracker that uses something similar to the AccountingAllocator to track real memory used, instead of voluntary accounting.

@andygrove is prototyping the same idea on the Comet side in apache/datafusion-comet#4582 ("prototype allocator-level OOM circuit breaker (OomGuard)"). The framing for both: voluntary tracking is very difficult, hard to maintain, and has real production impacts from hidden sites (random re-alloc in any vec, map, etc) deemed "small amounts" in most DataFusion queries. Allocator-level tracking is structurally complete.

Example math of how small amounts add up, using the "no need to track <= 1 batch size" convention:

  - Post-coalesce in-flight batch size: 128 MiB (executor.coalesce_exec.target_batch_bytes)
  - Plausible query plan: 5 operators × 16 partitions × 128 MiB ≈ 10 GB untracked per query
  - 5 concurrent queries × 10 GB ≈ 50 GB untracked across the executor
  - Plus DF tracker thinks it's at 21.8 GB tracked
  - → 71.8 GB total demand on a 31.14 GB cgroup → kernel OOM

We could just tune the percent of memory tracked by the MemoryPool lower, however we use the extra memory for caches and other things that provide customer benefit - making inaccurate accounting concrete in our bottom line of our operating budget / customer experience.

A practical middle path: ship (a) so this specific operator stops OOM-killing pods today, and let (b)'s discussion play out in parallel.

Distinct untracked-allocation sites we've cataloged so far via #22626

  1. WindowAggExecbatches.push + concat_batches in window_agg_exec.rs
  2. NestedLoopJoinExec — multiple sites: LazyMemoryStream::generate_next_batch, concat_batches in handle_buffering_left_memory_limited, take_native in process_left_range_join (issue NestedLoopJoinExec spill path: untracked allocation overshoots memory pool #22723)
  3. GroupValuesBytesView::intern — hashbrown RawTable::reserve_rehash via HashTableAllocExt::insert_accounted
  4. GroupValuesRows::internRowConverter::append Vec resize via __rust_realloc
  5. GroupValuesRows::emitRowConverter::convert_rowsdecode_column (this issue)
  6. parquet::arrow::array_reader::byte_array::ByteArrayDecoderPlain::readVec::reserve realloc

We expect more as the corpus is exercised. That's the case for (b).

Component

  • Physical Plan

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions