diff --git a/perf/pom.xml b/perf/pom.xml
index 3ecb99394..fc593e542 100644
--- a/perf/pom.xml
+++ b/perf/pom.xml
@@ -22,7 +22,7 @@
shawkins
https://raw.githubusercontent.com/shawkins/repo/master
- false
+ true
true
@@ -34,7 +34,7 @@
io.openmessaging.benchmark
benchmark-framework
- 0.0.1
+ 0.0.2-SNAPSHOT
ch.qos.logback
@@ -42,8 +42,8 @@
- io.openmessaging.benchmark
- driver-pulsar
+ io.openmessaging.benchmark
+ driver-pulsar
diff --git a/perf/src/main/java/org/bf2/performance/OMB.java b/perf/src/main/java/org/bf2/performance/OMB.java
index acc5efb48..6f498f95b 100644
--- a/perf/src/main/java/org/bf2/performance/OMB.java
+++ b/perf/src/main/java/org/bf2/performance/OMB.java
@@ -58,6 +58,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Handles installation and running of OpenMessagingBenchmark
@@ -114,6 +115,19 @@ public void install(TlsConfig tlsConfig) throws IOException {
}
public void install(String base64EncodedTrustStore) throws IOException {
+ install(() ->
+ ombCluster.kubeClient().client().secrets().inNamespace(Constants.OMB_NAMESPACE).create(new SecretBuilder()
+ .editOrNewMetadata()
+ .withName("ext-listener-crt")
+ .withNamespace(Constants.OMB_NAMESPACE)
+ .endMetadata()
+ .addToData("listener.jks", base64EncodedTrustStore)
+ .build())
+ );
+
+ }
+
+ public void install(Runnable... runnables) {
LOGGER.info("Installing OMB in namespace {}", Constants.OMB_NAMESPACE);
pullAndHoldWorkerImageToAllNodesUsingDaemonSet();
@@ -124,13 +138,8 @@ public void install(String base64EncodedTrustStore) throws IOException {
nsAnnotations.put(Constants.ORG_BF2_KAFKA_PERFORMANCE_COLLECTPODLOG, "true");
}
ombCluster.createNamespace(Constants.OMB_NAMESPACE, nsAnnotations, Map.of());
- ombCluster.kubeClient().client().secrets().inNamespace(Constants.OMB_NAMESPACE).create(new SecretBuilder()
- .editOrNewMetadata()
- .withName("ext-listener-crt")
- .withNamespace(Constants.OMB_NAMESPACE)
- .endMetadata()
- .addToData("listener.jks", base64EncodedTrustStore)
- .build());
+
+ Stream.of(runnables).forEachOrdered(Runnable::run);
LOGGER.info("Done installing OMB in namespace {}", Constants.OMB_NAMESPACE);
}
@@ -266,6 +275,7 @@ private void createWorker(String jvmOpts, String name, Node node) {
.withName("ca")
.editOrNewSecret()
.withSecretName("ext-listener-crt")
+ .withOptional(true)
.endSecret()
.endVolume()
.endSpec()
diff --git a/perf/src/test/java/org/bf2/performance/ConnectionCreationRateTest.java b/perf/src/test/java/org/bf2/performance/ConnectionCreationRateTest.java
new file mode 100644
index 000000000..288e7acd3
--- /dev/null
+++ b/perf/src/test/java/org/bf2/performance/ConnectionCreationRateTest.java
@@ -0,0 +1,135 @@
+package org.bf2.performance;
+
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.openmessaging.benchmark.TestResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.bf2.performance.framework.KubeClusterResource;
+import org.bf2.performance.framework.TestTags;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvFileSource;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Testcase 1: Producer throughput with a single small Kafka cluster (K2)
+ */
+@Tag(TestTags.PERF)
+public class ConnectionCreationRateTest extends TestBase {
+ private static final Logger LOGGER = LogManager.getLogger(ConnectionCreationRateTest.class);
+
+ static ManagedKafkaProvisioner kafkaProvisioner;
+ static KubeClusterResource kafkaCluster;
+ static OMB omb;
+
+ List workers;
+
+ @BeforeAll
+ void beforeAll() throws Exception {
+ omb = new OMB(KubeClusterResource.connectToKubeCluster(PerformanceEnvironment.OMB_KUBECONFIG));
+ }
+
+ @AfterAll
+ void afterAll() throws Exception {
+ omb.uninstall();
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ omb.deleteWorkers();
+ }
+
+ @ParameterizedTest(name = "testConnectionCreationRate: [{index}] {0}, {1}, {2}")
+ @CsvFileSource(resources = "/test-inputs/ConnectionCreationRateTestInput.csv", useHeadersInDisplayName = true)
+ void testConnectionCreationRate(int numProducers, int numConsumers, int replicationFactor, int numWorkers, String ombWorkerMem, String ombWorkerCpu, String bootstrapURL, String clientId, String clientSecret, String tokenEndpointURL, String trustStoreFileName, String trustStorePassword, TestInfo info) throws Exception {
+ int messageSize = 1024;
+ int targetRate = 100;
+ String instanceName = bootstrapURL.split("\\.", 2)[0];
+
+ omb.install(Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(trustStoreFileName))));
+ omb.setWorkerContainerMemory(Quantity.parse(ombWorkerMem));
+ omb.setWorkerCpu(Quantity.parse(ombWorkerCpu));
+ workers = omb.deployWorkers(numWorkers);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ AtomicInteger timeout = new AtomicInteger();
+ List testResults = new ArrayList<>();
+
+ try {
+ File ombDir = new File(instanceDir, instanceName);
+ Files.createDirectories(ombDir.toPath());
+
+ // OAuthBearer connection details
+ String driverCommonConfig = new StringBuilder("sasl.mechanism=OAUTHBEARER")
+ .append("\n")
+ .append("security.protocol=SASL_SSL")
+ .append("\n")
+ .append("sasl.oauthbearer.token.endpoint.url=")
+ .append(tokenEndpointURL)
+ .append("\n")
+ .append("ssl.truststore.location=/cert/listener.jks\n")
+ .append("ssl.truststore.password=password\n")
+ .append("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required scope=\"openid\" clientId=\"")
+ .append(clientId)
+ .append("\" ")
+ .append("clientSecret=\"")
+ .append(clientSecret)
+ .append("\" ;")
+ .append("\n")
+ .append("sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler")
+ .append("\n")
+ .append("bootstrap.servers=")
+ .append(bootstrapURL)
+ .append("\n")
+ .toString();
+
+ OMBDriver driver = new OMBDriver()
+ .setReplicationFactor(replicationFactor)
+ .setTopicConfig("")
+ .setCommonConfig(driverCommonConfig)
+ .setProducerConfig("acks=all\n")
+ .setConsumerConfig("auto.offset.reset=earliest\nenable.auto.commit=false\n");
+
+ OMBWorkload workload = new OMBWorkload()
+ .setName(String.format("Kafka Cluster: %s", instanceName))
+ .setTopics(1)
+ .setPartitionsPerTopic(1)
+ .setWarmupDurationMinutes(0)
+ .setTestDurationMinutes(1)
+ .setMessageSize(messageSize)
+ .setPayloadFile("src/test/resources/payload/payload-1Kb.data")
+ .setSubscriptionsPerTopic(numConsumers)
+ .setConsumerPerSubscription(1)
+ .setProducersPerTopic(numProducers)
+ .setProducerRate(targetRate)
+ .setConsumerBacklogSizeGB(0);
+ timeout.set(Math.max(workload.getTestDurationMinutes() + workload.getWarmupDurationMinutes(), timeout.get()));
+
+ Future resultFuture = executorService.submit(() -> {
+ OMBWorkloadResult result = omb.runWorkload(ombDir, driver, workers, workload);
+ LOGGER.info("Result stored in {}", result.getResultFile().getAbsolutePath());
+ return result;
+ });
+ testResults.add(resultFuture.get(timeout.get() * 5L, TimeUnit.MINUTES).getTestResult());
+ } finally {
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+ }
+
+ }
+}