From 951181802e65cda6e539f1a01f2ee0c033951780 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Thu, 24 Apr 2025 20:42:58 +0900 Subject: [PATCH 1/6] spring-projectsGH-3807: Necessity of KafkaHandler on single method class Signed-off-by: chickenchickenlove --- .../kafka/annotation/KafkaHandlerParser.java | 81 ++++++ ...kaListenerAnnotationBeanPostProcessor.java | 23 +- .../annotation/AliasPropertiesTests.java | 20 ++ .../kafka/annotation/KafkaListenerTests.java | 239 ++++++++++++++++++ 4 files changed, 358 insertions(+), 5 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java new file mode 100644 index 0000000000..81344271ef --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Optional; +import java.util.Set; + +import org.springframework.core.MethodIntrospector; +import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.util.ReflectionUtils; +import org.springframework.util.ReflectionUtils.MethodFilter; + +/** + * Find the appropriate handler method when the target class has a class-level {@link KafkaListener} + * annotation and contains a single public method without a {@link KafkaHandler} annotation. + * + * @author Sanghyeok An + * + * @since 4.0 + * + * @see KafkaListenerAnnotationBeanPostProcessor + */ +public class KafkaHandlerParser { + + /** + * Finds the appropriate handler method when the target class has a class-level {@link KafkaListener} + * annotation and contains a single public method without a {@link KafkaHandler} annotation. + * This method is used to determine which method should be invoked for Kafka message handling + * when no explicit {@link KafkaHandler} annotations are present but the class itself is annotated with {@link KafkaListener}. + * + * @param targetClass the class to inspect for handler methods + * @return the method to be used for kafka message handling, or {@code null} if no suitable method is found. + */ + public Optional parseSingleHandlerMethod(Class targetClass) { + + Set methodsWithAnnotations = MethodIntrospector.selectMethods( + targetClass, (MethodFilter) method -> + AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) != null || + AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) != null + ); + + if (!methodsWithAnnotations.isEmpty()) { + return Optional.empty(); + } + + Set publicMethodsWithoutHandlerAnnotation = MethodIntrospector.selectMethods( + targetClass, (ReflectionUtils.MethodFilter) method -> + Modifier.isPublic(method.getModifiers()) && + AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) == null && + AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) == null + ); + + if (!hasSinglePublicMethod(publicMethodsWithoutHandlerAnnotation)) { + return Optional.empty(); + } + + Method publicMethod = publicMethodsWithoutHandlerAnnotation.iterator().next(); + return Optional.of(publicMethod); + } + + private boolean hasSinglePublicMethod(Set methods) { + return methods.size() == 1; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 1edcc0104a..57b14c5a49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -33,6 +33,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -148,6 +149,7 @@ * @author Sanghyeok An * @author Soby Chacko * @author Omer Celik + * @author Sanghyeok An * * @see KafkaListener * @see KafkaListenerErrorHandler @@ -157,6 +159,7 @@ * @see KafkaListenerEndpointRegistry * @see org.springframework.kafka.config.KafkaListenerEndpoint * @see MethodKafkaListenerEndpoint + * @see KafkaHandlerParser */ public class KafkaListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware, SmartInitializingSingleton { @@ -213,6 +216,8 @@ public class KafkaListenerAnnotationBeanPostProcessor private final Lock globalLock = new ReentrantLock(); + private final KafkaHandlerParser kafkaHandlerParser = new KafkaHandlerParser(); + @Override public int getOrder() { return LOWEST_PRECEDENCE; @@ -408,11 +413,19 @@ public Object postProcessAfterInitialization(final Object bean, final String bea + beanName + "': " + annotatedMethods); } if (hasClassLevelListeners) { - Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, - (ReflectionUtils.MethodFilter) method -> - AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); - List multiMethods = new ArrayList<>(methodsWithHandler); - processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName); + Optional methodWithoutAnnotation = this.kafkaHandlerParser.parseSingleHandlerMethod(targetClass); + if (methodWithoutAnnotation.isPresent() && classLevelListeners.size() == 1) { + // Case when target class has class-level @KafkaListener annotation and + // has only single public method without @KafkaHandler. + for (KafkaListener kafkaListener : classLevelListeners) { + processKafkaListener(kafkaListener, methodWithoutAnnotation.get(), bean, beanName); + } + } + else { + Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); + List multiMethods = new ArrayList<>(methodsWithHandler); + processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName); + } } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java index dd043851cd..f24fd9821c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java @@ -36,6 +36,7 @@ import org.springframework.core.Ordered; import org.springframework.core.annotation.AliasFor; import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassAndMethodLevelListener; +import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; @@ -58,6 +59,7 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Sanghyeok An * * @since 2.2 * @@ -81,12 +83,16 @@ public class AliasPropertiesTests { @Autowired private ClassAndMethodLevelListener repeatable; + @Autowired + private ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation withPublicMethod; + @Test public void testAliasFor() throws Exception { this.template.send("alias.tests", "foo"); assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.classLevel.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.repeatable.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.withPublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.kafkaListenerEndpointRegistry()).isSameAs(this.kafkaListenerEndpointRegistry); assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onMethodInConfigClass").getGroupId()) .isEqualTo("onMethodInConfigClass.Config.listen1"); @@ -102,6 +108,8 @@ public void testAliasFor() throws Exception { .isEqualTo("onMethodRepeatable2.RepeatableClassAndMethodLevelListener.listen1"); assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onClassRepeatable2").getGroupId()) .isEqualTo("onClassRepeatable2.RepeatableClassAndMethodLevelListener"); + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onSinglePublicMethod").getGroupId()) + .isEqualTo("onSinglePublicMethod.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation"); assertThat(Config.orderedCalledFirst.get()).isTrue(); } @@ -220,6 +228,18 @@ public void listen1(String in) { } + @Component + @MyListener(id = "onSinglePublicMethod", value = "alias.tests") + public static class ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation { + + private final CountDownLatch latch = new CountDownLatch(1); + + public void listen(String in) { + this.latch.countDown(); + } + + } + public static class OrderedEnhancer implements AnnotationEnhancer, Ordered { private final AtomicBoolean orderedCalledFirst; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java new file mode 100644 index 0000000000..2aa5bdd3b7 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java @@ -0,0 +1,239 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassAndMethodLevelListener; +import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithDoublePublicMethod; +import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethod; +import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethodAndPrivateMethod; +import org.springframework.kafka.annotation.KafkaListenerTests.Config.OtherClassLevelListenerWithSinglePublicMethod; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerConfigUtils; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Sanghyeok An + * + * @since 4.0 + * + */ + +@SpringJUnitConfig +@DirtiesContext +public class KafkaListenerTests { + + private static final String TEST_TOPIC = "default-listener.tests"; + + @Autowired + private KafkaTemplate template; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + private ClassAndMethodLevelListener classLevel; + + @Autowired + private ClassLevelListenerWithSinglePublicMethod classLevelListenerWithSinglePublicMethod; + + @Autowired + private OtherClassLevelListenerWithSinglePublicMethod otherClassLevelListenerWithSinglePublicMethod; + + @Autowired + private ClassLevelListenerWithDoublePublicMethod classLevelListenerWithDoublePublicMethod; + + @Autowired + private ClassLevelListenerWithSinglePublicMethodAndPrivateMethod classLevelListenerWithSinglePublicMethodAndPrivateMethod; + + @Test + public void testImplicitKafkaHandlerAnnotation() throws Exception { + // GIVEN + // See, bean configuration. + + // WHEN + this.template.send(TEST_TOPIC, "foo"); + + // THEN + assertThat(this.classLevel.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.classLevelListenerWithSinglePublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.otherClassLevelListenerWithSinglePublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.classLevelListenerWithSinglePublicMethodAndPrivateMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(this.classLevelListenerWithDoublePublicMethod.latch.await(10, TimeUnit.SECONDS)).isFalse(); + assertThat(this.classLevelListenerWithDoublePublicMethod.latch.getCount()).isEqualTo(ClassLevelListenerWithDoublePublicMethod.INIT_LATCH_COUNT); + + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classAndMethodLevelListener").getGroupId()) + .isEqualTo("classAndMethodLevelListener"); + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithSinglePublicMethod").getGroupId()) + .isEqualTo("classLevelListenerWithSinglePublicMethod"); + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("otherClassLevelListenerWithSinglePublicMethod").getGroupId()) + .isEqualTo("otherClassLevelListenerWithSinglePublicMethod"); + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithDoublePublicMethod").getGroupId()) + .isEqualTo("classLevelListenerWithDoublePublicMethod"); + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithSinglePublicMethodAndPrivateMethod").getGroupId()) + .isEqualTo("classLevelListenerWithSinglePublicMethodAndPrivateMethod"); + + } + + @Configuration + @EnableKafka + public static class Config { + + @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) + public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { + return new KafkaListenerEndpointRegistry(); + } + + @Bean + public EmbeddedKafkaBroker embeddedKafka() { + return new EmbeddedKafkaKraftBroker(1, 1, TEST_TOPIC); + } + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map consumerProps = + KafkaTestUtils.consumerProps("myDefaultListenerGroup", "false", embeddedKafka()); + return consumerProps; + } + + @Bean + public KafkaTemplate template() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.producerProps(embeddedKafka()); + } + + @Component + @KafkaListener(id = "classAndMethodLevelListener", topics = "default-listener.tests") + public static class ClassAndMethodLevelListener { + + final CountDownLatch latch = new CountDownLatch(1); + + @KafkaHandler + public void listen(String in) { + this.latch.countDown(); + } + } + + @Component + @KafkaListener(id = "classLevelListenerWithSinglePublicMethod", topics = TEST_TOPIC) + public static class ClassLevelListenerWithSinglePublicMethod { + + final CountDownLatch latch = new CountDownLatch(1); + + public void listen(String in) { + this.latch.countDown(); + } + } + + @Component + @KafkaListener(id = "otherClassLevelListenerWithSinglePublicMethod", topics = TEST_TOPIC) + public static class OtherClassLevelListenerWithSinglePublicMethod { + + final CountDownLatch latch = new CountDownLatch(1); + + public void listen(String in) { + this.latch.countDown(); + } + } + + @Component + @KafkaListener(id = "classLevelListenerWithDoublePublicMethod", topics = TEST_TOPIC) + public static class ClassLevelListenerWithDoublePublicMethod { + + public final static int INIT_LATCH_COUNT = 2; + + final CountDownLatch latch = new CountDownLatch(INIT_LATCH_COUNT); + + public void listen1(String in) { + this.latch.countDown(); + } + + public void listen2(String in) { + this.latch.countDown(); + } + } + + @Component + @KafkaListener(id = "classLevelListenerWithSinglePublicMethodAndPrivateMethod", topics = TEST_TOPIC) + public static class ClassLevelListenerWithSinglePublicMethodAndPrivateMethod { + + final CountDownLatch latch = new CountDownLatch(1); + + public void listen1(String in) { + this.latch.countDown(); + } + + private void listen2(String in) { + this.latch.countDown(); + } + + private void listen3(String in) { + this.latch.countDown(); + } + + } + + } + +} From 13ee0d1e640193cd2dc9b854de3664a600be6b51 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Fri, 25 Apr 2025 19:37:26 +0900 Subject: [PATCH 2/6] Addressing PR review Signed-off-by: chickenchickenlove --- .../kafka/annotation/KafkaHandlerParser.java | 81 ------------------- ...kaListenerAnnotationBeanPostProcessor.java | 23 +++--- .../kafka/annotation/KafkaListenerTests.java | 23 +++--- 3 files changed, 22 insertions(+), 105 deletions(-) delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java deleted file mode 100644 index 81344271ef..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2016-2025 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.annotation; - -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Optional; -import java.util.Set; - -import org.springframework.core.MethodIntrospector; -import org.springframework.core.annotation.AnnotatedElementUtils; -import org.springframework.util.ReflectionUtils; -import org.springframework.util.ReflectionUtils.MethodFilter; - -/** - * Find the appropriate handler method when the target class has a class-level {@link KafkaListener} - * annotation and contains a single public method without a {@link KafkaHandler} annotation. - * - * @author Sanghyeok An - * - * @since 4.0 - * - * @see KafkaListenerAnnotationBeanPostProcessor - */ -public class KafkaHandlerParser { - - /** - * Finds the appropriate handler method when the target class has a class-level {@link KafkaListener} - * annotation and contains a single public method without a {@link KafkaHandler} annotation. - * This method is used to determine which method should be invoked for Kafka message handling - * when no explicit {@link KafkaHandler} annotations are present but the class itself is annotated with {@link KafkaListener}. - * - * @param targetClass the class to inspect for handler methods - * @return the method to be used for kafka message handling, or {@code null} if no suitable method is found. - */ - public Optional parseSingleHandlerMethod(Class targetClass) { - - Set methodsWithAnnotations = MethodIntrospector.selectMethods( - targetClass, (MethodFilter) method -> - AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) != null || - AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) != null - ); - - if (!methodsWithAnnotations.isEmpty()) { - return Optional.empty(); - } - - Set publicMethodsWithoutHandlerAnnotation = MethodIntrospector.selectMethods( - targetClass, (ReflectionUtils.MethodFilter) method -> - Modifier.isPublic(method.getModifiers()) && - AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) == null && - AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) == null - ); - - if (!hasSinglePublicMethod(publicMethodsWithoutHandlerAnnotation)) { - return Optional.empty(); - } - - Method publicMethod = publicMethodsWithoutHandlerAnnotation.iterator().next(); - return Optional.of(publicMethod); - } - - private boolean hasSinglePublicMethod(Set methods) { - return methods.size() == 1; - } - -} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 57b14c5a49..baa8a6730a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -20,6 +20,7 @@ import java.io.StringReader; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -33,7 +34,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -149,7 +149,6 @@ * @author Sanghyeok An * @author Soby Chacko * @author Omer Celik - * @author Sanghyeok An * * @see KafkaListener * @see KafkaListenerErrorHandler @@ -159,7 +158,6 @@ * @see KafkaListenerEndpointRegistry * @see org.springframework.kafka.config.KafkaListenerEndpoint * @see MethodKafkaListenerEndpoint - * @see KafkaHandlerParser */ public class KafkaListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware, SmartInitializingSingleton { @@ -216,8 +214,6 @@ public class KafkaListenerAnnotationBeanPostProcessor private final Lock globalLock = new ReentrantLock(); - private final KafkaHandlerParser kafkaHandlerParser = new KafkaHandlerParser(); - @Override public int getOrder() { return LOWEST_PRECEDENCE; @@ -413,16 +409,23 @@ public Object postProcessAfterInitialization(final Object bean, final String bea + beanName + "': " + annotatedMethods); } if (hasClassLevelListeners) { - Optional methodWithoutAnnotation = this.kafkaHandlerParser.parseSingleHandlerMethod(targetClass); - if (methodWithoutAnnotation.isPresent() && classLevelListeners.size() == 1) { + Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, + (ReflectionUtils.MethodFilter) method -> + AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); + Set publicMethods = MethodIntrospector.selectMethods(targetClass, + (ReflectionUtils.MethodFilter) method -> + Modifier.isPublic(method.getModifiers())); + + if (methodsWithHandler.isEmpty() && publicMethods.size() == 1 && !hasMethodLevelListeners) { // Case when target class has class-level @KafkaListener annotation and // has only single public method without @KafkaHandler. - for (KafkaListener kafkaListener : classLevelListeners) { - processKafkaListener(kafkaListener, methodWithoutAnnotation.get(), bean, beanName); + // See, GH-3807 for more details. + Method method = publicMethods.iterator().next(); + for (KafkaListener classLevelListener : classLevelListeners) { + processKafkaListener(classLevelListener, method, bean, beanName); } } else { - Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); List multiMethods = new ArrayList<>(methodsWithHandler); processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java index 2aa5bdd3b7..5caa68389d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java @@ -31,7 +31,6 @@ import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethodAndPrivateMethod; import org.springframework.kafka.annotation.KafkaListenerTests.Config.OtherClassLevelListenerWithSinglePublicMethod; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -39,7 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; @@ -56,6 +55,7 @@ @SpringJUnitConfig @DirtiesContext +@EmbeddedKafka(partitions = 1, topics = "default-listener.tests") public class KafkaListenerTests { private static final String TEST_TOPIC = "default-listener.tests"; @@ -118,15 +118,8 @@ public void testImplicitKafkaHandlerAnnotation() throws Exception { @EnableKafka public static class Config { - @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) - public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { - return new KafkaListenerEndpointRegistry(); - } - - @Bean - public EmbeddedKafkaBroker embeddedKafka() { - return new EmbeddedKafkaKraftBroker(1, 1, TEST_TOPIC); - } + @Autowired + EmbeddedKafkaBroker broker; @Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory() { @@ -143,8 +136,10 @@ public DefaultKafkaConsumerFactory consumerFactory() { @Bean public Map consumerConfigs() { - Map consumerProps = - KafkaTestUtils.consumerProps("myDefaultListenerGroup", "false", embeddedKafka()); + Map consumerProps = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "myDefaultListenerGroup", + "false"); return consumerProps; } @@ -160,7 +155,7 @@ public ProducerFactory producerFactory() { @Bean public Map producerConfigs() { - return KafkaTestUtils.producerProps(embeddedKafka()); + return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); } @Component From 19f9371086657da7ad67fe9ad0642ffd6c5d3f40 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 26 Apr 2025 07:42:18 +0900 Subject: [PATCH 3/6] Addressing PR review Signed-off-by: chickenchickenlove --- .../class-level-kafkalistener.adoc | 4 +- .../antora/modules/ROOT/pages/whats-new.adoc | 7 ++ ...kaListenerAnnotationBeanPostProcessor.java | 3 - .../kafka/annotation/KafkaListenerTests.java | 91 ------------------- 4 files changed, 9 insertions(+), 96 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc index 757ef8eaef..c99f72830c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc @@ -1,7 +1,8 @@ [[class-level-kafkalistener]] = `@KafkaListener` on a Class -When you use `@KafkaListener` at the class-level, you must specify `@KafkaHandler` at the method level. +When you use `@KafkaListener` at the class-level, if there is only one public method, you do not need to specify `@KafkaHandler`. +However, if there are multiple public methods, you must specify `@KafkaHandler` at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so: @@ -58,4 +59,3 @@ void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetad ... } ---- - diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 189fe5200f..96859c1fbf 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -70,3 +70,10 @@ Several deprecated items have been removed: Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. + +[[x40-single-public-method-listener]] +=== Single Public Method Listener Simplification + +Spring for Apache Kafka 4.0 simplifies the use of class-level `@KafkaListener` annotations. +Previously, when `@KafkaListener` was declared at the class level, each public method intended to handle messages had to be annotated explicitly with `@KafkaHandler`. +Now, if there is exactly one public method in the class, it will be automatically recognized as the listener method even without the `@KafkaHandler` annotation. For details, see xref:kafka/receiving-messages/class-level-kafkalistener.adoc#class-level-kafkalistener[Class Level Kafka Listener]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index baa8a6730a..4c82d53354 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -417,9 +417,6 @@ public Object postProcessAfterInitialization(final Object bean, final String bea Modifier.isPublic(method.getModifiers())); if (methodsWithHandler.isEmpty() && publicMethods.size() == 1 && !hasMethodLevelListeners) { - // Case when target class has class-level @KafkaListener annotation and - // has only single public method without @KafkaHandler. - // See, GH-3807 for more details. Method method = publicMethods.iterator().next(); for (KafkaListener classLevelListener : classLevelListeners) { processKafkaListener(classLevelListener, method, bean, beanName); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java index 5caa68389d..efde883b03 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java @@ -25,11 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassAndMethodLevelListener; -import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithDoublePublicMethod; import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethod; -import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethodAndPrivateMethod; -import org.springframework.kafka.annotation.KafkaListenerTests.Config.OtherClassLevelListenerWithSinglePublicMethod; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; @@ -69,21 +65,9 @@ public class KafkaListenerTests { @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - @Autowired - private ClassAndMethodLevelListener classLevel; - @Autowired private ClassLevelListenerWithSinglePublicMethod classLevelListenerWithSinglePublicMethod; - @Autowired - private OtherClassLevelListenerWithSinglePublicMethod otherClassLevelListenerWithSinglePublicMethod; - - @Autowired - private ClassLevelListenerWithDoublePublicMethod classLevelListenerWithDoublePublicMethod; - - @Autowired - private ClassLevelListenerWithSinglePublicMethodAndPrivateMethod classLevelListenerWithSinglePublicMethodAndPrivateMethod; - @Test public void testImplicitKafkaHandlerAnnotation() throws Exception { // GIVEN @@ -93,25 +77,10 @@ public void testImplicitKafkaHandlerAnnotation() throws Exception { this.template.send(TEST_TOPIC, "foo"); // THEN - assertThat(this.classLevel.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.classLevelListenerWithSinglePublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.otherClassLevelListenerWithSinglePublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.classLevelListenerWithSinglePublicMethodAndPrivateMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); - - assertThat(this.classLevelListenerWithDoublePublicMethod.latch.await(10, TimeUnit.SECONDS)).isFalse(); - assertThat(this.classLevelListenerWithDoublePublicMethod.latch.getCount()).isEqualTo(ClassLevelListenerWithDoublePublicMethod.INIT_LATCH_COUNT); - assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classAndMethodLevelListener").getGroupId()) - .isEqualTo("classAndMethodLevelListener"); assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithSinglePublicMethod").getGroupId()) .isEqualTo("classLevelListenerWithSinglePublicMethod"); - assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("otherClassLevelListenerWithSinglePublicMethod").getGroupId()) - .isEqualTo("otherClassLevelListenerWithSinglePublicMethod"); - assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithDoublePublicMethod").getGroupId()) - .isEqualTo("classLevelListenerWithDoublePublicMethod"); - assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithSinglePublicMethodAndPrivateMethod").getGroupId()) - .isEqualTo("classLevelListenerWithSinglePublicMethodAndPrivateMethod"); - } @Configuration @@ -158,18 +127,6 @@ public Map producerConfigs() { return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); } - @Component - @KafkaListener(id = "classAndMethodLevelListener", topics = "default-listener.tests") - public static class ClassAndMethodLevelListener { - - final CountDownLatch latch = new CountDownLatch(1); - - @KafkaHandler - public void listen(String in) { - this.latch.countDown(); - } - } - @Component @KafkaListener(id = "classLevelListenerWithSinglePublicMethod", topics = TEST_TOPIC) public static class ClassLevelListenerWithSinglePublicMethod { @@ -181,54 +138,6 @@ public void listen(String in) { } } - @Component - @KafkaListener(id = "otherClassLevelListenerWithSinglePublicMethod", topics = TEST_TOPIC) - public static class OtherClassLevelListenerWithSinglePublicMethod { - - final CountDownLatch latch = new CountDownLatch(1); - - public void listen(String in) { - this.latch.countDown(); - } - } - - @Component - @KafkaListener(id = "classLevelListenerWithDoublePublicMethod", topics = TEST_TOPIC) - public static class ClassLevelListenerWithDoublePublicMethod { - - public final static int INIT_LATCH_COUNT = 2; - - final CountDownLatch latch = new CountDownLatch(INIT_LATCH_COUNT); - - public void listen1(String in) { - this.latch.countDown(); - } - - public void listen2(String in) { - this.latch.countDown(); - } - } - - @Component - @KafkaListener(id = "classLevelListenerWithSinglePublicMethodAndPrivateMethod", topics = TEST_TOPIC) - public static class ClassLevelListenerWithSinglePublicMethodAndPrivateMethod { - - final CountDownLatch latch = new CountDownLatch(1); - - public void listen1(String in) { - this.latch.countDown(); - } - - private void listen2(String in) { - this.latch.countDown(); - } - - private void listen3(String in) { - this.latch.countDown(); - } - - } - } } From 62f82f7f0dfdc850ede9bc6bfd99718c9920a6eb Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sun, 27 Apr 2025 15:37:32 +0900 Subject: [PATCH 4/6] Revert to a52faccd Signed-off-by: chickenchickenlove --- .../class-level-kafkalistener.adoc | 4 +- .../antora/modules/ROOT/pages/whats-new.adoc | 7 - ...kaListenerAnnotationBeanPostProcessor.java | 17 +-- .../annotation/AliasPropertiesTests.java | 20 --- .../kafka/annotation/KafkaListenerTests.java | 143 ------------------ 5 files changed, 4 insertions(+), 187 deletions(-) delete mode 100644 spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc index c99f72830c..757ef8eaef 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc @@ -1,8 +1,7 @@ [[class-level-kafkalistener]] = `@KafkaListener` on a Class -When you use `@KafkaListener` at the class-level, if there is only one public method, you do not need to specify `@KafkaHandler`. -However, if there are multiple public methods, you must specify `@KafkaHandler` at the method level. +When you use `@KafkaListener` at the class-level, you must specify `@KafkaHandler` at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so: @@ -59,3 +58,4 @@ void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetad ... } ---- + diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 96859c1fbf..189fe5200f 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -70,10 +70,3 @@ Several deprecated items have been removed: Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. - -[[x40-single-public-method-listener]] -=== Single Public Method Listener Simplification - -Spring for Apache Kafka 4.0 simplifies the use of class-level `@KafkaListener` annotations. -Previously, when `@KafkaListener` was declared at the class level, each public method intended to handle messages had to be annotated explicitly with `@KafkaHandler`. -Now, if there is exactly one public method in the class, it will be automatically recognized as the listener method even without the `@KafkaHandler` annotation. For details, see xref:kafka/receiving-messages/class-level-kafkalistener.adoc#class-level-kafkalistener[Class Level Kafka Listener]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 4c82d53354..1edcc0104a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -20,7 +20,6 @@ import java.io.StringReader; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; -import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -412,20 +411,8 @@ public Object postProcessAfterInitialization(final Object bean, final String bea Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); - Set publicMethods = MethodIntrospector.selectMethods(targetClass, - (ReflectionUtils.MethodFilter) method -> - Modifier.isPublic(method.getModifiers())); - - if (methodsWithHandler.isEmpty() && publicMethods.size() == 1 && !hasMethodLevelListeners) { - Method method = publicMethods.iterator().next(); - for (KafkaListener classLevelListener : classLevelListeners) { - processKafkaListener(classLevelListener, method, bean, beanName); - } - } - else { - List multiMethods = new ArrayList<>(methodsWithHandler); - processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName); - } + List multiMethods = new ArrayList<>(methodsWithHandler); + processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName); } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java index f24fd9821c..dd043851cd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java @@ -36,7 +36,6 @@ import org.springframework.core.Ordered; import org.springframework.core.annotation.AliasFor; import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassAndMethodLevelListener; -import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; @@ -59,7 +58,6 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko - * @author Sanghyeok An * * @since 2.2 * @@ -83,16 +81,12 @@ public class AliasPropertiesTests { @Autowired private ClassAndMethodLevelListener repeatable; - @Autowired - private ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation withPublicMethod; - @Test public void testAliasFor() throws Exception { this.template.send("alias.tests", "foo"); assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.classLevel.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.repeatable.latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.withPublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.kafkaListenerEndpointRegistry()).isSameAs(this.kafkaListenerEndpointRegistry); assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onMethodInConfigClass").getGroupId()) .isEqualTo("onMethodInConfigClass.Config.listen1"); @@ -108,8 +102,6 @@ public void testAliasFor() throws Exception { .isEqualTo("onMethodRepeatable2.RepeatableClassAndMethodLevelListener.listen1"); assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onClassRepeatable2").getGroupId()) .isEqualTo("onClassRepeatable2.RepeatableClassAndMethodLevelListener"); - assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onSinglePublicMethod").getGroupId()) - .isEqualTo("onSinglePublicMethod.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation"); assertThat(Config.orderedCalledFirst.get()).isTrue(); } @@ -228,18 +220,6 @@ public void listen1(String in) { } - @Component - @MyListener(id = "onSinglePublicMethod", value = "alias.tests") - public static class ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation { - - private final CountDownLatch latch = new CountDownLatch(1); - - public void listen(String in) { - this.latch.countDown(); - } - - } - public static class OrderedEnhancer implements AnnotationEnhancer, Ordered { private final AtomicBoolean orderedCalledFirst; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java deleted file mode 100644 index efde883b03..0000000000 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2016-2025 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.annotation; - -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.Test; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethod; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.stereotype.Component; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Sanghyeok An - * - * @since 4.0 - * - */ - -@SpringJUnitConfig -@DirtiesContext -@EmbeddedKafka(partitions = 1, topics = "default-listener.tests") -public class KafkaListenerTests { - - private static final String TEST_TOPIC = "default-listener.tests"; - - @Autowired - private KafkaTemplate template; - - @Autowired - private Config config; - - @Autowired - private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - - @Autowired - private ClassLevelListenerWithSinglePublicMethod classLevelListenerWithSinglePublicMethod; - - @Test - public void testImplicitKafkaHandlerAnnotation() throws Exception { - // GIVEN - // See, bean configuration. - - // WHEN - this.template.send(TEST_TOPIC, "foo"); - - // THEN - assertThat(this.classLevelListenerWithSinglePublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue(); - - assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("classLevelListenerWithSinglePublicMethod").getGroupId()) - .isEqualTo("classLevelListenerWithSinglePublicMethod"); - } - - @Configuration - @EnableKafka - public static class Config { - - @Autowired - EmbeddedKafkaBroker broker; - - @Bean - public KafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - return factory; - } - - @Bean - public DefaultKafkaConsumerFactory consumerFactory() { - return new DefaultKafkaConsumerFactory<>(consumerConfigs()); - } - - @Bean - public Map consumerConfigs() { - Map consumerProps = KafkaTestUtils.consumerProps( - this.broker.getBrokersAsString(), - "myDefaultListenerGroup", - "false"); - return consumerProps; - } - - @Bean - public KafkaTemplate template() { - return new KafkaTemplate<>(producerFactory()); - } - - @Bean - public ProducerFactory producerFactory() { - return new DefaultKafkaProducerFactory<>(producerConfigs()); - } - - @Bean - public Map producerConfigs() { - return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); - } - - @Component - @KafkaListener(id = "classLevelListenerWithSinglePublicMethod", topics = TEST_TOPIC) - public static class ClassLevelListenerWithSinglePublicMethod { - - final CountDownLatch latch = new CountDownLatch(1); - - public void listen(String in) { - this.latch.countDown(); - } - } - - } - -} From c322d681b75269d04bc56044a01bad857f2bc589 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sun, 27 Apr 2025 15:38:01 +0900 Subject: [PATCH 5/6] Addressing PR review Signed-off-by: chickenchickenlove --- .../kafka/receiving-messages/class-level-kafkalistener.adoc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc index 757ef8eaef..750603ae7a 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc @@ -2,6 +2,10 @@ = `@KafkaListener` on a Class When you use `@KafkaListener` at the class-level, you must specify `@KafkaHandler` at the method level. +If you do not use `@KafkaHandler`, no method will be used as a listener. + +IMPORTANT: This is because a class annotated at the class-level may be extended or inherited in various ways. + When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so: @@ -58,4 +62,3 @@ void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetad ... } ---- - From f395ccef921c3fc21a7dcc96fee30d76a6bf3c64 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 29 Apr 2025 07:32:10 +0900 Subject: [PATCH 6/6] Addressing PR review Signed-off-by: chickenchickenlove --- .../kafka/receiving-messages/class-level-kafkalistener.adoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc index 750603ae7a..3fdf026192 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/class-level-kafkalistener.adoc @@ -2,9 +2,9 @@ = `@KafkaListener` on a Class When you use `@KafkaListener` at the class-level, you must specify `@KafkaHandler` at the method level. -If you do not use `@KafkaHandler`, no method will be used as a listener. - -IMPORTANT: This is because a class annotated at the class-level may be extended or inherited in various ways. +If no `@KafkaHandler` on any methods of this class or its sub-classes, the framework will reject such a configuration. +The `@KafkaHandler` annotation is required for explicit and concise purpose of the method. +Otherwise it is hard to make a decision about this or other method without extra restrictions. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so: