From bf5d8ba1be299397847b19943a9b87b1a7b27789 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 14 Oct 2025 17:05:56 +0100 Subject: [PATCH 1/3] MINOR: Separate share consumer metadata class --- .../clients/consumer/KafkaShareConsumer.java | 4 +- .../internals/AbstractMembershipManager.java | 5 +- .../consumer/internals/FetchUtils.java | 3 +- .../internals/NetworkClientDelegate.java | 2 +- .../consumer/internals/RequestManagers.java | 3 +- .../internals/ShareConsumeRequestManager.java | 4 +- .../ShareConsumerDelegateCreator.java | 2 +- .../consumer/internals/ShareConsumerImpl.java | 10 +-- .../internals/ShareConsumerMetadata.java | 70 +++++++++++++++++++ .../internals/ShareFetchCollector.java | 4 +- .../internals/ShareMembershipManager.java | 5 +- .../events/ApplicationEventProcessor.java | 8 +-- .../KafkaShareConsumerMetricsTest.java | 28 ++++---- .../consumer/KafkaShareConsumerTest.java | 10 +-- .../ShareConsumeRequestManagerTest.java | 6 +- .../internals/ShareConsumerImplTest.java | 2 +- .../internals/ShareFetchCollectorTest.java | 5 +- 17 files changed, 122 insertions(+), 49 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index 7f3bad2e318f2..b18ec93ec7b49 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -17,9 +17,9 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.KafkaClient; -import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate; import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator; +import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics; import org.apache.kafka.common.KafkaException; @@ -392,7 +392,7 @@ public KafkaShareConsumer(Map configs, final Time time, final KafkaClient client, final SubscriptionState subscriptions, - final ConsumerMetadata metadata) { + final ShareConsumerMetadata metadata) { delegate = CREATOR.create( logContext, clientId, groupId, config, keyDeserializer, valueDeserializer, time, client, subscriptions, metadata); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index ffe01c089e7bf..c93a844ff9be5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.CloseOptions; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -108,7 +109,7 @@ public abstract class AbstractMembershipManager impl /** * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. */ - private final ConsumerMetadata metadata; + private final Metadata metadata; /** * Logger. @@ -204,7 +205,7 @@ public abstract class AbstractMembershipManager impl AbstractMembershipManager(String groupId, SubscriptionState subscriptions, - ConsumerMetadata metadata, + Metadata metadata, Logger log, Time time, RebalanceMetricsManager metricsManager, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java index 0b2faa58e7dbe..55083da20b4e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.TopicPartition; /** @@ -44,7 +45,7 @@ public class FetchUtils { * @param subscriptions {@link SubscriptionState} to clear any internal read replica node * @param topicPartition {@link TopicPartition} for which this state change is related */ - static void requestMetadataUpdate(final ConsumerMetadata metadata, + static void requestMetadataUpdate(final Metadata metadata, final SubscriptionState subscriptions, final TopicPartition topicPartition) { metadata.requestUpdate(false); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 3c280e39d0279..e85f244c80472 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -445,7 +445,7 @@ public CompletableFuture future() { */ public static Supplier supplier(final Time time, final LogContext logContext, - final ConsumerMetadata metadata, + final Metadata metadata, final ConsumerConfig config, final ApiVersions apiVersions, final Metrics metrics, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index ae39753f3d8e8..f0b06d2416b15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.common.internals.IdempotentCloser; @@ -318,7 +319,7 @@ protected RequestManagers create() { public static Supplier supplier(final Time time, final LogContext logContext, final BackgroundEventHandler backgroundEventHandler, - final ConsumerMetadata metadata, + final Metadata metadata, final SubscriptionState subscriptions, final ShareFetchBuffer fetchBuffer, final ConsumerConfig config, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 51e3fb39dfb0e..08eed2ad12cf5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -78,7 +78,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private final Logger log; private final LogContext logContext; private final String groupId; - private final ConsumerMetadata metadata; + private final Metadata metadata; private final SubscriptionState subscriptions; private final FetchConfig fetchConfig; protected final ShareFetchBuffer shareFetchBuffer; @@ -103,7 +103,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi ShareConsumeRequestManager(final Time time, final LogContext logContext, final String groupId, - final ConsumerMetadata metadata, + final Metadata metadata, final SubscriptionState subscriptions, final FetchConfig fetchConfig, final ShareFetchBuffer shareFetchBuffer, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java index 9eb5fd13699b0..e840a7da3dc0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java @@ -63,7 +63,7 @@ public ShareConsumerDelegate create(final LogContext logContext, final Time time, final KafkaClient client, final SubscriptionState subscriptions, - final ConsumerMetadata metadata) { + final ShareConsumerMetadata metadata) { try { Logger log = logContext.logger(getClass()); log.warn("Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production."); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 12b01b5482e32..7969750325662 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -189,7 +189,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { private final ShareFetchCollector fetchCollector; private final SubscriptionState subscriptions; - private final ConsumerMetadata metadata; + private final ShareConsumerMetadata metadata; private final Metrics metrics; private final int requestTimeoutMs; private final int defaultApiTimeoutMs; @@ -257,7 +257,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners( metrics.reporters(), Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer())); - this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); + this.metadata = new ShareConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); @@ -350,7 +350,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { final Time time, final KafkaClient client, final SubscriptionState subscriptions, - final ConsumerMetadata metadata) { + final ShareConsumerMetadata metadata) { this.clientId = clientId; this.groupId = groupId; this.log = logContext.logger(getClass()); @@ -441,7 +441,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { final CompletableEventReaper backgroundEventReaper, final Metrics metrics, final SubscriptionState subscriptions, - final ConsumerMetadata metadata, + final ShareConsumerMetadata metadata, final int requestTimeoutMs, final int defaultApiTimeoutMs, final String groupId, @@ -492,7 +492,7 @@ interface ShareFetchCollectorFactory { ShareFetchCollector build( final LogContext logContext, - final ConsumerMetadata metadata, + final ShareConsumerMetadata metadata, final SubscriptionState subscriptions, final FetchConfig fetchConfig, final Deserializers deserializers diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java new file mode 100644 index 0000000000000..aea7f745e2eab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.utils.LogContext; + +import java.util.ArrayList; +import java.util.List; + +public class ShareConsumerMetadata extends Metadata { + private final boolean allowAutoTopicCreation; + private final SubscriptionState subscription; + + public ShareConsumerMetadata(long refreshBackoffMs, + long refreshBackoffMaxMs, + long metadataExpireMs, + boolean allowAutoTopicCreation, + SubscriptionState subscription, + LogContext logContext, + ClusterResourceListeners clusterResourceListeners) { + super(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, logContext, clusterResourceListeners); + this.allowAutoTopicCreation = allowAutoTopicCreation; + this.subscription = subscription; + } + + public ShareConsumerMetadata(ConsumerConfig config, + SubscriptionState subscriptions, + LogContext logContext, + ClusterResourceListeners clusterResourceListeners) { + this(config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG), + config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), + config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), + config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + subscriptions, + logContext, + clusterResourceListeners); + } + + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + + /** + * Constructs a metadata request builder for fetching cluster metadata for the topics the share consumer needs. + */ + @Override + public synchronized MetadataRequest.Builder newMetadataRequestBuilder() { + List topics = new ArrayList<>(); + topics.addAll(subscription.metadataTopics()); + return MetadataRequest.Builder.forTopicNames(topics, allowAutoTopicCreation); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java index c2a17d051b17e..8861824600edd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java @@ -42,13 +42,13 @@ public class ShareFetchCollector { private final Logger log; - private final ConsumerMetadata metadata; + private final ShareConsumerMetadata metadata; private final SubscriptionState subscriptions; private final FetchConfig fetchConfig; private final Deserializers deserializers; public ShareFetchCollector(final LogContext logContext, - final ConsumerMetadata metadata, + final ShareConsumerMetadata metadata, final SubscriptionState subscriptions, final FetchConfig fetchConfig, final Deserializers deserializers) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java index 47ab87edb358d..130656f5b58d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager; import org.apache.kafka.common.Uuid; @@ -81,7 +82,7 @@ public ShareMembershipManager(LogContext logContext, String groupId, String rackId, SubscriptionState subscriptions, - ConsumerMetadata metadata, + Metadata metadata, Time time, Metrics metrics) { this(logContext, @@ -98,7 +99,7 @@ public ShareMembershipManager(LogContext logContext, String groupId, String rackId, SubscriptionState subscriptions, - ConsumerMetadata metadata, + Metadata metadata, Time time, ShareRebalanceMetricsManager metricsManager) { super(groupId, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 853c5484df5be..31eef662ce834 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.Acknowledgements; import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; -import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.RequestManagers; @@ -56,14 +56,14 @@ public class ApplicationEventProcessor implements EventProcessor { private final Logger log; - private final ConsumerMetadata metadata; + private final Metadata metadata; private final SubscriptionState subscriptions; private final RequestManagers requestManagers; private int metadataVersionSnapshot; public ApplicationEventProcessor(final LogContext logContext, final RequestManagers requestManagers, - final ConsumerMetadata metadata, + final Metadata metadata, final SubscriptionState subscriptions) { this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; @@ -740,7 +740,7 @@ private void process(final StreamsOnAllTasksLostCallbackCompletedEvent event) { * {@link ConsumerNetworkThread}. */ public static Supplier supplier(final LogContext logContext, - final ConsumerMetadata metadata, + final Metadata metadata, final SubscriptionState subscriptions, final Supplier requestManagersSupplier) { return new CachedSupplier<>() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java index a5417c3e00fd1..3dbe0a01cbbb3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java @@ -20,8 +20,8 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; -import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ShareConsumerImpl; +import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -80,7 +80,7 @@ public class KafkaShareConsumerMetricsTest { @Test public void testPollTimeMetrics() { - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -125,7 +125,7 @@ public void testPollTimeMetrics() { @Test public void testPollIdleRatio() { - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -172,7 +172,7 @@ private static boolean consumerMetricPresent(KafkaShareConsumer @Test public void testClosingConsumerUnregistersConsumerMetrics() { Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -190,7 +190,7 @@ public void testClosingConsumerUnregistersConsumerMetrics() { @Test public void testRegisteringCustomMetricsDoesntAffectConsumerMetrics() { Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -207,7 +207,7 @@ public void testRegisteringCustomMetricsWithSameNameDoesntAffectConsumerMetrics( try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { appender.setClassLogger(ShareConsumerImpl.class, Level.DEBUG); Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -224,7 +224,7 @@ public void testUnregisteringCustomMetricsWithSameNameDoesntAffectConsumerMetric try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { appender.setClassLogger(ShareConsumerImpl.class, Level.DEBUG); Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -244,7 +244,7 @@ public void testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingConsumer mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -265,7 +265,7 @@ public void testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetr mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -280,7 +280,7 @@ public void testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetr @Test public void testUnregisteringNonexistingMetricsDoesntCauseError() { Time time = new MockTime(1L); - ConsumerMetadata metadata = createMetadata(subscription); + ShareConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -291,15 +291,15 @@ public void testUnregisteringNonexistingMetricsDoesntCauseError() { customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> consumer.unregisterMetricFromSubscription(metric))); } - private ConsumerMetadata createMetadata(SubscriptionState subscription) { - return new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, + private ShareConsumerMetadata createMetadata(SubscriptionState subscription) { + return new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false, subscription, new LogContext(), new ClusterResourceListeners()); } private KafkaShareConsumer newShareConsumer(Time time, KafkaClient client, SubscriptionState subscription, - ConsumerMetadata metadata) { + ShareConsumerMetadata metadata) { return newShareConsumer( time, client, @@ -313,7 +313,7 @@ private KafkaShareConsumer newShareConsumer(Time time, private KafkaShareConsumer newShareConsumer(Time time, KafkaClient client, SubscriptionState subscriptions, - ConsumerMetadata metadata, + ShareConsumerMetadata metadata, String groupId, Optional> valueDeserializerOpt) { String clientId = "mock-consumer"; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java index 1a6d76dbabfc3..1346ea7fd09f1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; -import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicIdPartition; @@ -91,7 +91,7 @@ public class KafkaShareConsumerTest { @Test public void testVerifyHeartbeats() throws InterruptedException { - ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, + ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false, subscription, new LogContext(), new ClusterResourceListeners()); MockClient client = new MockClient(time, metadata); @@ -146,7 +146,7 @@ public void testVerifyHeartbeats() throws InterruptedException { // @Flaky("KAFKA-18488") @Test public void testVerifyFetchAndCommitSyncImplicit() { - ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, + ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false, subscription, new LogContext(), new ClusterResourceListeners()); MockClient client = new MockClient(time, metadata); @@ -223,7 +223,7 @@ public void testVerifyFetchAndCommitSyncImplicit() { //@Flaky("KAFKA-18794") @Test public void testVerifyFetchAndCloseImplicit() { - ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, + ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false, subscription, new LogContext(), new ClusterResourceListeners()); MockClient client = new MockClient(time, metadata); @@ -279,7 +279,7 @@ public void testVerifyFetchAndCloseImplicit() { } private KafkaShareConsumer newShareConsumer(String clientId, - ConsumerMetadata metadata, + ShareConsumerMetadata metadata, KafkaClient client) { LogContext logContext = new LogContext(); Deserializer keyDeserializer = new StringDeserializer(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index a4268b7eca0a7..6c47aea417184 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -161,7 +161,7 @@ public class ShareConsumeRequestManagerTest { private final long defaultApiTimeoutMs = 60000; private MockTime time = new MockTime(1); private SubscriptionState subscriptions; - private ConsumerMetadata metadata; + private ShareConsumerMetadata metadata; private ShareFetchMetricsManager metricsManager; private MockClient client; private Metrics metrics; @@ -2671,7 +2671,7 @@ private void buildDependencies(MetricConfig metricConfig, LogContext logContext) { time = new MockTime(1, 0, 0); subscriptions = subscriptionState; - metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, + metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); metrics = new Metrics(metricConfig, time); @@ -2696,7 +2696,7 @@ private class TestableShareConsumeRequestManager extends ShareConsumeReque public TestableShareConsumeRequestManager(LogContext logContext, String groupId, - ConsumerMetadata metadata, + ShareConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig fetchConfig, ShareFetchBuffer shareFetchBuffer, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 5dddd0772df2f..edb244b06aa26 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -102,7 +102,7 @@ public class ShareConsumerImplTest { private final Time time = new MockTime(1); private final ShareFetchCollector fetchCollector = mock(ShareFetchCollector.class); - private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); + private final ShareConsumerMetadata metadata = mock(ShareConsumerMetadata.class); private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); private final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); private final CompletableEventReaper backgroundEventReaper = mock(CompletableEventReaper.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java index 194d9b2a2c459..afac3a76d9538 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java @@ -76,7 +76,7 @@ public class ShareFetchCollectorTest { private SubscriptionState subscriptions; private FetchConfig fetchConfig; - private ConsumerMetadata metadata; + private ShareConsumerMetadata metadata; private ShareFetchBuffer fetchBuffer; private Deserializers deserializers; private ShareFetchCollector fetchCollector; @@ -246,12 +246,11 @@ private void buildDependencies() { subscriptions = createSubscriptionState(config, logContext); fetchConfig = new FetchConfig(config); - metadata = new ConsumerMetadata( + metadata = new ShareConsumerMetadata( 0, 1000, 10000, false, - false, subscriptions, logContext, new ClusterResourceListeners()); From ba1bf272afa029f001f782d23670b6ede39a17c9 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 15 Oct 2025 08:48:27 +0100 Subject: [PATCH 2/3] Add simplified topic retention back in --- .../clients/consumer/internals/ShareConsumerMetadata.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java index aea7f745e2eab..20b99540e1958 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java @@ -67,4 +67,12 @@ public synchronized MetadataRequest.Builder newMetadataRequestBuilder() { topics.addAll(subscription.metadataTopics()); return MetadataRequest.Builder.forTopicNames(topics, allowAutoTopicCreation); } + + /** + * Check if the metadata for the topic should be retained, based on the topic name. + */ + @Override + public synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) { + return subscription.needsMetadata(topic); + } } From 7f27e943c64eeef12b6de310de5999fed2fa5a03 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 15 Oct 2025 16:47:39 +0100 Subject: [PATCH 3/3] Review comments --- .../kafka/clients/consumer/internals/RequestManagers.java | 3 +-- .../consumer/internals/ShareConsumeRequestManager.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index f0b06d2416b15..4c6d8b17b5a45 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.GroupRebalanceConfig; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.common.internals.IdempotentCloser; @@ -319,7 +318,7 @@ protected RequestManagers create() { public static Supplier supplier(final Time time, final LogContext logContext, final BackgroundEventHandler backgroundEventHandler, - final Metadata metadata, + final ShareConsumerMetadata metadata, final SubscriptionState subscriptions, final ShareFetchBuffer fetchBuffer, final ConsumerConfig config, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 08eed2ad12cf5..647b5b7e4b41b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -78,7 +78,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private final Logger log; private final LogContext logContext; private final String groupId; - private final Metadata metadata; + private final ShareConsumerMetadata metadata; private final SubscriptionState subscriptions; private final FetchConfig fetchConfig; protected final ShareFetchBuffer shareFetchBuffer; @@ -103,7 +103,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi ShareConsumeRequestManager(final Time time, final LogContext logContext, final String groupId, - final Metadata metadata, + final ShareConsumerMetadata metadata, final SubscriptionState subscriptions, final FetchConfig fetchConfig, final ShareFetchBuffer shareFetchBuffer,