diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java index c00d1ddab90a0..1eee4ec6441ee 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -35,15 +37,19 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; @@ -77,6 +83,9 @@ public class PlaintextConsumerCommitTest { private final String topic = "topic"; private final TopicPartition tp = new TopicPartition(topic, 0); private final TopicPartition tp1 = new TopicPartition(topic, 1); + private final String cacheTopic = "cache-topic-test"; + private final String cacheGroup = "cache-group-test"; + private final TopicPartition cacheTopicPartition = new TopicPartition(cacheTopic, 0); public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) { this.cluster = clusterInstance; @@ -555,11 +564,15 @@ public void testCommitAsyncCompletedBeforeCommitSyncReturns() { } private Consumer createConsumer(GroupProtocol protocol, boolean enableAutoCommit) { - return cluster.consumer(Map.of( - GROUP_ID_CONFIG, "test-group", - GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT), - ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit - )); + return createConsumer(protocol, enableAutoCommit, Map.of(GROUP_ID_CONFIG, "test-group")); + } + + private Consumer createConsumer(GroupProtocol protocol, boolean enableAutoCommit, Map properties) { + Map consumerProperties = new HashMap<>(); + consumerProperties.put(GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT)); + consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + consumerProperties.putAll(properties); + return cluster.consumer(consumerProperties); } private void sendAndAwaitAsyncCommit( @@ -633,4 +646,276 @@ private void changeConsumerSubscriptionAndValidateAssignment( consumer.subscribe(topicsToSubscribe, rebalanceListener); awaitAssignment(consumer, expectedAssignment); } -} + + @ClusterTest + public void testAutoCommitHitOffsetCacheForClassic() throws InterruptedException { + testAutoCommitHitOffsetCache(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + public void testAutoCommitHitOffsetCacheForConsumer() throws InterruptedException { + testAutoCommitHitOffsetCache(GroupProtocol.CONSUMER.name); + } + + private void testAutoCommitHitOffsetCache(String groupProtocol) throws InterruptedException { + var consumerProperties = Map.of(GROUP_ID_CONFIG, cacheGroup, AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); + try (Producer producer = cluster.producer(); + var consumer = createConsumer(GroupProtocol.of(groupProtocol), true, consumerProperties); + var admin = cluster.admin()) { + + int sendRecordNum = 5; + cluster.createTopic(cacheTopic, 1, (short) BROKER_COUNT); + sendRecords(producer, cacheTopicPartition, sendRecordNum, System.currentTimeMillis()); + + consumer.subscribe(Collections.singleton(cacheTopic)); + consumeTargetRecordsAndCommitOffset(consumer, sendRecordNum); + + final long consumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + assertTrue(consumerOffsetLEO > 0L); + + // case 1: cache NOT hit. Because the records are continuously growing, so the cache will not be hit + for (int i = 0; i < 5; i++) { + sendRecords(producer, cacheTopicPartition, 1, System.currentTimeMillis()); + final long expectOffset = consumerOffsetLEO + i + 1; + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(500)); + long consumerOffsetTmpLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + return consumerOffsetTmpLEO == expectOffset; + }, "The offset in `__consumer_offsets` does not match"); + } + + // case 2: cache hit. The records no longer grow, so all of them hit the cache, and the LEO of __consumer_offsets did not increase + consumer.commitSync(); + long consumerOffsetLEO2 = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + for (int i = 0; i < 5; i++) { + consumer.poll(Duration.ofMillis(500)); + } + long consumerOffsetLatestLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + assertEquals(consumerOffsetLEO2, consumerOffsetLatestLEO); + } + } + + private void consumeTargetRecordsAndCommitOffset(Consumer consumer, int recordNum) { + // consume to the target record number + int consumedRecords = 0; + while (consumedRecords < recordNum) { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); + consumedRecords += consumerRecords.count(); + } + + // ensure all the partition offsets have committed to broker + consumer.poll(Duration.ofMillis(100)); + consumer.commitSync(); + } + + private long queryLatestOffsetOfGroupInConsumerOffsetsPartition(Admin admin) { + try { + TopicPartition topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); + return admin.listOffsets(Map.of(topicPartition, OffsetSpec.latest())).all().get().get(topicPartition).offset(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @ClusterTest + public void testManualCommitAndCacheDoesNotTakeEffectForClassic() throws InterruptedException { + testManualCommitAndCacheDoesNotTakeEffect(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testManualCommitAndCacheDoesNotTakeEffectForConsumer() throws InterruptedException { + testManualCommitAndCacheDoesNotTakeEffect(GroupProtocol.CONSUMER); + } + + private void testManualCommitAndCacheDoesNotTakeEffect(GroupProtocol groupProtocol) throws InterruptedException { + var consumerProperties = Map.of( + GROUP_ID_CONFIG, cacheGroup, + AUTO_COMMIT_INTERVAL_MS_CONFIG, "100", + INTERCEPTOR_CLASSES_CONFIG, OffsetCommitConsumerInterceptor.class.getName()); + OffsetCommitConsumerInterceptor.clear(); + + try (Producer producer = cluster.producer(); + var consumer = createConsumer(groupProtocol, false, consumerProperties); + var admin = cluster.admin()) { + + int sendRecordNum = 20; + cluster.createTopic(cacheTopic, 1, (short) BROKER_COUNT); + sendRecords(producer, cacheTopicPartition, sendRecordNum, System.currentTimeMillis()); + + consumer.subscribe(Collections.singleton(cacheTopic)); + consumeTargetRecordsAndCommitOffset(consumer, sendRecordNum); + + // test the sync and async mode + testCacheWillNotTakeEffectWhenManualCommit(consumer, admin); + } + } + + public static class OffsetCommitConsumerInterceptor implements ConsumerInterceptor { + private static final AtomicInteger COMMIT_COUNTER = new AtomicInteger(0); + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + return records; + } + + @Override + public void onCommit(Map offsets) { + COMMIT_COUNTER.incrementAndGet(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + } + + public static void clear() { + COMMIT_COUNTER.set(0); + } + + public static int commitCount() { + return COMMIT_COUNTER.get(); + } + } + + @ClusterTest + public void testAssignModeAndCacheDoesNotTakeEffectForClassic() throws InterruptedException { + testAssignModeAndCacheDoesNotTakeEffect(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAssignModeAndCacheDoesNotTakeEffectForConsumer() throws InterruptedException { + testAssignModeAndCacheDoesNotTakeEffect(GroupProtocol.CONSUMER); + } + + private void testAssignModeAndCacheDoesNotTakeEffect(GroupProtocol protocol) throws InterruptedException { + var consumerProperties = Map.of( + GROUP_ID_CONFIG, cacheGroup, + AUTO_COMMIT_INTERVAL_MS_CONFIG, "100", + INTERCEPTOR_CLASSES_CONFIG, OffsetCommitConsumerInterceptor.class.getName()); + OffsetCommitConsumerInterceptor.clear(); + + try (Producer producer = cluster.producer(); + var consumer = createConsumer(protocol, false, consumerProperties); + var autoCommitConsumer = createConsumer(protocol, true, consumerProperties); + var admin = cluster.admin()) { + + // send some records + int sendRecordNum = 20; + cluster.createTopic(cacheTopic, 1, (short) BROKER_COUNT); + sendRecords(producer, cacheTopicPartition, sendRecordNum, System.currentTimeMillis()); + + // take assign mode + consumer.assign(Collections.singleton(cacheTopicPartition)); + + // test the sync and async mode + testCacheWillNotTakeEffectWhenManualCommit(consumer, admin); + + // test the AUTO commit mode, the cache will not take effect + autoCommitConsumer.assign(Collections.singleton(cacheTopicPartition)); + for (int i = 0; i < 10; i++) { + long consumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + int commitCount = OffsetCommitConsumerInterceptor.commitCount(); + while (OffsetCommitConsumerInterceptor.commitCount() != commitCount + 1) { + autoCommitConsumer.poll(Duration.ofMillis(10)); + } + long newConsumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + assertEquals(consumerOffsetLEO + 1, newConsumerOffsetLEO); + } + } + } + + private void testCacheWillNotTakeEffectWhenManualCommit(Consumer consumer, Admin admin) { + // first commit the offset + consumer.commitSync(Map.of(cacheTopicPartition, new OffsetAndMetadata(1L))); + assertTrue(queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin) > 0L); + + // test the SYNC commit mode, the cache will not take effect + for (int i = 0; i < 10; i++) { + long consumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + consumer.commitSync(Map.of(cacheTopicPartition, new OffsetAndMetadata(1L))); + long newConsumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + assertEquals(consumerOffsetLEO + 1, newConsumerOffsetLEO); + } + + // test the ASYNC commit mode, the cache will not take effect + for (int i = 0; i < 10; i++) { + long consumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + int commitCount = OffsetCommitConsumerInterceptor.commitCount(); + consumer.commitAsync(Map.of(cacheTopicPartition, new OffsetAndMetadata(1L)), null); + while (OffsetCommitConsumerInterceptor.commitCount() != commitCount + 1) { + consumer.poll(Duration.ofMillis(100)); + } + long newConsumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + assertEquals(consumerOffsetLEO + 1, newConsumerOffsetLEO); + } + } + + @ClusterTest + public void testCacheWillClearWhenRebalanceForClassic() throws InterruptedException { + testCacheWillClearWhenRebalance(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testCacheWillClearWhenRebalanceForConsumer() throws InterruptedException { + testCacheWillClearWhenRebalance(GroupProtocol.CONSUMER); + } + + private void testCacheWillClearWhenRebalance(GroupProtocol groupProtocol) throws InterruptedException { + AtomicInteger rebalanceTimes = new AtomicInteger(); + Semaphore revokedSemaphore = new Semaphore(0); + var rebalanceListener = new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + rebalanceTimes.incrementAndGet(); + } + @Override + public void onPartitionsRevoked(Collection partitions) { + revokedSemaphore.release(); + } + }; + + var consumerProperties = Map.of(GROUP_ID_CONFIG, cacheGroup, AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); + try (Producer producer = cluster.producer(); + var consumer = createConsumer(GroupProtocol.of(groupProtocol.name), true, consumerProperties); + var admin = cluster.admin()) { + + // Send some records, then start a consumer, followed by consuming all the records and committing the offsets + int sendRecordNum = 5; + sendRecords(producer, tp, sendRecordNum, System.currentTimeMillis()); + sendRecords(producer, tp1, sendRecordNum, System.currentTimeMillis()); + consumer.subscribe(Collections.singleton(topic), rebalanceListener); + consumeTargetRecordsAndCommitOffset(consumer, sendRecordNum * 2); + + // Start another consumer, then ensure that after a rebalance occurs, shut down this consumer + var otherConsumer = createConsumer(GroupProtocol.of(groupProtocol.name), false, consumerProperties); + otherConsumer.subscribe(Collections.singleton(topic)); + while (!revokedSemaphore.tryAcquire()) { + consumer.poll(Duration.ofMillis(100)); + otherConsumer.poll(Duration.ofMillis(100)); + } + otherConsumer.unsubscribe(); + otherConsumer.close(); + + // Ensure that a total of 3 rebalances occur: + // 1. The first consumer starts + // 2. The second consumer starts + // 3. The second consumer shuts down + while (rebalanceTimes.get() != 3) { + consumer.poll(Duration.ofMillis(100)); + } + + // The cache has already become invalid; therefore, the subsequent consumption logic will cause the + // offset in `__consumer_offsets` to increase by 2 (for 2 partitions). + long consumerOffsetLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(500)); + long consumerOffsetTmpLEO = queryLatestOffsetOfGroupInConsumerOffsetsPartition(admin); + return consumerOffsetTmpLEO == consumerOffsetLEO + 2; + }, "The offset in `__consumer_offsets` does not match"); + } + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 938ae909027d0..2e2da8c76d16c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -351,6 +351,7 @@ public void onGroupAssignmentUpdated(Set partitions) { setGroupAssignmentSnapshot(partitions); } }; + private final CommittedOffsetCache committedOffsetCache; public AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, @@ -444,6 +445,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig)); + this.committedOffsetCache = new CommittedOffsetCache(subscriptions); final Supplier requestManagersSupplier = RequestManagers.supplier(time, logContext, backgroundEventHandler, @@ -459,7 +461,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, metrics, offsetCommitCallbackInvoker, memberStateListener, - streamsRebalanceData + streamsRebalanceData, + committedOffsetCache ); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, @@ -479,7 +482,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics) + new RebalanceCallbackMetricsManager(metrics), + committedOffsetCache ); this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s -> new StreamsRebalanceListenerInvoker(logContext, s)); @@ -564,6 +568,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, time, asyncConsumerMetrics ); + this.committedOffsetCache = new CommittedOffsetCache(subscriptions); } AsyncKafkaConsumer(LogContext logContext, @@ -616,11 +621,13 @@ public AsyncKafkaConsumer(final ConsumerConfig config, time, asyncConsumerMetrics ); + this.committedOffsetCache = new CommittedOffsetCache(subscriptions); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics) + new RebalanceCallbackMetricsManager(metrics), + committedOffsetCache ); ApiVersions apiVersions = new ApiVersions(); Supplier networkClientDelegateSupplier = () -> new NetworkClientDelegate( @@ -650,7 +657,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, metrics, offsetCommitCallbackInvoker, memberStateListener, - Optional.empty() + Optional.empty(), + committedOffsetCache ); Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index c227a9511b7b6..d43b5d4a7d1c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -152,6 +152,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; + // cache of committed offset + private final CommittedOffsetCache committedOffsetCache; ClassicKafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) { try { @@ -183,6 +185,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics); this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics); this.subscriptions = createSubscriptionState(config, logContext); + this.committedOffsetCache = new CommittedOffsetCache(subscriptions); ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners( metrics.reporters(), interceptorList, @@ -230,7 +233,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), - clientTelemetryReporter); + clientTelemetryReporter, + committedOffsetCache); } this.fetcher = new Fetcher<>( logContext, @@ -302,6 +306,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.clientTelemetryReporter = Optional.empty(); + this.committedOffsetCache = new CommittedOffsetCache(subscriptions); int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); @@ -347,7 +352,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported, - clientTelemetryReporter + clientTelemetryReporter, + committedOffsetCache ); } else { this.coordinator = null; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 6aae084fd47e2..baf35ff7041ff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -104,6 +104,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * the latest memberEpoch received from the broker. */ private final MemberInfo memberInfo; + private final CommittedOffsetCache committedOffsetCache; public CommitRequestManager( final Time time, @@ -115,7 +116,8 @@ public CommitRequestManager( final String groupId, final Optional groupInstanceId, final Metrics metrics, - final ConsumerMetadata metadata) { + final ConsumerMetadata metadata, + final CommittedOffsetCache committedOffsetCache) { this(time, logContext, subscriptions, @@ -128,10 +130,12 @@ public CommitRequestManager( config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), OptionalDouble.empty(), metrics, - metadata); + metadata, + committedOffsetCache); } // Visible for testing + @SuppressWarnings({"checkstyle:ParameterNumber"}) CommitRequestManager( final Time time, final LogContext logContext, @@ -145,7 +149,9 @@ public CommitRequestManager( final long retryBackoffMaxMs, final OptionalDouble jitter, final Metrics metrics, - final ConsumerMetadata metadata) { + final ConsumerMetadata metadata, + final CommittedOffsetCache committedOffsetCache) { + Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets"); this.time = time; this.logContext = logContext; @@ -171,6 +177,7 @@ public CommitRequestManager( this.metricsManager = new OffsetCommitMetricsManager(metrics); this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker; this.lastEpochSentOnCommit = Optional.empty(); + this.committedOffsetCache = committedOffsetCache; } /** @@ -251,6 +258,8 @@ private CompletableFuture> requestAutoCom CompletableFuture> result; if (requestState.offsets.isEmpty()) { result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else if (committedOffsetCache.isHitCache(requestState.offsets)) { + result = CompletableFuture.completedFuture(requestState.offsets); } else { autocommit.setInflightCommitStatus(true); OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); @@ -377,6 +386,7 @@ private void autoCommitSyncBeforeRebalanceWithRetries(OffsetCommitRequestState r autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); + committedOffsetCache.tryAddToCache(allConsumedOffsets); log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { log.debug("Auto-commit of offsets {} failed due to retriable error: {}", @@ -411,6 +421,7 @@ public CompletableFuture> commitAsync(fin if (error != null) { asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error)); } else { + committedOffsetCache.tryAddToCache(committedOffsets); asyncCommitResult.complete(offsets); } }); @@ -467,6 +478,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, // timeout hasn't expired. requestAttempt.future.whenComplete((res, error) -> { if (error == null) { + committedOffsetCache.tryAddToCache(res); result.complete(requestAttempt.offsets); } else { if (error instanceof RetriableException) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommittedOffsetCache.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommittedOffsetCache.java new file mode 100644 index 0000000000000..e597d29276950 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommittedOffsetCache.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class CommittedOffsetCache { + private final SubscriptionState subscriptions; + private final Map latestCommittedOffsets; + + CommittedOffsetCache(SubscriptionState subscriptions) { + this.subscriptions = Objects.requireNonNull(subscriptions); + this.latestCommittedOffsets = new ConcurrentHashMap<>(); + } + + public void tryAddToCache(Map offsets) { + if (subscriptions.hasAutoAssignedPartitions() && offsets != null && !offsets.isEmpty()) { + latestCommittedOffsets.putAll(offsets); + } + } + + public void tryAddToCache(TopicPartition tp, OffsetAndMetadata metadata) { + if (subscriptions.hasAutoAssignedPartitions() && tp != null && metadata != null) { + latestCommittedOffsets.put(tp, metadata); + } + } + + public boolean isHitCache(Map offsets) { + // If the current consumer mode is not subscribe, or there are in-flight async commits, simply return false + if (!subscriptions.hasAutoAssignedPartitions() || offsets == null || offsets.isEmpty()) { + return false; + } + + for (Map.Entry entry : offsets.entrySet()) { + OffsetAndMetadata cachedOffset = latestCommittedOffsets.get(entry.getKey()); + if (cachedOffset == null || !cachedOffset.equals(entry.getValue())) { + return false; + } + } + return true; + } + + public void clear(Set topicPartitions) { + if (topicPartitions != null && !topicPartitions.isEmpty()) { + for (TopicPartition topicPartition : topicPartitions) { + latestCommittedOffsets.remove(topicPartition); + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4956d64228dbb..01842dc5920d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -161,6 +161,7 @@ private boolean sameRequest(final Set currentRequest, final Gene // it'll set to rebalance timeout so that the member can join the group successfully // even though offset commit failed. private Timer joinPrepareTimer = null; + private final CommittedOffsetCache committedOffsetCache; /** * Initialize the coordination manager. @@ -178,7 +179,8 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, int autoCommitIntervalMs, ConsumerInterceptors interceptors, boolean throwOnFetchStableOffsetsUnsupported, - Optional clientTelemetryReporter) { + Optional clientTelemetryReporter, + CommittedOffsetCache committedOffsetCache) { this(rebalanceConfig, logContext, client, @@ -193,7 +195,8 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, interceptors, throwOnFetchStableOffsetsUnsupported, clientTelemetryReporter, - Optional.empty()); + Optional.empty(), + committedOffsetCache); } /** @@ -213,7 +216,8 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, ConsumerInterceptors interceptors, boolean throwOnFetchStableOffsetsUnsupported, Optional clientTelemetryReporter, - Optional> heartbeatThreadSupplier) { + Optional> heartbeatThreadSupplier, + CommittedOffsetCache committedOffsetCache) { super(rebalanceConfig, logContext, client, @@ -275,9 +279,11 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix) + new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix), + committedOffsetCache ); this.metadata.requestUpdate(true); + this.committedOffsetCache = committedOffsetCache; } // package private for testing @@ -1044,12 +1050,16 @@ void invokeCompletedOffsetCommitCallbacks() { } public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { + return commitOffsetsAsync(offsets, callback, false); + } + + private RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback, boolean autoCommit) { invokeCompletedOffsetCommitCallbacks(); RequestFuture future = null; if (offsets.isEmpty()) { // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. - future = doCommitOffsetsAsync(offsets, callback); + future = doCommitOffsetsAsync(offsets, callback, autoCommit); } else if (!coordinatorUnknownAndUnreadyAsync()) { // we need to make sure coordinator is ready before committing, since // this is for async committing we do not try to block, but just try once to @@ -1061,7 +1071,7 @@ public RequestFuture commitOffsetsAsync(final Map commitOffsetsAsync(final Map doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { - RequestFuture future = sendOffsetCommitRequest(offsets); + private RequestFuture doCommitOffsetsAsync(final Map offsets, + final OffsetCommitCallback callback, final boolean autoCommit) { + RequestFuture future; + // the async commit of classic consumer maybe parallel sending, so we should ensure that + // all the commits have finished, otherwise, the cache becomes invalid + if (autoCommit && inFlightAsyncCommits.get() == 0 && committedOffsetCache.isHitCache(offsets)) { + future = RequestFuture.voidSuccess(); + } else { + future = sendOffsetCommitRequest(offsets); + } inFlightAsyncCommits.incrementAndGet(); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<>() { @@ -1240,7 +1258,7 @@ private RequestFuture autoCommitOffsetsAsync() { } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } - }); + }, true); } private RequestFuture maybeAutoCommitOffsetsAsync() { @@ -1365,6 +1383,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu Errors error = Errors.forCode(partition.errorCode()); if (error == Errors.NONE) { + committedOffsetCache.tryAddToCache(tp, offsetAndMetadata); log.debug("Committed offset {} for partition {}", offset, tp); } else { if (error.exception() instanceof RetriableException) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java index 3f66b6ce3c383..554c4de6f11f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java @@ -42,15 +42,18 @@ public class ConsumerRebalanceListenerInvoker { private final SubscriptionState subscriptions; private final Time time; private final RebalanceCallbackMetricsManager metricsManager; + private final CommittedOffsetCache committedOffsetCache; ConsumerRebalanceListenerInvoker(LogContext logContext, SubscriptionState subscriptions, Time time, - RebalanceCallbackMetricsManager metricsManager) { + RebalanceCallbackMetricsManager metricsManager, + CommittedOffsetCache committedOffsetCache) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.time = time; this.metricsManager = metricsManager; + this.committedOffsetCache = committedOffsetCache; } public Exception invokePartitionsAssigned(final SortedSet assignedPartitions) { @@ -86,6 +89,9 @@ public Exception invokePartitionsRevoked(final SortedSet revoked if (!revokePausedPartitions.isEmpty()) log.info("The pause flag in partitions {} will be removed due to revocation.", revokePausedPartitions); + // clear the offset cache + committedOffsetCache.clear(revokedPartitions); + Optional listener = subscriptions.rebalanceListener(); if (listener.isPresent()) { @@ -116,6 +122,9 @@ public Exception invokePartitionsLost(final SortedSet lostPartit if (!lostPausedPartitions.isEmpty()) log.info("The pause flag in partitions {} will be removed due to partition lost.", lostPartitions); + // clear the offset cache + committedOffsetCache.clear(lostPartitions); + Optional listener = subscriptions.rebalanceListener(); if (listener.isPresent()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index ae39753f3d8e8..f2e1754650080 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -167,7 +167,8 @@ public static Supplier supplier(final Time time, final Metrics metrics, final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final MemberStateListener applicationThreadMemberStateListener, - final Optional streamsRebalanceData + final Optional streamsRebalanceData, + final CommittedOffsetCache committedOffsetCache ) { return new CachedSupplier<>() { @Override @@ -216,7 +217,8 @@ protected RequestManagers create() { groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId, metrics, - metadata); + metadata, + committedOffsetCache); if (streamsRebalanceData.isPresent()) { streamsMembershipManager = new StreamsMembershipManager( groupRebalanceConfig.groupId, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8e44b3fcc25d5..f8f7b38cb3f40 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1338,6 +1338,7 @@ private MemberStateListener captureGroupMetadataUpdateListener(final MockedStati any(), any(), applicationThreadMemberStateListener.capture(), + any(), any() )); return applicationThreadMemberStateListener.getValue(); @@ -1409,7 +1410,8 @@ private Optional captureStreamRebalanceData(final MockedSt any(), any(), any(), - streamRebalanceData.capture() + streamRebalanceData.capture(), + any() )); return streamRebalanceData.getValue(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index afbb81eb53fce..df7b54998c0db 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -152,7 +152,8 @@ public void testOffsetFetchRequestStateToStringBase() { retryBackoffMaxMs, OptionalDouble.of(0), metrics, - metadata); + metadata, + new CommittedOffsetCache(subscriptionState)); commitRequestManager.onMemberEpochUpdated(Optional.of(1), Uuid.randomUuid().toString()); Set requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1)); @@ -1623,7 +1624,8 @@ private CommitRequestManager create(final boolean autoCommitEnabled, final long retryBackoffMaxMs, OptionalDouble.of(0), metrics, - metadata)); + metadata, + new CommittedOffsetCache(subscriptionState))); } private ClientResponse buildOffsetFetchClientResponse( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommittedOffsetCacheTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommittedOffsetCacheTest.java new file mode 100644 index 0000000000000..9954a77fc2995 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommittedOffsetCacheTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CommittedOffsetCacheTest { + + @Test + public void testSubscriptionStateShouldNotBeNull() { + assertThrows(NullPointerException.class, () -> new CommittedOffsetCache(null)); + } + + @Test + public void testAddToCache() { + var committedOffsetCache = new CommittedOffsetCache(getMockSubscriptionState()); + TopicPartition topicPartition = new TopicPartition("topic", 0); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(12345, Optional.of(15), "abc"); + + committedOffsetCache.tryAddToCache(topicPartition, offsetAndMetadata); + assertTrue(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata))); + + committedOffsetCache.clear(Set.of(topicPartition)); + assertFalse(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata))); + + TopicPartition topicPartition2 = new TopicPartition("topic2", 0); + OffsetAndMetadata offsetAndMetadata2 = new OffsetAndMetadata(7890, Optional.of(11), "efg"); + committedOffsetCache.tryAddToCache(Map.of(topicPartition, offsetAndMetadata, topicPartition2, offsetAndMetadata2)); + assertTrue(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata))); + assertTrue(committedOffsetCache.isHitCache(Map.of(topicPartition2, offsetAndMetadata2))); + assertTrue(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata, topicPartition2, offsetAndMetadata2))); + + committedOffsetCache.clear(Set.of(topicPartition, topicPartition2)); + assertFalse(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata))); + assertFalse(committedOffsetCache.isHitCache(Map.of(topicPartition2, offsetAndMetadata2))); + assertFalse(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata, topicPartition2, offsetAndMetadata2))); + + committedOffsetCache.tryAddToCache(Map.of(topicPartition, offsetAndMetadata)); + assertFalse(committedOffsetCache.isHitCache(Map.of(topicPartition, offsetAndMetadata, topicPartition2, offsetAndMetadata2))); + } + + private SubscriptionState getMockSubscriptionState() { + var mockSubscriptionState = Mockito.mock(SubscriptionState.class); + Mockito.when(mockSubscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + return mockSubscriptionState; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a839618cf7c3e..1edca9e7a4efe 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -3696,7 +3696,8 @@ private void supportStableFlag(final short upperVersion, final boolean expectThr autoCommitIntervalMs, null, true, - Optional.empty()); + Optional.empty(), + new CommittedOffsetCache(subscriptions)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, upperVersion)); @@ -3864,7 +3865,8 @@ private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanc autoCommitIntervalMs, null, false, - Optional.empty()); + Optional.empty(), + new CommittedOffsetCache(subscriptions)); } private Collection getRevoked(final List owned, @@ -4110,7 +4112,8 @@ private void createRackAwareCoordinator(String rackId, MockPartitionAssignor ass rebalanceConfig = buildRebalanceConfig(rebalanceConfig.groupInstanceId, rackId); coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, Collections.singletonList(assignor), metadata, subscriptions, - metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, Optional.empty()); + metrics, consumerId + groupId, time, false, autoCommitIntervalMs, + null, false, Optional.empty(), new CommittedOffsetCache(subscriptions)); } private static MetadataResponse rackAwareMetadata(int numNodes, @@ -4190,6 +4193,7 @@ private void createMockHeartbeatThreadCoordinator() { null, false, Optional.empty(), - Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class))); + Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class)), + new CommittedOffsetCache(subscriptions)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java index 9edf178182831..5ae7cb2e1de89 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java @@ -2224,7 +2224,8 @@ private ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker() { new LogContext(), subscriptionState, time, - new RebalanceCallbackMetricsManager(new Metrics(time)) + new RebalanceCallbackMetricsManager(new Metrics(time)), + new CommittedOffsetCache(subscriptionState) ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java index 67628c513406a..8415a1c1cf32e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java @@ -66,7 +66,8 @@ public void testMemberStateListenerRegistered() { new Metrics(), mock(OffsetCommitCallbackInvoker.class), listener, - Optional.empty() + Optional.empty(), + mock(CommittedOffsetCache.class) ).get(); assertTrue(requestManagers.consumerMembershipManager.isPresent()); assertTrue(requestManagers.streamsMembershipManager.isEmpty()); @@ -106,7 +107,8 @@ public void testStreamMemberStateListenerRegistered() { new Metrics(), mock(OffsetCommitCallbackInvoker.class), listener, - Optional.of(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())) + Optional.of(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())), + mock(CommittedOffsetCache.class) ).get(); assertTrue(requestManagers.streamsMembershipManager.isPresent()); assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isPresent());