Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaReadinessCheckTest #1784

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
KafkaReadinessCheckTest
piotrrzysko committed Nov 20, 2023
commit ba9c6b45f749357021bca22da88ed44628b1e6ff

This file was deleted.

Original file line number Diff line number Diff line change
@@ -4,28 +4,57 @@
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;

class FrontendTestClient {
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;

import static org.awaitility.Awaitility.waitAtMost;

public class FrontendTestClient {

private static final String TOPIC_PATH = "/topics/{topicName}";
private static final String STATUS_HEALTH_PATH = "/status/health";
private static final String STATUS_READY_PATH = "/status/ready";

private final WebTestClient webTestClient;
private final String frontendContainerUrl;

public FrontendTestClient(String frontendContainerUrl) {
this.frontendContainerUrl = frontendContainerUrl;
public FrontendTestClient(int frontendPort) {
this.frontendContainerUrl = "http://localhost:" + frontendPort;
this.webTestClient = WebTestClient
.bindToServer()
.baseUrl(frontendContainerUrl)
.build();
}

public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, String body) {
AtomicReference<WebTestClient.ResponseSpec> response = new AtomicReference<>();
waitAtMost(Duration.ofSeconds(10))
.untilAsserted(() -> response.set(publish(topicQualifiedName, body).expectStatus().isCreated()));
return response.get();
}

public WebTestClient.ResponseSpec publish(String topicQualifiedName, String body) {
private WebTestClient.ResponseSpec publish(String topicQualifiedName, String body) {
return webTestClient.post().uri(UriBuilder
.fromUri(frontendContainerUrl)
.path(TOPIC_PATH)
.build(topicQualifiedName))
.body(Mono.just(body), String.class)
.exchange();
}

public WebTestClient.ResponseSpec getStatusHealth() {
return webTestClient.get().uri(UriBuilder
.fromUri(frontendContainerUrl)
.path(STATUS_HEALTH_PATH)
.build())
.exchange();
}

public WebTestClient.ResponseSpec getStatusReady() {
return webTestClient.get().uri(UriBuilder
.fromUri(frontendContainerUrl)
.path(STATUS_READY_PATH)
.build())
.exchange();
}
}
Original file line number Diff line number Diff line change
@@ -18,9 +18,9 @@ public class HermesTestClient {
private final ManagementTestClient managementTestClient;
private final FrontendTestClient frontendTestClient;

public HermesTestClient(String managementUrl, String frontendUrl) {
managementTestClient = new ManagementTestClient(managementUrl);
frontendTestClient = new FrontendTestClient(frontendUrl);
public HermesTestClient(int managementPort, int frontendPort) {
managementTestClient = new ManagementTestClient(managementPort);
frontendTestClient = new FrontendTestClient(frontendPort);
}

// GROUP
@@ -60,7 +60,7 @@ public WebTestClient.ResponseSpec getSubscriptionResponse(String topicQualifiedN

// PUBLISH
public WebTestClient.ResponseSpec publishUntilSuccess(String topicQualifiedName, String body) {
return waitUntilPublished(topicQualifiedName, body);
return frontendTestClient.publishUntilSuccess(topicQualifiedName, body);
}

public void updateSubscription(Topic topic, String subscription, PatchData patch) {
@@ -80,11 +80,4 @@ private void waitUntilSubscriptionCreated(String topicQualifiedName, String subs
.expectStatus()
.is2xxSuccessful());
}

private WebTestClient.ResponseSpec waitUntilPublished(String topicQualifiedName, String body) {
PublisherCallable publisherCallable = new PublisherCallable(frontendTestClient, topicQualifiedName, body);
waitAtMost(Duration.TEN_SECONDS)
.until(() -> publisherCallable.call().expectStatus().isCreated());
return publisherCallable.getResponse();
}
}
Original file line number Diff line number Diff line change
@@ -31,8 +31,8 @@ public class ManagementTestClient {

private final ObjectMapper objectMapper;

public ManagementTestClient(String managementContainerUrl) {
this.managementContainerUrl = managementContainerUrl;
public ManagementTestClient(int managementPort) {
this.managementContainerUrl = "http://localhost:" + managementPort;
this.webTestClient = WebTestClient
.bindToServer()
.baseUrl(managementContainerUrl)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -8,31 +8,37 @@

import java.time.Duration;

class HermesConsumersTestApp implements HermesTestApp {
public class HermesConsumersTestApp implements HermesTestApp {

private final ZookeeperContainer hermesZookeeper;
private final KafkaContainerCluster kafka;
private final SpringApplicationBuilder app = new SpringApplicationBuilder(HermesConsumers.class)
.web(WebApplicationType.NONE);

HermesConsumersTestApp(ZookeeperContainer hermesZookeeper, KafkaContainerCluster kafka) {
public HermesConsumersTestApp(ZookeeperContainer hermesZookeeper, KafkaContainerCluster kafka) {
this.hermesZookeeper = hermesZookeeper;
this.kafka = kafka;
}

@Override
public void start() {
public HermesTestApp start() {
app.run(
"--consumer.healthCheckPort=0",
"--consumer.kafka.clusters.[0].brokerList=" + kafka.getBootstrapServersForExternalClients(),
"--consumer.zookeeper.clusters.[0].connectionString=" + hermesZookeeper.getConnectionString(),
"--consumer.backgroundSupervisor.interval=" + Duration.ofMillis(100),
"--consumer.workload.rebalanceInterval=" + Duration.ofSeconds(1)
);
return this;
}

@Override
public void stop() {
app.context().close();
}

@Override
public int getPort() {
throw new UnsupportedOperationException("Not implemented yet");
}
}
Original file line number Diff line number Diff line change
@@ -29,8 +29,8 @@ public void beforeAll(ExtensionContext context) {
Stream.of(consumers, frontend).parallel().forEach(HermesTestApp::start);
started = true;
}
hermesTestClient = new HermesTestClient(getManagementUrl(), getFrontendUrl());
hermesInitHelper = new HermesInitHelper(getManagementUrl());
hermesTestClient = new HermesTestClient(management.getPort(), frontend.getPort());
hermesInitHelper = new HermesInitHelper(management.getPort());
}

@Override
@@ -44,14 +44,6 @@ public HermesTestClient api() {
return hermesTestClient;
}

private String getManagementUrl() {
return "http://localhost:" + management.getPort();
}

private String getFrontendUrl() {
return "http://localhost:" + frontend.getPort();
}

public HermesInitHelper initHelper() {
return hermesInitHelper;
}
Original file line number Diff line number Diff line change
@@ -7,38 +7,71 @@
import pl.allegro.tech.hermes.test.helper.containers.KafkaContainerCluster;
import pl.allegro.tech.hermes.test.helper.containers.ZookeeperContainer;

class HermesFrontendTestApp implements HermesTestApp {
import java.time.Duration;

public class HermesFrontendTestApp implements HermesTestApp {

private final ZookeeperContainer hermesZookeeper;
private final KafkaContainerCluster kafka;
private int port = -1;
private final SpringApplicationBuilder app = new SpringApplicationBuilder(HermesFrontend.class)
.web(WebApplicationType.NONE);

HermesFrontendTestApp(ZookeeperContainer hermesZookeeper, KafkaContainerCluster kafka) {
private int port = -1;
private boolean kafkaCheckEnabled = false;
private Duration metadataMaxAge = Duration.ofMinutes(5);
private Duration readinessCheckInterval = Duration.ofSeconds(1);

public HermesFrontendTestApp(ZookeeperContainer hermesZookeeper, KafkaContainerCluster kafka) {
this.hermesZookeeper = hermesZookeeper;
this.kafka = kafka;
}

@Override
public void start() {
public HermesTestApp start() {
app.run(
"--frontend.server.port=0",
"--frontend.kafka.namespace=itTest",
"--frontend.kafka.clusters.[0].brokerList=" + kafka.getBootstrapServersForExternalClients(),
"--frontend.zookeeper.clusters.[0].connectionString=" + hermesZookeeper.getConnectionString()
"--frontend.zookeeper.clusters.[0].connectionString=" + hermesZookeeper.getConnectionString(),
"--frontend.readiness.check.kafkaCheckEnabled=" + kafkaCheckEnabled,
"--frontend.readiness.check.enabled=true",
"--frontend.kafka.producer.metadataMaxAge=" + metadataMaxAge,
"--frontend.readiness.check.interval=" + readinessCheckInterval
);
port = app.context().getBean(HermesServer.class).getPort();
return this;
}

@Override
public void stop() {
app.context().close();
}

int getPort() {
@Override
public int getPort() {
if (port == -1) {
throw new IllegalStateException("hermes-frontend port hasn't been initialized");
}
return port;
}

public HermesFrontendTestApp metadataMaxAgeInSeconds(int value) {
metadataMaxAge = Duration.ofSeconds(value);
return this;
}

public HermesFrontendTestApp readinessCheckIntervalInSeconds(int value) {
readinessCheckInterval = Duration.ofSeconds(value);
return this;
}

public HermesFrontendTestApp kafkaCheckEnabled() {
kafkaCheckEnabled = true;
return this;
}

public HermesFrontendTestApp kafkaCheckDisabled() {
kafkaCheckEnabled = false;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -14,8 +14,8 @@ public class HermesInitHelper {

private final ManagementTestClient managementTestClient;

public HermesInitHelper(String managementUrl) {
managementTestClient = new ManagementTestClient(managementUrl);
public HermesInitHelper(int managementPort) {
managementTestClient = new ManagementTestClient(managementPort);
}

public Topic createTopic(Topic topic) {
Original file line number Diff line number Diff line change
@@ -17,20 +17,20 @@
import static com.jayway.awaitility.Awaitility.waitAtMost;
import static pl.allegro.tech.hermes.test.helper.endpoint.TimeoutAdjuster.adjust;

class HermesManagementTestApp implements HermesTestApp {
public class HermesManagementTestApp implements HermesTestApp {

private int port = -1;
private final ZookeeperContainer hermesZookeeper;
private final KafkaContainerCluster kafka;
private final SpringApplicationBuilder app = new SpringApplicationBuilder(HermesManagement.class);

HermesManagementTestApp(ZookeeperContainer hermesZookeeper, KafkaContainerCluster kafka) {
public HermesManagementTestApp(ZookeeperContainer hermesZookeeper, KafkaContainerCluster kafka) {
this.hermesZookeeper = hermesZookeeper;
this.kafka = kafka;
}

@Override
public void start() {
public HermesTestApp start() {
app.run(
"--server.port=0",
"--storage.clusters[0].datacenter=dc",
@@ -41,7 +41,7 @@ public void start() {
"--kafka.clusters[0].clusterName=primary",
"--kafka.clusters[0].bootstrapKafkaServer=" + kafka.getBootstrapServersForExternalClients(),
"--kafka.clusters[0].namespace=itTest",
"--topic.replicationFactor=1",
"--topic.replicationFactor=" + kafka.getAllBrokers().size(),
"--topic.uncleanLeaderElectionEnabled=false"
);
String localServerPort = app.context().getBean(Environment.class).getProperty("local.server.port");
@@ -50,6 +50,7 @@ public void start() {
}
port = Integer.parseInt(localServerPort);
waitUntilReady();
return this;
}

private void waitUntilReady() {
@@ -75,7 +76,8 @@ private void waitUntilReady() {
}
}

int getPort() {
@Override
public int getPort() {
if (port == -1) {
throw new IllegalStateException("hermes-management port hasn't been initialized");
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package pl.allegro.tech.hermes.integrationtests.setup;

interface HermesTestApp {
public interface HermesTestApp {

void start();
HermesTestApp start();

void stop();

int getPort();
}
Loading