Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Process all remote fetches by creating async read tasks and handling them in DelayedRemoteFetch collectively.
*/
private def processRemoteFetches(remoteFetchInfos: util.HashMap[TopicIdPartition, RemoteStorageFetchInfo],
private def processRemoteFetches(remoteFetchInfos: util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo],
params: FetchParams,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
Expand Down Expand Up @@ -1675,7 +1675,7 @@ class ReplicaManager(val config: KafkaConfig,
var errorReadingData = false

// topic-partitions that have to be read from remote storage
val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()
val remoteFetchInfos = new util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to have this as LinkedHashMap so that we try processing the topic partitions in the order sent by the client.


var hasDivergingEpoch = false
var hasPreferredReadReplica = false
Expand Down Expand Up @@ -1879,13 +1879,13 @@ class ReplicaManager(val config: KafkaConfig,
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
val recordBatchSize = readResult.info.records.sizeInBytes
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (recordBatchSize > 0)
minOneMessage = false
// Because we don't know how much data will be retrieved in remote fetch yet, and we don't want to block the API call
// to query remoteLogMetadata, assume it will fetch the max bytes size of data to avoid to exceed the "fetch.max.bytes" setting.
val estimatedRecordBatchSize = if (recordBatchSize == 0 && readResult.info.delayedRemoteStorageFetch.isPresent)
readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else recordBatchSize
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (estimatedRecordBatchSize > 0)
minOneMessage = false
limitBytes = math.max(0, limitBytes - estimatedRecordBatchSize)
result += (tp -> readResult)
}
Expand Down
10 changes: 6 additions & 4 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3657,8 +3657,8 @@ class ReplicaManagerTest {
val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls")

val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0)
val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition)
assertEquals(tp0, capturedTopicPartitions.head, "Should contain " + tp0 + " as first item")
assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1)

// Verify the fetch info details are correct for both partitions
Expand All @@ -3671,6 +3671,7 @@ class ReplicaManagerTest {
assertTrue(fetchInfo.minOneMessage())
} else {
assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
assertFalse(fetchInfo.minOneMessage())
}
}

Expand Down Expand Up @@ -3764,8 +3765,8 @@ class ReplicaManagerTest {
val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls")

val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0)
val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition)
assertEquals(tp0, capturedTopicPartitions.head, "Should contain " + tp0 + " as first item")
assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1)

// Verify the fetch info details are correct for both partitions
Expand All @@ -3778,6 +3779,7 @@ class ReplicaManagerTest {
assertTrue(fetchInfo.minOneMessage())
} else {
assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
assertFalse(fetchInfo.minOneMessage())
}
}

Expand Down