Skip to content

Support reactive KafkaEntity #3862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Csini
Copy link

@Csini Csini commented Apr 22, 2025

Hi Spring Team,
I'm a big fan of your work. I've recently made some projects with apache kafka and found your spring-kafka library very impressive. I thought a reactive entity approach to reading/writing the kafka topics can be a good new feature. Ealier I made a spring-kafka-extension library, now I thought, I create a PR.

Imagine that you have per Topic one KafkaEntity and you can read (consume) and write (produce) them performant, easily and reactive.

When enabling with @EnableKafkaEntity than it create automatic a Publisher, a Subscriber or a Processor to every KafkaEntity! All you need to do is to use the custom @KafkaEntity, @KafkaEntityKey, @KafkaEntityPublisher, @KafkaEntitySubscriber and @KafkaEntityProcessor annotations!

support-reactive-kafkaentity

how to use:

enable KafkaEntity

@Configuration
@EnableKafkaEntity
public class KafkaEntityEnvironment {
}

define per topic a KafkaEntity

one class or record and mark with @KafkaEntity (mark a field with @KafkaEntityKey)
Topic name will be the name of the entity class with included packagename
but you can use custom Topic name like this @KafkaEntity(customTopicName = "PRODUCT")

for example:

package net.csini.spring.kafka.entity;

import org.springframework.kafka.entity.KafkaEntity;
import org.springframework.kafka.entity.KafkaEntityKey;

@KafkaEntity
public record Student(@KafkaEntityKey String studentid, int age) {

}

Topic name will be net.csini.spring.kafka.entity.Student

package net.csini.spring.topic;

import org.springframework.kafka.entity.KafkaEntity;

@KafkaEntity
public class Product {

	@org.springframework.kafka.entity.KafkaEntityKey
	private String id;

	private String title;

	private String description;
	
}

Topic name will be net.csini.spring.topic.Product

write data to a topic (produce)

in a Spring Bean just inject a KafkaEntitySubscriber (we assume, that City is as KafkaEntity defined)

default is transactional=true

import java.util.List;

import net.csini.spring.kafka.entity.City;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

import org.springframework.stereotype.Service;
import org.springframework.kafka.entity.KafkaEntitySubscriber;

@Service
public class ExampleKafkaEntitySubscriberService {
	
	@KafkaEntitySubscriber
	private Subscriber<City> citySubscriber;
	
	private List<City> input = List.of(new City("Budapest"), new City("Wien"));

	public void sendCitiesToKafkaTopic(){
		Flux.fromIterable(input).subscribe(citySubscriber);
	}
}

if you need the RecordMetadata from the Kafka Message than you can use KafkaEntityProcessor

in a Spring Bean just inject a KafkaEntityProcessor (we assume, that User is defined as a KafkaEntity)

default is transactional=true

import java.util.ArrayList;
import java.util.List;

import net.csini.spring.kafka.entity.User;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.reactivestreams.Processor;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

import org.springframework.stereotype.Service;
import org.springframework.kafka.entity.KafkaProcessor;
import org.springframework.kafka.entity.KafkaEntityProcessor;

@Service
public class OtherKafkaEntityProcessorService {

	@KafkaEntityProcessor(transactional = false)
	private KafkaProcessor<User> userProcessor;

	public RecordMetadata sendUserToKafkaTopic(User u) {
		List<RecordMetadata> ret = new ArrayList<>();
		userProcessor.subscribe(new BaseSubscriber<>() {
			@Override
			protected void hookOnNext(RecordMetadata r) {
				ret.add(r);
			}
		});
		Mono.just(u).subscribe(userProcessor);

                // implement to wait for receive data, than return

		return ret.get(0);
	}

}

read data from a topic (consume)

in a Spring Bean just inject a KafkaEntityPublisher (we assume Place is defined as a KafkaEntity)

import net.csini.spring.kafka.entity.Place;
import reactor.core.publisher.BaseSubscriber;
import org.reactivestreams.Publisher;

import org.springframework.stereotype.Service;
import org.springframework.kafka.entity.KafkaEntityPublisher;


@Service
public class ExampleKafkaEntityPublisherService {

	@KafkaEntityPublisher
	private Publisher<Place> placePublisher;
	
	public void readPlacesFromKafkaTopic() throws Exception {

		List<Place> eventList = new ArrayList<>();
		
	        placePublisher.subscribe(new BaseSubscriber<>() {
			@Override
			protected void hookOnNext(Place r) {
				eventList.add(r);
			}
		});
	}
}

@sobychacko
Copy link
Contributor

@Csini Thanks for this effort. Since this is a reactive style new feature that you are trying to add, my initial instinct is to keep this out of the core Spring Kafka project and keep it as a standalone component that you can manage/maintain separately. Even though I see that you are relying on the optional reactive API's provided by Spring Kafka, it might be too much maintenance for us to add this to the core framework. We can certainly consider how we can support these types of use cases via documentation and examples. We will internally discuss this and get back to you on which direction we want to go before we start reviewing. Either way, this is a reasonable effort that can be beneficial to the community, even if via a standalone project. Stay tuned. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants