diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerConfiguration.java index 5cc542cccd..27c6735bb9 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerConfiguration.java @@ -160,7 +160,7 @@ public MessageConverterResolver defaultMessageConverterResolver(AvroToJsonMessag public ConsumerMessageSenderFactory consumerMessageSenderFactory(KafkaClustersProperties kafkaClustersProperties, MessageSenderFactory messageSenderFactory, Trackers trackers, - FutureAsyncTimeout futureAsyncTimeout, + FutureAsyncTimeout futureAsyncTimeout, UndeliveredMessageLog undeliveredMessageLog, Clock clock, InstrumentedExecutorServiceFactory instrumentedExecutorServiceFactory, ConsumerAuthorizationHandler consumerAuthorizationHandler, diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java index 0edd6d56b3..49f6148544 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java @@ -261,13 +261,13 @@ public EndpointAddressResolver interpolatingEndpointAddressResolver(UriInterpola } @Bean - public FutureAsyncTimeout futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory, + public FutureAsyncTimeout futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory, SenderAsyncTimeoutProperties senderAsyncTimeoutProperties) { ScheduledExecutorService timeoutExecutorService = executorFactory.getScheduledExecutorService( "async-timeout", senderAsyncTimeoutProperties.getThreadPoolSize(), senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled() ); - return new FutureAsyncTimeout<>(MessageSendingResult::failedResult, timeoutExecutorService); + return new FutureAsyncTimeout(timeoutExecutorService); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java index eb5fb745f8..f2b64dce7d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java @@ -19,7 +19,6 @@ import java.net.URI; import java.time.Clock; -import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -38,22 +37,22 @@ public class ConsumerMessageSender { private final ExecutorService deliveryReportingExecutor; private final List successHandlers; private final List errorHandlers; - private final SerialConsumerRateLimiter rateLimiter; private final MessageSenderFactory messageSenderFactory; private final Clock clock; private final InflightsPool inflight; - private final FutureAsyncTimeout async; - private final int asyncTimeoutMs; private final SubscriptionLoadRecorder loadRecorder; - - private int requestTimeoutMs; private final Timer consumerLatencyTimer; + private final SerialConsumerRateLimiter rateLimiter; + private final FutureAsyncTimeout async; + private final int asyncTimeoutMs; + private MessageSender messageSender; private Subscription subscription; private ScheduledExecutorService retrySingleThreadExecutor; private volatile boolean running = true; + public ConsumerMessageSender(Subscription subscription, MessageSenderFactory messageSenderFactory, List successHandlers, @@ -63,22 +62,21 @@ public ConsumerMessageSender(Subscription subscription, InflightsPool inflight, SubscriptionMetrics metrics, int asyncTimeoutMs, - FutureAsyncTimeout futureAsyncTimeout, + FutureAsyncTimeout futureAsyncTimeout, Clock clock, SubscriptionLoadRecorder loadRecorder) { this.deliveryReportingExecutor = deliveryReportingExecutor; this.successHandlers = successHandlers; this.errorHandlers = errorHandlers; - this.rateLimiter = rateLimiter; this.messageSenderFactory = messageSenderFactory; this.clock = clock; this.loadRecorder = loadRecorder; - this.messageSender = messageSenderFactory.create(subscription); - this.subscription = subscription; - this.inflight = inflight; this.async = futureAsyncTimeout; - this.requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout(); + this.rateLimiter = rateLimiter; this.asyncTimeoutMs = asyncTimeoutMs; + this.messageSender = messageSender(subscription); + this.subscription = subscription; + this.inflight = inflight; this.consumerLatencyTimer = metrics.subscriptionLatencyTimer(); } @@ -121,18 +119,16 @@ private int calculateMessageDelay(long publishingMessageTimestamp) { return Math.max(delay, INTEGER_ZERO); } + /** * Method is calling MessageSender and is registering listeners to handle response. * Main responsibility of this method is that no message will be fully processed or rejected without release on semaphore. */ private void sendMessage(final Message message) { - rateLimiter.acquire(); loadRecorder.recordSingleOperation(); Timer.Context timer = consumerLatencyTimer.time(); - CompletableFuture response = async.within( - messageSender.send(message), - Duration.ofMillis(asyncTimeoutMs + requestTimeoutMs) - ); + CompletableFuture response = messageSender.send(message); + response.thenAcceptAsync(new ResponseHandlingListener(message, timer), deliveryReportingExecutor) .exceptionally(e -> { logger.error( @@ -142,6 +138,21 @@ private void sendMessage(final Message message) { }); } + private MessageSender messageSender(Subscription subscription) { + Integer requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout(); + ResilientMessageSender resilientMessageSender = new ResilientMessageSender( + this.rateLimiter, + subscription, + this.async, + requestTimeoutMs, + this.asyncTimeoutMs + ); + + return this.messageSenderFactory.create( + subscription, resilientMessageSender + ); + } + public void updateSubscription(Subscription newSubscription) { boolean endpointUpdated = !this.subscription.getEndpoint().equals(newSubscription.getEndpoint()); boolean subscriptionPolicyUpdated = !Objects.equals( @@ -157,14 +168,13 @@ public void updateSubscription(Subscription newSubscription) { ); this.subscription = newSubscription; - this.requestTimeoutMs = newSubscription.getSerialSubscriptionPolicy().getRequestTimeout(); boolean httpClientChanged = this.subscription.isHttp2Enabled() != newSubscription.isHttp2Enabled(); if (endpointUpdated || subscriptionPolicyUpdated || endpointAddressResolverMetadataChanged || oAuthPolicyChanged || httpClientChanged) { this.messageSender.stop(); - this.messageSender = messageSenderFactory.create(newSubscription); + this.messageSender = messageSender(newSubscription); } } @@ -175,20 +185,10 @@ private boolean willExceedTtl(Message message, long delay) { } private void handleFailedSending(Message message, MessageSendingResult result) { - registerResultInRateLimiter(result); retrySending(message, result); errorHandlers.forEach(h -> h.handleFailed(message, subscription, result)); } - private void registerResultInRateLimiter(MessageSendingResult result) { - if (result.ignoreInRateCalculation(subscription.getSerialSubscriptionPolicy().isRetryClientErrors(), - subscription.hasOAuthPolicy())) { - rateLimiter.registerSuccessfulSending(); - } else { - rateLimiter.registerFailedSending(); - } - } - private void retrySending(Message message, MessageSendingResult result) { List succeededUris = result.getSucceededUris(ConsumerMessageSender.this::messageSentSucceeded); message.incrementRetryCounter(succeededUris); @@ -232,7 +232,6 @@ private void handleMessageDiscarding(Message message, MessageSendingResult resul } private void handleMessageSendingSuccess(Message message, MessageSendingResult result) { - rateLimiter.registerSuccessfulSending(); inflight.release(); successHandlers.forEach(h -> h.handleSuccess(message, subscription, result)); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderFactory.java index 5908750874..ce97ec9813 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderFactory.java @@ -12,7 +12,6 @@ import pl.allegro.tech.hermes.consumers.consumer.result.ErrorHandler; import pl.allegro.tech.hermes.consumers.consumer.result.SuccessHandler; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSenderFactory; -import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout; import pl.allegro.tech.hermes.tracker.consumers.Trackers; @@ -26,7 +25,7 @@ public class ConsumerMessageSenderFactory { private final String kafkaClusterName; private final MessageSenderFactory messageSenderFactory; private final Trackers trackers; - private final FutureAsyncTimeout futureAsyncTimeout; + private final FutureAsyncTimeout futureAsyncTimeout; private final UndeliveredMessageLog undeliveredMessageLog; private final Clock clock; private final ConsumerAuthorizationHandler consumerAuthorizationHandler; @@ -34,7 +33,7 @@ public class ConsumerMessageSenderFactory { private final int senderAsyncTimeoutMs; public ConsumerMessageSenderFactory(String kafkaClusterName, MessageSenderFactory messageSenderFactory, - Trackers trackers, FutureAsyncTimeout futureAsyncTimeout, + Trackers trackers, FutureAsyncTimeout futureAsyncTimeout, UndeliveredMessageLog undeliveredMessageLog, Clock clock, InstrumentedExecutorServiceFactory instrumentedExecutorServiceFactory, ConsumerAuthorizationHandler consumerAuthorizationHandler, diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java new file mode 100644 index 0000000000..672a141eb7 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java @@ -0,0 +1,90 @@ +package pl.allegro.tech.hermes.consumers.consumer; + +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter; +import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; +import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +public class ResilientMessageSender { + private final ConsumerRateLimiter rateLimiter; + private final List> ignore; + private final FutureAsyncTimeout async; + private final int requestTimeoutMs; + private final int asyncTimeoutMs; + + public ResilientMessageSender(ConsumerRateLimiter rateLimiter, + Subscription subscription, + FutureAsyncTimeout async, + int requestTimeoutMs, + int asyncTimeoutMs) { + this.rateLimiter = rateLimiter; + this.ignore = ignorableErrors(subscription); + this.async = async; + this.requestTimeoutMs = requestTimeoutMs; + this.asyncTimeoutMs = asyncTimeoutMs; + } + + private static List> ignorableErrors(Subscription subscription) { + Predicate ignore = + result -> result.ignoreInRateCalculation( + subscription.getSerialSubscriptionPolicy().isRetryClientErrors(), + subscription.hasOAuthPolicy() + ); + return Collections.singletonList(ignore); + } + + public CompletableFuture send( + Consumer> resultFutureConsumer, + Function exceptionMapper + ) { + try { + rateLimiter.acquire(); + CompletableFuture resultFuture = new CompletableFuture<>(); + resultFutureConsumer.accept(resultFuture); + CompletableFuture timeoutGuardedResultFuture = async.within( + resultFuture, + Duration.ofMillis(asyncTimeoutMs + requestTimeoutMs), + exceptionMapper); + return withCompletionHandle(timeoutGuardedResultFuture, exceptionMapper); + } catch (Exception e) { + rateLimiter.registerFailedSending(); + return CompletableFuture.completedFuture(exceptionMapper.apply(e)); + } + } + + private CompletableFuture withCompletionHandle( + CompletableFuture future, + Function exceptionMapper + ) { + return future.handle((result, throwable) -> { + if (throwable != null) { + rateLimiter.registerFailedSending(); + return exceptionMapper.apply(throwable); + } else { + if (result.succeeded()) { + rateLimiter.registerSuccessfulSending(); + } else { + registerResultInRateLimiter(result); + } + return result; + } + }); + } + + private void registerResultInRateLimiter(MessageSendingResult result) { + if (ignore.stream().anyMatch(p -> p.test(result))) { + rateLimiter.registerSuccessfulSending(); + } else { + rateLimiter.registerFailedSending(); + } + } + +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/CompletableFutureAwareMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/CompletableFutureAwareMessageSender.java index bf0648437c..0e7824b81c 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/CompletableFutureAwareMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/CompletableFutureAwareMessageSender.java @@ -4,19 +4,9 @@ import java.util.concurrent.CompletableFuture; -public abstract class CompletableFutureAwareMessageSender implements MessageSender { +public interface CompletableFutureAwareMessageSender { - @Override - public CompletableFuture send(Message message) { - try { - CompletableFuture resultFuture = new CompletableFuture<>(); - sendMessage(message, resultFuture); - return resultFuture; - } catch (Exception e) { - return CompletableFuture.completedFuture(MessageSendingResult.failedResult(e)); - } - } - - protected abstract void sendMessage(Message message, CompletableFuture resultFuture); + void send(Message message, CompletableFuture resultFuture); + void stop(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactory.java index bc21e84291..5fdc0acc9f 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactory.java @@ -4,6 +4,7 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.common.exception.EndpointProtocolNotSupportedException; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; import java.util.HashMap; import java.util.List; @@ -21,14 +22,14 @@ public MessageSenderFactory(List providers) { } } - public MessageSender create(Subscription subscription) { + public MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender) { EndpointAddress endpoint = subscription.getEndpoint(); ProtocolMessageSenderProvider provider = protocolProviders.get(endpoint.getProtocol()); if (provider == null) { throw new EndpointProtocolNotSupportedException(endpoint); } - return provider.create(subscription); + return provider.create(subscription, resilientMessageSender); } private void addSupportedProtocol(String protocol, ProtocolMessageSenderProvider provider) { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/ProtocolMessageSenderProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/ProtocolMessageSenderProvider.java index 7d10aa5d61..6743bd8272 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/ProtocolMessageSenderProvider.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/ProtocolMessageSenderProvider.java @@ -1,12 +1,13 @@ package pl.allegro.tech.hermes.consumers.consumer.sender; import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; import java.util.Set; public interface ProtocolMessageSenderProvider { - MessageSender create(Subscription subscription); + MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender); Set getSupportedProtocols(); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapter.java new file mode 100644 index 0000000000..47b8dd0c77 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapter.java @@ -0,0 +1,32 @@ +package pl.allegro.tech.hermes.consumers.consumer.sender; + +import pl.allegro.tech.hermes.consumers.consumer.Message; +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class SingleRecipientMessageSenderAdapter implements MessageSender { + private final CompletableFutureAwareMessageSender adaptee; + private final Function exceptionMapper = MessageSendingResult::failedResult; + + private final ResilientMessageSender resilientMessageSender; + + public SingleRecipientMessageSenderAdapter(CompletableFutureAwareMessageSender adaptee, ResilientMessageSender resilientMessageSender) { + this.resilientMessageSender = resilientMessageSender; + this.adaptee = adaptee; + } + + @Override + public CompletableFuture send(Message message) { + return resilientMessageSender.send( + resultFuture -> adaptee.send(message, resultFuture), + exceptionMapper + ); + } + + @Override + public void stop() { + adaptee.stop(); + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSender.java index f21f932bf0..96054b92dc 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSender.java @@ -10,21 +10,21 @@ import static pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult.failedResult; -class GooglePubSubMessageSender extends CompletableFutureAwareMessageSender { +class GooglePubSubMessageSender implements CompletableFutureAwareMessageSender { private final GooglePubSubClient googlePubSubClient; private final GooglePubSubSenderTarget resolvedTarget; private final GooglePubSubClientsPool clientsPool; GooglePubSubMessageSender(GooglePubSubSenderTarget resolvedTarget, - GooglePubSubClientsPool clientsPool) throws IOException { + GooglePubSubClientsPool clientsPool) throws IOException { this.googlePubSubClient = clientsPool.acquire(resolvedTarget); this.resolvedTarget = resolvedTarget; this.clientsPool = clientsPool; } @Override - protected void sendMessage(Message message, CompletableFuture resultFuture) { + public void send(Message message, CompletableFuture resultFuture) { try { googlePubSubClient.publish(message, resultFuture); } catch (IOException | ExecutionException | InterruptedException exception) { @@ -36,4 +36,4 @@ protected void sendMessage(Message message, CompletableFuture exceptionMapper = MessageSendingResult::failedResult; + private final ResilientMessageSender resilientMessageSender; public JettyBroadCastMessageSender(HttpRequestFactory requestFactory, ResolvableEndpointAddress endpoint, - HttpHeadersProvider requestHeadersProvider, SendingResultHandlers sendingResultHandlers) { + HttpHeadersProvider requestHeadersProvider, + SendingResultHandlers sendingResultHandlers, + ResilientMessageSender resilientMessageSender + ) { this.requestFactory = requestFactory; this.endpoint = endpoint; this.requestHeadersProvider = requestHeadersProvider; this.sendingResultHandlers = sendingResultHandlers; + this.resilientMessageSender = resilientMessageSender; } @Override @@ -40,7 +47,7 @@ public CompletableFuture send(Message message) { try { return sendMessage(message).thenApply(MultiMessageSendingResult::new); } catch (Exception exception) { - return CompletableFuture.completedFuture(MessageSendingResult.failedResult(exception)); + return CompletableFuture.completedFuture(exceptionMapper.apply(exception)); } } @@ -49,11 +56,13 @@ private CompletableFuture> sendMessage(Message List> results = collectResults(message); return mergeResults(results); } catch (EndpointAddressResolutionException exception) { - return CompletableFuture.completedFuture(Collections.singletonList(MessageSendingResult.failedResult(exception))); + return CompletableFuture.completedFuture(Collections.singletonList(exceptionMapper.apply(exception))); } } - private List> collectResults(Message message) throws EndpointAddressResolutionException { + private List> collectResults( + Message message + ) throws EndpointAddressResolutionException { final HttpRequestData requestData = new HttpRequestDataBuilder() .withRawAddress(endpoint.getRawAddress()) @@ -80,9 +89,10 @@ private CompletableFuture> mergeResults(List processResponse(Request request) { - CompletableFuture resultFuture = new CompletableFuture<>(); - request.send(sendingResultHandlers.handleSendingResultForBroadcast(resultFuture)); - return resultFuture; + return resilientMessageSender.send( + resultFuture -> request.send(sendingResultHandlers.handleSendingResultForBroadcast(resultFuture)), + exceptionMapper); + } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java index 33127d30df..ab0eaf966b 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java @@ -9,8 +9,10 @@ import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionMode; +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender; import pl.allegro.tech.hermes.consumers.consumer.sender.ProtocolMessageSenderProvider; +import pl.allegro.tech.hermes.consumers.consumer.sender.SingleRecipientMessageSenderAdapter; import pl.allegro.tech.hermes.consumers.consumer.sender.http.auth.HttpAuthorizationProvider; import pl.allegro.tech.hermes.consumers.consumer.sender.http.auth.HttpAuthorizationProviderFactory; import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.AuthHeadersProvider; @@ -65,7 +67,7 @@ public JettyHttpMessageSenderProvider( } @Override - public MessageSender create(Subscription subscription) { + public MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender) { EndpointAddress endpoint = subscription.getEndpoint(); EndpointAddressResolverMetadata endpointAddressResolverMetadata = subscription.getEndpointAddressResolverMetadata(); ResolvableEndpointAddress resolvableEndpoint = @@ -78,13 +80,15 @@ public MessageSender create(Subscription subscription) { requestFactory, resolvableEndpoint, getHttpRequestHeadersProvider(subscription), - sendingResultHandlers); + sendingResultHandlers, + resilientMessageSender); } else { - return new JettyMessageSender( + JettyMessageSender jettyMessageSender = new JettyMessageSender( requestFactory, resolvableEndpoint, getHttpRequestHeadersProvider(subscription), sendingResultHandlers); + return new SingleRecipientMessageSenderAdapter(jettyMessageSender, resilientMessageSender); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java index 79fd6b14cb..03b7a36264 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java @@ -13,7 +13,7 @@ import java.net.URI; import java.util.concurrent.CompletableFuture; -public class JettyMessageSender extends CompletableFutureAwareMessageSender { +public class JettyMessageSender implements CompletableFutureAwareMessageSender { private final HttpRequestFactory requestFactory; private final ResolvableEndpointAddress addressResolver; @@ -31,7 +31,7 @@ public JettyMessageSender(HttpRequestFactory requestFactory, } @Override - protected void sendMessage(Message message, final CompletableFuture resultFuture) { + public void send(Message message, final CompletableFuture resultFuture) { try { final HttpRequestData requestData = new HttpRequestDataBuilder() .withRawAddress(addressResolver.getRawAddress()) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/AbstractJmsMessageSenderProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/AbstractJmsMessageSenderProvider.java index d9b924f8d4..70a0cc45fd 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/AbstractJmsMessageSenderProvider.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/AbstractJmsMessageSenderProvider.java @@ -6,7 +6,9 @@ import pl.allegro.tech.hermes.api.EndpointAddress; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender; +import pl.allegro.tech.hermes.consumers.consumer.sender.SingleRecipientMessageSenderAdapter; import pl.allegro.tech.hermes.consumers.consumer.trace.MetadataAppender; import pl.allegro.tech.hermes.consumers.uri.UriUtils; @@ -27,7 +29,7 @@ public AbstractJmsMessageSenderProvider(MetadataAppender metadataAppend } @Override - public MessageSender create(Subscription subscription) { + public MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender) { EndpointAddress endpoint = subscription.getEndpoint(); URI uri = endpoint.getUri(); ConnectionFactory connectionFactory = getConnectionFactory(uri); @@ -36,7 +38,8 @@ public MessageSender create(Subscription subscription) { endpoint.getPassword() ); - return new JmsMessageSender(jmsContext, extractTopicName(uri), metadataAppender); + JmsMessageSender jmsMessageSender = new JmsMessageSender(jmsContext, extractTopicName(uri), metadataAppender); + return new SingleRecipientMessageSenderAdapter(jmsMessageSender, resilientMessageSender); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSender.java index aa5bfabcd5..995fe555bf 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSender.java @@ -19,7 +19,7 @@ import static pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult.failedResult; import static pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult.succeededResult; -public class JmsMessageSender extends CompletableFutureAwareMessageSender { +public class JmsMessageSender implements CompletableFutureAwareMessageSender { private static final Logger logger = LoggerFactory.getLogger(JmsMessageSender.class); @@ -39,7 +39,7 @@ public void stop() { } @Override - protected void sendMessage(Message msg, final CompletableFuture resultFuture) { + public void send(Message msg, final CompletableFuture resultFuture) { try { BytesMessage message = jmsContext.createBytesMessage(); message.writeBytes(msg.getData()); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/timeout/FutureAsyncTimeout.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/timeout/FutureAsyncTimeout.java index 523be32773..3cc296cb42 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/timeout/FutureAsyncTimeout.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/timeout/FutureAsyncTimeout.java @@ -10,25 +10,24 @@ /** * see http://www.nurkiewicz.com/2014/12/asynchronous-timeouts-with.html */ -public class FutureAsyncTimeout { +public class FutureAsyncTimeout { private final ScheduledExecutorService executor; - private final Function failure; - public FutureAsyncTimeout(Function failure, ScheduledExecutorService scheduledExecutorService) { + + public FutureAsyncTimeout(ScheduledExecutorService scheduledExecutorService) { this.executor = scheduledExecutorService; - this.failure = failure; } - public CompletableFuture within(CompletableFuture future, Duration duration) { - return future.applyToEither(failAfter(duration), Function.identity()); + public CompletableFuture within(CompletableFuture future, Duration duration, Function exceptionMapper) { + return future.applyToEither(failAfter(duration, exceptionMapper), Function.identity()); } - private CompletableFuture failAfter(Duration duration) { + private CompletableFuture failAfter(Duration duration, Function exceptionMapper) { final CompletableFuture promise = new CompletableFuture<>(); executor.schedule(() -> { TimeoutException ex = new TimeoutException("Timeout after " + duration); - return promise.complete(failure.apply(ex)); + return promise.complete(exceptionMapper.apply(ex)); }, duration.toMillis(), TimeUnit.MILLISECONDS); return promise; } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy new file mode 100644 index 0000000000..07eada26f4 --- /dev/null +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy @@ -0,0 +1,290 @@ +package pl.allegro.tech.hermes.consumers.consumer + +import com.google.common.base.Throwables +import pl.allegro.tech.hermes.api.Subscription +import pl.allegro.tech.hermes.api.SubscriptionName +import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter +import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult +import pl.allegro.tech.hermes.consumers.consumer.sender.SingleMessageSendingResult +import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout +import spock.lang.Specification + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.function.Consumer +import java.util.function.Function + +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; +import static pl.allegro.tech.hermes.api.SubscriptionPolicy.Builder.subscriptionPolicy +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription + + +class ResilientMessageSenderTest extends Specification { + + Consumer> successfulConsumer() { + return { cf -> return cf.complete(MessageSendingResult.succeededResult()) + } + } + + Consumer> slowConsumer(long completeAfterMs) { + return { cf -> + return cf.completeAsync( + { + sleep(completeAfterMs) + MessageSendingResult.succeededResult() + } + ) + } + } + + Consumer> ordinarilyFailingConsumer(int statusCode) { + return { cf -> cf.complete(MessageSendingResult.failedResult(statusCode)) } + } + + Consumer> abnormallyFailingConsumer(Exception e) { + return { cf -> cf.completeExceptionally(e) } + } + + Consumer> abruptlyFailingConsumer(Exception e) { + return { cf -> throw e } + } + + Consumer> consumer(MessageSendingResult messageSendingResult) { + return {cf -> cf.complete(messageSendingResult)} + } + + FutureAsyncTimeout futureAsyncTimeout = new FutureAsyncTimeout(Executors.newSingleThreadScheduledExecutor()) + Function exceptionMapper = { e -> MessageSendingResult.failedResult(e) } + + Subscription subscription = subscription(SubscriptionName.fromString("group.topic\$subscription")).build() + + + def "should report successful sending"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerSuccessfulSending() + } + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 1000, + 1000 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(successfulConsumer(), exceptionMapper) + + then: + future.get().succeeded() + } + + def "should asynchronously time out send future and report failed sending"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(slowConsumer(5_000), exceptionMapper) + + then: + future.get().isTimeout() + } + + def "should treat 4xx response for subscription with no 4xx retry as success"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerSuccessfulSending() + } + + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(ordinarilyFailingConsumer(404), exceptionMapper) + + then: + !future.get().succeeded() + } + + def "should report failed sending on error response other than 4xx for subscription with no 4xx retry"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(ordinarilyFailingConsumer(500), exceptionMapper) + + then: + !future.get().succeeded() + } + + def "should report failed sending on 4xx response for subscription with 4xx retry"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + def subscription = subscription(SubscriptionName.fromString("group.topic\$subscription")) + .withSubscriptionPolicy(subscriptionPolicy().applyDefaults() + .withClientErrorRetry() + .build()).build() + + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(ordinarilyFailingConsumer(500), exceptionMapper) + + then: + !future.get().succeeded() + } + + def "should report successful sending on retry after"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerSuccessfulSending() + } + + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(consumer(MessageSendingResult.retryAfter(100)), exceptionMapper) + + then: + !future.get().succeeded() + } + + def "should report failed sending on service unavailable without retry after"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(ordinarilyFailingConsumer(SERVICE_UNAVAILABLE.code()), exceptionMapper) + + then: + !future.get().succeeded() + } + + def "should not report failed sending on too many requests without retry after"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerSuccessfulSending() + } + + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + CompletableFuture future = rateLimitingMessageSender.send(ordinarilyFailingConsumer(TOO_MANY_REQUESTS.code()), exceptionMapper) + + then: + !future.get().succeeded() + } + + def "should report failed sending when future completes exceptionally"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + def failWith = new IOException() + def future = rateLimitingMessageSender.send(abnormallyFailingConsumer(failWith), exceptionMapper) + + then: + with(future.get()) { + !succeeded() + Throwables.getRootCause(getFailure()) == failWith + } + } + + def "should report failed sending when consumer throws exception"() { + given: + SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( + serialConsumerRateLimiter, + subscription, + futureAsyncTimeout, + 100, + 100 + ) + + when: + def failWith = new IllegalStateException() + def future = rateLimitingMessageSender.send(abruptlyFailingConsumer(failWith), exceptionMapper) + + then: + with(future.get()) { + !succeeded() + Throwables.getRootCause(getFailure()) == failWith + } + } +} diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy new file mode 100644 index 0000000000..9a444d1be5 --- /dev/null +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy @@ -0,0 +1,83 @@ +package pl.allegro.tech.hermes.consumers.consumer.sender + +import pl.allegro.tech.hermes.api.Subscription +import pl.allegro.tech.hermes.api.SubscriptionName +import pl.allegro.tech.hermes.consumers.consumer.Message +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender +import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter +import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout +import spock.lang.Specification +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors + +import static pl.allegro.tech.hermes.consumers.test.MessageBuilder.testMessage + +class SingleRecipientMessageSenderAdapterTest extends Specification { + + FutureAsyncTimeout futureAsyncTimeout = new FutureAsyncTimeout(Executors.newSingleThreadScheduledExecutor()) + + CompletableFutureAwareMessageSender successfulMessageSender = new CompletableFutureAwareMessageSender() { + void send(Message message, CompletableFuture resultFuture) { + resultFuture.complete(MessageSendingResult.succeededResult()) + } + + void stop() {} + } + + CompletableFutureAwareMessageSender failingMessageSender = new CompletableFutureAwareMessageSender() { + void send(Message message, CompletableFuture resultFuture) { + resultFuture.completeExceptionally(new IllegalStateException()) + } + + void stop() {} + } + + ResilientMessageSender rateLimitingMessageSender(ConsumerRateLimiter consumerRateLimiter) { + Subscription subscription = subscription(SubscriptionName.fromString("group.topic\$subscription")).build() + + return new ResilientMessageSender( + consumerRateLimiter, + subscription, + futureAsyncTimeout, + 1000, + 1000, + ) + } + + def "should register successful send in rate limiter"() { + given: + ConsumerRateLimiter consumerRateLimiter = Mock(ConsumerRateLimiter) { + 1 * acquire() + 1 * registerSuccessfulSending() + } + ResilientMessageSender rateLimitingMessageSender = rateLimitingMessageSender(consumerRateLimiter) + SingleRecipientMessageSenderAdapter adapter = new SingleRecipientMessageSenderAdapter(successfulMessageSender, rateLimitingMessageSender) + + when: + CompletableFuture future = adapter.send(testMessage()) + + then: + future.get().succeeded() + } + + def "should register unsuccessful send in rate limiter"() { + given: + ConsumerRateLimiter consumerRateLimiter = Mock(ConsumerRateLimiter) { + 1 * acquire() + 1 * registerFailedSending() + } + ResilientMessageSender rateLimitingMessageSender = rateLimitingMessageSender(consumerRateLimiter) + + SingleRecipientMessageSenderAdapter adapter = new SingleRecipientMessageSenderAdapter(failingMessageSender, rateLimitingMessageSender) + + when: + CompletableFuture future = adapter.send(testMessage()) + + then: + !future.get().succeeded() + + } + +} diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSenderTest.groovy index f5d6446313..d2a9248079 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSenderTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/googlepubsub/GooglePubSubMessageSenderTest.groovy @@ -12,6 +12,7 @@ import pl.allegro.tech.hermes.consumers.test.MessageBuilder import spock.lang.Specification import spock.lang.Subject +import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit class GooglePubSubMessageSenderTest extends Specification { @@ -19,8 +20,8 @@ class GooglePubSubMessageSenderTest extends Specification { Publisher publisher = Mock(Publisher) GooglePubSubSenderTarget senderTarget = GooglePubSubSenderTarget.builder() - .withTopicName(TopicName.of("test-project", "topic-name")) - .build() + .withTopicName(TopicName.of("test-project", "topic-name")) + .build() GooglePubSubClientsPool clientsPool = Mock(GooglePubSubClientsPool) @@ -40,8 +41,9 @@ class GooglePubSubMessageSenderTest extends Specification { publisher.publish(_ as PubsubMessage) >> apiFuture("test") when: - MessageSendingResult result = sender.send(MessageBuilder.testMessage()) - .get(1, TimeUnit.SECONDS) + CompletableFuture future = new CompletableFuture(); + sender.send(MessageBuilder.testMessage(), future) + MessageSendingResult result = future.get(1, TimeUnit.SECONDS) then: result.succeeded() @@ -53,8 +55,9 @@ class GooglePubSubMessageSenderTest extends Specification { publisher.publish(_ as PubsubMessage) >> apiFuture(exception) when: - MessageSendingResult result = sender.send(MessageBuilder.testMessage()) - .get(1, TimeUnit.SECONDS) + CompletableFuture future = new CompletableFuture(); + sender.send(MessageBuilder.testMessage(), future) + MessageSendingResult result = future.get(1, TimeUnit.SECONDS) then: !result.succeeded() @@ -72,4 +75,4 @@ class GooglePubSubMessageSenderTest extends Specification { future.setException(ex) future } -} +} \ No newline at end of file diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy index 17505ac0b5..b565f11801 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy @@ -5,26 +5,33 @@ import org.eclipse.jetty.client.HttpClient import org.eclipse.jetty.util.HttpCookieStore import pl.allegro.tech.hermes.api.EndpointAddress import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata +import pl.allegro.tech.hermes.api.Subscription +import pl.allegro.tech.hermes.api.SubscriptionName import pl.allegro.tech.hermes.consumers.consumer.Message +import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter +import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult import pl.allegro.tech.hermes.consumers.consumer.sender.MultiMessageSendingResult +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.AuthHeadersProvider import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.HermesHeadersProvider import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.Http1HeadersProvider import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.HttpHeadersProvider import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.ResolvableEndpointAddress +import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout import pl.allegro.tech.hermes.test.helper.endpoint.MultiUrlEndpointAddressResolver import pl.allegro.tech.hermes.test.helper.endpoint.RemoteServiceEndpoint import pl.allegro.tech.hermes.test.helper.util.Ports import spock.lang.Shared import spock.lang.Specification -import spock.lang.Subject +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import static java.util.Collections.singleton import static pl.allegro.tech.hermes.consumers.test.MessageBuilder.TEST_MESSAGE_CONTENT import static pl.allegro.tech.hermes.consumers.test.MessageBuilder.testMessage +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription class JettyBroadCastMessageSenderTest extends Specification { @@ -32,7 +39,7 @@ class JettyBroadCastMessageSenderTest extends Specification { List ports = (1..4).collect { Ports.nextAvailable() } @Shared - EndpointAddress endpoint = EndpointAddress.of(ports.collect {"http://localhost:${it}/"}.join(";")) + EndpointAddress endpoint = EndpointAddress.of(ports.collect { "http://localhost:${it}/" }.join(";")) @Shared HttpClient client @@ -43,14 +50,13 @@ class JettyBroadCastMessageSenderTest extends Specification { @Shared List serviceEndpoints - @Subject - JettyBroadCastMessageSender messageSender - HttpHeadersProvider requestHeadersProvider = new HermesHeadersProvider( singleton(new AuthHeadersProvider(new Http1HeadersProvider(), { Optional.empty() }))) SendingResultHandlers resultHandlersProvider = new DefaultSendingResultHandlers() + FutureAsyncTimeout futureAsyncTimeout = new FutureAsyncTimeout(Executors.newSingleThreadScheduledExecutor()) + def setupSpec() throws Exception { wireMockServers.forEach { it.start() } @@ -63,23 +69,36 @@ class JettyBroadCastMessageSenderTest extends Specification { serviceEndpoints = wireMockServers.collect { new RemoteServiceEndpoint(it) } } - def setup() { + MessageSender getSender(ConsumerRateLimiter rateLimiter) { def address = new ResolvableEndpointAddress(endpoint, new MultiUrlEndpointAddressResolver(), EndpointAddressResolverMetadata.empty()); def httpRequestFactory = new DefaultHttpRequestFactory(client, 1000, 1000, new DefaultHttpMetadataAppender()) - messageSender = new JettyBroadCastMessageSender(httpRequestFactory, address, - requestHeadersProvider, resultHandlersProvider); + + Subscription subscription = subscription(SubscriptionName.fromString("group.topic\$subscription")).build() + + ResilientMessageSender sendFutureProvider = new ResilientMessageSender( + rateLimiter, subscription, futureAsyncTimeout, 10000, 1000 + ) + + return new JettyBroadCastMessageSender(httpRequestFactory, address, + requestHeadersProvider, resultHandlersProvider, sendFutureProvider) } def "should send message successfully in parallel to all urls"() { given: - serviceEndpoints.forEach { endpoint -> endpoint.setDelay(300).expectMessages(TEST_MESSAGE_CONTENT)} + ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { + 4 * acquire() + 4 * registerSuccessfulSending() + } + + serviceEndpoints.forEach { endpoint -> endpoint.setDelay(300).expectMessages(TEST_MESSAGE_CONTENT) } when: - def future = messageSender.send(testMessage()); + def future = getSender(rateLimiter).send(testMessage()); then: - future.get(1, TimeUnit.SECONDS).succeeded() + future.get(10, TimeUnit.SECONDS).succeeded() + and: serviceEndpoints.forEach { it.waitUntilReceived() } @@ -87,12 +106,18 @@ class JettyBroadCastMessageSenderTest extends Specification { def "should return not succeeded when sending to one of urls fails"() { given: + ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { + 4 * acquire() + 3 * registerSuccessfulSending() + 1 * registerFailedSending() + } + def failedServiceEndpoint = serviceEndpoints[0] failedServiceEndpoint.setReturnedStatusCode(500) - serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT)} + serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT) } when: - def future = messageSender.send(testMessage()) + def future = getSender(rateLimiter).send(testMessage()) then: serviceEndpoints.forEach { it.waitUntilReceived() } @@ -107,6 +132,11 @@ class JettyBroadCastMessageSenderTest extends Specification { def "should not send to already sent url on retry"() { given: + ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { + 3 * acquire() + 3 * registerSuccessfulSending() + } + serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT) } def alreadySentServiceEndpoint = serviceEndpoints[0] @@ -114,7 +144,7 @@ class JettyBroadCastMessageSenderTest extends Specification { message.incrementRetryCounter([alreadySentServiceEndpoint.url]); when: - def future = messageSender.send(message) + def future = getSender(rateLimiter).send(message) then: alreadySentServiceEndpoint.makeSureNoneReceived() @@ -132,11 +162,11 @@ class JettyBroadCastMessageSenderTest extends Specification { } def httpRequestFactory = new DefaultHttpRequestFactory(client, 1000, 1000, new DefaultHttpMetadataAppender()) - messageSender = new JettyBroadCastMessageSender(httpRequestFactory, address, - requestHeadersProvider, resultHandlersProvider) + MessageSender messageSender = new JettyBroadCastMessageSender(httpRequestFactory, address, + requestHeadersProvider, resultHandlersProvider, Mock(ResilientMessageSender)) when: - def future = messageSender.send(testMessage()) + def future = messageSender.send(testMessage()) then: MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS) @@ -144,11 +174,10 @@ class JettyBroadCastMessageSenderTest extends Specification { !messageSendingResult.succeeded() !messageSendingResult.isClientError() messageSendingResult.isRetryLater() - } def cleanupSpec() { - wireMockServers.forEach{ it.stop() } + wireMockServers.forEach { it.stop() } client.stop() } diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java index 16da80a7a2..ca15d4b942 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java @@ -29,8 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; -import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -116,7 +114,6 @@ public void shouldHandleSuccessfulSending() { // then verifySemaphoreReleased(); - verifyRateLimiterSuccessfulSendingCountedTimes(1); verifyLatencyTimersCountedTimes(subscription, 1, 1); verifyZeroInteractions(errorHandler); verifyZeroInteractions(failedMeter); @@ -135,8 +132,6 @@ public void shouldKeepTryingToSendMessageFailedSending() throws InterruptedExcep // then verifySemaphoreReleased(); verifyLatencyTimersCountedTimes(subscription, 3, 3); - verifyRateLimiterFailedSendingCountedTimes(2); - verifyRateLimiterSuccessfulSendingCountedTimes(1); verifyErrorHandlerHandleFailed(message, subscription, 2); } @@ -154,7 +149,6 @@ public void shouldDiscardMessageWhenTTLIsExceeded() { verifySemaphoreReleased(); verifyZeroInteractions(successHandler); verifyLatencyTimersCountedTimes(subscription, 1, 1); - verifyRateLimiterFailedSendingCountedTimes(1); } @Test @@ -193,50 +187,6 @@ public void shouldKeepTryingToSendMessageFailedWithStatusCode4xxForSubscriptionW ); } - @Test - public void shouldTreat4xxResponseForSubscriptionWithNo4xxRetryAsSuccess() { - // given - Message message = message(); - doReturn(failure(403)).doReturn(success()).when(messageSender).send(message); - - // when - sender.sendAsync(message); - - // then - verifyRateLimiterFailedSendingCountedTimes(0); - verifyRateLimiterSuccessfulSendingCountedTimes(1); - verifyErrorHandlerHandleFailed(message, subscription, 1); - } - - @Test - public void shouldReduceSendingRateLimitOnErrorResponseOtherThan4xxForSubscriptionWithNo4xxRetry() { - // given - Message message = message(); - doReturn(failure(500)).doReturn(success()).when(messageSender).send(message); - - // when - sender.sendAsync(message); - - // then - verifyRateLimiterFailedSendingCountedTimes(1); - verifyErrorHandlerHandleFailed(message, subscription, 1); - } - - @Test - public void shouldReduceSendingRateLimitOn4xxResponseForSubscriptionWith4xxRetry() { - // given - ConsumerMessageSender sender = consumerMessageSender(subscriptionWith4xxRetry); - Message message = message(); - doReturn(failure(403)).doReturn(failure(403)).doReturn(success()).when(messageSender).send(message); - - // when - sender.sendAsync(message); - - // then - verifyRateLimiterFailedSendingCountedTimes(2); - verifyRateLimiterSuccessfulSendingCountedTimes(1); - } - @Test public void shouldRetryOn401UnauthorizedForOAuthSecuredSubscription() { // given @@ -250,21 +200,8 @@ public void shouldRetryOn401UnauthorizedForOAuthSecuredSubscription() { sender.sendAsync(message); // then - verifyRateLimiterFailedSendingCountedTimes(2); - verifyRateLimiterSuccessfulSendingCountedTimes(1); - } - - @Test - public void shouldRaiseTimeoutWhenSenderNotCompletesResult() { - // given - Message message = message(); - when(messageSender.send(message)).thenReturn(new CompletableFuture<>()); - - // when - sender.sendAsync(message); - - // then - verifyErrorHandlerHandleFailed(message, subscription, 1, 3000); + verifyErrorHandlerHandleFailed(message, subscription, 2); + verify(successHandler, timeout(1000)).handleSuccess(eq(message), eq(subscription), any(MessageSendingResult.class)); } @Test @@ -287,51 +224,6 @@ public void shouldBackoffRetriesWhenEndpointFails() throws InterruptedException verifyErrorHandlerHandleFailed(message, subscriptionWithBackoff, 1 + executionTime / senderBackoffTime); } - @Test - public void shouldNotBackoffRetriesOnRetryAfter() throws InterruptedException { - // given - int retrySeconds = 1; - Message message = message(); - doReturn(backoff(retrySeconds)).doReturn(success()).when(messageSender).send(message); - - // when - sender.sendAsync(message); - - // then - verifyRateLimiterFailedSendingCountedTimes(0); - verifyRateLimiterSuccessfulSendingCountedTimes(2); - verifySemaphoreReleased(); - } - - @Test - public void shouldBackoffRetriesOnServiceUnavailableWithoutRetryAfter() throws InterruptedException { - // given - Message message = message(); - doReturn(failure(SERVICE_UNAVAILABLE.code())).doReturn(success()).when(messageSender).send(message); - - // when - sender.sendAsync(message); - - // then - verifyRateLimiterFailedSendingCountedTimes(1); - verifyRateLimiterSuccessfulSendingCountedTimes(1); - verifySemaphoreReleased(); - } - - @Test - public void shouldBackoffRetriesOnTooManyRequestsWithoutRetryAfter() throws InterruptedException { - // given - Message message = message(); - doReturn(failure(TOO_MANY_REQUESTS.code())).doReturn(success()).when(messageSender).send(message); - - // when - sender.sendAsync(message); - - // then - verifyRateLimiterSuccessfulSendingCountedTimes(1); - verifySemaphoreReleased(); - } - @Test public void shouldNotRetryOnRetryAfterAboveTtl() throws InterruptedException { // given @@ -343,7 +235,6 @@ public void shouldNotRetryOnRetryAfterAboveTtl() throws InterruptedException { sender.sendAsync(message); // then - verifyRateLimiterSuccessfulSendingCountedTimes(1); verifyErrorHandlerHandleDiscarded(message, subscription); verifySemaphoreReleased(); } @@ -355,7 +246,8 @@ public void shouldDeliverToModifiedEndpoint() { Subscription subscriptionWithModfiedEndpoint = subscriptionWithEndpoint("http://somewhere:9876"); MessageSender otherMessageSender = mock(MessageSender.class); - when(messageSenderFactory.create(subscriptionWithModfiedEndpoint)).thenReturn(otherMessageSender); + when(messageSenderFactory.create(eq(subscriptionWithModfiedEndpoint), any(ResilientMessageSender.class))) + .thenReturn(otherMessageSender); when(otherMessageSender.send(message)).thenReturn(success()); // when @@ -373,7 +265,8 @@ public void shouldDeliverToNewSenderAfterModifiedTimeout() { Subscription subscriptionWithModifiedTimeout = subscriptionWithRequestTimeout(2000); MessageSender otherMessageSender = mock(MessageSender.class); - when(messageSenderFactory.create(subscriptionWithModifiedTimeout)).thenReturn(otherMessageSender); + when(messageSenderFactory.create(eq(subscriptionWithModifiedTimeout), any(ResilientMessageSender.class))) + .thenReturn(otherMessageSender); when(otherMessageSender.send(message)).thenReturn(success()); // when @@ -449,7 +342,6 @@ public void shouldIncreaseRetryBackoffExponentially() throws InterruptedExceptio // then verifyZeroInteractions(successHandler); - verifyRateLimiterFailedSendingCountedTimes(2); } @Test @@ -492,7 +384,7 @@ public void shouldIgnoreExponentialRetryBackoffAfterExceededTtl() throws Interru } private ConsumerMessageSender consumerMessageSender(Subscription subscription) { - when(messageSenderFactory.create(subscription)).thenReturn(messageSender); + when(messageSenderFactory.create(eq(subscription), any(ResilientMessageSender.class))).thenReturn(messageSender); ConsumerMessageSender sender = new ConsumerMessageSender( subscription, messageSenderFactory, @@ -503,7 +395,7 @@ private ConsumerMessageSender consumerMessageSender(Subscription subscription) { () -> inflightSemaphore.release(), new SubscriptionMetrics(hermesMetrics, subscription.getQualifiedName()), ASYNC_TIMEOUT_MS, - new FutureAsyncTimeout<>(MessageSendingResult::failedResult, Executors.newSingleThreadScheduledExecutor()), + new FutureAsyncTimeout(Executors.newSingleThreadScheduledExecutor()), Clock.systemUTC(), new NoOpConsumerNodeLoadRegistry().register(subscription.getQualifiedName()) ); @@ -512,14 +404,6 @@ private ConsumerMessageSender consumerMessageSender(Subscription subscription) { return sender; } - private void verifyRateLimiterSuccessfulSendingCountedTimes(int count) { - verify(rateLimiter, timeout(1000).times(count)).registerSuccessfulSending(); - } - - private void verifyRateLimiterFailedSendingCountedTimes(int count) { - verify(rateLimiter, timeout(1000).times(count)).registerFailedSending(); - } - private void verifyErrorHandlerHandleFailed(Message message, Subscription subscription, int times) { verifyErrorHandlerHandleFailed(message, subscription, times, 1000); } @@ -622,6 +506,7 @@ private void verifySemaphoreReleased() { assertThat(inflightSemaphore.availablePermits()).isEqualTo(1); } + private Message message() { return messageWithTimestamp(System.currentTimeMillis()); } diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactoryTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactoryTest.java index ac8d94aee5..b439fff7a8 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactoryTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSenderFactoryTest.java @@ -8,6 +8,7 @@ import pl.allegro.tech.hermes.api.EndpointAddress; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.common.exception.EndpointProtocolNotSupportedException; +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; import java.util.Set; @@ -17,7 +18,8 @@ public class MessageSenderFactoryTest { - private MessageSender referenceMessageSender = Mockito.mock(MessageSender.class); + private final MessageSender referenceMessageSender = Mockito.mock(MessageSender.class); + private final ResilientMessageSender resilientMessageSender = Mockito.mock(ResilientMessageSender.class); @Test public void shouldCreateCustomProtocolMessageSender() { @@ -29,7 +31,7 @@ public void shouldCreateCustomProtocolMessageSender() { ); // when - MessageSender sender = factory.create(subscription); + MessageSender sender = factory.create(subscription, resilientMessageSender); // then assertThat(sender).isEqualTo(referenceMessageSender); @@ -42,7 +44,7 @@ public void shouldGetProtocolNotSupportedExceptionWhenPassingUnknownUri() { Subscription subscription = subscription("group.topic", "subscription", "unknown://localhost:8080/test").build(); // when - catchException(factory).create(subscription); + catchException(factory).create(subscription, resilientMessageSender); // then assertThat(CatchException.caughtException()) @@ -52,7 +54,7 @@ public void shouldGetProtocolNotSupportedExceptionWhenPassingUnknownUri() { private ProtocolMessageSenderProvider protocolMessageSenderProviderReturning(Object createdMessageSender, String protocol) { return new ProtocolMessageSenderProvider() { @Override - public MessageSender create(Subscription endpoint) { + public MessageSender create(Subscription endpoint, ResilientMessageSender resilientMessageSender) { return (MessageSender) createdMessageSender; } diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java index dc73da207e..a8244c1d42 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java @@ -92,7 +92,8 @@ public void shouldSendMessageSuccessfully() throws Exception { remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - CompletableFuture future = messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then remoteServiceEndpoint.waitUntilReceived(); @@ -106,7 +107,8 @@ public void shouldReturnTrueWhenOtherSuccessfulCodeReturned() throws Exception { remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - CompletableFuture future = messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then assertThat(future.get(1, TimeUnit.SECONDS).succeeded()).isTrue(); @@ -115,12 +117,13 @@ public void shouldReturnTrueWhenOtherSuccessfulCodeReturned() throws Exception { @Test public void shouldNotRedirectMessage() throws InterruptedException, ExecutionException, TimeoutException { // given - int numberOfExpectedMessages = 1; - int maximumWaitTimeInSeconds = 1; + final int numberOfExpectedMessages = 1; + final int maximumWaitTimeInSeconds = 1; remoteServiceEndpoint.redirectMessage(TEST_MESSAGE_CONTENT); // when - CompletableFuture future = messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then assertThat(future.get(maximumWaitTimeInSeconds, TimeUnit.SECONDS).succeeded()).isFalse(); @@ -134,7 +137,8 @@ public void shouldReturnFalseWhenSendingFails() throws Exception { remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - CompletableFuture future = messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then remoteServiceEndpoint.waitUntilReceived(); @@ -147,7 +151,8 @@ public void shouldSendMessageIdHeader() { remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then remoteServiceEndpoint.waitUntilReceived(); @@ -160,7 +165,8 @@ public void shouldSendTraceIdHeader() { remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then remoteServiceEndpoint.waitUntilReceived(); @@ -173,10 +179,11 @@ public void shouldSendRetryCounterInHeader() throws InterruptedException, Execut remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - CompletableFuture result = messageSender.send(testMessage()); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(testMessage(), future); // then - assertThat(result.get(1000, TimeUnit.MILLISECONDS).getStatusCode()).isEqualTo(200); + assertThat(future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()).isEqualTo(200); remoteServiceEndpoint.waitUntilReceived(); assertThat(remoteServiceEndpoint.getLastReceivedRequest().getHeader("Hermes-Retry-Count")).isEqualTo("0"); } @@ -197,7 +204,8 @@ public void shouldSendAuthorizationHeaderIfAuthorizationProviderAttached() { remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - messageSender.send(message); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(message, future); // then remoteServiceEndpoint.waitUntilReceived(); @@ -218,7 +226,9 @@ public void shouldUseSuppliedRequestTimeout() throws ExecutionException, Interru remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - MessageSendingResult messageSendingResult = messageSender.send(message).get(1000, TimeUnit.MILLISECONDS); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(message, future); + MessageSendingResult messageSendingResult = future.get(1000, TimeUnit.MILLISECONDS); // then assertThat(messageSendingResult.isTimeout()).isTrue(); @@ -238,9 +248,11 @@ public void shouldUseSuppliedSocketTimeout() throws ExecutionException, Interrup remoteServiceEndpoint.expectMessages(TEST_MESSAGE_CONTENT); // when - MessageSendingResult messageSendingResult = messageSender.send(message).get(1000, TimeUnit.MILLISECONDS); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(message, future); + MessageSendingResult messageSendingResult = future.get(1000, TimeUnit.MILLISECONDS); // then assertThat(messageSendingResult.isTimeout()).isTrue(); } -} +} \ No newline at end of file diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSenderTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSenderTest.java index 1f5962e350..3c4f6032e5 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSenderTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/jms/JmsMessageSenderTest.java @@ -55,10 +55,12 @@ public void setUp() throws Exception { when(jmsContextMock.createProducer()).thenReturn(jmsProducerMock); } + @Test public void shouldReturnTrueWhenMessageSuccessfullyPublished() throws Exception { // when - CompletableFuture future = messageSender.send(SOME_MESSAGE); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(SOME_MESSAGE, future); // then ArgumentCaptor listenerCaptor = ArgumentCaptor.forClass(CompletionListener.class); @@ -70,7 +72,8 @@ public void shouldReturnTrueWhenMessageSuccessfullyPublished() throws Exception @Test public void shouldReturnFalseWhenOnExceptionCalledOnListener() throws Exception { // when - CompletableFuture future = messageSender.send(SOME_MESSAGE); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(SOME_MESSAGE, future); // then ArgumentCaptor listenerCaptor = ArgumentCaptor.forClass(CompletionListener.class); @@ -85,7 +88,8 @@ public void shouldReturnFalseWhenJMSThrowsCheckedException() throws Exception { doThrow(new JMSException("test")).when(messageMock).writeBytes(SOME_MESSAGE.getData()); // when - CompletableFuture future = messageSender.send(SOME_MESSAGE); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(SOME_MESSAGE, future); // then assertFalse(future.get(1, TimeUnit.SECONDS).succeeded()); @@ -96,7 +100,8 @@ public void shouldReturnFalseWhenJMSThrowsRuntimeException() throws Exception { doThrow(new JMSRuntimeException("test")).when(jmsContextMock).createProducer(); // when - CompletableFuture future = messageSender.send(SOME_MESSAGE); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(SOME_MESSAGE, future); // then assertFalse(future.get(1, TimeUnit.SECONDS).succeeded()); @@ -105,7 +110,8 @@ public void shouldReturnFalseWhenJMSThrowsRuntimeException() throws Exception { @Test public void shouldSetMessageIdInProperty() throws JMSException { // when - messageSender.send(SOME_MESSAGE); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(SOME_MESSAGE, future); // then verify(messageMock).setStringProperty(MESSAGE_ID.getCamelCaseName(), "id"); @@ -114,9 +120,10 @@ public void shouldSetMessageIdInProperty() throws JMSException { @Test public void shouldSetMessageTraceIdInProperty() throws JMSException { // when - messageSender.send(SOME_MESSAGE); + CompletableFuture future = new CompletableFuture<>(); + messageSender.send(SOME_MESSAGE, future); // then verify(messageMock).setStringProperty("TraceId", "traceId"); } -} +} \ No newline at end of file