|
| 1 | +## vertx-spring-boot-starter-kafka |
| 2 | + |
| 3 | +Vert.x Spring Boot Kafka starter implements a Reactor API for the Vert.x Kafka client. |
| 4 | + |
| 5 | +### Usage |
| 6 | + |
| 7 | +Add the starter dependency to your `pom.xml`. |
| 8 | +```xml |
| 9 | +<dependency> |
| 10 | + <groupId>dev.snowdrop</groupId> |
| 11 | + <artifactId>vertx-spring-boot-starter-kafka</artifactId> |
| 12 | +</dependency> |
| 13 | +``` |
| 14 | + |
| 15 | +#### Sending messages |
| 16 | + |
| 17 | +Use a `ProducerRecord` builder to create a producer record. |
| 18 | +```java |
| 19 | +ProducerRecord<String, String> record = ProducerRecord |
| 20 | + .builder("my-topic", "my-record-body", "my-record-key") |
| 21 | + .withHeader(Header.create("my-header", "my-header-value")) |
| 22 | + .build(); |
| 23 | +``` |
| 24 | + |
| 25 | +Inject a `KafkaProducerFactory` to your bean and use it to create a producer. |
| 26 | +```java |
| 27 | +KafkaProducer<String, String> producer = producerFactory.create(); |
| 28 | +``` |
| 29 | + |
| 30 | +Then send your record with the producer. |
| 31 | +```java |
| 32 | +producer.send(record); |
| 33 | +``` |
| 34 | + |
| 35 | +Producer send method returns a `Mono<RecordMetadata>`. This means that a record will not be sent until you subscribe to this |
| 36 | +`Mono`. Once the send operations will end, `Mono` will be completed with a `RecordMetadata` containing your record information |
| 37 | +such as its partition, offset, checksum etc. |
| 38 | + |
| 39 | +#### Receiving messages |
| 40 | + |
| 41 | +Inject a `KafkaConsumerFactory` to your bean and use it to create a consumer. |
| 42 | +```java |
| 43 | +KafkaConsumer<String, String> consumer = consumerFactory.create(); |
| 44 | +``` |
| 45 | + |
| 46 | +There are two steps needed to start receiving the messages. First you need to subscribe to a topic and then to a messages |
| 47 | +`Flux`. |
| 48 | +```java |
| 49 | +Disposable consumerDisposer = consumer |
| 50 | + .subscribe("my-topic") |
| 51 | + .thenMany(consumer.flux()) |
| 52 | + .subscribe(record -> System.out.printon("Received a message: " + record.value())); |
| 53 | +``` |
| 54 | + |
| 55 | +To stop receiving messages - dispose a `Flux` subscription using the `consumerDisposer`. |
| 56 | +If you want to fully unsubscribe - use `consumer.unsubscribe()` method. This method returns `Mono<Void>`, which means that you |
| 57 | +need to subscribe to this `Mono` in order to execute the command. E.g. |
| 58 | +```java |
| 59 | +consumerDisposer.dispose(); |
| 60 | +consumer.unsubscribe().block(); |
| 61 | +``` |
| 62 | + |
| 63 | +### Configuration |
| 64 | + |
| 65 | +To enable/disable Kafka starter, set a `vertx.kafka.enabled` property to `true/false` (`true` is a default value). |
| 66 | + |
| 67 | +`vertx.kafka.enabled` is the only property shared between consumers and producers. Standard Kafka consumer and producer |
| 68 | +properties are mapped via Spring properties using the respective prefixes. E.g. |
| 69 | +```properties |
| 70 | +vertx.kafka.producer.bootstrap.servers=localhost:9092 |
| 71 | +vertx.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer |
| 72 | +vertx.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer |
| 73 | +vertx.kafka.consumer.bootstrap.servers=localhost:9092 |
| 74 | +vertx.kafka.consumer.group.id=log |
| 75 | +vertx.kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer |
| 76 | +vertx.kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer |
| 77 | +``` |
| 78 | + |
| 79 | +These properties will be used to create consumers and producers using their factories. If you want to override any of |
| 80 | +the properties for a particular consumer or producer, you can pass an instance of `Map<String, String>` with overriding |
| 81 | +properties to factory's `create` method. |
0 commit comments