Skip to content

GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855

Open
justinli500 wants to merge 11 commits into
apache:mainfrom
justinli500:GH-39808-parquet-dataset-memory-accumulation
Open

GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855
justinli500 wants to merge 11 commits into
apache:mainfrom
justinli500:GH-39808-parquet-dataset-memory-accumulation

Conversation

@justinli500

@justinli500 justinli500 commented Apr 24, 2026

Copy link
Copy Markdown

Rationale for this change

Dataset.to_batches() accumulates memory because ReadRangeCache
has no eviction API. PreBuffer() is called with every row group up
front and entries stay resident until the FileReader is destroyed,
so users see ~10x more peak memory than ParquetFile.iter_batches().
Issue #39808 has been open for over a year; downstream projects have
worked around it by disabling pre_buffer (Ray) or dropping the
dataset API (Marin), both of which give up features or throughput.

What changes are included in this PR?

  • Add ReadRangeCache::EvictEntriesInRange(start, length). Removes
    entries fully contained in the window; leaves coalesced entries
    alone so eviction is safe under range coalescing.
  • Add ParquetFileReader::EvictPreBufferedData(row_groups, column_indices)
    and call it from RowGroupGenerator::ReadOneRowGroup after the row
    group has been decoded into Arrow arrays.
  • Promote the existing LazyImpl mutex into base Impl so
    concurrent Read and Evict across row groups is defined
    behaviour on both cache variants.

Performance

Measured on a 458 MB / 10 row group / 10M row parquet file
(6 columns: 3 float64, 2 int64, 1 large_string; Snappy; macOS arm64;
Release build). Fix toggled via a one-line A/B test:

Mode Peak total_allocated_bytes
Dataset.to_batches, fix disabled 598 MB
Dataset.to_batches, fix enabled 331 MB (-267 MB, -44.6%)
Dataset.to_batches, pre_buffer=False 151 MB
ParquetFile.iter_batches 59 MB
xychart-beta                                                                                                                                  
    title "Peak allocated memory (458 MB / 10 row groups, lower is better)"                                                                                                                                                                                                  
    x-axis ["without fix", "with fix", "no prebuffer", "iter_batches"]                                                                                                                                                                                                       
    y-axis "MB" 0 --> 650                                                                                                                                                                                                                                                    
    bar [598, 331, 151, 59]                                                                                                                                                                                                                                                  
Loading

Per-row-group progression during iteration
(max(total_allocated_bytes) in MB, sampled every 10k of 100k batches):

batches (k)        10   20   30   40   50   60   70   80   90  100                                                                                                                                                                                                           
----------------  ---  ---  ---  ---  ---  ---  ---  ---  ---  ---                                                                                                                                                                                                           
without fix       159  232  294  323  386  415  477  507  569  598                                                                                                                                                                                                           
with fix          129  202  205  234  237  266  270  299  302  331                                                                                                                                                                                                           
saved              30   30   89   89  149  149  207  208  267  267                                                                                                                                                                                                           
xychart-beta                                                                                                                                  
    title "max_allocated over iteration (top line: without fix; bottom: with fix)"                                                                                                                                                                                           
    x-axis "batches consumed (thousands)" 10 --> 100                                                                                                                                                                                                                         
    y-axis "MB" 0 --> 650                                                                                                                                                                                                                                                    
    line [159, 232, 294, 323, 386, 415, 477, 507, 569, 598]                                                                                                                                                                                                                  
    line [129, 202, 205, 234, 237, 266, 270, 299, 302, 331]                                                                                                                                                                                                                  
Loading

Savings scale linearly with row-group count, so on the multi-GB files
from the issue thread this single fix recovers several GB of peak.

Related work/commits

Downstream projects have shipped workarounds while this issue has
been open, all of them in their own code rather than upstream:

  • ray-project/ray#62745 (merged 2026-04-20): injects
    ParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)
    in Ray Data's parquet reader. Gets peak alloc down to ~75 MB but
    gives up the pre_buffer=True coalesced-read optimization that
    makes S3 fast.
  • marin-community/marin#4344 (merged): replaces dataset-API usage
    with ParquetFile.iter_batches, giving up hive-partition discovery,
    filter pushdown, and dataset-level schema unification.

No open PR against apache/arrow addresses the cache-side
accumulation. This PR is the upstream fix that lets both workarounds
be reverted without losing features or throughput.

Scope of this fix

This PR fixes the ReadRangeCache accumulation that dominates peak
memory on the default pre_buffer=true path.

A second source of growth, visible as the 151 MB vs 59 MB gap in the
pre_buffer=false row of the table above, lives in the dataset async
generator pipeline and is unrelated to the cache. It should be
tracked as a follow-up issue.

Partially closes #39808.

Test plan

New tests in arrow/io/memory_test.cc:

  • RangeReadCache.EvictEntriesInRange - basic eviction semantics
    across lazy and eager caches. Covers no-op windows, partial
    overlaps, wide windows that drop multiple entries, and evict on an
    empty cache.
  • RangeReadCache.EvictEntriesInRangeSpanningEntry - forces coalescing
    via hole_size_limit=100 and verifies a coalesced entry is refused
    for a partial-window evict and dropped for a wide window that fully
    contains it.
  • RangeReadCache.ConcurrentReadAndEvict - 4 reader threads in a tight
    Read() loop against the upper half of the cache, 1 evictor thread
    running 50 cycles of EvictEntriesInRange + re-Cache against the
    lower half. Runs for both lazy=true and lazy=false. Under the
    pre-refactor code the lazy=false case would race the entries
    vector; both cases now pass cleanly.

New tests in parquet/arrow/arrow_reader_writer_test.cc:

  • TestArrowReadWrite.EvictPreBufferedData - PreBuffers a 4-row-group
    file, calls EvictPreBufferedData({0}, ...), confirms row group 0's
    cache entries are gone while row groups 1-3 remain readable, and
    that evicting twice or evicting on a reader that never PreBuffered
    are both safe no-ops.
  • TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups
    • drives the full async generator pipeline end to end with
      pre_buffer=true and confirms correctness of every emitted batch.

Full-suite regression on Release build, macOS arm64:

  • parquet-arrow-reader-writer-test: 824/826 passing, 0 failing
    (the 2 skips are pre-existing dictionary-write variants not built
    in this configuration).
  • arrow-io-memory-test: 57/57 passing.

Are there any user-facing changes?

One new public method: parquet::ParquetFileReader::EvictPreBufferedData.
No behaviour change for existing callers beyond strictly lower peak
memory on the default pre_buffer=true path. No API deprecations,
no format changes.

This PR contains a "Critical Fix": No (memory usage improvement,
not correctness).

…er decode

Dataset.to_batches() on parquet files accumulates memory as iteration
proceeds because ReadRangeCache has no eviction API. PreBuffer() is
called once with every row group up front, entries stay resident until
the FileReader is destroyed, and users see roughly 10x more memory than
the equivalent ParquetFile.iter_batches() path. This is one of the
longest-standing open issues on the tracker.

Add a new ReadRangeCache::EvictEntriesInRange(start, length) method
that removes cache entries fully contained in the given window. Entries
that span past the window (for example, because range coalescing merged
them with an adjacent row group's column chunk) are deliberately left
in place, so eviction is safe in the presence of coalescing.

Expose the primitive through ParquetFileReader::EvictPreBufferedData
and call it from the Arrow RowGroupGenerator's .Then callback once a
row group has been decoded into Arrow arrays. At that point the raw
column-chunk bytes held by the cache are no longer needed, and releasing
them gives each row group a bounded per-row-group memory footprint.

Thread safety: promote the existing mutex from LazyImpl into base Impl
so that Cache, Read, Wait, WaitFor, and EvictEntriesInRange all acquire
it before touching the entries vector. Concurrent Read from one thread
and Evict from another was previously undefined behaviour in the
non-lazy cache, and the dataset scanner's batch_readahead path is
exactly the concurrent call pattern that would trigger it. Read now
drops the lock before blocking on the I/O future, so the new locking
does not serialize readers more tightly than before.

Measured on a 458 MB / 10-row-group / 10M-row test file:

  Dataset.to_batches, before fix:              598 MB peak
  Dataset.to_batches, after  fix:              331 MB peak (-267 MB)
  ParquetFile.iter_batches (no pre-buffer):     59 MB peak

Savings scale linearly with row-group count, so on the multi-GB files
from the issue thread this single fix recovers several GB of peak
allocation. The remaining gap between Dataset.to_batches and
iter_batches comes from a second source of accumulation in the Dataset
infrastructure that is unrelated to the ReadRangeCache and should be
tracked as a follow-up issue.

New tests:
* RangeReadCache.EvictEntriesInRange
* RangeReadCache.EvictEntriesInRangeSpanningEntry
* RangeReadCache.ConcurrentReadAndEvict
* TestArrowReadWrite.EvictPreBufferedData
* TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups

Full regression sweep: 824/824 parquet-arrow-reader-writer-test,
57/57 arrow-io-memory-test.
@justinli500 justinli500 requested a review from wgtmac as a code owner April 24, 2026 06:01
@github-actions

Copy link
Copy Markdown

⚠️ GitHub issue #39808 has been automatically assigned in GitHub to PR creator.

Reformat the pre-buffer eviction changes with clang-format 18.1.8 to
satisfy the CI lint job. Whitespace and line-wrapping only; no behavior
change.
@justinli500 justinli500 requested a review from pitrou as a code owner June 15, 2026 02:30
@justinli500

Copy link
Copy Markdown
Author

Just pushed a clang-format fix for the only PR-related CI failure (lint). The two Windows MinGW failures should be an unrelated arrow-s3fs-test flake. Could a reviewer approve CI and take a look? Thanks!

@bveeramani

Copy link
Copy Markdown

@wgtmac @pitrou This is a huge issue for Ray users, and we're trying to decide if we need to move away from PyArrow for our Parquet reading implementation

Will you guys be unable to prioritize this PR? I know maintaining a project is a lot of work, so I'd understand if you can't take a look immediately

@wgtmac wgtmac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! I have two concerns with this patch.

First, eviction does not seem to account for coalesced cache entries that span row groups. See my inline comment.

Second, many of the comments feel too verbose and restate what the code is already doing. I’d suggest trimming them down to only the invariants or non-obvious reasoning.

Comment thread cpp/src/parquet/file_reader.cc Outdated
if (!cached_source_) {
return;
}
for (int row : row_groups) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eviction is done one row group at a time, but cache entries are only removed if they are fully contained in that row group’s byte window. With default coalescing, a single cache entry can span adjacent row groups, so evicting row group 0 leaves the entry because it extends past the window, and evicting row group 1 also leaves it because the entry starts before the window. That entry is then never released, so the memory growth this PR is meant to fix can still occur for small or adjacent row groups.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! Both addressed:

  • Cross-RG eviction: tracks runs of evicted row groups and evicts each run's combined window, so that a coalesced entry is freed once all its row groups are evicted (EvictEntriesInRange unchanged). Merges by buffered-set adjacency, so filtered/non-contiguous scans are covered too; safe under out-of-order eviction. Tests added for all three cases.
  • Comments: trimmed to the invariants.

@github-actions github-actions Bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Jun 18, 2026
@wgtmac

wgtmac commented Jun 26, 2026

Copy link
Copy Markdown
Member

Thanks for updating it! I still think this PR is more complex than necessary. A few concerns:

  • The new eviction API requires callers to manually decide when and what to evict, which is easy to misuse.
  • The implementation tracks evicted state by row group runs, but the API also accepts column_indices, so column-subset eviction can be incorrect or ambiguous.
  • The run-merging / bounding-box logic is subtle, especially with filtered row groups and coalesced cache entries.
  • The comments are quite verbose and sometimes explain the implementation too mechanically. It makes the patch feel harder to review than the actual idea.

Could we instead make this automatic for the sequential Arrow reader path? For example, add an opt-in reader property, and after each row group is decoded, advance a simple watermark over the contiguous completed row groups. Then evict cache entries ending before the first byte any remaining row group may need.

That would keep PreBuffer() semantics unchanged, avoid a public row-group/column eviction API, handle readahead safely, and reduce the amount of Parquet-specific bookkeeping needed.

I asked Codex to draft the patch below to demonstrate my idea. I'm not suggesting to adopt it as is but just a demo code in case I didn't make it clear.

Details
diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 41fdd9f781..8305869bcf 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -151,8 +151,10 @@ struct ReadRangeCache::Impl {
   IOContext ctx;
   CacheOptions options;
 
-  // Ordered by offset (so as to find a matching region by binary search)
+  // Ordered by offset (so as to find a matching region by binary search).
+  // Protects entries and the futures stored in entries.
   std::vector<RangeCacheEntry> entries;
+  std::mutex mutex;
 
   virtual ~Impl() = default;
 
@@ -179,14 +181,17 @@ struct ReadRangeCache::Impl {
         ranges, internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
                                              options.range_size_limit));
     std::vector<RangeCacheEntry> new_entries = MakeCacheEntries(ranges);
-    // Add new entries, themselves ordered by offset
-    if (entries.size() > 0) {
-      std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
-      std::merge(entries.begin(), entries.end(), new_entries.begin(), new_entries.end(),
-                 merged.begin());
-      entries = std::move(merged);
-    } else {
-      entries = std::move(new_entries);
+    {
+      std::unique_lock<std::mutex> guard(mutex);
+      // Add new entries, themselves ordered by offset
+      if (entries.size() > 0) {
+        std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
+        std::merge(entries.begin(), entries.end(), new_entries.begin(), new_entries.end(),
+                   merged.begin());
+        entries = std::move(merged);
+      } else {
+        entries = std::move(new_entries);
+      }
     }
     // Prefetch immediately, regardless of executor availability, if possible
     auto st = file->WillNeed(ranges);
@@ -205,14 +210,20 @@ struct ReadRangeCache::Impl {
       return std::make_shared<Buffer>(&byte, 0);
     }
 
-    const auto it = std::lower_bound(
-        entries.begin(), entries.end(), range,
-        [](const RangeCacheEntry& entry, const ReadRange& range) {
-          return entry.range.offset + entry.range.length < range.offset + range.length;
-        });
-    if (it != entries.end() && it->range.Contains(range)) {
-      auto fut = MaybeRead(&*it);
-      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+    Future<std::shared_ptr<Buffer>> fut;
+    int64_t slice_offset = 0;
+    {
+      std::unique_lock<std::mutex> guard(mutex);
+      const auto it = std::lower_bound(
+          entries.begin(), entries.end(), range,
+          [](const RangeCacheEntry& entry, const ReadRange& range) {
+            return entry.range.offset + entry.range.length < range.offset + range.length;
+          });
+      if (it == entries.end() || !it->range.Contains(range)) {
+        return Status::Invalid("ReadRangeCache did not find matching cache entry");
+      }
+      fut = MaybeRead(&*it);
+      slice_offset = range.offset - it->range.offset;
       if (options.lazy && options.prefetch_limit > 0) {
         int64_t num_prefetched = 0;
         for (auto next_it = it + 1;
@@ -226,19 +237,41 @@ struct ReadRangeCache::Impl {
           ++num_prefetched;
         }
       }
-      return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
     }
-    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+    ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+    return SliceBuffer(std::move(buf), slice_offset, range.length);
   }
 
   virtual Future<> Wait() {
     std::vector<Future<>> futures;
-    for (auto& entry : entries) {
-      futures.emplace_back(MaybeRead(&entry));
+    {
+      std::unique_lock<std::mutex> guard(mutex);
+      futures.reserve(entries.size());
+      for (auto& entry : entries) {
+        futures.emplace_back(MaybeRead(&entry));
+      }
     }
     return AllComplete(futures);
   }
 
+  int64_t EvictEntriesBefore(int64_t end_offset) {
+    int64_t n_evicted = 0;
+    std::unique_lock<std::mutex> guard(mutex);
+    auto it = entries.begin();
+    while (it != entries.end()) {
+      if (it->range.offset >= end_offset) {
+        break;
+      }
+      if (it->range.length <= end_offset - it->range.offset) {
+        it = entries.erase(it);
+        ++n_evicted;
+      } else {
+        ++it;
+      }
+    }
+    return n_evicted;
+  }
+
   // Return a Future that completes when the given ranges have been read.
   virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
     auto end = std::remove_if(ranges.begin(), ranges.end(),
@@ -246,17 +279,21 @@ struct ReadRangeCache::Impl {
     ranges.resize(end - ranges.begin());
     std::vector<Future<>> futures;
     futures.reserve(ranges.size());
-    for (auto& range : ranges) {
-      const auto it = std::lower_bound(
-          entries.begin(), entries.end(), range,
-          [](const RangeCacheEntry& entry, const ReadRange& range) {
-            return entry.range.offset + entry.range.length < range.offset + range.length;
-          });
-      if (it != entries.end() && it->range.Contains(range)) {
-        futures.push_back(Future<>(MaybeRead(&*it)));
-      } else {
-        return Status::Invalid("Range was not requested for caching: offset=",
-                               range.offset, " length=", range.length);
+    {
+      std::unique_lock<std::mutex> guard(mutex);
+      for (auto& range : ranges) {
+        const auto it =
+            std::lower_bound(entries.begin(), entries.end(), range,
+                             [](const RangeCacheEntry& entry, const ReadRange& range) {
+                               return entry.range.offset + entry.range.length <
+                                      range.offset + range.length;
+                             });
+        if (it != entries.end() && it->range.Contains(range)) {
+          futures.push_back(Future<>(MaybeRead(&*it)));
+        } else {
+          return Status::Invalid("Range was not requested for caching: offset=",
+                                 range.offset, " length=", range.length);
+        }
       }
     }
     return AllComplete(futures);
@@ -339,6 +376,10 @@ Future<> ReadRangeCache::WaitFor(std::vector<ReadRange> ranges) {
   return impl_->WaitFor(std::move(ranges));
 }
 
+int64_t ReadRangeCache::EvictEntriesBefore(int64_t end_offset) {
+  return impl_->EvictEntriesBefore(end_offset);
+}
+
 }  // namespace internal
 }  // namespace io
 }  // namespace arrow
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index e2b911fafd..55b1a03fc1 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -142,6 +142,14 @@ class ARROW_EXPORT ReadRangeCache {
   /// \brief Wait until all given ranges have been cached.
   Future<> WaitFor(std::vector<ReadRange> ranges);
 
+  /// \brief Evict cache entries ending at or before the given offset.
+  ///
+  /// This releases the memory held by those entries. Buffers already returned
+  /// by Read() remain valid through their shared ownership.
+  ///
+  /// \return Number of cache entries that were evicted.
+  int64_t EvictEntriesBefore(int64_t end_offset);
+
  protected:
   struct Impl;
   struct LazyImpl;
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index cc107c1802..9e1fbbb538 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -19,7 +19,9 @@
 
 #include <algorithm>
 #include <cstring>
+#include <limits>
 #include <memory>
+#include <mutex>
 #include <random>
 #include <unordered_set>
 #include <utility>
@@ -1144,6 +1146,39 @@ Result<std::unique_ptr<RecordBatchReader>> FileReaderImpl::GetRecordBatchReader(
 
 /// Given a file reader and a list of row groups, this is a generator of record
 /// batch generators (where each sub-generator is the contents of a single row group).
+class ReadCacheEvictionState {
+ public:
+  explicit ReadCacheEvictionState(std::vector<int64_t> evict_before_offsets)
+      : evict_before_offsets_(std::move(evict_before_offsets)),
+        completed_(evict_before_offsets_.size() - 1, false) {}
+
+  void RowGroupDecoded(ParquetFileReader* reader, size_t row_group_index) {
+    int64_t evict_before = -1;
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      if (row_group_index >= completed_.size() || completed_[row_group_index]) {
+        return;
+      }
+      completed_[row_group_index] = true;
+      const size_t old_completed_prefix = completed_prefix_;
+      while (completed_prefix_ < completed_.size() && completed_[completed_prefix_]) {
+        ++completed_prefix_;
+      }
+      if (completed_prefix_ == old_completed_prefix) {
+        return;
+      }
+      evict_before = evict_before_offsets_[completed_prefix_];
+    }
+    reader->EvictPreBufferedDataBefore(evict_before);
+  }
+
+ private:
+  std::mutex mutex_;
+  std::vector<int64_t> evict_before_offsets_;
+  std::vector<bool> completed_;
+  size_t completed_prefix_ = 0;
+};
+
 class RowGroupGenerator {
  public:
   using RecordBatchGenerator =
@@ -1157,12 +1192,14 @@ class RowGroupGenerator {
   explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
                              ::arrow::internal::Executor* cpu_executor,
                              std::vector<int> row_groups, std::vector<int> column_indices,
-                             int64_t min_rows_in_flight)
+                             int64_t min_rows_in_flight,
+                             std::shared_ptr<ReadCacheEvictionState> eviction_state)
       : arrow_reader_(std::move(arrow_reader)),
         cpu_executor_(cpu_executor),
         row_groups_(std::move(row_groups)),
         column_indices_(std::move(column_indices)),
         min_rows_in_flight_(min_rows_in_flight),
+        eviction_state_(std::move(eviction_state)),
         rows_in_flight_(0),
         index_(0),
         readahead_index_(0) {}
@@ -1207,12 +1244,21 @@ class RowGroupGenerator {
     } else {
       auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
       if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
-      row_group_read =
-          ready.Then([cpu_executor = cpu_executor_, reader, row_group,
-                      column_indices = std::move(
-                          column_indices)]() -> ::arrow::Future<RecordBatchGenerator> {
-            return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices);
-          });
+      row_group_read = ready.Then([cpu_executor = cpu_executor_, reader, row_group,
+                                   row_group_index, eviction_state = eviction_state_,
+                                   column_indices = std::move(column_indices)]()
+                                      -> ::arrow::Future<RecordBatchGenerator> {
+        return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices)
+            .Then([reader, row_group_index, eviction_state = std::move(eviction_state)](
+                      RecordBatchGenerator generator)
+                      -> ::arrow::Result<RecordBatchGenerator> {
+              if (eviction_state) {
+                eviction_state->RowGroupDecoded(reader->parquet_reader(),
+                                                row_group_index);
+              }
+              return std::move(generator);
+            });
+      });
     }
     in_flight_reads_.push({std::move(row_group_read), num_rows});
   }
@@ -1253,6 +1299,7 @@ class RowGroupGenerator {
   std::vector<int> row_groups_;
   std::vector<int> column_indices_;
   int64_t min_rows_in_flight_;
+  std::shared_ptr<ReadCacheEvictionState> eviction_state_;
   std::queue<ReadRequest> in_flight_reads_;
   int64_t rows_in_flight_;
   size_t index_;
@@ -1275,10 +1322,30 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
                        reader_properties_.cache_options());
     END_PARQUET_CATCH_EXCEPTIONS
   }
+  std::shared_ptr<ReadCacheEvictionState> eviction_state;
+  if (reader_properties_.pre_buffer() && reader_properties_.auto_evict_read_cache() &&
+      !column_indices.empty()) {
+    const int64_t no_more_ranges = std::numeric_limits<int64_t>::max();
+    std::vector<int64_t> evict_before_offsets(row_group_indices.size() + 1,
+                                              no_more_ranges);
+    for (int64_t i = static_cast<int64_t>(row_group_indices.size()) - 1; i >= 0; --i) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto ranges, reader_->GetReadRanges({row_group_indices[static_cast<size_t>(i)]},
+                                              column_indices));
+      int64_t row_group_min_offset = no_more_ranges;
+      for (const auto& range : ranges) {
+        row_group_min_offset = std::min(row_group_min_offset, range.offset);
+      }
+      evict_before_offsets[static_cast<size_t>(i)] = std::min(
+          evict_before_offsets[static_cast<size_t>(i + 1)], row_group_min_offset);
+    }
+    eviction_state =
+        std::make_shared<ReadCacheEvictionState>(std::move(evict_before_offsets));
+  }
   ::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
       RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
                         cpu_executor, row_group_indices, column_indices,
-                        rows_to_readahead);
+                        rows_to_readahead, std::move(eviction_state));
   ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> concatenated =
       ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
   WRAP_ASYNC_GENERATOR(std::move(concatenated));
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 2f46a5e296..bd6c3f3115 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -403,6 +403,13 @@ class SerializedFile : public ParquetFileReader::Contents {
     PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
   }
 
+  int64_t EvictPreBufferedDataBefore(int64_t end_offset) {
+    if (!cached_source_) {
+      return 0;
+    }
+    return cached_source_->EvictEntriesBefore(end_offset);
+  }
+
   Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
       const std::vector<int>& row_groups, const std::vector<int>& column_indices,
       int64_t hole_size_limit, int64_t range_size_limit) {
@@ -909,6 +916,13 @@ void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups,
   file->PreBuffer(row_groups, column_indices, ctx, options);
 }
 
+int64_t ParquetFileReader::EvictPreBufferedDataBefore(int64_t end_offset) {
+  // Access private methods here
+  SerializedFile* file =
+      ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
+  return file->EvictPreBufferedDataBefore(end_offset);
+}
+
 Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadRanges(
     const std::vector<int>& row_groups, const std::vector<int>& column_indices,
     int64_t hole_size_limit, int64_t range_size_limit) {
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index c42163276c..8662dcc0d9 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -201,6 +201,12 @@ class PARQUET_EXPORT ParquetFileReader {
                  const ::arrow::io::IOContext& ctx,
                  const ::arrow::io::CacheOptions& options);
 
+  /// \brief Release cached entries ending at or before the given offset.
+  ///
+  /// This only affects data cached by PreBuffer(). Buffers already returned to
+  /// readers remain valid through their shared ownership.
+  int64_t EvictPreBufferedDataBefore(int64_t end_offset);
+
   /// Retrieve the list of byte ranges that would need to be read to retrieve
   /// the data for the specified row groups and column indices.
   ///
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index e2244a1176..abd22683fb 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -1151,6 +1151,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
         read_dict_indices_(),
         batch_size_(kArrowDefaultBatchSize),
         pre_buffer_(true),
+        auto_evict_read_cache_(false),
         cache_options_(::arrow::io::CacheOptions::LazyDefaults()),
         coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO),
         binary_type_(kArrowDefaultBinaryType),
@@ -1235,6 +1236,14 @@ class PARQUET_EXPORT ArrowReaderProperties {
   /// Return whether read coalescing is enabled.
   bool pre_buffer() const { return pre_buffer_; }
 
+  /// Set whether to automatically evict pre-buffered read cache entries while
+  /// consuming row groups sequentially.
+  ///
+  /// Default is false.
+  void set_auto_evict_read_cache(bool auto_evict) { auto_evict_read_cache_ = auto_evict; }
+  /// Return whether the Arrow reader automatically evicts read cache entries.
+  bool auto_evict_read_cache() const { return auto_evict_read_cache_; }
+
   /// Set options for read coalescing. This can be used to tune the
   /// implementation for characteristics of different filesystems.
   void set_cache_options(::arrow::io::CacheOptions options) { cache_options_ = options; }
@@ -1300,6 +1309,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
   std::unordered_set<int> read_dict_indices_;
   int64_t batch_size_;
   bool pre_buffer_;
+  bool auto_evict_read_cache_;
   ::arrow::io::IOContext io_context_;
   ::arrow::io::CacheOptions cache_options_;
   ::arrow::TimeUnit::type coerce_int96_timestamp_unit_;

@wgtmac

wgtmac commented Jun 26, 2026

Copy link
Copy Markdown
Member

@pitrou Do you have an opinion on this? I don't want to waste author's effort if my suggestion is not the right direction.

@wgtmac

wgtmac commented Jun 26, 2026

Copy link
Copy Markdown
Member

cc @mapleFU

@justinli500

Copy link
Copy Markdown
Author

Received - @wgtmac your approach looks great and I'd be glad to pivot.

Just wondering, though, since it defaults off, how would Dataset.to_batches in the #39808 path actually get the fix? Would we enable it in the scanner, or flip the default later?

Happy to hold until we get more eyes on this though.

@wgtmac

wgtmac commented Jun 27, 2026

Copy link
Copy Markdown
Member

I made the option default off only to avoid changing the general PreBuffer() contract. Some callers may expect the cache to stay alive until the next PreBuffer() or reader destruction. For the Dataset GetRecordBatchGenerator path, I think we should enable this automatically when pre_buffer=true. That path reads row groups once, so it has a clear safe point to evict after each row group is decoded.

@justinli500

Copy link
Copy Markdown
Author

Got it - in that case, I'll pivot to your approach and start working on it now. Going to key eviction off pre_buffer() in the GetRecordBatchGenerator path so the Dataset/#39808 case is covered, and will use the offset-watermark so that merged cross-row-group entries free by construction.

…termark

Replace the run-merging pre-buffer eviction with a simpler offset
watermark. ParquetFileReader now exposes EvictPreBufferedDataBefore(
end_offset), delegating to ReadRangeCache::EvictEntriesBefore, and the
SerializedFile run-tracking state is removed.

The async record-batch generator builds a ReadCacheEvictionState that
advances a watermark over the contiguous prefix of decoded row groups
and evicts every cache entry ending before the lowest byte any not-yet-
completed row group still needs. A coalesced entry spanning a row-group
boundary is freed once the watermark passes its end. Eviction is keyed
off pre_buffer() and confined to this read-once generator path, so
PreBuffer()'s contract for other callers is unchanged.
Move ReadCacheEvictionState to reader_internal.h and split its prefix-advance logic into a reader-free MarkDecodedAndGetEvictOffset() method that returns the evict-before offset (or nullopt). RowGroupDecoded is now a thin wrapper, so production behavior is unchanged while the out-of-order watermark advance is unit-testable without a reader.

Add a deterministic ReadCacheEvictionStateOutOfOrder test driving that method directly, covering out-of-order completion, prefix jumps, and idempotent re-decode.

Also drop now-dead includes (<mutex> in reader.cc once the class moved out; <limits>/<map>/<mutex>/<set> in file_reader.cc, left over from the removed run-merging members).
Drop comments that restate the code or duplicate the header doc, keeping invariants and non-obvious rationale: the impl-side restatement above EvictEntriesBefore, the SerializedFile EvictPreBufferedDataBefore comment (already documented on the public method), and the non-empty precondition note on the eviction-state constructor (implied by the index invariant above it).
@justinli500 justinli500 force-pushed the GH-39808-parquet-dataset-memory-accumulation branch from 5e88968 to de90ce7 Compare June 28, 2026 20:17
@wgtmac

wgtmac commented Jun 30, 2026

Copy link
Copy Markdown
Member

Could you please make CI happy and trim those added comments (they are still too long to read)?

The cross-row-group eviction test set range_size_limit == hole_size_limit, which trips the range_size_limit > hole_size_limit check in debug/CI builds (the release build compiles it out). Make range one byte larger so the limits stay valid while still coalescing every chunk into one entry. Also trim the remaining added comments to bare invariants per review feedback.
@justinli500

Copy link
Copy Markdown
Author

Just fixed both, should work now!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Python] Dataset.to_batches accumulates memory usage and leaks

3 participants