Skip to content

Commit

Permalink
Merge pull request kroxylicious#1468 from SamBarker/timeoutHandling
Browse files Browse the repository at this point in the history
Using assertThat(future) provides cleaner error handling and better us…
  • Loading branch information
SamBarker authored Aug 26, 2024
2 parents 93089cf + 4624b7c commit 52c4c90
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
Expand All @@ -35,56 +33,45 @@
public abstract class BaseIT {

protected CreateTopicsResult createTopics(Admin admin, NewTopic... topics) {
try {
List<NewTopic> topicsList = List.of(topics);
var created = admin.createTopics(topicsList);
assertThat(created.values()).hasSizeGreaterThanOrEqualTo(topicsList.size());
created.all().get(10, TimeUnit.SECONDS);
return created;
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
List<NewTopic> topicsList = List.of(topics);
var created = admin.createTopics(topicsList);
assertThat(created.values()).hasSizeGreaterThanOrEqualTo(topicsList.size());
assertThat(created.all()).as("The future(s) creating topic(s) did not complete within the timeout.").succeedsWithin(10, TimeUnit.SECONDS);
return created;
}

protected CreateTopicsResult createTopic(Admin admin, String topic, int numPartitions) {
return createTopics(admin, new NewTopic(topic, numPartitions, (short) 1));
}

protected DeleteTopicsResult deleteTopics(Admin admin, TopicCollection topics) {
try {
var deleted = admin.deleteTopics(topics);
deleted.all().get(10, TimeUnit.SECONDS);
return deleted;
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
var deleted = admin.deleteTopics(topics);
assertThat(deleted.all()).as("The future(s) deleting topic(s) did not complete within the timeout.").succeedsWithin(10, TimeUnit.SECONDS);
return deleted;
}

protected Map<String, Object> buildClientConfig(Map<String, Object>... configs) {
@SafeVarargs
protected final Map<String, Object> buildClientConfig(Map<String, Object>... configs) {
Map<String, Object> clientConfig = new HashMap<>();
for (var config : configs) {
clientConfig.putAll(config);
}
return clientConfig;
}

protected Consumer<String, String> getConsumerWithConfig(KroxyliciousTester tester, Optional<String> virtualCluster, Map<String, Object>... configs) {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@SafeVarargs
protected final Consumer<String, String> getConsumerWithConfig(KroxyliciousTester tester, Optional<String> virtualCluster, Map<String, Object>... configs) {
var consumerConfig = buildClientConfig(configs);
if (virtualCluster.isPresent()) {
return tester.consumer(virtualCluster.get(), consumerConfig);
}
return tester.consumer(consumerConfig);
}

protected Producer<String, String> getProducerWithConfig(KroxyliciousTester tester, Optional<String> virtualCluster, Map<String, Object>... configs) {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@SafeVarargs
protected final Producer<String, String> getProducerWithConfig(KroxyliciousTester tester, Optional<String> virtualCluster, Map<String, Object>... configs) {
var producerConfig = buildClientConfig(configs);
if (virtualCluster.isPresent()) {
return tester.producer(virtualCluster.get(), producerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,12 @@ void upstreamRequiresTlsClientAuth_TrustStore() throws Exception {
try (var tester = kroxyliciousTester(builder); var admin = tester.admin("demo")) {
// do some work to ensure connection is opened
final var result = admin.describeCluster().clusterId();
assertThat(result).succeedsWithin(Duration.ofSeconds(10));
assertThat(result).as("Unable to get the clusterId from the Kafka cluster").succeedsWithin(Duration.ofSeconds(10));
}
}
}

private void assertSuccessfulDirectClientAuthConnectionWithClientCert(KafkaCluster cluster) throws Exception {
private void assertSuccessfulDirectClientAuthConnectionWithClientCert(KafkaCluster cluster) {
try (var admin = CloseableAdmin.create(cluster.getKafkaClientConfiguration())) {
// Any operation to test successful connection to cluster
var result = admin.describeCluster().clusterId();
Expand Down

0 comments on commit 52c4c90

Please sign in to comment.