Skip to content
26 changes: 17 additions & 9 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,12 @@ class ReplicaManager(val config: KafkaConfig,
new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", config.brokerId,
config.deleteRecordsPurgatoryPurgeIntervalRequests))
// delayedRemoteFetchPurgatory purgeInterval is set to 0 to release the references of completed DelayedRemoteFetch
// instances immediately for GC. The DelayedRemoteFetch instance internally holds the RemoteLogReadResult that can be
// up to the size of `fetch.max.bytes` which defaults to 50 MB.
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", config.brokerId))
"RemoteFetch", config.brokerId, 0))
val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedRemoteListOffsets](
"RemoteListOffsets", config.brokerId))
Expand Down Expand Up @@ -1637,7 +1640,7 @@ class ReplicaManager(val config: KafkaConfig,
params: FetchParams,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]

Expand All @@ -1649,10 +1652,10 @@ class ReplicaManager(val config: KafkaConfig,

val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)
fetchPartitionStatus, params, logReadResults, this, responseCallback)

// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get the fix. I'm really surprised this fixes the memory leak. Could you explain why it leaks memory here? I thought we only use the tp in remoteFetchPartitionStatus to create the delayedFetchKeys list, where is the leak? We don't hold the FetchPartitionStatus at all, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

  1. While writing the unit test, realized that this fix does not solve the problem fully.
  2. The remoteFetchPartitionStatus might contain the partitions that are for local-reads, so those keys won't complete at all.
  3. The fix reduces the severity of the issue by maintaining the keys only for the remote-request issued.
  4. The DelayedRemoteFetch operation completes only when all the keys are complete. So, if we are watching for 4 keys (tp0, tp1, tp2, and tp3). Only for the last key, the reference gets removed. Rest of them, it retains.
  5. The next FETCH request for the same set of partitions clears the DelayedRemoteFetch references from the previous FETCH request.
  6. There would be leak for one DelayedRemoteFetch object per partition when it transits from remote-to-local log. We can reduce the purgeInterval (entries) from 1000 to 10 so that reaper threads can clear those completed delayed operation references.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! Yes, you're right. If we have purgeInterval = 1000, and each operation has 1 remote -> local log, and each has 1 MiB buffer reference, that means we could leak ~ 1 GB of memory. Reducing the purgeInterval is a way to resolve the issue I agree. But since each operation could contain more than 1 key (partition), I want to see if we can have a better solution for that. Let me think about it.

Copy link
Member

@showuon showuon Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with the solution to reduce the purgeInterval from 1000 to 10. The worst case, we will keep up to 10 remote fetched records in memory, and each fetched total record size is bounded by fetch.max.bytes (default 50MiB), so total 500 MiB. If we want to reduce the purgeInterval, I'd suggest we set to 0, so that we make sure all completed operation are removed. The trade-off is that if the same key added soon (ex: the same remote fetch partition comes in) , it needs to re-create an entry in a concurrentMap lock.

Another solution I came out, is that we added a lastModifiedMs in WatcherList (we now have 512 WatcherList). And when we advanceClock, we not only check the purgeInterval, we also do the purge when WatcherList has no update after a timeout (ex: 1 second?). This will make sure the active watchers will stay in the cache to avoid the need to recreate an entry in concurrentMap.

WDYT?

Copy link
Contributor Author

@kamalcph kamalcph Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that we added a lastModifiedMs in WatcherList

We can add lastModifiedMs to the watcherList. But, there can be multiple keys that can map to the same watcherList and the lastModifiedTime gets updated on each invocation. So, it may not fully address the problem. If the value is kept to low like 1 second, then the chance of leak might go down drastically.

The new attribute lastModifiedMs can be introduced in a separate PR if required since the DelayedOperationPurgatory is used by multiple components. (to make revert easier if something goes off).

it needs to re-create an entry in a concurrentMap lock.

The purgeInterval can be reduced to 0. The watchForOperation already takes a lock. This trade-off to re-create an entry looks fine to me since the remote operation are not expected to be too-frequent. If this is fine, then I'll reduce the purgeInterval to 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree to reduce purgeInterval to 0, then the lastModifiedMs is not needed anymore. @satishd @chia7712 , do you think reducing purgeInterval to 0 will cause any unexpected result?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reaper thread is invoked every 200 ms. So, the check for purged entries is done every 200 ms as the purge interval is 0. It goes through all the registered entries and finds the completed operations. We can go ahead with this change for now. We can have any followups required in the followup PR if needed.

delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, delayedFetchKeys.asJava)
}

Expand Down Expand Up @@ -1737,6 +1740,8 @@ class ReplicaManager(val config: KafkaConfig,
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
// We only guarantee eventual cleanup via the next FETCH request for the same set of partitions or
// using reaper-thread.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys.asJava)
}
}
Expand Down Expand Up @@ -1926,11 +1931,14 @@ class ReplicaManager(val config: KafkaConfig,
Optional.empty()
)
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the topic-partitions that need remote data, we will use this information to read the data in another thread.
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp,
fetchInfo, params.isolation)))
val remoteStorageFetchInfoOpt = if (adjustedMaxBytes > 0) {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the topic-partitions that need remote data, we will use this information to read the data in another thread.
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, fetchInfo, params.isolation))
} else {
Optional.empty[RemoteStorageFetchInfo]()
}
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), remoteStorageFetchInfoOpt)
}

new LogReadResult(fetchDataInfo,
Expand Down
Loading