Skip to content

Conversation

@kamalcph
Copy link
Contributor

@kamalcph kamalcph commented Oct 7, 2025

Broker heap memory gets filled up and throws OOM error when remote reads
are triggered for multiple partitions within a FETCH request.

Steps to reproduce:

  1. Start a one node broker and configure LocalTieredStorage as remote
    storage.
  2. Create a topic with 5 partitions.
  3. Produce message and ensure that few segments are uploaded to remote.
  4. Start a consumer to read from those 5 partitions. Seek the offset to
    beginning for 4 partitions and to end for 1 partition. This is to
    simulate that the FETCH request read from both remote-log and local-log.
  5. The broker crashes with the OOM error.
  6. The DelayedRemoteFetch / RemoteLogReadResult references are being
    held by the purgatory, so the broker crashes.

Reviewers: Luke Chen [email protected], Satish Duggana
[email protected]

@github-actions github-actions bot added core Kafka Broker triage PRs from the community small Small PRs labels Oct 7, 2025
@kamalcph kamalcph requested a review from showuon October 7, 2025 15:14
@kamalcph
Copy link
Contributor Author

kamalcph commented Oct 7, 2025

@satishd @showuon

PTAL, will cover the patch with unit test. Thanks!

Shall we also reduce the purgeInterval for DelayedRemoteFetch purgatory from 1000 to 100? This is to cleanup any left-over completed delayed operations from the watcher.

@kamalcph kamalcph requested review from chia7712 and satishd October 7, 2025 15:21
@github-actions github-actions bot removed the small Small PRs label Oct 8, 2025

// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get the fix. I'm really surprised this fixes the memory leak. Could you explain why it leaks memory here? I thought we only use the tp in remoteFetchPartitionStatus to create the delayedFetchKeys list, where is the leak? We don't hold the FetchPartitionStatus at all, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

  1. While writing the unit test, realized that this fix does not solve the problem fully.
  2. The remoteFetchPartitionStatus might contain the partitions that are for local-reads, so those keys won't complete at all.
  3. The fix reduces the severity of the issue by maintaining the keys only for the remote-request issued.
  4. The DelayedRemoteFetch operation completes only when all the keys are complete. So, if we are watching for 4 keys (tp0, tp1, tp2, and tp3). Only for the last key, the reference gets removed. Rest of them, it retains.
  5. The next FETCH request for the same set of partitions clears the DelayedRemoteFetch references from the previous FETCH request.
  6. There would be leak for one DelayedRemoteFetch object per partition when it transits from remote-to-local log. We can reduce the purgeInterval (entries) from 1000 to 10 so that reaper threads can clear those completed delayed operation references.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! Yes, you're right. If we have purgeInterval = 1000, and each operation has 1 remote -> local log, and each has 1 MiB buffer reference, that means we could leak ~ 1 GB of memory. Reducing the purgeInterval is a way to resolve the issue I agree. But since each operation could contain more than 1 key (partition), I want to see if we can have a better solution for that. Let me think about it.

Copy link
Member

@showuon showuon Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with the solution to reduce the purgeInterval from 1000 to 10. The worst case, we will keep up to 10 remote fetched records in memory, and each fetched total record size is bounded by fetch.max.bytes (default 50MiB), so total 500 MiB. If we want to reduce the purgeInterval, I'd suggest we set to 0, so that we make sure all completed operation are removed. The trade-off is that if the same key added soon (ex: the same remote fetch partition comes in) , it needs to re-create an entry in a concurrentMap lock.

Another solution I came out, is that we added a lastModifiedMs in WatcherList (we now have 512 WatcherList). And when we advanceClock, we not only check the purgeInterval, we also do the purge when WatcherList has no update after a timeout (ex: 1 second?). This will make sure the active watchers will stay in the cache to avoid the need to recreate an entry in concurrentMap.

WDYT?

Copy link
Contributor Author

@kamalcph kamalcph Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that we added a lastModifiedMs in WatcherList

We can add lastModifiedMs to the watcherList. But, there can be multiple keys that can map to the same watcherList and the lastModifiedTime gets updated on each invocation. So, it may not fully address the problem. If the value is kept to low like 1 second, then the chance of leak might go down drastically.

The new attribute lastModifiedMs can be introduced in a separate PR if required since the DelayedOperationPurgatory is used by multiple components. (to make revert easier if something goes off).

it needs to re-create an entry in a concurrentMap lock.

The purgeInterval can be reduced to 0. The watchForOperation already takes a lock. This trade-off to re-create an entry looks fine to me since the remote operation are not expected to be too-frequent. If this is fine, then I'll reduce the purgeInterval to 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree to reduce purgeInterval to 0, then the lastModifiedMs is not needed anymore. @satishd @chia7712 , do you think reducing purgeInterval to 0 will cause any unexpected result?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reaper thread is invoked every 200 ms. So, the check for purged entries is done every 200 ms as the purge interval is 0. It goes through all the registered entries and finds the completed operations. We can go ahead with this change for now. We can have any followups required in the followup PR if needed.

@github-actions github-actions bot removed the triage PRs from the community label Oct 9, 2025
Copy link

@danish-ali danish-ali left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The switch from remoteFetchPartitionStatus → remoteFetchTasks.asScala.map { tp … } makes sense since it limits keys to actual remote requests.
Could we (1) add a targeted test that forces a partition to transition remote → local mid-fetch and asserts that the purgatory entry is removed (or reaped) after completion, and (2) document in code that we only guarantee eventual cleanup (via next FETCH or reaper)?
A short comment where delayedFetchKeys is built would help future readers.

@kamalcph
Copy link
Contributor Author

Addressed the review comments. PTAL.

@showuon Tried enabling the remoteFetchReaperEnabled as true in testRemoteLogReaderMetrics, the test fails to assert that RemoteLogReaderFetchRateAndTimeMs is 5. Seems to be a metric issue. Could you take a look? Thanks!

Tried changing the remote.fetch.max.wait.ms to 5s in testRemoteLogReaderMetrics, it didn't help:

 props.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, 5000.toString)
 ...
  val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2),
      propsModifier = props => props.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, 5000.toString), 
      enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM), remoteFetchReaperEnabled = true)

@kamalcph
Copy link
Contributor Author

Fixed the testRemoteLogReaderMetrics. PTAL. Thanks!

Copy link
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding tests.

@kamalcph
Copy link
Contributor Author

@satishd @chia7712

Call for review. PTAL. Thanks!

Copy link
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for finding and fixing the bug!

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kamalcph for identifying the issue and our offline discussion in fixing this issue. Overall LGTM.

readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else recordBatchSize
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (estimatedRecordBatchSize > 0)
minOneMessage = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us keep this PR only for the memory leak issue in remote reads. Please have another PR related to minOneMessage scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #20706 to fix minOneMessage case.

@showuon
One additional change in #20706 is that remoteFetchInfos changed from HashMap to LinkedHashMap to preserve the order of partitions while sending the remote fetches. PTAL.

Thanks for the review!

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kamalcph for addressing the review comments. LGTM.

@kamalcph kamalcph merged commit c58cf1d into apache:trunk Oct 15, 2025
25 checks passed
@kamalcph kamalcph deleted the KAFKA-19763 branch October 15, 2025 16:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants