Skip to content

Commit ce99204

Browse files
Add benchmark for send operation using kafka-clients
1 parent 986e264 commit ce99204

File tree

6 files changed

+118
-1
lines changed

6 files changed

+118
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
# performance-storage
1+
# Benchmarks of third-party tools

kafka-benchmarks/pom.xml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>scalecube-third-party-benchmarks-parent</artifactId>
7+
<groupId>io.scalecube</groupId>
8+
<version>2.0.1-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>kafka-benchmarks</artifactId>
13+
14+
<properties>
15+
<kafka.version>2.1.0</kafka.version>
16+
<reactor-kafka.version>1.1.0.RELEASE</reactor-kafka.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-streams</artifactId>
23+
<version>${kafka.version}</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>io.projectreactor.kafka</groupId>
27+
<artifactId>reactor-kafka</artifactId>
28+
<version>${reactor-kafka.version}</version>
29+
</dependency>
30+
</dependencies>
31+
32+
</project>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.scalecube.benchmarks.kafka.client;
2+
3+
import io.scalecube.benchmarks.BenchmarkSettings;
4+
import io.scalecube.benchmarks.BenchmarkState;
5+
import java.util.Properties;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.common.serialization.ByteArraySerializer;
9+
10+
final class KafkaState extends BenchmarkState<KafkaState> {
11+
12+
static final String TOPIC = "benchmark-topic";
13+
14+
private static final String SERVERS = "127.0.0.1:9092";
15+
16+
private final KafkaProducer<byte[], byte[]> producer;
17+
18+
KafkaState(BenchmarkSettings settings) {
19+
super(settings);
20+
21+
producer = new KafkaProducer<>(producerProps());
22+
}
23+
24+
KafkaProducer<byte[], byte[]> producer() {
25+
return producer;
26+
}
27+
28+
private static Properties producerProps() {
29+
final Properties props = new Properties();
30+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
31+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
32+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
33+
return props;
34+
}
35+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.scalecube.benchmarks.kafka.client;
2+
3+
final class ProducerBenchmark {
4+
5+
public static void main(String[] args) {
6+
ProducerSendScenario.runWith(args, KafkaState::new);
7+
}
8+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.scalecube.benchmarks.kafka.client;
2+
3+
import io.scalecube.benchmarks.BenchmarkSettings;
4+
import io.scalecube.benchmarks.metrics.BenchmarkTimer;
5+
import io.scalecube.benchmarks.metrics.BenchmarkTimer.Context;
6+
import java.time.Duration;
7+
import java.util.function.Function;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import reactor.core.publisher.Mono;
10+
11+
final class ProducerSendScenario {
12+
13+
static void runWith(String[] args, Function<BenchmarkSettings, KafkaState> stateSupplier) {
14+
BenchmarkSettings settings =
15+
BenchmarkSettings.from(args)
16+
.numberThreads(1)
17+
.warmUpDuration(Duration.ofSeconds(1))
18+
.executionTaskDuration(Duration.ofMinutes(3))
19+
.build();
20+
21+
stateSupplier
22+
.apply(settings)
23+
.runForAsync(
24+
state -> {
25+
BenchmarkTimer timer = state.timer("write");
26+
27+
return iteration -> {
28+
byte[] value = "Hello".getBytes();
29+
30+
ProducerRecord<byte[], byte[]> record =
31+
new ProducerRecord<>(KafkaState.TOPIC, value);
32+
33+
Context time = timer.time();
34+
35+
state.producer().send(record, (metadata, exception) -> time.stop());
36+
37+
return Mono.empty();
38+
};
39+
});
40+
}
41+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
<modules>
3737
<module>chronicle-map-benchmarks</module>
38+
<module>kafka-benchmarks</module>
3839
<module>lmdb-benchmarks</module>
3940
<module>rocksdb-benchmarks</module>
4041
<module>storage-benchmarks-common</module>

0 commit comments

Comments
 (0)