diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e01d264f53962..38db3275c8d67 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -251,9 +251,12 @@ class ReplicaManager(val config: KafkaConfig, new DelayedOperationPurgatory[DelayedDeleteRecords]( "DeleteRecords", config.brokerId, config.deleteRecordsPurgatoryPurgeIntervalRequests)) + // delayedRemoteFetchPurgatory purgeInterval is set to 0 to release the references of completed DelayedRemoteFetch + // instances immediately for GC. The DelayedRemoteFetch instance internally holds the RemoteLogReadResult that can be + // up to the size of `fetch.max.bytes` which defaults to 50 MB. val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse( new DelayedOperationPurgatory[DelayedRemoteFetch]( - "RemoteFetch", config.brokerId)) + "RemoteFetch", config.brokerId, 0)) val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse( new DelayedOperationPurgatory[DelayedRemoteListOffsets]( "RemoteListOffsets", config.brokerId)) @@ -1637,7 +1640,7 @@ class ReplicaManager(val config: KafkaConfig, params: FetchParams, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, logReadResults: Seq[(TopicIdPartition, LogReadResult)], - remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { + fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]] val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]] @@ -1649,10 +1652,10 @@ class ReplicaManager(val config: KafkaConfig, val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs, - remoteFetchPartitionStatus, params, logReadResults, this, responseCallback) + fetchPartitionStatus, params, logReadResults, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList + val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, delayedFetchKeys.asJava) } @@ -1737,6 +1740,8 @@ class ReplicaManager(val config: KafkaConfig, // try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed fetch operation is being created, new requests // may arrive and hence make this operation completable. + // We only guarantee eventual cleanup via the next FETCH request for the same set of partitions or + // using reaper-thread. delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys.asJava) } } @@ -1926,11 +1931,14 @@ class ReplicaManager(val config: KafkaConfig, Optional.empty() ) } else { - // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information. - // For the topic-partitions that need remote data, we will use this information to read the data in another thread. - new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), - Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, - fetchInfo, params.isolation))) + val remoteStorageFetchInfoOpt = if (adjustedMaxBytes > 0) { + // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information. + // For the topic-partitions that need remote data, we will use this information to read the data in another thread. + Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, fetchInfo, params.isolation)) + } else { + Optional.empty[RemoteStorageFetchInfo]() + } + new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), remoteStorageFetchInfoOpt) } new LogReadResult(fetchDataInfo, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index eb5491812c109..a29c8468d8b51 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -72,11 +72,11 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa import org.apache.kafka.server.transaction.AddPartitionsToTxnManager import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION, GENERIC_ERROR_SUPPORTED} -import org.apache.kafka.server.util.timer.MockTimer +import org.apache.kafka.server.util.timer.{MockTimer, SystemTimer} import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test} @@ -97,7 +97,7 @@ import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, Cou import java.util.function.{BiConsumer, Consumer} import java.util.stream.IntStream import java.util.{Collections, Optional, OptionalLong, Properties} -import scala.collection.{mutable, Map, Seq} +import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.{RichOption, RichOptional} @@ -2964,7 +2964,8 @@ class ReplicaManagerTest { setupLogDirMetaProperties: Boolean = false, directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, buildRemoteLogAuxState: Boolean = false, - remoteFetchQuotaExceeded: Option[Boolean] = None + remoteFetchQuotaExceeded: Option[Boolean] = None, + remoteFetchReaperEnabled: Boolean = false, ): ReplicaManager = { val props = TestUtils.createBrokerConfig(brokerId) val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath @@ -3007,8 +3008,9 @@ class ReplicaManagerTest { "Fetch", timer, 0, false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( "DeleteRecords", timer, 0, false) + // When enabling reaper, set the new timer instance so that other tests won't be affected. val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( - "DelayedRemoteFetch", timer, 0, false) + "DelayedRemoteFetch", if (remoteFetchReaperEnabled) new SystemTimer("DelayedRemoteFetch") else timer, 0, 0, remoteFetchReaperEnabled, true) val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( "RemoteListOffsets", timer, 0, false) val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch]( @@ -3433,7 +3435,10 @@ class ReplicaManagerTest { Optional.empty) val spyRLM = spy(remoteLogManager) - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM)) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), + // Increased the `remote.max.wait.ms` to avoid flaky test due to usage of SystemTimer + propsModifier = props => props.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, 120000.toString), + enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM), remoteFetchReaperEnabled = true) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) @@ -3599,9 +3604,10 @@ class ReplicaManagerTest { val tp1 = new TopicPartition(topic, 1) val tidp0 = new TopicIdPartition(topicId, tp0) val tidp1 = new TopicIdPartition(topicId, tp1) - - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false)) - + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), + // Increased the `remote.max.wait.ms` to avoid flaky test due to usage of SystemTimer + propsModifier = props => props.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, 120000), + enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false), remoteFetchReaperEnabled = true) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) @@ -3615,7 +3621,7 @@ class ReplicaManagerTest { replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0) replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1) - val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty) + val params = new FetchParams(replicaId, 1, 1000, 10, 200000, FetchIsolation.LOG_END, Optional.empty) val fetchOffsetTp0 = 1 val fetchOffsetTp1 = 2 @@ -3662,6 +3668,7 @@ class ReplicaManagerTest { assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get()) if (fetchInfo.topicIdPartition.topicPartition == tp0) { assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset) + assertTrue(fetchInfo.minOneMessage()) } else { assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset) } @@ -3684,6 +3691,116 @@ class ReplicaManagerTest { responseData.foreach { case (_, fetchPartitionData) => assertEquals(Errors.NONE, fetchPartitionData.error) } + TestUtils.waitUntilTrue(() => replicaManager.delayedRemoteFetchPurgatory.watched == 0, + "DelayedRemoteFetch purgatory should not have any pending / completed operation") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testMultipleRemoteFetchCountInOneFetchRequestWhenBreachingFetchMaxBytes(): Unit = { + val replicaId = -1 + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) + val tp2 = new TopicPartition(topic, 2) + val tidp0 = new TopicIdPartition(topicId, tp0) + val tidp1 = new TopicIdPartition(topicId, tp1) + val tidp2 = new TopicIdPartition(topicId, tp2) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), + // Increased the `remote.max.wait.ms` to avoid flaky test due to usage of SystemTimer + propsModifier = props => props.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, 120000), + enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false), remoteFetchReaperEnabled = true) + try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) + val leaderEpoch = 0 + + def createPartition(tp: TopicPartition): Unit = { + replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) + val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, leaderEpoch = leaderEpoch) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) + } + createPartition(tp0) + createPartition(tp1) + createPartition(tp2) + + val params = new FetchParams(replicaId, 1, 1000, 10, 200000, FetchIsolation.LOG_END, Optional.empty) + val fetchOffsetTp0 = 1 + val fetchOffsetTp1 = 2 + val fetchOffsetTp2 = 3 + + val responseSeq = new AtomicReference[Seq[(TopicIdPartition, FetchPartitionData)]]() + val responseLatch = new CountDownLatch(1) + + def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.set(responseStatus) + responseLatch.countDown() + } + + val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new util.HashSet[Consumer[RemoteLogReadResult]]() + when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => { + callbacks.add(ans.getArgument(1, classOf[Consumer[RemoteLogReadResult]])) + mock(classOf[Future[Void]]) + }) + + // Start the fetch request for 3 partitions - this should trigger remote fetches since the default mocked log + // behavior throws OffsetOutOfRangeException + replicaManager.fetchMessages(params, Seq( + tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)), + tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)), + tidp2 -> new PartitionData(topicId, fetchOffsetTp2, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)) + ), UNBOUNDED_QUOTA, fetchCallback) + + // Verify that exactly two asyncRead calls were made. + // tidp2 should not send remote request since the FETCH max bytes limit is already exceeded. + val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo]) + verify(mockRemoteLogManager, times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any()) + + // Verify that remote fetch operations were properly set up for both partitions + assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, "DelayedRemoteFetch purgatory should have operations") + + // Verify both partitions were captured in the remote fetch requests + val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala + assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls") + + val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet + assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0) + assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1) + + // Verify the fetch info details are correct for both partitions + capturedFetchInfos.foreach { fetchInfo => + assertEquals(topicId, fetchInfo.fetchInfo.topicId) + assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset) + assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get()) + if (fetchInfo.topicIdPartition.topicPartition == tp0) { + assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset) + assertTrue(fetchInfo.minOneMessage()) + } else { + assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset) + } + } + + // Complete the 2 asyncRead tasks + callbacks.forEach(callback => callback.accept(buildRemoteReadResult(Errors.NONE))) + + // Wait for the fetch callback to complete and verify responseSeq content + assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback should complete") + + val responseData = responseSeq.get() + assertNotNull(responseData, "Response sequence should not be null") + assertEquals(3, responseData.size, "Response should contain data for both partitions") + + // Verify that response contains all partitions (tidp0, tidp1, and tidp2) and have no errors + val responseTopicIdPartitions = responseData.map(_._1).toSet + assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should contain " + tidp0) + assertTrue(responseTopicIdPartitions.contains(tidp1), "Response should contain " + tidp1) + assertTrue(responseTopicIdPartitions.contains(tidp2), "Response should contain " + tidp2) + responseData.foreach { case (_, fetchPartitionData) => + assertEquals(Errors.NONE, fetchPartitionData.error) + } + TestUtils.waitUntilTrue(() => replicaManager.delayedRemoteFetchPurgatory.watched == 0, + "DelayedRemoteFetch purgatory should not have any pending / completed operation") } finally { replicaManager.shutdown(checkpointHW = false) } @@ -5721,25 +5838,29 @@ class ReplicaManagerTest { val aliveBrokerIds = Array(1, 2) val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) - val directoryIds = rm.logManager.directoryIdsSet.toList - assertEquals(directoryIds.size, 2) - val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) - val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get - partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), - isNew = false, - new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), - None) - - def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { - assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code) - } + try { + val directoryIds = rm.logManager.directoryIdsSet.toList + assertEquals(directoryIds.size, 2) + val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) + val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get + partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), + isNew = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), + None) - // default internal topics delete disabled - rm.deleteRecords( - timeout = 0L, - Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L), - responseCallback = callback - ) + def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { + assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code) + } + + // default internal topics delete disabled + rm.deleteRecords( + timeout = 0L, + Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L), + responseCallback = callback + ) + } finally { + rm.shutdown(checkpointHW = false) + } } @Test @@ -5750,26 +5871,30 @@ class ReplicaManagerTest { val aliveBrokerIds = Array(1, 2) val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + try { val directoryIds = rm.logManager.directoryIdsSet.toList assertEquals(directoryIds.size, 2) val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get - partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), - isNew = false, - new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), - None) + partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), + isNew = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), + None) - def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { - assert(responseStatus.values.head.errorCode == Errors.NONE.code) - } + def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { + assert(responseStatus.values.head.errorCode == Errors.NONE.code) + } - // internal topics delete allowed - rm.deleteRecords( - timeout = 0L, - Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L), - responseCallback = callback, - allowInternalTopicDeletion = true - ) + // internal topics delete allowed + rm.deleteRecords( + timeout = 0L, + Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L), + responseCallback = callback, + allowInternalTopicDeletion = true + ) + } finally { + rm.shutdown(checkpointHW = false) + } } @Test