Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator;
import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -392,7 +392,7 @@ public KafkaShareConsumer(Map<String, Object> configs,
final Time time,
final KafkaClient client,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata) {
final ShareConsumerMetadata metadata) {
delegate = CREATOR.create(
logContext, clientId, groupId, config, keyDeserializer, valueDeserializer,
time, client, subscriptions, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -108,7 +109,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
/**
* Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}.
*/
private final ConsumerMetadata metadata;
private final Metadata metadata;

/**
* Logger.
Expand Down Expand Up @@ -204,7 +205,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl

AbstractMembershipManager(String groupId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Metadata metadata,
Logger log,
Time time,
RebalanceMetricsManager metricsManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.TopicPartition;

/**
Expand Down Expand Up @@ -44,7 +45,7 @@ public class FetchUtils {
* @param subscriptions {@link SubscriptionState} to clear any internal read replica node
* @param topicPartition {@link TopicPartition} for which this state change is related
*/
static void requestMetadataUpdate(final ConsumerMetadata metadata,
static void requestMetadataUpdate(final Metadata metadata,
final SubscriptionState subscriptions,
final TopicPartition topicPartition) {
metadata.requestUpdate(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public CompletableFuture<ClientResponse> future() {
*/
public static Supplier<NetworkClientDelegate> supplier(final Time time,
final LogContext logContext,
final ConsumerMetadata metadata,
final Metadata metadata,
final ConsumerConfig config,
final ApiVersions apiVersions,
final Metrics metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ protected RequestManagers create() {
public static Supplier<RequestManagers> supplier(final Time time,
final LogContext logContext,
final BackgroundEventHandler backgroundEventHandler,
final ConsumerMetadata metadata,
final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final ShareFetchBuffer fetchBuffer,
final ConsumerConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final Logger log;
private final LogContext logContext;
private final String groupId;
private final ConsumerMetadata metadata;
private final ShareConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final FetchConfig fetchConfig;
protected final ShareFetchBuffer shareFetchBuffer;
Expand All @@ -103,7 +103,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
ShareConsumeRequestManager(final Time time,
final LogContext logContext,
final String groupId,
final ConsumerMetadata metadata,
final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final ShareFetchBuffer shareFetchBuffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public <K, V> ShareConsumerDelegate<K, V> create(final LogContext logContext,
final Time time,
final KafkaClient client,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata) {
final ShareConsumerMetadata metadata) {
try {
Logger log = logContext.logger(getClass());
log.warn("Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
private final ShareFetchCollector<K, V> fetchCollector;

private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
private final ShareConsumerMetadata metadata;
private final Metrics metrics;
private final int requestTimeoutMs;
private final int defaultApiTimeoutMs;
Expand Down Expand Up @@ -257,7 +257,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer()));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
this.metadata = new ShareConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);

Expand Down Expand Up @@ -350,7 +350,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
final Time time,
final KafkaClient client,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata) {
final ShareConsumerMetadata metadata) {
this.clientId = clientId;
this.groupId = groupId;
this.log = logContext.logger(getClass());
Expand Down Expand Up @@ -441,7 +441,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
final CompletableEventReaper backgroundEventReaper,
final Metrics metrics,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata,
final ShareConsumerMetadata metadata,
final int requestTimeoutMs,
final int defaultApiTimeoutMs,
final String groupId,
Expand Down Expand Up @@ -492,7 +492,7 @@ interface ShareFetchCollectorFactory<K, V> {

ShareFetchCollector<K, V> build(
final LogContext logContext,
final ConsumerMetadata metadata,
final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.utils.LogContext;

import java.util.ArrayList;
import java.util.List;

public class ShareConsumerMetadata extends Metadata {
private final boolean allowAutoTopicCreation;
private final SubscriptionState subscription;

public ShareConsumerMetadata(long refreshBackoffMs,
long refreshBackoffMaxMs,
long metadataExpireMs,
boolean allowAutoTopicCreation,
SubscriptionState subscription,
LogContext logContext,
ClusterResourceListeners clusterResourceListeners) {
super(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, logContext, clusterResourceListeners);
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.subscription = subscription;
}

public ShareConsumerMetadata(ConsumerConfig config,
SubscriptionState subscriptions,
LogContext logContext,
ClusterResourceListeners clusterResourceListeners) {
this(config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
subscriptions,
logContext,
clusterResourceListeners);
}

public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}

/**
* Constructs a metadata request builder for fetching cluster metadata for the topics the share consumer needs.
*/
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
List<String> topics = new ArrayList<>();
topics.addAll(subscription.metadataTopics());
return MetadataRequest.Builder.forTopicNames(topics, allowAutoTopicCreation);
}

/**
* Check if the metadata for the topic should be retained, based on the topic name.
*/
@Override
public synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
return subscription.needsMetadata(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@
public class ShareFetchCollector<K, V> {

private final Logger log;
private final ConsumerMetadata metadata;
private final ShareConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final FetchConfig fetchConfig;
private final Deserializers<K, V> deserializers;

public ShareFetchCollector(final LogContext logContext,
final ConsumerMetadata metadata,
final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -81,7 +82,7 @@ public ShareMembershipManager(LogContext logContext,
String groupId,
String rackId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Metadata metadata,
Time time,
Metrics metrics) {
this(logContext,
Expand All @@ -98,7 +99,7 @@ public ShareMembershipManager(LogContext logContext,
String groupId,
String rackId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Metadata metadata,
Time time,
ShareRebalanceMetricsManager metricsManager) {
super(groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
Expand Down Expand Up @@ -56,14 +56,14 @@
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {

private final Logger log;
private final ConsumerMetadata metadata;
private final Metadata metadata;
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
private int metadataVersionSnapshot;

public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
final ConsumerMetadata metadata,
final Metadata metadata,
final SubscriptionState subscriptions) {
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
Expand Down Expand Up @@ -740,7 +740,7 @@ private void process(final StreamsOnAllTasksLostCallbackCompletedEvent event) {
* {@link ConsumerNetworkThread}.
*/
public static Supplier<ApplicationEventProcessor> supplier(final LogContext logContext,
final ConsumerMetadata metadata,
final Metadata metadata,
final SubscriptionState subscriptions,
final Supplier<RequestManagers> requestManagersSupplier) {
return new CachedSupplier<>() {
Expand Down
Loading