diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 38db3275c8d67..959ae2cde384b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1636,7 +1636,7 @@ class ReplicaManager(val config: KafkaConfig, /** * Process all remote fetches by creating async read tasks and handling them in DelayedRemoteFetch collectively. */ - private def processRemoteFetches(remoteFetchInfos: util.HashMap[TopicIdPartition, RemoteStorageFetchInfo], + private def processRemoteFetches(remoteFetchInfos: util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo], params: FetchParams, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, logReadResults: Seq[(TopicIdPartition, LogReadResult)], @@ -1675,7 +1675,7 @@ class ReplicaManager(val config: KafkaConfig, var errorReadingData = false // topic-partitions that have to be read from remote storage - val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() + val remoteFetchInfos = new util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo]() var hasDivergingEpoch = false var hasPreferredReadReplica = false @@ -1879,13 +1879,13 @@ class ReplicaManager(val config: KafkaConfig, readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) val recordBatchSize = readResult.info.records.sizeInBytes - // Once we read from a non-empty partition, we stop ignoring request and partition level size limits - if (recordBatchSize > 0) - minOneMessage = false // Because we don't know how much data will be retrieved in remote fetch yet, and we don't want to block the API call // to query remoteLogMetadata, assume it will fetch the max bytes size of data to avoid to exceed the "fetch.max.bytes" setting. val estimatedRecordBatchSize = if (recordBatchSize == 0 && readResult.info.delayedRemoteStorageFetch.isPresent) readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else recordBatchSize + // Once we read from a non-empty partition, we stop ignoring request and partition level size limits + if (estimatedRecordBatchSize > 0) + minOneMessage = false limitBytes = math.max(0, limitBytes - estimatedRecordBatchSize) result += (tp -> readResult) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a29c8468d8b51..577e4f6b1282b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3657,8 +3657,8 @@ class ReplicaManagerTest { val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls") - val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet - assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0) + val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition) + assertEquals(tp0, capturedTopicPartitions.head, "Should contain " + tp0 + " as first item") assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1) // Verify the fetch info details are correct for both partitions @@ -3671,6 +3671,7 @@ class ReplicaManagerTest { assertTrue(fetchInfo.minOneMessage()) } else { assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset) + assertFalse(fetchInfo.minOneMessage()) } } @@ -3764,8 +3765,8 @@ class ReplicaManagerTest { val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls") - val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet - assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0) + val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition) + assertEquals(tp0, capturedTopicPartitions.head, "Should contain " + tp0 + " as first item") assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1) // Verify the fetch info details are correct for both partitions @@ -3778,6 +3779,7 @@ class ReplicaManagerTest { assertTrue(fetchInfo.minOneMessage()) } else { assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset) + assertFalse(fetchInfo.minOneMessage()) } }