From a50f2446e7c4ca1dec35109edfb3db93feea34df Mon Sep 17 00:00:00 2001 From: csini Date: Wed, 23 Apr 2025 00:54:17 +0200 Subject: [PATCH] Support reactive KafkaEntity --- .../kafka/annotation/EnableKafkaEntity.java | 46 ++ .../kafka/entity/KafkaEntity.java | 47 ++ .../kafka/entity/KafkaEntityException.java | 72 +++ .../kafka/entity/KafkaEntityKey.java | 34 ++ .../kafka/entity/KafkaEntityUtil.java | 56 ++ .../KafkaEntityDefaultConfiguration.java | 307 +++++++++++ .../reactive/KafkaEntityPollingRunnable.java | 183 +++++++ .../entity/reactive/KafkaEntityProcessor.java | 44 ++ .../entity/reactive/KafkaEntityPublisher.java | 34 ++ .../reactive/KafkaEntitySubscriber.java | 43 ++ .../kafka/entity/reactive/KafkaProcessor.java | 24 + .../reactive/SimpleKafkaEntityProcessor.java | 518 ++++++++++++++++++ .../reactive/SimpleKafkaEntityPublisher.java | 356 ++++++++++++ .../reactive/SimpleKafkaEntitySubscriber.java | 213 +++++++ .../DefaultJackson2JavaKeyTypeMapper.java | 39 ++ .../serializer/JsonKeyDeserializer.java | 67 +++ .../support/serializer/JsonKeySerializer.java | 44 ++ .../springframework/kafka/entity/City.java | 29 + .../kafka/entity/CityGroup.java | 82 +++ .../kafka/entity/ComplexKey.java | 81 +++ .../kafka/entity/ComplexKeyRecord.java | 27 + .../springframework/kafka/entity/Place.java | 28 + .../kafka/entity/PlaceVersion.java | 84 +++ .../springframework/kafka/entity/Product.java | 82 +++ .../springframework/kafka/entity/Student.java | 25 + .../springframework/kafka/entity/User.java | 25 + .../EnableKafkaEntityProcessorTest.java | 194 +++++++ .../EnableKafkaEntityPublisherTest.java | 361 ++++++++++++ .../EnableKafkaEntitySubscriberTest.java | 216 ++++++++ .../SimpleKafkaEntityProcessorTest.java | 204 +++++++ .../SimpleKafkaEntityPublisherTest.java | 170 ++++++ .../SimpleKafkaEntitySubscriberTest.java | 177 ++++++ .../entity/reactive/TestConsumerRunnable.java | 134 +++++ 33 files changed, 4046 insertions(+) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaEntity.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntity.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityException.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityKey.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityUtil.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityDefaultConfiguration.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPollingRunnable.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityProcessor.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPublisher.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntitySubscriber.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaProcessor.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessor.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisher.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriber.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/mapping/DefaultJackson2JavaKeyTypeMapper.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeyDeserializer.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeySerializer.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/City.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/CityGroup.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKey.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKeyRecord.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/Place.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/PlaceVersion.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/Product.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/Student.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/User.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityProcessorTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityPublisherTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntitySubscriberTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessorTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisherTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriberTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/TestConsumerRunnable.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaEntity.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaEntity.java new file mode 100644 index 0000000000..ef4ff8ad0f --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaEntity.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.context.annotation.Import; +import org.springframework.kafka.entity.reactive.KafkaEntityDefaultConfiguration; +import org.springframework.kafka.entity.reactive.KafkaEntityProcessor; +import org.springframework.kafka.entity.reactive.KafkaEntityPublisher; +import org.springframework.kafka.entity.reactive.KafkaEntitySubscriber; + +/** + * Enable Kafka Entities annotated fields. To be used on + * {@link org.springframework.context.annotation.Configuration Configuration} classes. + * + * Note that all fields in beans annotated with fields in Beans annotated + * with @{@link KafkaEntityProcessor} @{@link KafkaEntitySubscriber} @{@link KafkaEntityPublisher} will be detected. + * + * @author Popovics Boglarka + * + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Import(KafkaEntityDefaultConfiguration.class) +public @interface EnableKafkaEntity { +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntity.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntity.java new file mode 100644 index 0000000000..7c8c9f0144 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntity.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * define per topic one class or record and mark with @KafkaEntity (mark a field + * with @KafkaEntityKey) + * Topic must exists. + * Topic name will be the name of the entity class with included packagename but + * you can use custom Topic name like + * this @KafkaEntity(customTopicName="PRODUCT") + * + * @author Popovics Boglarka + */ +@Target({ ElementType.TYPE }) +@Retention(RetentionPolicy.RUNTIME) +public @interface KafkaEntity { + +// readonly true/false + + /** + * Topic name will be the name of the entity class with included packagename + * unless you use this property. + * + * @return custom topic name + */ + String customTopicName() default ""; +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityException.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityException.java new file mode 100644 index 0000000000..094c403e2f --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityException.java @@ -0,0 +1,72 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +/** + * Thrown by spring-kafka-extensions library if there is a problem creating a + * Kafka Entity Bean. + * + * @author Popovics Boglarka + */ +public class KafkaEntityException extends Exception { + + private static final long serialVersionUID = 8485773596430780144L; + + /** + * The name of the Kafka Entity Bean. + */ + private final String beanName; + + /** + * Constructs a new exception with the specified message and a name of the Kafka + * Entity Bean, which had the problem. + * + * @param beanName name of the Kafka Entity Bean + * @param message problem bei creating the Kafka Entity Bean + */ + public KafkaEntityException(String beanName, String message) { + super(message); + this.beanName = beanName; + } + + /** + * Constructs a new exception with the specified cause and a name of the Kafka + * Entity Bean, which had the problem. + * + * @param beanName name of the Kafka Entity Bean + * @param e problem bei creating the Kafka Entity Bean + */ + public KafkaEntityException(String beanName, Exception e) { + super(e); + this.beanName = beanName; + } + + /** + * Getter to the name of the Kafka Entity Bean, which had the problem. + * + * @return name of the Kafka Entity Bean + */ + public String getBeanName() { + return this.beanName; + } + + @Override + public String getMessage() { + return this.beanName + ": " + super.getMessage(); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityKey.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityKey.java new file mode 100644 index 0000000000..16291d2f10 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityKey.java @@ -0,0 +1,34 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Define per topic one class or record and mark with @KafkaEntity (mark a field + * with @KafkaEntityKey) . + * + * @author Popovics Boglarka + */ +@Target({ ElementType.FIELD }) +@Retention(RetentionPolicy.RUNTIME) +public @interface KafkaEntityKey { + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityUtil.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityUtil.java new file mode 100644 index 0000000000..39abc4afc6 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/KafkaEntityUtil.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import io.micrometer.common.util.StringUtils; + +/** + * Kafka Entity Utility Class. + * + * @author Popovics Boglarka + */ +public final class KafkaEntityUtil { + + private KafkaEntityUtil() { + super(); + } + + /** + * Gets the name of the topic to a @KafkaEntity. + * + * @param entity class which is marked with @KafkaEntity + * + * @return the name of the entity class with included packagename but you can + * use custom Topic name like + * this @KafkaEntity(customTopicName="PRODUCT") + */ + public static String getTopicName(Class entity) { + KafkaEntity topic = extractKafkaEntity(entity); + return getTopicName(entity, topic); + } + + private static String getTopicName(Class entity, KafkaEntity topic) { + if (!StringUtils.isEmpty(topic.customTopicName())) { + return topic.customTopicName(); + } + return entity.getName(); + } + + private static KafkaEntity extractKafkaEntity(Class entity) { + return (KafkaEntity) entity.getAnnotation(KafkaEntity.class); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityDefaultConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityDefaultConfiguration.java new file mode 100644 index 0000000000..b0984a7356 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityDefaultConfiguration.java @@ -0,0 +1,307 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListTopicsResult; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.entity.KafkaEntity; +import org.springframework.kafka.entity.KafkaEntityException; +import org.springframework.kafka.entity.KafkaEntityKey; +import org.springframework.kafka.entity.KafkaEntityUtil; + +/** + * This Bean creates and registers all KafkaEntity Bean at starting a + * spring-boot app. + * + * @author Popovics Boglarka + */ +@Configuration +public class KafkaEntityDefaultConfiguration implements InitializingBean, DisposableBean { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private List bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092")); + + private ApplicationContext applicationContext; + + private Set beanNames = new HashSet<>(); + + /** + * Use this constructor if you want to create an instance manually. + * + * @param applicationContext spring applicationcontext + * @param bootstrapServers kafka bootstrap URL, default is looking at spirng + * application property + * spring.kafka.bootstrap-servers, if it is empty than + * "localhost:9092" + */ + public KafkaEntityDefaultConfiguration(@Autowired ApplicationContext applicationContext, + @Value("${spring.kafka.bootstrap-servers:localhost:9092}") List bootstrapServers) { + super(); + this.applicationContext = applicationContext; + if (bootstrapServers != null && !bootstrapServers.isEmpty()) { + this.bootstrapServers = Collections.unmodifiableList(bootstrapServers); + } + this.logger.warn("bootstrapServers: " + bootstrapServers); + } + + private List errors = new ArrayList<>(); + + @Override + public void afterPropertiesSet() { + + StringBuilder result = new StringBuilder(); + String[] allBeans = this.applicationContext.getBeanDefinitionNames(); + DefaultSingletonBeanRegistry registry = (DefaultSingletonBeanRegistry) this.applicationContext + .getAutowireCapableBeanFactory(); + for (String beanName : allBeans) { + result.append(beanName).append("\n"); + + if ("kafkaEntityDefaultConfiguration".equals(beanName)) { + continue; + } + + Object bean = this.applicationContext.getBean(beanName); + if (this.logger.isTraceEnabled()) { + this.logger.trace(" bean -> " + bean.getClass()); + } + for (Field field : bean.getClass().getDeclaredFields()) { + if (this.logger.isTraceEnabled()) { + this.logger.trace(" field -> " + field.getName()); + } + try { + + if (field.isAnnotationPresent(KafkaEntityPublisher.class) + || field.isAnnotationPresent(KafkaEntitySubscriber.class) + || field.isAnnotationPresent(KafkaEntityProcessor.class)) { + String newBeanName = bean.getClass().getName() + "#" + field.getName(); + Class entity = (Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0]; + if (!registry.containsSingleton(newBeanName)) { + DisposableBean newInstance = null; + if (field.isAnnotationPresent(KafkaEntityPublisher.class)) { + + KafkaEntityPublisher kafkaEntityPublisher = field + .getAnnotation(KafkaEntityPublisher.class); + + newInstance = registerKafkaEntityPublisherBean(bean, entity, newBeanName, + kafkaEntityPublisher); + + } + else if (field.isAnnotationPresent(KafkaEntitySubscriber.class)) { + KafkaEntitySubscriber kafkaEntitySubscriber = field + .getAnnotation(KafkaEntitySubscriber.class); + + newInstance = registerKafkaEntitySubscriberBean(bean, entity, newBeanName, kafkaEntitySubscriber); + + } + else if (field.isAnnotationPresent(KafkaEntityProcessor.class)) { + KafkaEntityProcessor kafkaEntityProcessor = field.getAnnotation(KafkaEntityProcessor.class); + + newInstance = registerKafkaEntityProcessorBean(bean, entity, newBeanName, kafkaEntityProcessor); + } + registerBean(registry, bean, field, newBeanName, newInstance); + } + } + } + catch (KafkaEntityException e) { + this.logger.error(e, "Error by registering Kafka Entity Bean"); + this.errors.add(e); + } + catch (Exception e) { + KafkaEntityException kafkaEx = new KafkaEntityException(beanName, e); + this.logger.error(kafkaEx, "Error by registering Kafka Entity Bean"); + this.errors.add(kafkaEx); + } + } + } + String string = result.toString(); + if (this.logger.isTraceEnabled()) { + this.logger.trace("postConstruct-getAllBeans(): " + string); + } + + } + + private DisposableBean registerKafkaEntityPublisherBean(Object bean, Class entity, String newBeanName, + KafkaEntityPublisher kafkaEntityPublisher) + throws KafkaEntityException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + + if (this.logger.isDebugEnabled()) { + this.logger.debug("registering " + newBeanName + " as Publisher"); + } + handleKafkaEntity(newBeanName, entity); + + Class clazz = SimpleKafkaEntityPublisher.class; + Method method = clazz.getMethod("create", List.class, KafkaEntityPublisher.class, Class.class, String.class); + + Object obj = method.invoke(null, this.bootstrapServers, kafkaEntityPublisher, entity, newBeanName); + SimpleKafkaEntityPublisher newInstance = (SimpleKafkaEntityPublisher) obj; + + return newInstance; + } + + private DisposableBean registerKafkaEntitySubscriberBean(Object bean, Class entity, String newBeanName, + KafkaEntitySubscriber kafkaEntitySubscriber) + throws KafkaEntityException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + + if (this.logger.isDebugEnabled()) { + this.logger.debug("registering " + newBeanName + " as Subscriber"); + } + handleKafkaEntity(newBeanName, entity); + + Class clazz = SimpleKafkaEntitySubscriber.class; + Method method = clazz.getMethod("create", List.class, KafkaEntitySubscriber.class, Class.class, String.class); + + Object obj = method.invoke(null, this.bootstrapServers, kafkaEntitySubscriber, entity, newBeanName); + SimpleKafkaEntitySubscriber newInstance = (SimpleKafkaEntitySubscriber) obj; + + return newInstance; + } + + private DisposableBean registerKafkaEntityProcessorBean(Object bean, Class entity, String newBeanName, + KafkaEntityProcessor kafkaEntityProcessor) + throws KafkaEntityException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + + if (this.logger.isDebugEnabled()) { + this.logger.debug("registering " + newBeanName + " as Processor"); + } + handleKafkaEntity(newBeanName, entity); + + Class clazz = SimpleKafkaEntityProcessor.class; + Method method = clazz.getMethod("create", List.class, KafkaEntityProcessor.class, Class.class, String.class); + + Object obj = method.invoke(null, this.bootstrapServers, kafkaEntityProcessor, entity, newBeanName); + SimpleKafkaEntityProcessor newInstance = (SimpleKafkaEntityProcessor) obj; + + return newInstance; + } + + private void registerBean(DefaultSingletonBeanRegistry registry, Object bean, Field field, String newBeanName, + DisposableBean newInstance) throws IllegalAccessException { + registry.registerSingleton(newBeanName, newInstance); + registry.registerDisposableBean(newBeanName, newInstance); + field.setAccessible(true); + field.set(bean, newInstance); + this.beanNames.add(newBeanName); + } + + private void handleKafkaEntity(String beanName, Class entity) throws KafkaEntityException { + if (!entity.isAnnotationPresent(KafkaEntity.class)) { + throw new KafkaEntityException(beanName, entity.getName() + " must be a @KafkaEntity"); + } + + boolean foundKeyAnnotation = false; + for (Field field : entity.getDeclaredFields()) { + if (this.logger.isTraceEnabled()) { + this.logger.trace(" field -> " + field.getName()); + } + if (field.isAnnotationPresent(KafkaEntityKey.class)) { + foundKeyAnnotation = true; + break; + } + } + + if (!foundKeyAnnotation) { + throw new KafkaEntityException(beanName, entity.getName() + " @" + KafkaEntityKey.class.getSimpleName() + + " is mandatory in @" + KafkaEntity.class.getSimpleName()); + } + + try { + checkTopic(entity); + } + catch (InterruptedException | ExecutionException e) { + throw new KafkaEntityException(beanName, e); + } + } + + private void checkTopic(Class entity) throws InterruptedException, ExecutionException, KafkaEntityException { + Map conf = new HashMap<>(); + conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + try (AdminClient admin = AdminClient.create(conf);) { + + ListTopicsResult listTopics = admin.listTopics(); + Set names = listTopics.names().get(); + if (this.logger.isTraceEnabled()) { + this.logger.trace("names: " + names); + } + String topicName = KafkaEntityUtil.getTopicName(entity); + boolean contains = names.contains(topicName); + if (!contains) { + throw new KafkaEntityException(topicName, + "Topic " + topicName + " does not exist in " + this.bootstrapServers); + } + } + } + + /** + * To get the exceptions, which were created while afterPropertiesSet() . + * @return list of KafkaEntityException + */ + public List getErrors() { + return Collections.unmodifiableList(this.errors); + } + + /** + * It throws the first error, which happenened at afterPropertiesSet() if any + * happpened. + * @throws KafkaEntityException the first exception + */ + public void throwFirstError() throws KafkaEntityException { + if (!this.errors.isEmpty()) { + throw this.errors.get(0); + } + } + + @Override + public void destroy() throws Exception { + + DefaultSingletonBeanRegistry registry = (DefaultSingletonBeanRegistry) this.applicationContext + .getAutowireCapableBeanFactory(); + + this.beanNames.stream().forEach(beanName -> { + if (this.logger.isDebugEnabled()) { + this.logger.debug("destroying " + beanName); + } + registry.destroySingleton(beanName); + }); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPollingRunnable.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPollingRunnable.java new file mode 100644 index 0000000000..df4d5af84c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPollingRunnable.java @@ -0,0 +1,183 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.entity.KafkaEntityUtil; +import org.springframework.kafka.entity.reactive.SimpleKafkaEntityPublisher.PublisherWrapper; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonKeyDeserializer; + +/** + * This runnable is constantly polling a kafka-consumer for 10 sec after started + * until it is stopped if there is minimum 1 subscriber. + * + * @param class of the entity, representing messages + * @param the key from the entity + * + * @author Popovics Boglarka + */ +public class KafkaEntityPollingRunnable implements Runnable { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private final String groupid; + + private final Class clazz; + + private final Class clazzKey; + + private final AtomicBoolean stopped = new AtomicBoolean(false); + + private final AtomicBoolean started = new AtomicBoolean(false); + + /** The array of currently subscribed subscribers. */ + private final AtomicReference[]> subscribers; + + private final List bootstrapServers; + + private final String beanName; + + KafkaEntityPollingRunnable(String groupid, Class clazz, Class clazzKey, + AtomicReference[]> subscribers, List bootstrapServers, + String beanName) { + super(); + this.groupid = groupid; + this.clazz = clazz; + this.clazzKey = clazzKey; + this.subscribers = subscribers; + this.bootstrapServers = bootstrapServers; + this.beanName = beanName; + } + + @Override + public void run() { + + Map properties = new HashMap<>(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupid); + + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + JsonDeserializer valueDeserializer = new JsonDeserializer<>(getClazz()); + JsonKeyDeserializer keyDeserializer = new JsonKeyDeserializer<>(getClazzKey()); + + valueDeserializer.addTrustedPackages(getClazz().getPackageName()); + keyDeserializer.addTrustedPackages(getClazzKey().getPackageName()); + + try (KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, keyDeserializer, + valueDeserializer);) { + + kafkaConsumer.subscribe(List.of(KafkaEntityUtil.getTopicName(getClazz()))); + + kafkaConsumer.poll(Duration.ofSeconds(10L)); + + kafkaConsumer.seekToEnd(Collections.emptyList()); + kafkaConsumer.commitSync(); + + // wait until kafkaConsumer is ready and offset setted + + LocalDateTime then = LocalDateTime.now(); + while (kafkaConsumer.committed(kafkaConsumer.assignment()).isEmpty()) { + if (ChronoUnit.SECONDS.between(then, LocalDateTime.now()) >= 60) { + throw new RuntimeException("KafkaConsumer is not ready in 60sec."); + } + } + + if (this.logger.isDebugEnabled()) { + this.logger.debug("started " + this.beanName + "..."); + } + while (!this.stopped.get()) { + this.started.set(true); + + if (this.subscribers.get().length > 0) { + this.logger.info("POLL-" + this.groupid + " to " + this.subscribers.get().length + " subscribers"); + ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(10L)); + + if (this.logger.isDebugEnabled()) { + this.logger.debug("poll.count: " + poll.count()); + } + + poll.forEach(r -> { + + if (this.logger.isTraceEnabled()) { + this.logger.trace("polled:" + r); + } + + for (PublisherWrapper pd : this.subscribers.get()) { + pd.onNext(r.value()); + } + }); + kafkaConsumer.commitSync(); + } + } + kafkaConsumer.unsubscribe(); + this.started.set(false); + } + } + + /** + * Getter to message class. + * + * @return message class + */ + public Class getClazz() { + return this.clazz; + } + + /** + * Getter to message key class. + * + * @return message key class + */ + public Class getClazzKey() { + return this.clazzKey; + } + + /** + * Getter to is the polling stopped. + * + * @return stopped + */ + public AtomicBoolean getStopped() { + return this.stopped; + } + + /** + * Getter to is the polling started. + * + * @return started + */ + public AtomicBoolean getStarted() { + return this.started; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityProcessor.java new file mode 100644 index 0000000000..39d19894de --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityProcessor.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * To write data to a topic (produce) + * if you need the RecordMetadata from the Kafka Message than you can use + * KafkaEntityProcessor . + * In a Spring Bean just inject a KafkaEntityProcessor. + * + * default is transactional=true + * + * @author Popovics Boglarka + */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface KafkaEntityProcessor { + + /** + * If the Kafka Producer should create a Kafka Transaction. default is true + * + * @return transactional + */ + boolean transactional() default true; +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPublisher.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPublisher.java new file mode 100644 index 0000000000..e6aab27c43 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntityPublisher.java @@ -0,0 +1,34 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * To read data from a topic (consume) in a Spring Bean just inject a + * KafkaEntityPublisher . + * + * @author Popovics Boglarka + */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface KafkaEntityPublisher { + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntitySubscriber.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntitySubscriber.java new file mode 100644 index 0000000000..2763dc85ab --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaEntitySubscriber.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * To write data to a topic (produce) in a Spring Bean just inject a + * KafkaEntitySubscriber . + * + * default is transactional=true + * + * @author Popovics Boglarka + */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface KafkaEntitySubscriber { + + /** + * If the Kafka Producer should create a Kafka Transaction. default is true + * + * @return transactional + */ + boolean transactional() default true; + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaProcessor.java new file mode 100644 index 0000000000..a492f0b69c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/KafkaProcessor.java @@ -0,0 +1,24 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.reactivestreams.Processor; + +public interface KafkaProcessor extends Processor { + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessor.java new file mode 100644 index 0000000000..aa4e9f261b --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessor.java @@ -0,0 +1,518 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.jspecify.annotations.NonNull; +import org.reactivestreams.Processor; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Operators; +import reactor.core.publisher.SignalType; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.entity.KafkaEntityException; +import org.springframework.kafka.entity.KafkaEntityKey; +import org.springframework.kafka.entity.KafkaEntityUtil; +import org.springframework.kafka.support.serializer.JsonKeySerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +/** + * Implementation class for @KafkaEntityProcessor . + * + * @param class of the Kafka Entity, representing messages + * @param class of the Kafka Entity Key of the Kafka Entity + * + * @author Popovics Boglarka + */ +public final class SimpleKafkaEntityProcessor extends Flux + implements KafkaProcessor, Processor, DisposableBean, InitializingBean { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + /** The terminated indicator for the subscribers array. */ + @SuppressWarnings("rawtypes") + static final ProcessorWrapper[] TERMINATED = new ProcessorWrapper[0]; + + /** An empty subscribers array to avoid allocating it all the time. */ + @SuppressWarnings("rawtypes") + static final ProcessorWrapper[] EMPTY = new ProcessorWrapper[0]; + + /** The array of currently subscribed subscribers. */ + final AtomicReference[]> subscribers; + + private String clientid; + + private KafkaProducer kafkaProducer; + + private Class clazz; + + private String topic; + + /** The error, write before terminating and read after checking subscribers. */ + Throwable error; + + private Field keyField; + + private boolean transactional; + + private List bootstrapServers; + + /** + * Constructs a SimpleKafkaEntityProcessor. + * + * @param the value type + * @param the key type + * @param bootstrapServers List of bootstrapServers + * @param kafkaEntityProcessor annotation + * @param entity class of the Kafka Entity, representing messages + * @param beanName name of the Kafka Entity Bean + * + * @return the new SimpleKafkaEntityProcessor + * @throws KafkaEntityException problem at creation + */ + // @CheckReturnValue + @NonNull + public static SimpleKafkaEntityProcessor create(List bootstrapServers, + KafkaEntityProcessor kafkaEntityProcessor, Class entity, String beanName) throws KafkaEntityException { + return new SimpleKafkaEntityProcessor<>(bootstrapServers, kafkaEntityProcessor, entity, beanName); + } + + SimpleKafkaEntityProcessor(List bootstrapServers, KafkaEntityProcessor kafkaEntityProcessor, + Class entity, String beanName) { + + this.bootstrapServers = bootstrapServers; + this.subscribers = new AtomicReference<>(EMPTY); + + this.clazz = entity; + this.clientid = beanName; + this.topic = KafkaEntityUtil.getTopicName(this.clazz); + + this.transactional = kafkaEntityProcessor.transactional(); + + // presents of @KafkaEntityKey is checked in KafkaEntityConfig + for (Field field : this.clazz.getDeclaredFields()) { + if (this.logger.isDebugEnabled()) { + this.logger.debug(" field -> " + field.getName()); + } + if (field.isAnnotationPresent(KafkaEntityKey.class)) { + field.setAccessible(true); + this.keyField = field; + break; + } + } + + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonKeySerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientid); + + if (this.transactional) { + configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, beanName + "-transactional-id"); + configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + } + + JsonSerializer valueSerializer = new JsonSerializer<>(); + JsonKeySerializer keySerializer = new JsonKeySerializer<>(); + + this.kafkaProducer = new KafkaProducer(configProps, keySerializer, valueSerializer); + if (this.transactional) { + this.logger.info("initTransactions-begin " + beanName); + this.kafkaProducer.initTransactions(); + this.logger.info("initTransactions-end " + beanName); + } + } + + private K extractKey(T event) throws IllegalArgumentException, IllegalAccessException { + return (K) this.keyField.get(event); + } + + @Override + public void onSubscribe(Subscription d) { + + if (this.subscribers.get() == TERMINATED) { + d.cancel(); + } + + if (this.transactional) { + this.kafkaProducer.beginTransaction(); + } + d.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + this.logger.info("onNext: " + t); + CompletableFuture completableFuture = new CompletableFuture<>(); + K key; + try { + key = extractKey(t); + + ProducerRecord rec = new ProducerRecord(this.topic, key, t); + Future send = this.kafkaProducer.send(rec); + completableFuture.complete(send.get()); + } + catch (InterruptedException | ExecutionException e) { + completableFuture.completeExceptionally(e); + } + catch (IllegalAccessException e) { + completableFuture.completeExceptionally(e); + } + + try { + RecordMetadata r = completableFuture.get(); + for (ProcessorWrapper pd : this.subscribers.get()) { + pd.onNext(r); + } + } + catch (InterruptedException | ExecutionException e) { + onError(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + if (this.subscribers.get() == TERMINATED) { + onError(t); + return; + } + this.error = t; + + for (ProcessorWrapper pd : this.subscribers.getAndSet(TERMINATED)) { + pd.onError(t); + } + if (this.transactional) { + this.kafkaProducer.abortTransaction(); + } + return; + } + + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + if (this.subscribers.get() == TERMINATED) { + return; + } + for (ProcessorWrapper pd : this.subscribers.getAndSet(TERMINATED)) { + pd.onComplete(); + } + if (this.transactional) { + this.kafkaProducer.commitTransaction(); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + + } + + @Override + public void destroy() throws Exception { + this.kafkaProducer.close(); + } + + @Override + public void subscribe(CoreSubscriber actual) { + ProcessorWrapper ps = new ProcessorWrapper<>(actual, this); + actual.onSubscribe(ps); + if (add(ps)) { + // if cancellation happened while a successful add, the remove() didn't work + // so we need to do it again + if (ps.isDisposed()) { + remove(ps); + } + } + else { + Throwable ex = this.error; + if (ex != null) { + actual.onError(ex); + } + else { + actual.onComplete(); + } + } + } + + /** + * Tries to add the given subscriber to the subscribers array atomically or + * returns false if the Processor has terminated. + * + * @param ps the subscriber to add + * + * @return true if successful, false if the Processor has terminated + */ + boolean add(ProcessorWrapper ps) { + for (;;) { + ProcessorWrapper[] a = this.subscribers.get(); + if (a == TERMINATED) { + return false; + } + + int n = a.length; + @SuppressWarnings("unchecked") + ProcessorWrapper[] b = new ProcessorWrapper[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = ps; + + if (this.subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + /** + * Atomically removes the given subscriber if it is subscribed to the Processor. + * + * @param ps the Processor to remove + */ + @SuppressWarnings("unchecked") + void remove(ProcessorWrapper ps) { + for (;;) { + ProcessorWrapper[] a = this.subscribers.get(); + if (a == TERMINATED || a == EMPTY) { + return; + } + + int n = a.length; + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == ps) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + ProcessorWrapper[] b; + + if (n == 1) { + b = EMPTY; + } + else { + b = new ProcessorWrapper[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (this.subscribers.compareAndSet(a, b)) { + return; + } + } + } + + /** + * Wraps the actual subscriber, tracks its requests and makes cancellation to + * remove itself from the current subscribers array. + * + * @param the value type + * @param the key type + */ + static final class ProcessorWrapper extends AtomicBoolean + implements CoreSubscriber, Subscription, Disposable { + + private static final long serialVersionUID = 1L; + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + /** The actual subscriber. */ + final CoreSubscriber downstream; + + /** The Processor state. */ + final SimpleKafkaEntityProcessor parent; + + /** + * Constructs a ProcessorWrapper, wraps the actual subscriber and the state. + * + * @param actual the actual subscriber + * @param parent the parent PublishProcessor + */ + ProcessorWrapper(CoreSubscriber actual, SimpleKafkaEntityProcessor parent) { + this.downstream = actual; + this.parent = parent; + } + + @Override + public void dispose() { + if (compareAndSet(false, true)) { + this.parent.remove(this); + } + cancel(); + } + + volatile Subscription subscription; + + static AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(ProcessorWrapper.class, Subscription.class, "subscription"); + + /** + * Return current {@link Subscription} . + * + * @return current {@link Subscription} + */ + protected Subscription upstream() { + return this.subscription; + } + + @Override + public boolean isDisposed() { + return this.subscription == Operators.cancelledSubscription(); + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.setOnce(S, this, s)) { + try { + this.downstream.onSubscribe(s); + this.subscription.request(Long.MAX_VALUE); + } + catch (Throwable throwable) { + onError(Operators.onOperatorError(s, throwable, currentContext())); + } + } + } + + @Override + public void onNext(RecordMetadata value) { + Objects.requireNonNull(value, "onNext"); + try { + if (!get()) { + this.downstream.onNext(value); + } + } + catch (Throwable throwable) { + onError(Operators.onOperatorError(this.subscription, throwable, value, currentContext())); + } + } + + @Override + public void onError(Throwable t) { + Objects.requireNonNull(t, "onError"); + + if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators + .cancelledSubscription()) { + //already cancelled concurrently + Operators.onErrorDropped(t, currentContext()); + return; + } + + try { + if (!get()) { + this.downstream.onError(t); + } + throw Exceptions.errorCallbackNotImplemented(t); + } + catch (Throwable e) { + e = Exceptions.addSuppressed(e, t); + Operators.onErrorDropped(e, currentContext()); + } + finally { + safeHookFinally(SignalType.ON_ERROR); + } + } + + @Override + public void onComplete() { + if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators + .cancelledSubscription()) { + //we're sure it has not been concurrently cancelled + try { + if (!get()) { + this.downstream.onComplete(); + } + } + catch (Throwable throwable) { + //onError itself will short-circuit due to the CancelledSubscription being set above + onError(Operators.onOperatorError(throwable, currentContext())); + } + finally { + safeHookFinally(SignalType.ON_COMPLETE); + } + } + } + + @Override + public void request(long n) { + if (Operators.validate(n)) { + Subscription s = this.subscription; + if (s != null) { + s.request(n); + } + } + } + + /** + * {@link #request(long) Request} an unbounded amount. + */ + public void requestUnbounded() { + request(Long.MAX_VALUE); + } + + @Override + public void cancel() { + if (Operators.terminate(S, this)) { + try { + // do nothing + } + catch (Throwable throwable) { + onError(Operators.onOperatorError(this.subscription, throwable, currentContext())); + } + finally { + safeHookFinally(SignalType.CANCEL); + } + } + } + + void safeHookFinally(SignalType type) { + try { + // do nothing + } + catch (Throwable finallyFailure) { + Operators.onErrorDropped(finallyFailure, currentContext()); + } + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisher.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisher.java new file mode 100644 index 0000000000..3270997352 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisher.java @@ -0,0 +1,356 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.reflect.Field; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; +import org.apache.kafka.common.KafkaFuture; +import org.jspecify.annotations.NonNull; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.entity.KafkaEntityException; +import org.springframework.kafka.entity.KafkaEntityKey; + +/** + * Implementation class for @KafkaEntityPublisher . + * + * @param class of the entity, representing messages + * @param the key from the entity + * + * @author Popovics Boglarka + */ +public class SimpleKafkaEntityPublisher extends Flux implements DisposableBean, InitializingBean { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private List bootstrapServers; + + private String groupid; + + private Class clazz; + + private Class clazzKey; + + private final KafkaEntityPollingRunnable pollingRunnable; + + private String beanName; + + SimpleKafkaEntityPublisher(List bootstrapServers, KafkaEntityPublisher kafkaEntityPublisher, Class entity, String beanName) + throws KafkaEntityException { + this.bootstrapServers = bootstrapServers; + this.clazz = entity; + this.groupid = /* getTopicName() + "-Subscriber-" + */ beanName; + + // presents of @KafkaEntityKey is checked in KafkaEntityConfig + for (Field field : this.clazz.getDeclaredFields()) { + if (this.logger.isDebugEnabled()) { + this.logger.debug(" field -> " + field.getName()); + } + if (field.isAnnotationPresent(KafkaEntityKey.class)) { + field.setAccessible(true); + this.clazzKey = (Class) field.getType(); + break; + } + } + + this.subscribers = new AtomicReference<>(EMPTY); + + this.beanName = beanName; + + this.pollingRunnable = new KafkaEntityPollingRunnable(this.groupid, this.clazz, this.clazzKey, this.subscribers, + bootstrapServers, beanName); + + start(); + } + + private void start() throws KafkaEntityException { + + if (this.pollingRunnable.getStarted().get()) { + this.logger.warn("already started..."); + return; + } + + if (this.logger.isDebugEnabled()) { + this.logger.debug("starting " + this.beanName + "..."); + } + + Thread pollingThread = new Thread(this.pollingRunnable); + pollingThread.setName(this.beanName + "Thread"); + pollingThread.start(); + + this.logger.info("waiting the consumer to start in " + this.beanName + "Thread..."); + LocalDateTime then = LocalDateTime.now(); + while (!this.pollingRunnable.getStarted().get()) { + if (ChronoUnit.SECONDS.between(then, LocalDateTime.now()) >= 300) { + throw new KafkaEntityException(this.beanName, "KafkaConsumer could not start in 300 sec."); + } + } + } + + @Override + public void afterPropertiesSet() throws Exception { + + } + + @Override + public void destroy() throws Exception { + + this.logger.warn("deleting a Consumer Group for " + this.beanName); + + this.pollingRunnable.getStopped().set(true); + + this.logger.info("waiting polling to stop"); + LocalDateTime then = LocalDateTime.now(); + while (this.pollingRunnable.getStarted().get()) { + if (ChronoUnit.SECONDS.between(then, LocalDateTime.now()) >= 20) { + break; + } + } + + final Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); + properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000"); + + try (AdminClient adminClient = AdminClient.create(properties);) { + String consumerGroupToBeDeleted = this.groupid; + DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient + .deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted)); + + KafkaFuture resultFuture = deleteConsumerGroupsResult.all(); + resultFuture.get(); + } + catch (Exception e) { + // we cannot do anything at this point + this.logger.error(e.getMessage()); + } + + } + + /** The terminated indicator for the subscribers array. */ + @SuppressWarnings("rawtypes") + static final PublisherWrapper[] TERMINATED = new PublisherWrapper[0]; + + /** An empty subscribers array to avoid allocating it all the time. */ + @SuppressWarnings("rawtypes") + static final PublisherWrapper[] EMPTY = new PublisherWrapper[0]; + + /** The array of currently subscribed subscribers. */ + final AtomicReference[]> subscribers; + + /** The error, write before terminating and read after checking subscribers. */ + Throwable error; + + /** + * Constructs a SimpleKafkaPublisherHandler. + * + * @param the value type + * @param the key type + * @param bootstrapServers List of bootstrapServers + * @param kafkaEntityPublisher annotation + * @param entity class of the Kafka Entity, representing messages + * @param beanName name of the Kafka Entity Bean + * + * @return the new SimpleKafkaPublisherHandler + * @throws KafkaEntityException problem at creation + */ + // @CheckReturnValue + @NonNull + public static SimpleKafkaEntityPublisher create(List bootstrapServers, KafkaEntityPublisher kafkaEntityPublisher, + Class entity, String beanName) throws KafkaEntityException { + return new SimpleKafkaEntityPublisher(bootstrapServers, kafkaEntityPublisher, entity, beanName); + } + + @Override + public void subscribe(CoreSubscriber actual) { + PublisherWrapper ps = new PublisherWrapper<>(actual, this); + actual.onSubscribe(ps); + if (add(ps)) { + // if cancellation happened while a successful add, the remove() didn't work + // so we need to do it again + if (ps.isDisposed()) { + remove(ps); + } + } + else { + Throwable ex = this.error; + if (ex != null) { + actual.onError(ex); + } + else { + actual.onComplete(); + } + } + + } + + /** + * Tries to add the given subscriber to the subscribers array atomically or + * returns false if the Processor has terminated. + * + * @param ps the subscriber to add + * + * @return true if successful, false if the Processor has terminated + */ + boolean add(PublisherWrapper ps) { + for (;;) { + PublisherWrapper[] a = this.subscribers.get(); + if (a == TERMINATED) { + return false; + } + + int n = a.length; + @SuppressWarnings("unchecked") + PublisherWrapper[] b = new PublisherWrapper[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = ps; + + if (this.subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + /** + * Atomically removes the given subscriber if it is subscribed to the Processor. + * + * @param ps the Processor to remove + */ + @SuppressWarnings("unchecked") + void remove(PublisherWrapper ps) { + for (;;) { + PublisherWrapper[] a = this.subscribers.get(); + if (a == TERMINATED || a == EMPTY) { + return; + } + + int n = a.length; + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == ps) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + PublisherWrapper[] b; + + if (n == 1) { + b = EMPTY; + } + else { + b = new PublisherWrapper[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (this.subscribers.compareAndSet(a, b)) { + return; + } + } + } + + /** + * Wraps the actual subscriber, tracks its requests and makes cancellation to + * remove itself from the current subscribers array. + * + * @param the value type + * @param the key type + */ + static final class PublisherWrapper extends AtomicBoolean implements CoreSubscriber, Subscription, Disposable { + + /** The actual subscriber. */ + final CoreSubscriber downstream; + + /** The Processor state. */ + final SimpleKafkaEntityPublisher parent; + + /** + * Constructs a PublishSubscriber, wraps the actual subscriber and the state. + * + * @param actual the actual subscriber + * @param parent the parent PublishProcessor + */ + PublisherWrapper(CoreSubscriber actual, SimpleKafkaEntityPublisher parent) { + this.downstream = actual; + this.parent = parent; + } + + @Override + public void dispose() { + if (compareAndSet(false, true)) { + this.parent.remove(this); + } + } + + @Override + public void onSubscribe(Subscription s) { + this.downstream.onSubscribe(s); + } + + @Override + public void onNext(T t) { + if (!get()) { + this.downstream.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + this.downstream.onError(t); + } + + @Override + public void onComplete() { + if (!get()) { + this.downstream.onComplete(); + } + } + + @Override + public void request(long n) { + compareAndSet(true, false); + } + + @Override + public void cancel() { + dispose(); + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriber.java b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriber.java new file mode 100644 index 0000000000..780f694e57 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriber.java @@ -0,0 +1,213 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.jspecify.annotations.NonNull; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.SignalType; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.entity.KafkaEntityException; +import org.springframework.kafka.entity.KafkaEntityKey; +import org.springframework.kafka.entity.KafkaEntityUtil; +import org.springframework.kafka.support.serializer.JsonKeySerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +/** + * Implementation class for @KafkaEntitySubscriber. + * + * @param class of the entity, representing messages + * @param the key from the entity + * + * @author Popovics Boglarka + */ +public final class SimpleKafkaEntitySubscriber extends BaseSubscriber implements DisposableBean, InitializingBean { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private String clientid; + + private KafkaProducer kafkaProducer; + + private Class clazz; + + private String topic; + + /** The error, write before terminating and read after checking subscribers. */ + Throwable error; + + private Field keyField; + + private boolean transactional; + + private List bootstrapServers; + + /** + * Constructs a SimpleKafkaSubscriber. + * + * @param the value type + * @param the key type + * @param bootstrapServers List of bootstrapServers + * @param kafkaEntitySubscriber annotation + * @param entity class of the Kafka Entity, representing messages + * @param beanName name of the Kafka Entity Bean + * + * @return the new SimpleKafkaSubscriber + * @throws KafkaEntityException problem at creation + */ + @NonNull + public static SimpleKafkaEntitySubscriber create(List bootstrapServers, KafkaEntitySubscriber kafkaEntitySubscriber, Class entity, + String beanName) throws KafkaEntityException { + return new SimpleKafkaEntitySubscriber<>(bootstrapServers, kafkaEntitySubscriber, entity, beanName); + } + + SimpleKafkaEntitySubscriber(List bootstrapServers, KafkaEntitySubscriber kafkaEntitySubscriber, Class entity, String beanName) throws KafkaEntityException { + this.bootstrapServers = bootstrapServers; + this.clazz = entity; + this.clientid = beanName; + this.topic = KafkaEntityUtil.getTopicName(this.clazz); + this.transactional = kafkaEntitySubscriber.transactional(); + + // presents of @KafkaEntityKey is checked in KafkaEntityConfig + for (Field field : this.clazz.getDeclaredFields()) { + if (this.logger.isDebugEnabled()) { + this.logger.debug(" field -> " + field.getName()); + } + if (field.isAnnotationPresent(KafkaEntityKey.class)) { + field.setAccessible(true); + this.keyField = field; + break; + } + } + + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonKeySerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientid); + + if (this.transactional) { + configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, beanName + "-transactional-id"); + configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + } + + JsonSerializer valueSerializer = new JsonSerializer<>(); + JsonKeySerializer keySerializer = new JsonKeySerializer<>(); + + this.kafkaProducer = new KafkaProducer(configProps, keySerializer, valueSerializer); + + if (this.transactional) { + this.logger.info("initTransactions-begin " + beanName); + this.kafkaProducer.initTransactions(); + this.logger.info("initTransactions-end " + beanName); + } + } + + private K extractKey(T event) throws IllegalArgumentException, IllegalAccessException { + return (K) this.keyField.get(event); + } + + @Override + public void hookOnSubscribe(Subscription d) { + if (this.transactional) { + this.kafkaProducer.beginTransaction(); + } + super.hookOnSubscribe(d); + } + + @Override + public void hookOnNext(T t) { + CompletableFuture completableFuture = new CompletableFuture<>(); + K key; + try { + key = extractKey(t); + + ProducerRecord rec = new ProducerRecord(this.topic, key, t); + Future send = this.kafkaProducer.send(rec); + completableFuture.complete(send.get()); + } + catch (InterruptedException | ExecutionException e) { + completableFuture.completeExceptionally(e); + } + catch (IllegalAccessException e) { + completableFuture.completeExceptionally(e); + } + + try { + completableFuture.get(); + } + catch (InterruptedException | ExecutionException e) { + onError(e); + } + + super.hookOnNext(t); + } + + @Override + public void hookOnError(Throwable t) { + this.logger.error(t, "onError"); + if (this.transactional) { + this.kafkaProducer.abortTransaction(); + } + super.hookOnError(t); + } + + @Override + public void hookOnComplete() { + if (this.transactional) { + this.kafkaProducer.commitTransaction(); + } + super.hookOnComplete(); + } + + @Override + public void afterPropertiesSet() throws Exception { + + } + + @Override + public void destroy() throws Exception { + this.kafkaProducer.close(); + } + + @Override + protected void hookFinally(SignalType type) { + super.hookFinally(type); + } + + @Override + protected void hookOnCancel() { + super.hookOnCancel(); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/mapping/DefaultJackson2JavaKeyTypeMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/mapping/DefaultJackson2JavaKeyTypeMapper.java new file mode 100644 index 0000000000..cd533917c5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/mapping/DefaultJackson2JavaKeyTypeMapper.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.mapping; + +/** + * Same as DefaultJackson2JavaTypeMapper only the classIdFieldName is other. + * + * @author Popovics Boglarka + */ +public class DefaultJackson2JavaKeyTypeMapper extends DefaultJackson2JavaTypeMapper { + + /** + * Default constructor, use this. + */ + public DefaultJackson2JavaKeyTypeMapper() { + super(); + } + + @Override + public String getClassIdFieldName() { + // DEFAULT_KEY_CLASSID_FIELD_NAME + return "__Key_TypeId__"; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeyDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeyDeserializer.java new file mode 100644 index 0000000000..28f766d28b --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeyDeserializer.java @@ -0,0 +1,67 @@ +/* + * Copyright 2015-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.jspecify.annotations.Nullable; + +import org.springframework.kafka.support.mapping.DefaultJackson2JavaKeyTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; + +/** + * Same as JsonDeserializer only the classIdFieldName is other because using + * DefaultJackson2JavaKeyTypeMapper. + * + * @param class of the entity, representing messages + * + * @author Popovics Boglarka + */ +public class JsonKeyDeserializer extends JsonDeserializer { + + @Override + public Jackson2JavaTypeMapper getTypeMapper() { + return new DefaultJackson2JavaKeyTypeMapper(); + } + + /** + * Construct an instance with a default {@link ObjectMapper}. + */ + public JsonKeyDeserializer() { + super((Class) null, true); + } + + /** + * Construct an instance with the provided {@link ObjectMapper}. + * + * @param objectMapper a custom object mapper. + */ + public JsonKeyDeserializer(ObjectMapper objectMapper) { + super((Class) null, objectMapper, true); + setTypeMapper(new DefaultJackson2JavaKeyTypeMapper()); + } + + /** + * Construct an instance with the provided target type. + * + * @param targetType the target type to use if no type info headers are present. + */ + public JsonKeyDeserializer(@Nullable Class targetType) { + super(targetType); + setTypeMapper(new DefaultJackson2JavaKeyTypeMapper()); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeySerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeySerializer.java new file mode 100644 index 0000000000..1a5b55e793 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonKeySerializer.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import org.springframework.kafka.support.mapping.DefaultJackson2JavaKeyTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; + +/** + * Same as JsonSerializer only using DefaultJackson2JavaKeyTypeMapper. + * + * @param class of the entity, representing messages + * + * @author Popovics Boglarka + */ +public class JsonKeySerializer extends JsonSerializer { + + @Override + public Jackson2JavaTypeMapper getTypeMapper() { + return new DefaultJackson2JavaKeyTypeMapper(); + } + + /** + * Default constructor, use this. + */ + public JsonKeySerializer() { + super(); + setTypeMapper(new DefaultJackson2JavaKeyTypeMapper()); + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/City.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/City.java new file mode 100644 index 0000000000..bf107e4cc5 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/City.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity +public record City(@KafkaEntityKey String name) { + + public String name() { + return name; + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/CityGroup.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/CityGroup.java new file mode 100644 index 0000000000..c73c0359ee --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/CityGroup.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.util.Objects; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity +public class CityGroup { + @KafkaEntityKey + private ComplexKey complexKey; + + private String country; + + public CityGroup() { + super(); + } + + public CityGroup(ComplexKey complexKey, String country) { + super(); + this.complexKey = complexKey; + this.country = country; + } + + public ComplexKey getComplexKey() { + return complexKey; + } + + public void setComplexKey(ComplexKey complexKey) { + this.complexKey = complexKey; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } + + @Override + public int hashCode() { + return Objects.hash(complexKey, country); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + CityGroup other = (CityGroup) obj; + return Objects.equals(complexKey, other.complexKey) && Objects.equals(country, other.country); + } + + @Override + public String toString() { + return "CityGroup [complexKey=" + complexKey + ", country=" + country + "]"; + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKey.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKey.java new file mode 100644 index 0000000000..8ad4786685 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKey.java @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.util.Objects; + +/** + * @author Popovics Boglarka + */ +public class ComplexKey { + + private long id; + + private String name; + + public ComplexKey() { + super(); + } + + public ComplexKey(long id, String name) { + super(); + this.id = id; + this.name = name; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public int hashCode() { + return Objects.hash(id, name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ComplexKey other = (ComplexKey) obj; + return id == other.id && Objects.equals(name, other.name); + } + + @Override + public String toString() { + return "ComplexKey [id=" + id + ", name=" + name + "]"; + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKeyRecord.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKeyRecord.java new file mode 100644 index 0000000000..3913d87a56 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/ComplexKeyRecord.java @@ -0,0 +1,27 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * @author Popovics Boglarka + */ +public record ComplexKeyRecord(UUID uuid, LocalDateTime created) { + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/Place.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/Place.java new file mode 100644 index 0000000000..36023c3dae --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/Place.java @@ -0,0 +1,28 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity +public record Place(@KafkaEntityKey String id) { + + public String id() { + return id; + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/PlaceVersion.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/PlaceVersion.java new file mode 100644 index 0000000000..0d4a5c494f --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/PlaceVersion.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.util.Objects; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity +public class PlaceVersion { + + @KafkaEntityKey + private ComplexKeyRecord complexKeyRecord; + + private String placeName; + + public PlaceVersion() { + super(); + } + + public PlaceVersion(ComplexKeyRecord complexKeyRecord, String placeName) { + super(); + this.complexKeyRecord = complexKeyRecord; + this.placeName = placeName; + } + + public ComplexKeyRecord getComplexKeyRecord() { + return complexKeyRecord; + } + + public void setComplexKeyRecord(ComplexKeyRecord complexKeyRecord) { + this.complexKeyRecord = complexKeyRecord; + } + + public String getPlaceName() { + return placeName; + } + + public void setPlaceName(String placeName) { + this.placeName = placeName; + } + + @Override + public int hashCode() { + return Objects.hash(complexKeyRecord, placeName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PlaceVersion other = (PlaceVersion) obj; + return Objects.equals(complexKeyRecord, other.complexKeyRecord) + && Objects.equals(placeName, other.placeName); + } + + @Override + public String toString() { + return "PlaceVersion [complexKeyRecord=" + complexKeyRecord + ", placeName=" + placeName + "]"; + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/Product.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/Product.java new file mode 100644 index 0000000000..485c7a6b77 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/Product.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +import java.util.Objects; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity(customTopicName = "PRODUCT") +public class Product { + + @KafkaEntityKey + private String id; + + private String title; + + private String description; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Product other = (Product) obj; + return Objects.equals(id, other.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public String toString() { + return "Product [id=" + id + ", title=" + title + ", description=" + description + "]"; + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/Student.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/Student.java new file mode 100644 index 0000000000..c1270d0f23 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/Student.java @@ -0,0 +1,25 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity +public record Student(@KafkaEntityKey String studentid, int age) { + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/User.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/User.java new file mode 100644 index 0000000000..d178fe4a82 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/User.java @@ -0,0 +1,25 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity; + +/** + * @author Popovics Boglarka + */ +@KafkaEntity +public record User(String userid) { + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityProcessorTest.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityProcessorTest.java new file mode 100644 index 0000000000..166f21b03b --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityProcessorTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Processor; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.annotation.EnableKafkaEntity; +import org.springframework.kafka.entity.KafkaEntityException; +import org.springframework.kafka.entity.Product; +import org.springframework.kafka.entity.Student; +import org.springframework.kafka.entity.User; +import org.springframework.kafka.entity.reactive.EnableKafkaEntityProcessorTest.SpringKafkaEntityProcessorTestConfiguration; +import org.springframework.kafka.test.condition.LogLevels; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.stereotype.Service; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Popovics Boglarka + */ +@SpringJUnitConfig(SpringKafkaEntityProcessorTestConfiguration.class) +@DirtiesContext +@EmbeddedKafka(topics = { + + EnableKafkaEntityProcessorTest.TOPIC_CITY, EnableKafkaEntityProcessorTest.TOPIC_PLACE, + EnableKafkaEntityProcessorTest.TOPIC_PRODUCT, EnableKafkaEntityProcessorTest.TOPIC_USER, + EnableKafkaEntityProcessorTest.TOPIC_STUDENT }, partitions = 1, brokerProperties = { + "offsets.topic.replication.factor=1", "offset.storage.replication.factor=1", + "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) +public class EnableKafkaEntityProcessorTest { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @Autowired + private OtherKafkaEntityProcessorService processorService; + + @Autowired + private KafkaEntityDefaultConfiguration kafkaEntityDefaultConfiguration; + + public static final String TOPIC_PRODUCT = "PRODUCT"; + + public static final String TOPIC_USER = "org.springframework.kafka.entity.User"; + + public static final String TOPIC_STUDENT = "org.springframework.kafka.entity.Student"; + + public static final String TOPIC_CITY = "org.springframework.kafka.entity.City"; + + public static final String TOPIC_PLACE = "org.springframework.kafka.entity.Place"; + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendEvent() throws KafkaEntityException, InterruptedException, ExecutionException { + + Product event = new Product(); + event.setId("123456"); + RecordMetadata sendEventMetadata = processorService.sendEvent(event); + Assertions.assertNotNull(sendEventMetadata); + logger.info("sendEventMetadata: " + sendEventMetadata.offset()); + + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendUser() throws KafkaEntityException, InterruptedException, ExecutionException { + + User event = new User("abcdef"); + Assertions.assertThrows(NullPointerException.class, () -> processorService.sendUser(event)); + + KafkaEntityException ex = Assertions.assertThrows(KafkaEntityException.class, + () -> kafkaEntityDefaultConfiguration.throwFirstError()); + Assertions.assertEquals( + "org.springframework.kafka.entity.reactive.EnableKafkaEntityProcessorTest$OtherKafkaEntityProcessorService#userProcessor: org.springframework.kafka.entity.User @KafkaEntityKey is mandatory in @KafkaEntity", + ex.getMessage()); + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendStudent() throws KafkaEntityException, InterruptedException, ExecutionException { + + Student event = new Student("hrs123", 23); + RecordMetadata sendStudentMetadata = processorService.sendStudent(event); + Assertions.assertNotNull(sendStudentMetadata); + logger.info("sendStudentMetadata: " + sendStudentMetadata.offset()); + + } + + @Service + public static class OtherKafkaEntityProcessorService { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @KafkaEntityProcessor + private Processor productProcessor; + + @KafkaEntityProcessor + private Processor userProcessor; + + @KafkaEntityProcessor(transactional = false) + private KafkaProcessor studentProcessor; + + public RecordMetadata sendEvent(Product p) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + List ret = new ArrayList<>(); + productProcessor.subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnNext(RecordMetadata r) { + logger.trace("received: " + r); + ret.add(r); + countDownLatch.countDown(); + } + }); + Mono.just(p).subscribe(productProcessor); + logger.warn("waiting 30_000"); + countDownLatch.await(30, TimeUnit.SECONDS); + return ret.get(0); + } + + public RecordMetadata sendUser(User u) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + List ret = new ArrayList<>(); + userProcessor.subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnNext(RecordMetadata r) { + logger.trace("received: " + r); + ret.add(r); + countDownLatch.countDown(); + } + }); + Mono.just(u).subscribe(userProcessor); + logger.warn("waiting 30_000"); + countDownLatch.await(30, TimeUnit.SECONDS); + return ret.get(0); + } + + public RecordMetadata sendStudent(Student s) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + List ret = new ArrayList<>(); + studentProcessor.subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnNext(RecordMetadata r) { + logger.trace("received: " + r); + ret.add(r); + countDownLatch.countDown(); + } + }); + Flux.fromIterable(Arrays.asList(s)).subscribe(studentProcessor); + logger.warn("waiting 30_000"); + countDownLatch.await(30, TimeUnit.SECONDS); + return ret.get(0); + } + + } + + @Configuration + @ComponentScan(basePackageClasses = { SpringKafkaEntityProcessorTestConfiguration.class, + OtherKafkaEntityProcessorService.class }) + @EnableKafkaEntity + public static class SpringKafkaEntityProcessorTestConfiguration { + + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityPublisherTest.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityPublisherTest.java new file mode 100644 index 0000000000..0ab6262d2a --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntityPublisherTest.java @@ -0,0 +1,361 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.jspecify.annotations.NonNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.annotation.EnableKafkaEntity; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.entity.ComplexKeyRecord; +import org.springframework.kafka.entity.Place; +import org.springframework.kafka.entity.PlaceVersion; +import org.springframework.kafka.entity.reactive.EnableKafkaEntityPublisherTest.KafkaProducerConfig; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonKeySerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.LogLevels; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.stereotype.Service; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Popovics Boglarka + */ +@SpringJUnitConfig(KafkaProducerConfig.class) +@DirtiesContext +@EmbeddedKafka(topics = { EnableKafkaEntityPublisherTest.TOPIC_PLACE, + EnableKafkaEntityPublisherTest.TOPIC_PLACEVERSION }, partitions = 1, brokerProperties = { + "offsets.topic.replication.factor=1", "offset.storage.replication.factor=1", + "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) +public class EnableKafkaEntityPublisherTest { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @Autowired + private ExampleKafkaEntityPublisherService placePublisherService; + + @Autowired + private ExampleKafkaEntityPlaceVersionPublisherService placeVersionPublisherService; + + @Autowired + private KafkaTemplate kafkaProducer; + + @Autowired + private KafkaTemplate kafkaPlaceVersionProducer; + + public static final String TOPIC_PLACE = "org.springframework.kafka.entity.Place"; + + public static final String TOPIC_PLACEVERSION = "org.springframework.kafka.entity.PlaceVersion"; + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendEvent_PlaceVersion() throws Exception { + + List eventList = new ArrayList<>(); + + List sending = List.of( + new PlaceVersion(new ComplexKeyRecord(UUID.randomUUID(), LocalDateTime.now()), "absolute now"), + new PlaceVersion(new ComplexKeyRecord(UUID.fromString("ee3968cc-3d74-44c0-8645-31e3546c963f"), + LocalDateTime.of(2025, 04, 21, 12, 24)), "fix date and uuid")); + + CountDownLatch placeVersionReceivedCounter = new CountDownLatch(sending.size()); + Publisher placePublisher = placeVersionPublisherService.getPlaceVersionPublisher(); + Assertions.assertNotNull(placePublisher); + @NonNull + BaseSubscriber connect = new BaseSubscriber<>() { + @Override + protected void hookOnNext(PlaceVersion r) { + logger.info("received: " + r); + eventList.add(r); + + placeVersionReceivedCounter.countDown(); + } + }; + placePublisher.subscribe(connect); + + CountDownLatch placeVersionSentCounter = new CountDownLatch(sending.size()); + publishPlaceVersionMessages(placeVersionSentCounter, 0, sending); + + logger.warn("waiting maximum 60_000"); + placeVersionSentCounter.await(60, TimeUnit.SECONDS); + placeVersionReceivedCounter.await(60, TimeUnit.SECONDS); + + logger.warn("asserting"); + Assertions.assertEquals(sending, eventList); + + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendEvent_Place() throws Exception { + + List eventList = new ArrayList<>(); + List eventListOther = new ArrayList<>(); + List eventListBefore = new ArrayList<>(); + List eventListThird = new ArrayList<>(); + + int sendingFirstCount = 2; + int sendingSecondCount = 3; + + CountDownLatch beforeCounter = new CountDownLatch(sendingFirstCount + sendingSecondCount); + Publisher placePublisherBefore = placePublisherService.getPlacePublisherBefore(); + Assertions.assertNotNull(placePublisherBefore); + @NonNull + BaseSubscriber connectBefore = new BaseSubscriber<>() { + @Override + protected void hookOnNext(Place r) { + logger.info("received-Before: " + r); + eventListBefore.add(r); + + beforeCounter.countDown(); + } + }; + placePublisherBefore.subscribe(connectBefore); + + CountDownLatch receivedCounter = new CountDownLatch(sendingFirstCount); + Flux placePublisher = placePublisherService.getPlacePublisher(); + + Disposable connect1 = placePublisher.subscribe(r -> { + logger.info("received: " + r); + eventList.add(r); + receivedCounter.countDown(); + }); + + logger.warn("waiting 30_0000"); + Thread.sleep(30_000); + + CountDownLatch sentCounter = new CountDownLatch(sendingFirstCount); + publishMessages(sentCounter, 100, sendingFirstCount); + logger.warn("sentCounter.await() maximum 40_000"); + sentCounter.await(40, TimeUnit.SECONDS); + + logger.warn("waiting maximum 40_000"); + receivedCounter.await(40, TimeUnit.SECONDS); + connect1.dispose(); + + Publisher placePublisherOther = placePublisherService.getPlacePublisherOther(); + @NonNull + BaseSubscriber connect2 = new BaseSubscriber<>() { + @Override + protected void hookOnNext(Place r) { + logger.info("received-other: " + r); + eventListOther.add(r); + } + }; + placePublisherOther.subscribe(connect2); + + logger.warn("waiting 30_000"); + Thread.sleep(30_000); + connect2.dispose(); + + CountDownLatch thirdCounter = new CountDownLatch(sendingSecondCount); + Publisher placePublisherThird = placePublisherService.getPlacePublisherThird(); + @NonNull + BaseSubscriber connectThird = new BaseSubscriber<>() { + @Override + protected void hookOnNext(Place r) { + logger.info("received-Third: " + r); + eventListThird.add(r); + thirdCounter.countDown(); + } + }; + placePublisherThird.subscribe(connectThird); + + CountDownLatch sentCounterSecond = new CountDownLatch(sendingSecondCount); + // send events + publishMessages(sentCounterSecond, 200, sendingSecondCount); + logger.warn("sentCounterSecond.await() max 30_000"); + sentCounterSecond.await(30, TimeUnit.SECONDS); + + logger.warn("waiting maximum 90_000"); + thirdCounter.await(90, TimeUnit.SECONDS); + + logger.warn("waiting maximum 120_000"); + beforeCounter.await(120, TimeUnit.SECONDS); + + logger.info("eventList : " + eventList); + logger.info("eventListOther : " + eventListOther); + logger.info("eventListBefore: " + eventListBefore); + logger.info("eventListThird : " + eventListThird); + Assertions.assertEquals(sendingFirstCount, eventList.size(), "eventList"); + Assertions.assertEquals(0, eventListOther.size(), "eventListOther"); + Assertions.assertEquals(sendingFirstCount + sendingSecondCount, eventListBefore.size(), "eventListBefore"); + Assertions.assertEquals(sendingSecondCount, eventListThird.size(), "eventListThird"); + + connectThird.dispose(); + connectBefore.dispose(); + } + + void publishMessages(CountDownLatch sentCounter, int i, int count) throws Exception { + + logger.warn("publishMessages " + i + " " + count); + + for (int placeInt = 0; placeInt < count; placeInt++) { + + Place p2 = new Place("p" + (i++) + "id"); + ProducerRecord p2Record = new ProducerRecord<>(TOPIC_PLACE, p2.id(), p2); + sendPlace(sentCounter, p2Record); + } + } + + void publishPlaceVersionMessages(CountDownLatch sentCounter, int i, List placeVersionList) + throws Exception { + + logger.warn("publishPlaceVersionMessages " + i + " " + placeVersionList.size()); + + for (PlaceVersion pV : placeVersionList) { + + ProducerRecord p2Record = new ProducerRecord<>(TOPIC_PLACEVERSION, + pV.getComplexKeyRecord(), pV); + sendPlaceVersion(sentCounter, p2Record); + } + } + + private void sendPlace(CountDownLatch sentCounter, ProducerRecord record) + throws InterruptedException, ExecutionException { + CompletableFuture> sendingIntoTheFuture = kafkaProducer.send(record); + + sendingIntoTheFuture.get(); + while (!sendingIntoTheFuture.isDone()) { + logger.info("waiting"); + } + sentCounter.countDown(); + } + + private void sendPlaceVersion(CountDownLatch sentCounter, ProducerRecord record) + throws InterruptedException, ExecutionException { + CompletableFuture> sendingIntoTheFuture = kafkaPlaceVersionProducer + .send(record); + + sendingIntoTheFuture.get(); + while (!sendingIntoTheFuture.isDone()) { + logger.info("waiting"); + } + sentCounter.countDown(); + } + + @Service + public static class ExampleKafkaEntityPublisherService { + + @KafkaEntityPublisher + private Flux placePublisher; + + @KafkaEntityPublisher + private Publisher placePublisherOther; + + @KafkaEntityPublisher + private Publisher placePublisherThird; + + @KafkaEntityPublisher + private Publisher placePublisherBefore; + + public Flux getPlacePublisher() { + return placePublisher.log(); + } + + public Publisher getPlacePublisherOther() { + return placePublisherOther; + } + + public Publisher getPlacePublisherThird() { + return placePublisherThird; + } + + public Publisher getPlacePublisherBefore() { + return placePublisherBefore; + } + + } + + @Service + public static class ExampleKafkaEntityPlaceVersionPublisherService { + + @KafkaEntityPublisher + private Publisher placeVersionPublisher; + + public Publisher getPlaceVersionPublisher() { + return placeVersionPublisher; + } + + } + + @Configuration + @ComponentScan(basePackageClasses = { ExampleKafkaEntityPublisherService.class, + ExampleKafkaEntityPlaceVersionPublisherService.class }) + @EnableKafkaEntity + public static class KafkaProducerConfig { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Bean + public ProducerFactory producerFactory() { + + Map config = KafkaTestUtils.producerProps(embeddedKafka); + + return new DefaultKafkaProducerFactory<>(config, new JsonKeySerializer(), + new JsonSerializer()); + } + + @Bean + public KafkaTemplate kafkaProducer() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory placeVersionProducerFactory() { + Map config = KafkaTestUtils.producerProps(embeddedKafka); + + return new DefaultKafkaProducerFactory<>(config, new JsonKeySerializer(), + new JsonSerializer()); + } + + @Bean + public KafkaTemplate kafkaPlaceVerionProducer() { + return new KafkaTemplate<>(placeVersionProducerFactory()); + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntitySubscriberTest.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntitySubscriberTest.java new file mode 100644 index 0000000000..97e84ec3e5 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/EnableKafkaEntitySubscriberTest.java @@ -0,0 +1,216 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.LogFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.annotation.EnableKafkaEntity; +import org.springframework.kafka.entity.City; +import org.springframework.kafka.entity.CityGroup; +import org.springframework.kafka.entity.ComplexKey; +import org.springframework.kafka.entity.reactive.EnableKafkaEntitySubscriberTest.SpringKafkaEntitySubscriberTestConfiguration; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.LogLevels; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.stereotype.Service; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Popovics Boglarka + */ +@SpringJUnitConfig(SpringKafkaEntitySubscriberTestConfiguration.class) +@DirtiesContext +@EmbeddedKafka(topics = {EnableKafkaEntitySubscriberTest.TOPIC_CITY, EnableKafkaEntitySubscriberTest.TOPIC_CITYGROUP}, partitions = 1, brokerProperties = { + "offsets.topic.replication.factor=1", + "offset.storage.replication.factor=1", "transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1" }) +public class EnableKafkaEntitySubscriberTest { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @Autowired + private ExampleKafkaEntitySubscriberService subscriber; + + @Autowired + private ExampleKafkaEntitySubscriberNoTransactionService subscriberNoTr; + + @Autowired + private ExampleKafkaEntitySubscriberCityGroupService subscriberCityGroup; + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + public static final String TOPIC_CITY = "org.springframework.kafka.entity.City"; + + public static final String TOPIC_CITYGROUP = "org.springframework.kafka.entity.CityGroup"; + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendCity() throws Exception { + + List eventList = subscriber.getInput(); + Subscriber productSubscriber = subscriber.getCitySubscriber(); + + Assertions.assertNotNull(productSubscriber); + + CountDownLatch consumerInitialized = new CountDownLatch(1); + CountDownLatch consumerFinished = new CountDownLatch(1); + + AtomicInteger sum = new AtomicInteger(); + new Thread(new TestConsumerRunnable("test_sendCity()", eventList.size(), TOPIC_CITY, embeddedKafka, + consumerInitialized, consumerFinished, sum, City.class, String.class)).start(); + + logger.warn("waiting maximum 200_000"); + consumerInitialized.await(200, TimeUnit.SECONDS); + + Flux.fromIterable(eventList).log().subscribe(productSubscriber); + + logger.warn("waiting maximum 200_000"); + consumerFinished.await(200, TimeUnit.SECONDS); + Assertions.assertEquals(eventList.size(), sum.get()); + + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendCity_no_transaction() throws Exception { + + List eventList = subscriberNoTr.getInput(); + + Subscriber productSubscriber = subscriberNoTr.getCitySubscriber(); + + Assertions.assertNotNull(productSubscriber); + + CountDownLatch consumerInitialized = new CountDownLatch(1); + CountDownLatch consumerFinished = new CountDownLatch(1); + + AtomicInteger sum = new AtomicInteger(); + new Thread(new TestConsumerRunnable("test_sendCity_no_transaction()", eventList.size(), TOPIC_CITY, embeddedKafka, + consumerInitialized, consumerFinished, sum, City.class, String.class)).start(); + + logger.warn("waiting maximum 200_000"); + consumerInitialized.await(200, TimeUnit.SECONDS); + + Flux.fromIterable(eventList).log().subscribe(productSubscriber); + + logger.warn("waiting maximum 200_000"); + consumerFinished.await(200, TimeUnit.SECONDS); + Assertions.assertEquals(eventList.size(), sum.get()); + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendCityGroup_with_ComplexKey() throws Exception { + + List eventList = subscriberCityGroup.getInput(); + Subscriber productSubscriber = subscriberCityGroup.getCityGroupSubscriber(); + + Assertions.assertNotNull(productSubscriber); + + CountDownLatch consumerInitialized = new CountDownLatch(1); + CountDownLatch consumerFinished = new CountDownLatch(1); + + AtomicInteger sum = new AtomicInteger(); + new Thread(new TestConsumerRunnable("test_sendCityGroup_with_ComplexKey()", eventList.size(), TOPIC_CITYGROUP, embeddedKafka, + consumerInitialized, consumerFinished, sum, CityGroup.class, ComplexKey.class)).start(); + + logger.warn("waiting maximum 200_000"); + consumerInitialized.await(200, TimeUnit.SECONDS); + + Flux.fromIterable(eventList).log().subscribe(productSubscriber); + + logger.warn("waiting maximum 200_000"); + consumerFinished.await(200, TimeUnit.SECONDS); + Assertions.assertEquals(eventList.size(), sum.get()); + } + + @Service + public static class ExampleKafkaEntitySubscriberNoTransactionService { + + @KafkaEntitySubscriber(transactional = false) + private Subscriber citySubscriber; + + private List input = List.of(new City("Debrecen"), new City("Linz"), new City("Szeged")); + + public Subscriber getCitySubscriber() { + return citySubscriber; + } + + public List getInput() { + return input; + } + + } + + @Service + public static class ExampleKafkaEntitySubscriberService { + + @KafkaEntitySubscriber + private Subscriber citySubscriber; + + private List input = List.of(new City("Budapest"), new City("Wien")); + + public Subscriber getCitySubscriber() { + return citySubscriber; + } + + public List getInput() { + return input; + } + + } + + @Service + public static class ExampleKafkaEntitySubscriberCityGroupService { + @KafkaEntitySubscriber + private Subscriber cityGroupSubscriber; + + private List input = List.of(new CityGroup(new ComplexKey(111, "group1"), "Hungary"), new CityGroup(new ComplexKey(6, "aus"), "Austria"), new CityGroup(new ComplexKey(40, "40"), "Germany")); + + public Subscriber getCityGroupSubscriber() { + return cityGroupSubscriber; + } + + public List getInput() { + return input; + } + + } + + @Configuration + @ComponentScan(basePackageClasses = { ExampleKafkaEntitySubscriberNoTransactionService.class, + ExampleKafkaEntitySubscriberService.class, ExampleKafkaEntitySubscriberCityGroupService.class }) + @EnableKafkaEntity + public static class SpringKafkaEntitySubscriberTestConfiguration { + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessorTest.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessorTest.java new file mode 100644 index 0000000000..8921ee24ec --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityProcessorTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.annotation.Annotation; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Processor; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.annotation.EnableKafkaEntity; +import org.springframework.kafka.entity.KafkaEntityException; +import org.springframework.kafka.entity.Product; +import org.springframework.kafka.entity.Student; +import org.springframework.kafka.entity.User; +import org.springframework.kafka.entity.reactive.SimpleKafkaEntityProcessorTest.SpringKafkaEntityProcessorTestConfiguration; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.LogLevels; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Popovics Boglarka + */ +@SpringJUnitConfig(SpringKafkaEntityProcessorTestConfiguration.class) +@DirtiesContext +@EmbeddedKafka(topics = { SimpleKafkaEntityProcessorTest.TOPIC_PRODUCT, SimpleKafkaEntityProcessorTest.TOPIC_USER, + SimpleKafkaEntityProcessorTest.TOPIC_STUDENT }, partitions = 1, brokerProperties = { + "offsets.topic.replication.factor=1", "offset.storage.replication.factor=1", + "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) +public class SimpleKafkaEntityProcessorTest { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + public static final String TOPIC_PRODUCT = "PRODUCT"; + + public static final String TOPIC_USER = "org.springframework.kafka.entity.User"; + + public static final String TOPIC_STUDENT = "org.springframework.kafka.entity.Student"; + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendEvent() throws KafkaEntityException, InterruptedException, ExecutionException { + + Product event = new Product(); + event.setId("123456"); + + Processor productProcessor = new SimpleKafkaEntityProcessor( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntityProcessor() { + + @Override + public Class annotationType() { + return KafkaEntityProcessor.class; + } + + @Override + public boolean transactional() { + return false; + } + + }, Product.class, "test_sendEvent"); + + CountDownLatch countDownLatch = new CountDownLatch(1); + List ret = new ArrayList<>(); + productProcessor.subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnNext(RecordMetadata r) { + logger.trace("received: " + r); + ret.add(r); + countDownLatch.countDown(); + } + }); + Mono.just(event).subscribe(productProcessor); + logger.warn("waiting 30_000"); + countDownLatch.await(30, TimeUnit.SECONDS); + RecordMetadata sendEventMetadata = ret.get(0); + + Assertions.assertNotNull(sendEventMetadata); + logger.info("sendEventMetadata: " + sendEventMetadata.offset()); + + Assertions.assertEquals(TOPIC_PRODUCT, sendEventMetadata.topic()); + + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendUser() throws KafkaEntityException, InterruptedException, ExecutionException { + + User event = new User("abcdef"); + + Processor userProcessor = new SimpleKafkaEntityProcessor( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntityProcessor() { + + @Override + public Class annotationType() { + return KafkaEntityProcessor.class; + } + + @Override + public boolean transactional() { + return false; + } + + }, User.class, "test_sendUser"); + + CountDownLatch countDownLatch = new CountDownLatch(1); + List ret = new ArrayList<>(); + userProcessor.subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnNext(RecordMetadata r) { + logger.trace("received: " + r); + ret.add(r); + countDownLatch.countDown(); + } + }); + Mono.just(event).subscribe(userProcessor); + logger.warn("waiting 5_000"); + countDownLatch.await(5, TimeUnit.SECONDS); + + // it has no KafkaEntityKey + Assertions.assertEquals(0, ret.size()); + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendStudent() throws KafkaEntityException, InterruptedException, ExecutionException { + + Student event = new Student("hrs123", 23); + + Processor studentProcessor = new SimpleKafkaEntityProcessor( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntityProcessor() { + + @Override + public Class annotationType() { + return KafkaEntityProcessor.class; + } + + @Override + public boolean transactional() { + return false; + } + + }, Student.class, "test_sendStudent"); + + CountDownLatch countDownLatch = new CountDownLatch(1); + List ret = new ArrayList<>(); + studentProcessor.subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnNext(RecordMetadata r) { + logger.trace("received: " + r); + ret.add(r); + countDownLatch.countDown(); + } + }); + Flux.fromIterable(Arrays.asList(event)).subscribe(studentProcessor); + logger.warn("waiting 30_000"); + countDownLatch.await(30, TimeUnit.SECONDS); + RecordMetadata sendStudentMetadata = ret.get(0); + + Assertions.assertNotNull(sendStudentMetadata); + logger.info("sendStudentMetadata: " + sendStudentMetadata.offset()); + + Assertions.assertEquals(TOPIC_STUDENT, sendStudentMetadata.topic()); + + } + + @Configuration + @EnableKafkaEntity + public static class SpringKafkaEntityProcessorTestConfiguration { + + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisherTest.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisherTest.java new file mode 100644 index 0000000000..e0315268aa --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntityPublisherTest.java @@ -0,0 +1,170 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.annotation.Annotation; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.jspecify.annotations.NonNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.BaseSubscriber; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.entity.ComplexKeyRecord; +import org.springframework.kafka.entity.PlaceVersion; +import org.springframework.kafka.entity.reactive.SimpleKafkaEntityPublisherTest.KafkaProducerConfig; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonKeySerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.LogLevels; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Popovics Boglarka + */ +@SpringJUnitConfig(KafkaProducerConfig.class) +@DirtiesContext +@EmbeddedKafka(topics = { SimpleKafkaEntityPublisherTest.TOPIC_PLACEVERSION }, partitions = 1, brokerProperties = { + "offsets.topic.replication.factor=1", "offset.storage.replication.factor=1", + "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) +public class SimpleKafkaEntityPublisherTest { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Autowired + private KafkaTemplate kafkaPlaceVersionProducer; + + public static final String TOPIC_PLACEVERSION = "org.springframework.kafka.entity.PlaceVersion"; + + void publishPlaceVersionMessages(CountDownLatch sentCounter, int i, List placeVersionList) + throws Exception { + + logger.warn("publishPlaceVersionMessages " + i + " " + placeVersionList.size()); + + for (PlaceVersion pV : placeVersionList) { + + ProducerRecord p2Record = new ProducerRecord<>(TOPIC_PLACEVERSION, + pV.getComplexKeyRecord(), pV); + sendPlaceVersion(sentCounter, p2Record); + } + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendEvent_PlaceVersion() throws Exception { + + List eventList = new ArrayList<>(); + + List sending = List.of( + new PlaceVersion(new ComplexKeyRecord(UUID.randomUUID(), LocalDateTime.now()), "absolute now"), + new PlaceVersion(new ComplexKeyRecord(UUID.fromString("ee3968cc-3d74-44c0-8645-31e3546c963f"), + LocalDateTime.of(2025, 04, 21, 12, 24)), "fix date and uuid")); + + CountDownLatch placeVersionReceivedCounter = new CountDownLatch(sending.size()); + Publisher placeVersionPublisher = new SimpleKafkaEntityPublisher( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntityPublisher() { + + @Override + public Class annotationType() { + return KafkaEntityPublisher.class; + } + + }, PlaceVersion.class, "test_sendEvent_PlaceVersion"); + Assertions.assertNotNull(placeVersionPublisher); + @NonNull + BaseSubscriber connect = new BaseSubscriber<>() { + @Override + protected void hookOnNext(PlaceVersion r) { + logger.info("received: " + r); + eventList.add(r); + + placeVersionReceivedCounter.countDown(); + } + }; + placeVersionPublisher.subscribe(connect); + + CountDownLatch placeVersionSentCounter = new CountDownLatch(sending.size()); + + publishPlaceVersionMessages(placeVersionSentCounter, 0, sending); + + logger.warn("waiting maximum 90_000"); + placeVersionSentCounter.await(90, TimeUnit.SECONDS); + placeVersionReceivedCounter.await(90, TimeUnit.SECONDS); + + logger.warn("asserting"); + Assertions.assertEquals(sending, eventList); + + } + + private void sendPlaceVersion(CountDownLatch sentCounter, ProducerRecord record) + throws InterruptedException, ExecutionException { + CompletableFuture> sendingIntoTheFuture = kafkaPlaceVersionProducer + .send(record); + + sendingIntoTheFuture.get(); + while (!sendingIntoTheFuture.isDone()) { + logger.info("waiting"); + } + sentCounter.countDown(); + } + + @Configuration + public static class KafkaProducerConfig { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Bean + public ProducerFactory placeVersionProducerFactory() { + Map config = KafkaTestUtils.producerProps(embeddedKafka); + + return new DefaultKafkaProducerFactory<>(config, new JsonKeySerializer(), + new JsonSerializer()); + } + + @Bean + public KafkaTemplate kafkaPlaceVerionProducer() { + return new KafkaTemplate<>(placeVersionProducerFactory()); + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriberTest.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriberTest.java new file mode 100644 index 0000000000..b0c8fa05dd --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/SimpleKafkaEntitySubscriberTest.java @@ -0,0 +1,177 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.lang.annotation.Annotation; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.LogFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.entity.City; +import org.springframework.kafka.entity.CityGroup; +import org.springframework.kafka.entity.ComplexKey; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.LogLevels; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Popovics Boglarka + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = { SimpleKafkaEntitySubscriberTest.TOPIC_CITY, + SimpleKafkaEntitySubscriberTest.TOPIC_CITYGROUP }, partitions = 1, brokerProperties = { + "offsets.topic.replication.factor=1", "offset.storage.replication.factor=1", + "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) +public class SimpleKafkaEntitySubscriberTest { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + public static final String TOPIC_CITY = "org.springframework.kafka.entity.City"; + + public static final String TOPIC_CITYGROUP = "org.springframework.kafka.entity.CityGroup"; + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendCity() throws Exception { + + List eventList = List.of(new City("Budapest"), new City("Wien")); + + CountDownLatch consumerInitialized = new CountDownLatch(1); + CountDownLatch consumerFinished = new CountDownLatch(1); + + AtomicInteger sum = new AtomicInteger(); + new Thread(new TestConsumerRunnable("test_sendCity()", eventList.size(), TOPIC_CITY, + embeddedKafka, consumerInitialized, consumerFinished, sum, City.class, String.class)).start(); + + logger.warn("waiting maximum 200_000"); + consumerInitialized.await(200, TimeUnit.SECONDS); + + Subscriber productSubscriber = new SimpleKafkaEntitySubscriber( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntitySubscriber() { + + @Override + public Class annotationType() { + return KafkaEntitySubscriber.class; + } + + @Override + public boolean transactional() { + return false; + } + }, City.class, "test_sendCity"); + + Flux.fromIterable(eventList).subscribe(productSubscriber); + + logger.warn("waiting maximum 200_000"); + consumerFinished.await(200, TimeUnit.SECONDS); + Assertions.assertEquals(eventList.size(), sum.get()); + + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendCity_no_transaction() throws Exception { + + List eventList = List.of(new City("Budapest"), new City("Hamburg")); + + CountDownLatch consumerInitialized = new CountDownLatch(1); + CountDownLatch consumerFinished = new CountDownLatch(1); + + AtomicInteger sum = new AtomicInteger(); + new Thread(new TestConsumerRunnable("test_sendCity()", eventList.size(), TOPIC_CITY, + embeddedKafka, consumerInitialized, consumerFinished, sum, City.class, String.class)).start(); + + logger.warn("waiting maximum 200_000"); + consumerInitialized.await(200, TimeUnit.SECONDS); + + Subscriber productSubscriber = new SimpleKafkaEntitySubscriber( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntitySubscriber() { + + @Override + public Class annotationType() { + return KafkaEntitySubscriber.class; + } + + @Override + public boolean transactional() { + return false; + } + }, City.class, "test_sendCity_no_transaction"); + + Flux.fromIterable(eventList).subscribe(productSubscriber); + + logger.warn("waiting maximum 200_000"); + consumerFinished.await(200, TimeUnit.SECONDS); + Assertions.assertEquals(eventList.size(), sum.get()); + } + + @LogLevels(categories = { "org.springframework.kafka.entity", "reactor.core.publisher" }, level = "TRACE") + @Test + public void test_sendCityGroup_with_ComplexKey() throws Exception { + + List eventList = List.of(new CityGroup(new ComplexKey(111, "group1"), "Hungary"), + new CityGroup(new ComplexKey(6, "aus"), "Austria"), new CityGroup(new ComplexKey(40, "40"), "Germany")); + + CountDownLatch consumerInitialized = new CountDownLatch(1); + CountDownLatch consumerFinished = new CountDownLatch(1); + + AtomicInteger sum = new AtomicInteger(); + new Thread(new TestConsumerRunnable("test_sendCityGroup_with_ComplexKey()", + eventList.size(), TOPIC_CITYGROUP, embeddedKafka, consumerInitialized, consumerFinished, sum, + CityGroup.class, ComplexKey.class)).start(); + + logger.warn("waiting maximum 200_000"); + consumerInitialized.await(200, TimeUnit.SECONDS); + + Subscriber productSubscriber = new SimpleKafkaEntitySubscriber( + List.of(embeddedKafka.getBrokersAsString()), new KafkaEntitySubscriber() { + + @Override + public Class annotationType() { + return KafkaEntitySubscriber.class; + } + + @Override + public boolean transactional() { + return false; + } + }, CityGroup.class, "test_sendCityGroup_with_ComplexKey"); + + Flux.fromIterable(eventList).subscribe(productSubscriber); + + logger.warn("waiting maximum 200_000"); + consumerFinished.await(200, TimeUnit.SECONDS); + Assertions.assertEquals(eventList.size(), sum.get()); + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/TestConsumerRunnable.java b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/TestConsumerRunnable.java new file mode 100644 index 0000000000..fc944e9e09 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/entity/reactive/TestConsumerRunnable.java @@ -0,0 +1,134 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.entity.reactive; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonKeyDeserializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +/** + * @param class of the entity, representing messages + * @param the key from the entity + * + * @author Popovics Boglarka + */ +public class TestConsumerRunnable implements Runnable { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private String testName; + + private int expectedSum; + + private String topic; + + private EmbeddedKafkaBroker embeddedKafka; + + private CountDownLatch consumerInitialized; + + private CountDownLatch consumerFinished; + + private AtomicInteger sum; + + private Class valueClazz; + + private Class keyClazz; + + public TestConsumerRunnable(String testName, int expectedSum, String topic, EmbeddedKafkaBroker embeddedKafka, + CountDownLatch consumerInitialized, CountDownLatch consumerFinished, AtomicInteger sum, Class valueClazz, + Class keyClazz) { + super(); + this.testName = testName; + this.expectedSum = expectedSum; + this.topic = topic; + this.embeddedKafka = embeddedKafka; + this.consumerInitialized = consumerInitialized; + this.consumerFinished = consumerFinished; + this.sum = sum; + this.valueClazz = valueClazz; + this.keyClazz = keyClazz; + } + + @Override + public void run() { + + Map consumerProps = KafkaTestUtils.consumerProps(this.getClass().getName(), "false", + embeddedKafka); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + JsonDeserializer valueDeserializer = new JsonDeserializer<>(valueClazz); + JsonKeyDeserializer keyDeserializer = new JsonKeyDeserializer<>(keyClazz); + + valueDeserializer.addTrustedPackages(valueClazz.getPackageName()); + keyDeserializer.addTrustedPackages(keyClazz.getPackageName()); + + try (KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProps, keyDeserializer, + valueDeserializer);) { + kafkaConsumer.subscribe(List.of(topic)); + + kafkaConsumer.poll(Duration.ofSeconds(10L)); + + kafkaConsumer.seekToEnd(Collections.emptyList()); + kafkaConsumer.commitSync(); + // wait until kafkaConsumer is ready and offset setted + LocalDateTime then = LocalDateTime.now(); + while (kafkaConsumer.committed(kafkaConsumer.assignment()).isEmpty()) { + if (ChronoUnit.SECONDS.between(then, LocalDateTime.now()) >= 100) { + throw new RuntimeException("KafkaConsumer is not ready in 100sec."); + } + } + if (logger.isDebugEnabled()) { + logger.debug("started " + testName + "..."); + } + + do { + if (ChronoUnit.SECONDS.between(then, LocalDateTime.now()) >= 180) { + throw new RuntimeException("KafkaConsumer polling is timed out."); + } + ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(10)); + if (logger.isTraceEnabled()) { + poll.forEach(cr -> logger.trace("polled: " + cr.value())); + } + sum.addAndGet(poll.count()); + if (logger.isTraceEnabled()) { + logger.trace("polling... sum=" + sum); + } + consumerInitialized.countDown(); + } while (sum.get() < expectedSum); + + kafkaConsumer.unsubscribe(); + } + finally { + consumerFinished.countDown(); + } + } +}