Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
setGroupAssignmentSnapshot(partitions);
}
};
private final CommittedOffsetCache committedOffsetCache;

public AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
Expand Down Expand Up @@ -444,6 +445,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
this.committedOffsetCache = new CommittedOffsetCache(subscriptions);
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
logContext,
backgroundEventHandler,
Expand All @@ -459,7 +461,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
streamsRebalanceData
streamsRebalanceData,
committedOffsetCache
);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
Expand All @@ -479,7 +482,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
logContext,
subscriptions,
time,
new RebalanceCallbackMetricsManager(metrics)
new RebalanceCallbackMetricsManager(metrics),
committedOffsetCache
);
this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s ->
new StreamsRebalanceListenerInvoker(logContext, s));
Expand Down Expand Up @@ -564,6 +568,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
time,
asyncConsumerMetrics
);
this.committedOffsetCache = new CommittedOffsetCache(subscriptions);
}

AsyncKafkaConsumer(LogContext logContext,
Expand Down Expand Up @@ -616,11 +621,13 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
time,
asyncConsumerMetrics
);
this.committedOffsetCache = new CommittedOffsetCache(subscriptions);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
new RebalanceCallbackMetricsManager(metrics)
new RebalanceCallbackMetricsManager(metrics),
committedOffsetCache
);
ApiVersions apiVersions = new ApiVersions();
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
Expand Down Expand Up @@ -650,7 +657,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
Optional.empty()
Optional.empty(),
committedOffsetCache
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {

// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
// cache of committed offset
private final CommittedOffsetCache committedOffsetCache;

ClassicKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
Expand Down Expand Up @@ -183,6 +185,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
this.subscriptions = createSubscriptionState(config, logContext);
this.committedOffsetCache = new CommittedOffsetCache(subscriptions);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
interceptorList,
Expand Down Expand Up @@ -230,7 +233,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
clientTelemetryReporter);
clientTelemetryReporter,
committedOffsetCache);
}
this.fetcher = new Fetcher<>(
logContext,
Expand Down Expand Up @@ -302,6 +306,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.clientTelemetryReporter = Optional.empty();
this.committedOffsetCache = new CommittedOffsetCache(subscriptions);

int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -347,7 +352,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
autoCommitIntervalMs,
interceptors,
throwOnStableOffsetNotSupported,
clientTelemetryReporter
clientTelemetryReporter,
committedOffsetCache
);
} else {
this.coordinator = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* the latest memberEpoch received from the broker.
*/
private final MemberInfo memberInfo;
private final CommittedOffsetCache committedOffsetCache;

public CommitRequestManager(
final Time time,
Expand All @@ -115,7 +116,8 @@ public CommitRequestManager(
final String groupId,
final Optional<String> groupInstanceId,
final Metrics metrics,
final ConsumerMetadata metadata) {
final ConsumerMetadata metadata,
final CommittedOffsetCache committedOffsetCache) {
this(time,
logContext,
subscriptions,
Expand All @@ -128,10 +130,12 @@ public CommitRequestManager(
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
OptionalDouble.empty(),
metrics,
metadata);
metadata,
committedOffsetCache);
}

// Visible for testing
@SuppressWarnings({"checkstyle:ParameterNumber"})
CommitRequestManager(
final Time time,
final LogContext logContext,
Expand All @@ -145,7 +149,9 @@ public CommitRequestManager(
final long retryBackoffMaxMs,
final OptionalDouble jitter,
final Metrics metrics,
final ConsumerMetadata metadata) {
final ConsumerMetadata metadata,
final CommittedOffsetCache committedOffsetCache) {

Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
this.time = time;
this.logContext = logContext;
Expand All @@ -171,6 +177,7 @@ public CommitRequestManager(
this.metricsManager = new OffsetCommitMetricsManager(metrics);
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
this.lastEpochSentOnCommit = Optional.empty();
this.committedOffsetCache = committedOffsetCache;
}

/**
Expand Down Expand Up @@ -251,6 +258,8 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCom
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
if (requestState.offsets.isEmpty()) {
result = CompletableFuture.completedFuture(Collections.emptyMap());
} else if (committedOffsetCache.isHitCache(requestState.offsets)) {
result = CompletableFuture.completedFuture(requestState.offsets);
} else {
autocommit.setInflightCommitStatus(true);
OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState);
Expand Down Expand Up @@ -377,6 +386,7 @@ private void autoCommitSyncBeforeRebalanceWithRetries(OffsetCommitRequestState r
autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false));
if (throwable == null) {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
committedOffsetCache.tryAddToCache(allConsumedOffsets);
log.debug("Completed auto-commit of offsets {}", allConsumedOffsets);
} else if (throwable instanceof RetriableCommitFailedException) {
log.debug("Auto-commit of offsets {} failed due to retriable error: {}",
Expand Down Expand Up @@ -411,6 +421,7 @@ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(fin
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
committedOffsetCache.tryAddToCache(committedOffsets);
asyncCommitResult.complete(offsets);
}
});
Expand Down Expand Up @@ -467,6 +478,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
committedOffsetCache.tryAddToCache(res);
result.complete(requestAttempt.offsets);
} else {
if (error instanceof RetriableException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class CommittedOffsetCache {
private final SubscriptionState subscriptions;
private final Map<TopicPartition, OffsetAndMetadata> latestCommittedOffsets;

CommittedOffsetCache(SubscriptionState subscriptions) {
this.subscriptions = Objects.requireNonNull(subscriptions);
this.latestCommittedOffsets = new ConcurrentHashMap<>();
}

public void tryAddToCache(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (subscriptions.hasAutoAssignedPartitions() && offsets != null && !offsets.isEmpty()) {
latestCommittedOffsets.putAll(offsets);
}
}

public void tryAddToCache(TopicPartition tp, OffsetAndMetadata metadata) {
if (subscriptions.hasAutoAssignedPartitions() && tp != null && metadata != null) {
latestCommittedOffsets.put(tp, metadata);
}
}

public boolean isHitCache(Map<TopicPartition, OffsetAndMetadata> offsets) {
// If the current consumer mode is not subscribe, or there are in-flight async commits, simply return false
if (!subscriptions.hasAutoAssignedPartitions() || offsets == null || offsets.isEmpty()) {
return false;
}

for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata cachedOffset = latestCommittedOffsets.get(entry.getKey());
if (cachedOffset == null || !cachedOffset.equals(entry.getValue())) {
return false;
}
}
return true;
}

public void clear(Set<TopicPartition> topicPartitions) {
if (topicPartitions != null && !topicPartitions.isEmpty()) {
for (TopicPartition topicPartition : topicPartitions) {
latestCommittedOffsets.remove(topicPartition);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private boolean sameRequest(final Set<TopicPartition> currentRequest, final Gene
// it'll set to rebalance timeout so that the member can join the group successfully
// even though offset commit failed.
private Timer joinPrepareTimer = null;
private final CommittedOffsetCache committedOffsetCache;

/**
* Initialize the coordination manager.
Expand All @@ -178,7 +179,8 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
Optional<ClientTelemetryReporter> clientTelemetryReporter,
CommittedOffsetCache committedOffsetCache) {
this(rebalanceConfig,
logContext,
client,
Expand All @@ -193,7 +195,8 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
interceptors,
throwOnFetchStableOffsetsUnsupported,
clientTelemetryReporter,
Optional.empty());
Optional.empty(),
committedOffsetCache);
}

/**
Expand All @@ -213,7 +216,8 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier,
CommittedOffsetCache committedOffsetCache) {
super(rebalanceConfig,
logContext,
client,
Expand Down Expand Up @@ -275,9 +279,11 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
logContext,
subscriptions,
time,
new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix)
new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix),
committedOffsetCache
);
this.metadata.requestUpdate(true);
this.committedOffsetCache = committedOffsetCache;
}

// package private for testing
Expand Down Expand Up @@ -1044,12 +1050,16 @@ void invokeCompletedOffsetCommitCallbacks() {
}

public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
return commitOffsetsAsync(offsets, callback, false);
}

private RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback, boolean autoCommit) {
invokeCompletedOffsetCommitCallbacks();

RequestFuture<Void> future = null;
if (offsets.isEmpty()) {
// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
future = doCommitOffsetsAsync(offsets, callback);
future = doCommitOffsetsAsync(offsets, callback, autoCommit);
} else if (!coordinatorUnknownAndUnreadyAsync()) {
// we need to make sure coordinator is ready before committing, since
// this is for async committing we do not try to block, but just try once to
Expand All @@ -1061,7 +1071,7 @@ public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAn
// it's not known or ready, since this is the only place we can send such request
// under manual assignment (there we would not have heartbeat thread trying to auto-rediscover
// the coordinator).
future = doCommitOffsetsAsync(offsets, callback);
future = doCommitOffsetsAsync(offsets, callback, autoCommit);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
Expand All @@ -1074,7 +1084,7 @@ public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAn
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
doCommitOffsetsAsync(offsets, callback, autoCommit);
client.pollNoWakeup();
}

Expand All @@ -1094,8 +1104,16 @@ public void onFailure(RuntimeException e) {
return future;
}

private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets,
final OffsetCommitCallback callback, final boolean autoCommit) {
RequestFuture<Void> future;
// the async commit of classic consumer maybe parallel sending, so we should ensure that
// all the commits have finished, otherwise, the cache becomes invalid
if (autoCommit && inFlightAsyncCommits.get() == 0 && committedOffsetCache.isHitCache(offsets)) {
future = RequestFuture.voidSuccess();
} else {
future = sendOffsetCommitRequest(offsets);
}
inFlightAsyncCommits.incrementAndGet();
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<>() {
Expand Down Expand Up @@ -1240,7 +1258,7 @@ private RequestFuture<Void> autoCommitOffsetsAsync() {
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}, true);
}

private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
Expand Down Expand Up @@ -1365,6 +1383,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu

Errors error = Errors.forCode(partition.errorCode());
if (error == Errors.NONE) {
committedOffsetCache.tryAddToCache(tp, offsetAndMetadata);
log.debug("Committed offset {} for partition {}", offset, tp);
} else {
if (error.exception() instanceof RetriableException) {
Expand Down
Loading
Loading