Skip to content

Commit 1c8816a

Browse files
Add implementation of infinite stream benchmark for reactor-kafka
1 parent 9f86c90 commit 1c8816a

File tree

7 files changed

+261
-2
lines changed

7 files changed

+261
-2
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
import static io.scalecube.benchmarks.kafka.ConfigConstants.SERVERS;
4+
5+
import io.scalecube.benchmarks.kafka.Message;
6+
import io.scalecube.benchmarks.kafka.MessageDeserializer;
7+
import io.scalecube.benchmarks.kafka.MessageSerializer;
8+
import java.util.Collections;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import org.apache.kafka.clients.consumer.ConsumerConfig;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.serialization.IntegerDeserializer;
15+
import org.apache.kafka.common.serialization.IntegerSerializer;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
import reactor.kafka.receiver.KafkaReceiver;
21+
import reactor.kafka.receiver.ReceiverOptions;
22+
import reactor.kafka.sender.KafkaSender;
23+
import reactor.kafka.sender.SenderOptions;
24+
import reactor.kafka.sender.SenderRecord;
25+
26+
public class ExampleService {
27+
28+
private static final Logger LOGGER = LoggerFactory.getLogger(ExampleService.class);
29+
30+
private static final String TOPIC_RECEIVE = "benchmark-topic-a";
31+
private static final String TOPIC_SEND = "benchmark-topic-b";
32+
private static final int MAX_IN_FLIGHT = 256;
33+
34+
public static void main(String[] args) throws InterruptedException {
35+
KafkaSender<Integer, Message> kafkaSender = kafkaSender();
36+
37+
kafkaReceiver()
38+
.receive()
39+
.log(">>>")
40+
.subscribe(
41+
r -> {
42+
kafkaSender
43+
.send(records())
44+
.then()
45+
.doOnError(e -> LOGGER.error("Sending failed", e))
46+
.doOnSuccess(s -> LOGGER.info("Sending completed"))
47+
.subscribe();
48+
r.receiverOffset().acknowledge();
49+
},
50+
th -> LOGGER.error("Exception occurred", th));
51+
52+
Thread.currentThread().join();
53+
}
54+
55+
private static KafkaReceiver<Integer, Message> kafkaReceiver() {
56+
Map<String, Object> consumerProps = new HashMap<>();
57+
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
58+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "service-consumer");
59+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
60+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
61+
62+
ReceiverOptions<Integer, Message> receiverOptions =
63+
ReceiverOptions.<Integer, Message>create(consumerProps)
64+
.subscription(Collections.singleton(TOPIC_RECEIVE));
65+
66+
return KafkaReceiver.create(receiverOptions);
67+
}
68+
69+
private static KafkaSender<Integer, Message> kafkaSender() {
70+
Map<String, Object> producerProps = new HashMap<>();
71+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
72+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
73+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessageSerializer.class);
74+
75+
SenderOptions<Integer, Message> senderOptions =
76+
SenderOptions.<Integer, Message>create(producerProps).maxInFlight(MAX_IN_FLIGHT);
77+
78+
return KafkaSender.create(senderOptions);
79+
}
80+
81+
private static Flux<SenderRecord<Integer, Message, Integer>> records() {
82+
return Mono.fromCallable(
83+
() -> {
84+
Message message = new Message(System.currentTimeMillis());
85+
int correlationMetadata = message.hashCode();
86+
ProducerRecord<Integer, Message> record =
87+
new ProducerRecord<>(TOPIC_SEND, correlationMetadata, message);
88+
return SenderRecord.create(record, correlationMetadata);
89+
})
90+
.repeat();
91+
}
92+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
final class InfiniteStreamBenchmark {
4+
5+
public static void main(String[] args) {
6+
InfiniteStreamScenario.runWith(args, ReactorKafkaState::new);
7+
}
8+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
import io.scalecube.benchmarks.BenchmarkSettings;
4+
import io.scalecube.benchmarks.kafka.Message;
5+
import io.scalecube.benchmarks.metrics.BenchmarkTimer;
6+
import java.time.Duration;
7+
import java.util.Optional;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.function.Function;
10+
import org.apache.kafka.clients.producer.ProducerRecord;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import reactor.core.publisher.Flux;
14+
import reactor.core.publisher.Mono;
15+
import reactor.kafka.sender.SenderRecord;
16+
import reactor.util.concurrent.Queues;
17+
18+
final class InfiniteStreamScenario {
19+
20+
private static final Logger LOGGER = LoggerFactory.getLogger(InfiniteStreamScenario.class);
21+
22+
private static final int DEFAULT_RATE_LIMIT = Queues.SMALL_BUFFER_SIZE;
23+
private static final int MAX_IN_FLIGHT = 2048;
24+
private static final String RATE_LIMIT = "rateLimit";
25+
private static final String TOPIC_SEND = "benchmark-topic-a";
26+
27+
public static void runWith(
28+
String[] args, Function<BenchmarkSettings, ReactorKafkaState> stateSupplier) {
29+
30+
int numOfThreads = Runtime.getRuntime().availableProcessors();
31+
Duration rampUpDuration = Duration.ofSeconds(numOfThreads);
32+
33+
BenchmarkSettings settings =
34+
BenchmarkSettings.from(args)
35+
.injectors(1)
36+
.messageRate(1) // workaround
37+
.rampUpDuration(rampUpDuration)
38+
.durationUnit(TimeUnit.MILLISECONDS)
39+
.build();
40+
41+
int rateLimit = rateLimit(settings);
42+
43+
stateSupplier
44+
.apply(settings)
45+
.runWithRampUp(
46+
(rampUpTick, state) -> Mono.just(state.kafkaSender(MAX_IN_FLIGHT)),
47+
state -> {
48+
BenchmarkTimer timer = state.timer("reactor-kafka-timer");
49+
50+
return kafkaSender ->
51+
(executionTick, task) ->
52+
Flux.defer(
53+
() -> {
54+
SenderRecord<Integer, Message, Integer> record =
55+
SenderRecord.create(
56+
new ProducerRecord<>(TOPIC_SEND, new Message()),
57+
new Message().hashCode());
58+
return kafkaSender
59+
.send(Mono.just(record))
60+
.thenMany(
61+
state
62+
.kafkaReceiver()
63+
.receive()
64+
// .limitRate(rateLimit)
65+
.doOnNext(
66+
r ->
67+
timer.update(
68+
System.currentTimeMillis()
69+
- r.value().sendTime(),
70+
TimeUnit.MILLISECONDS))
71+
.doOnError(th -> LOGGER.error("Error occurred", th)));
72+
});
73+
},
74+
(state, kafkaSender) -> Mono.empty());
75+
}
76+
77+
private static Integer rateLimit(BenchmarkSettings settings) {
78+
return Optional.ofNullable(settings.find(RATE_LIMIT, null))
79+
.map(Integer::parseInt)
80+
.orElse(DEFAULT_RATE_LIMIT);
81+
}
82+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
import static io.scalecube.benchmarks.kafka.ConfigConstants.SERVERS;
4+
5+
import io.scalecube.benchmarks.BenchmarkSettings;
6+
import io.scalecube.benchmarks.BenchmarkState;
7+
import io.scalecube.benchmarks.kafka.Message;
8+
import io.scalecube.benchmarks.kafka.MessageDeserializer;
9+
import io.scalecube.benchmarks.kafka.MessageSerializer;
10+
import java.util.Collections;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import org.apache.kafka.clients.consumer.ConsumerConfig;
14+
import org.apache.kafka.clients.producer.ProducerConfig;
15+
import org.apache.kafka.common.serialization.IntegerDeserializer;
16+
import org.apache.kafka.common.serialization.IntegerSerializer;
17+
import reactor.kafka.receiver.KafkaReceiver;
18+
import reactor.kafka.receiver.ReceiverOptions;
19+
import reactor.kafka.sender.KafkaSender;
20+
import reactor.kafka.sender.SenderOptions;
21+
22+
final class ReactorKafkaState extends BenchmarkState<ReactorKafkaState> {
23+
24+
private static final String TOPIC_RECEIVE = "benchmark-topic-b";
25+
26+
ReactorKafkaState(BenchmarkSettings settings) {
27+
super(settings);
28+
}
29+
30+
KafkaSender<Integer, Message> kafkaSender(int maxInFlight) {
31+
return KafkaSender.create(
32+
SenderOptions.<Integer, Message>create(producerProps()).maxInFlight(maxInFlight));
33+
}
34+
35+
KafkaReceiver<Integer, Message> kafkaReceiver() {
36+
return KafkaReceiver.create(
37+
ReceiverOptions.<Integer, Message>create(consumerProps())
38+
.subscription(Collections.singleton(TOPIC_RECEIVE)));
39+
}
40+
41+
private Map<String, Object> producerProps() {
42+
Map<String, Object> producerProps = new HashMap<>();
43+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
44+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
45+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessageSerializer.class);
46+
return producerProps;
47+
}
48+
49+
private Map<String, Object> consumerProps() {
50+
Map<String, Object> consumerProps = new HashMap<>();
51+
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
52+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "benchmark-consumer");
53+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
54+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
55+
return consumerProps;
56+
}
57+
}

kafka-benchmarks/src/main/java/io/scalecube/benchmarks/kafka/reactor/ReactorKafkaReceiver.java renamed to kafka-benchmarks/src/main/java/io/scalecube/benchmarks/kafka/reactor/example/ReactorKafkaReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.scalecube.benchmarks.kafka.reactor;
1+
package io.scalecube.benchmarks.kafka.reactor.example;
22

33
import static io.scalecube.benchmarks.kafka.ConfigConstants.SERVERS;
44
import static io.scalecube.benchmarks.kafka.ConfigConstants.TOPIC;

kafka-benchmarks/src/main/java/io/scalecube/benchmarks/kafka/reactor/ReactorKafkaSender.java renamed to kafka-benchmarks/src/main/java/io/scalecube/benchmarks/kafka/reactor/example/ReactorKafkaSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.scalecube.benchmarks.kafka.reactor;
1+
package io.scalecube.benchmarks.kafka.reactor.example;
22

33
import static io.scalecube.benchmarks.kafka.ConfigConstants.SERVERS;
44
import static io.scalecube.benchmarks.kafka.ConfigConstants.TOPIC;

results/Reactor-Kafka.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
### Infinite Stream
2+
#### Environment: Intel Core i5-7200, 16 GB RAM
3+
```
4+
i.s.b.k.r.InfiniteStreamBenchmark-reactor-kafka-timer
5+
count = 28160250
6+
mean rate = 78156.13 calls/second
7+
1-minute rate = 89756.70 calls/second
8+
5-minute rate = 58289.34 calls/second
9+
15-minute rate = 26413.87 calls/second
10+
min = 4.00 milliseconds
11+
max = 32.00 milliseconds
12+
mean = 9.17 milliseconds
13+
stddev = 2.52 milliseconds
14+
median = 9.00 milliseconds
15+
75% <= 10.00 milliseconds
16+
95% <= 13.00 milliseconds
17+
98% <= 15.00 milliseconds
18+
99% <= 16.00 milliseconds
19+
99.9% <= 32.00 milliseconds
20+
```

0 commit comments

Comments
 (0)