Skip to content

Commit bc8a215

Browse files
authored
Merge pull request #15 from scalecube/add-kafka-benchmarks
Add kafka-benchmarks
2 parents 986e264 + 1c8816a commit bc8a215

17 files changed

+570
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
# performance-storage
1+
# Benchmarks of third-party tools

kafka-benchmarks/pom.xml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>scalecube-third-party-benchmarks-parent</artifactId>
7+
<groupId>io.scalecube</groupId>
8+
<version>2.0.1-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>kafka-benchmarks</artifactId>
13+
14+
<properties>
15+
<kafka.version>2.1.0</kafka.version>
16+
<reactor-kafka.version>1.1.0.RELEASE</reactor-kafka.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-streams</artifactId>
23+
<version>${kafka.version}</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>io.projectreactor.kafka</groupId>
27+
<artifactId>reactor-kafka</artifactId>
28+
<version>${reactor-kafka.version}</version>
29+
</dependency>
30+
</dependencies>
31+
32+
</project>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.scalecube.benchmarks.kafka;
2+
3+
public final class ConfigConstants {
4+
5+
public static final String SERVERS = "127.0.0.1:9092";
6+
public static final String TOPIC = "benchmark-topic";
7+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.scalecube.benchmarks.kafka;
2+
3+
public final class Message {
4+
5+
private long sendTime;
6+
7+
public Message() {}
8+
9+
public Message(long sendTime) {
10+
this.sendTime = sendTime;
11+
}
12+
13+
public long sendTime() {
14+
return sendTime;
15+
}
16+
17+
public Message sendTime(long sendTime) {
18+
this.sendTime = sendTime;
19+
return this;
20+
}
21+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.scalecube.benchmarks.kafka;
2+
3+
import java.util.Map;
4+
import org.apache.kafka.common.serialization.Deserializer;
5+
6+
public class MessageDeserializer implements Deserializer<Message> {
7+
8+
@Override
9+
public void configure(Map<String, ?> configs, boolean isKey) {}
10+
11+
@Override
12+
public Message deserialize(String topic, byte[] data) {
13+
long sendTime =
14+
((long) data[7] << 56)
15+
| ((long) data[6] & 0xff) << 48
16+
| ((long) data[5] & 0xff) << 40
17+
| ((long) data[4] & 0xff) << 32
18+
| ((long) data[3] & 0xff) << 24
19+
| ((long) data[2] & 0xff) << 16
20+
| ((long) data[1] & 0xff) << 8
21+
| ((long) data[0] & 0xff);
22+
return new Message(sendTime);
23+
}
24+
25+
@Override
26+
public void close() {}
27+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.scalecube.benchmarks.kafka;
2+
3+
import java.util.Map;
4+
import org.apache.kafka.common.serialization.Serializer;
5+
6+
public class MessageSerializer implements Serializer<Message> {
7+
8+
@Override
9+
public void configure(Map<String, ?> configs, boolean isKey) {}
10+
11+
@Override
12+
public byte[] serialize(String topic, Message data) {
13+
long sendTime = data.sendTime();
14+
return new byte[] {
15+
(byte) sendTime,
16+
(byte) (sendTime >> 8),
17+
(byte) (sendTime >> 16),
18+
(byte) (sendTime >> 24),
19+
(byte) (sendTime >> 32),
20+
(byte) (sendTime >> 40),
21+
(byte) (sendTime >> 48),
22+
(byte) (sendTime >> 56)
23+
};
24+
}
25+
26+
@Override
27+
public void close() {}
28+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.scalecube.benchmarks.kafka.client;
2+
3+
import io.scalecube.benchmarks.BenchmarkSettings;
4+
import io.scalecube.benchmarks.BenchmarkState;
5+
import java.util.Properties;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.common.serialization.ByteArraySerializer;
9+
10+
final class KafkaState extends BenchmarkState<KafkaState> {
11+
12+
static final String TOPIC = "benchmark-topic";
13+
14+
private static final String SERVERS = "127.0.0.1:9092";
15+
16+
private final KafkaProducer<byte[], byte[]> producer;
17+
18+
KafkaState(BenchmarkSettings settings) {
19+
super(settings);
20+
21+
producer = new KafkaProducer<>(producerProps());
22+
}
23+
24+
KafkaProducer<byte[], byte[]> producer() {
25+
return producer;
26+
}
27+
28+
private static Properties producerProps() {
29+
final Properties props = new Properties();
30+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
31+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
32+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
33+
return props;
34+
}
35+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.scalecube.benchmarks.kafka.client;
2+
3+
final class ProducerBenchmark {
4+
5+
public static void main(String[] args) {
6+
ProducerSendScenario.runWith(args, KafkaState::new);
7+
}
8+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.scalecube.benchmarks.kafka.client;
2+
3+
import io.scalecube.benchmarks.BenchmarkSettings;
4+
import io.scalecube.benchmarks.metrics.BenchmarkTimer;
5+
import io.scalecube.benchmarks.metrics.BenchmarkTimer.Context;
6+
import java.time.Duration;
7+
import java.util.function.Function;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import reactor.core.publisher.Mono;
10+
11+
final class ProducerSendScenario {
12+
13+
static void runWith(String[] args, Function<BenchmarkSettings, KafkaState> stateSupplier) {
14+
BenchmarkSettings settings =
15+
BenchmarkSettings.from(args)
16+
.numberThreads(1)
17+
.warmUpDuration(Duration.ofSeconds(1))
18+
.executionTaskDuration(Duration.ofMinutes(3))
19+
.build();
20+
21+
stateSupplier
22+
.apply(settings)
23+
.runForAsync(
24+
state -> {
25+
BenchmarkTimer timer = state.timer("write");
26+
27+
return iteration -> {
28+
byte[] value = "Hello".getBytes();
29+
30+
ProducerRecord<byte[], byte[]> record =
31+
new ProducerRecord<>(KafkaState.TOPIC, value);
32+
33+
Context time = timer.time();
34+
35+
state.producer().send(record, (metadata, exception) -> time.stop());
36+
37+
return Mono.empty();
38+
};
39+
});
40+
}
41+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
import static io.scalecube.benchmarks.kafka.ConfigConstants.SERVERS;
4+
5+
import io.scalecube.benchmarks.kafka.Message;
6+
import io.scalecube.benchmarks.kafka.MessageDeserializer;
7+
import io.scalecube.benchmarks.kafka.MessageSerializer;
8+
import java.util.Collections;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import org.apache.kafka.clients.consumer.ConsumerConfig;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.serialization.IntegerDeserializer;
15+
import org.apache.kafka.common.serialization.IntegerSerializer;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
import reactor.kafka.receiver.KafkaReceiver;
21+
import reactor.kafka.receiver.ReceiverOptions;
22+
import reactor.kafka.sender.KafkaSender;
23+
import reactor.kafka.sender.SenderOptions;
24+
import reactor.kafka.sender.SenderRecord;
25+
26+
public class ExampleService {
27+
28+
private static final Logger LOGGER = LoggerFactory.getLogger(ExampleService.class);
29+
30+
private static final String TOPIC_RECEIVE = "benchmark-topic-a";
31+
private static final String TOPIC_SEND = "benchmark-topic-b";
32+
private static final int MAX_IN_FLIGHT = 256;
33+
34+
public static void main(String[] args) throws InterruptedException {
35+
KafkaSender<Integer, Message> kafkaSender = kafkaSender();
36+
37+
kafkaReceiver()
38+
.receive()
39+
.log(">>>")
40+
.subscribe(
41+
r -> {
42+
kafkaSender
43+
.send(records())
44+
.then()
45+
.doOnError(e -> LOGGER.error("Sending failed", e))
46+
.doOnSuccess(s -> LOGGER.info("Sending completed"))
47+
.subscribe();
48+
r.receiverOffset().acknowledge();
49+
},
50+
th -> LOGGER.error("Exception occurred", th));
51+
52+
Thread.currentThread().join();
53+
}
54+
55+
private static KafkaReceiver<Integer, Message> kafkaReceiver() {
56+
Map<String, Object> consumerProps = new HashMap<>();
57+
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
58+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "service-consumer");
59+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
60+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
61+
62+
ReceiverOptions<Integer, Message> receiverOptions =
63+
ReceiverOptions.<Integer, Message>create(consumerProps)
64+
.subscription(Collections.singleton(TOPIC_RECEIVE));
65+
66+
return KafkaReceiver.create(receiverOptions);
67+
}
68+
69+
private static KafkaSender<Integer, Message> kafkaSender() {
70+
Map<String, Object> producerProps = new HashMap<>();
71+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
72+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
73+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessageSerializer.class);
74+
75+
SenderOptions<Integer, Message> senderOptions =
76+
SenderOptions.<Integer, Message>create(producerProps).maxInFlight(MAX_IN_FLIGHT);
77+
78+
return KafkaSender.create(senderOptions);
79+
}
80+
81+
private static Flux<SenderRecord<Integer, Message, Integer>> records() {
82+
return Mono.fromCallable(
83+
() -> {
84+
Message message = new Message(System.currentTimeMillis());
85+
int correlationMetadata = message.hashCode();
86+
ProducerRecord<Integer, Message> record =
87+
new ProducerRecord<>(TOPIC_SEND, correlationMetadata, message);
88+
return SenderRecord.create(record, correlationMetadata);
89+
})
90+
.repeat();
91+
}
92+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
final class InfiniteStreamBenchmark {
4+
5+
public static void main(String[] args) {
6+
InfiniteStreamScenario.runWith(args, ReactorKafkaState::new);
7+
}
8+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.scalecube.benchmarks.kafka.reactor;
2+
3+
import io.scalecube.benchmarks.BenchmarkSettings;
4+
import io.scalecube.benchmarks.kafka.Message;
5+
import io.scalecube.benchmarks.metrics.BenchmarkTimer;
6+
import java.time.Duration;
7+
import java.util.Optional;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.function.Function;
10+
import org.apache.kafka.clients.producer.ProducerRecord;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import reactor.core.publisher.Flux;
14+
import reactor.core.publisher.Mono;
15+
import reactor.kafka.sender.SenderRecord;
16+
import reactor.util.concurrent.Queues;
17+
18+
final class InfiniteStreamScenario {
19+
20+
private static final Logger LOGGER = LoggerFactory.getLogger(InfiniteStreamScenario.class);
21+
22+
private static final int DEFAULT_RATE_LIMIT = Queues.SMALL_BUFFER_SIZE;
23+
private static final int MAX_IN_FLIGHT = 2048;
24+
private static final String RATE_LIMIT = "rateLimit";
25+
private static final String TOPIC_SEND = "benchmark-topic-a";
26+
27+
public static void runWith(
28+
String[] args, Function<BenchmarkSettings, ReactorKafkaState> stateSupplier) {
29+
30+
int numOfThreads = Runtime.getRuntime().availableProcessors();
31+
Duration rampUpDuration = Duration.ofSeconds(numOfThreads);
32+
33+
BenchmarkSettings settings =
34+
BenchmarkSettings.from(args)
35+
.injectors(1)
36+
.messageRate(1) // workaround
37+
.rampUpDuration(rampUpDuration)
38+
.durationUnit(TimeUnit.MILLISECONDS)
39+
.build();
40+
41+
int rateLimit = rateLimit(settings);
42+
43+
stateSupplier
44+
.apply(settings)
45+
.runWithRampUp(
46+
(rampUpTick, state) -> Mono.just(state.kafkaSender(MAX_IN_FLIGHT)),
47+
state -> {
48+
BenchmarkTimer timer = state.timer("reactor-kafka-timer");
49+
50+
return kafkaSender ->
51+
(executionTick, task) ->
52+
Flux.defer(
53+
() -> {
54+
SenderRecord<Integer, Message, Integer> record =
55+
SenderRecord.create(
56+
new ProducerRecord<>(TOPIC_SEND, new Message()),
57+
new Message().hashCode());
58+
return kafkaSender
59+
.send(Mono.just(record))
60+
.thenMany(
61+
state
62+
.kafkaReceiver()
63+
.receive()
64+
// .limitRate(rateLimit)
65+
.doOnNext(
66+
r ->
67+
timer.update(
68+
System.currentTimeMillis()
69+
- r.value().sendTime(),
70+
TimeUnit.MILLISECONDS))
71+
.doOnError(th -> LOGGER.error("Error occurred", th)));
72+
});
73+
},
74+
(state, kafkaSender) -> Mono.empty());
75+
}
76+
77+
private static Integer rateLimit(BenchmarkSettings settings) {
78+
return Optional.ofNullable(settings.find(RATE_LIMIT, null))
79+
.map(Integer::parseInt)
80+
.orElse(DEFAULT_RATE_LIMIT);
81+
}
82+
}

0 commit comments

Comments
 (0)