Skip to content

Commit

Permalink
Broadcast rate limiting (#1613)
Browse files Browse the repository at this point in the history
* rate limit per sent message
  • Loading branch information
moscicky authored Dec 5, 2022
1 parent 578c29e commit 4f94d9e
Show file tree
Hide file tree
Showing 25 changed files with 700 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public MessageConverterResolver defaultMessageConverterResolver(AvroToJsonMessag
public ConsumerMessageSenderFactory consumerMessageSenderFactory(KafkaClustersProperties kafkaClustersProperties,
MessageSenderFactory messageSenderFactory,
Trackers trackers,
FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeout,
FutureAsyncTimeout futureAsyncTimeout,
UndeliveredMessageLog undeliveredMessageLog, Clock clock,
InstrumentedExecutorServiceFactory instrumentedExecutorServiceFactory,
ConsumerAuthorizationHandler consumerAuthorizationHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ public EndpointAddressResolver interpolatingEndpointAddressResolver(UriInterpola
}

@Bean
public FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory,
public FutureAsyncTimeout futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory,
SenderAsyncTimeoutProperties senderAsyncTimeoutProperties) {
ScheduledExecutorService timeoutExecutorService = executorFactory.getScheduledExecutorService(
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize(),
senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled()
);
return new FutureAsyncTimeout<>(MessageSendingResult::failedResult, timeoutExecutorService);
return new FutureAsyncTimeout(timeoutExecutorService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,22 +37,22 @@ public class ConsumerMessageSender {
private final ExecutorService deliveryReportingExecutor;
private final List<SuccessHandler> successHandlers;
private final List<ErrorHandler> errorHandlers;
private final SerialConsumerRateLimiter rateLimiter;
private final MessageSenderFactory messageSenderFactory;
private final Clock clock;
private final InflightsPool inflight;
private final FutureAsyncTimeout<MessageSendingResult> async;
private final int asyncTimeoutMs;
private final SubscriptionLoadRecorder loadRecorder;

private int requestTimeoutMs;
private final Timer consumerLatencyTimer;
private final SerialConsumerRateLimiter rateLimiter;
private final FutureAsyncTimeout async;
private final int asyncTimeoutMs;

private MessageSender messageSender;
private Subscription subscription;

private ScheduledExecutorService retrySingleThreadExecutor;
private volatile boolean running = true;


public ConsumerMessageSender(Subscription subscription,
MessageSenderFactory messageSenderFactory,
List<SuccessHandler> successHandlers,
Expand All @@ -63,22 +62,21 @@ public ConsumerMessageSender(Subscription subscription,
InflightsPool inflight,
SubscriptionMetrics metrics,
int asyncTimeoutMs,
FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeout,
FutureAsyncTimeout futureAsyncTimeout,
Clock clock,
SubscriptionLoadRecorder loadRecorder) {
this.deliveryReportingExecutor = deliveryReportingExecutor;
this.successHandlers = successHandlers;
this.errorHandlers = errorHandlers;
this.rateLimiter = rateLimiter;
this.messageSenderFactory = messageSenderFactory;
this.clock = clock;
this.loadRecorder = loadRecorder;
this.messageSender = messageSenderFactory.create(subscription);
this.subscription = subscription;
this.inflight = inflight;
this.async = futureAsyncTimeout;
this.requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout();
this.rateLimiter = rateLimiter;
this.asyncTimeoutMs = asyncTimeoutMs;
this.messageSender = messageSender(subscription);
this.subscription = subscription;
this.inflight = inflight;
this.consumerLatencyTimer = metrics.subscriptionLatencyTimer();
}

Expand Down Expand Up @@ -121,18 +119,16 @@ private int calculateMessageDelay(long publishingMessageTimestamp) {
return Math.max(delay, INTEGER_ZERO);
}


/**
* Method is calling MessageSender and is registering listeners to handle response.
* Main responsibility of this method is that no message will be fully processed or rejected without release on semaphore.
*/
private void sendMessage(final Message message) {
rateLimiter.acquire();
loadRecorder.recordSingleOperation();
Timer.Context timer = consumerLatencyTimer.time();
CompletableFuture<MessageSendingResult> response = async.within(
messageSender.send(message),
Duration.ofMillis(asyncTimeoutMs + requestTimeoutMs)
);
CompletableFuture<MessageSendingResult> response = messageSender.send(message);

response.thenAcceptAsync(new ResponseHandlingListener(message, timer), deliveryReportingExecutor)
.exceptionally(e -> {
logger.error(
Expand All @@ -142,6 +138,21 @@ private void sendMessage(final Message message) {
});
}

private MessageSender messageSender(Subscription subscription) {
Integer requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout();
ResilientMessageSender resilientMessageSender = new ResilientMessageSender(
this.rateLimiter,
subscription,
this.async,
requestTimeoutMs,
this.asyncTimeoutMs
);

return this.messageSenderFactory.create(
subscription, resilientMessageSender
);
}

public void updateSubscription(Subscription newSubscription) {
boolean endpointUpdated = !this.subscription.getEndpoint().equals(newSubscription.getEndpoint());
boolean subscriptionPolicyUpdated = !Objects.equals(
Expand All @@ -157,14 +168,13 @@ public void updateSubscription(Subscription newSubscription) {
);

this.subscription = newSubscription;
this.requestTimeoutMs = newSubscription.getSerialSubscriptionPolicy().getRequestTimeout();

boolean httpClientChanged = this.subscription.isHttp2Enabled() != newSubscription.isHttp2Enabled();

if (endpointUpdated || subscriptionPolicyUpdated || endpointAddressResolverMetadataChanged
|| oAuthPolicyChanged || httpClientChanged) {
this.messageSender.stop();
this.messageSender = messageSenderFactory.create(newSubscription);
this.messageSender = messageSender(newSubscription);
}
}

Expand All @@ -175,20 +185,10 @@ private boolean willExceedTtl(Message message, long delay) {
}

private void handleFailedSending(Message message, MessageSendingResult result) {
registerResultInRateLimiter(result);
retrySending(message, result);
errorHandlers.forEach(h -> h.handleFailed(message, subscription, result));
}

private void registerResultInRateLimiter(MessageSendingResult result) {
if (result.ignoreInRateCalculation(subscription.getSerialSubscriptionPolicy().isRetryClientErrors(),
subscription.hasOAuthPolicy())) {
rateLimiter.registerSuccessfulSending();
} else {
rateLimiter.registerFailedSending();
}
}

private void retrySending(Message message, MessageSendingResult result) {
List<URI> succeededUris = result.getSucceededUris(ConsumerMessageSender.this::messageSentSucceeded);
message.incrementRetryCounter(succeededUris);
Expand Down Expand Up @@ -232,7 +232,6 @@ private void handleMessageDiscarding(Message message, MessageSendingResult resul
}

private void handleMessageSendingSuccess(Message message, MessageSendingResult result) {
rateLimiter.registerSuccessfulSending();
inflight.release();
successHandlers.forEach(h -> h.handleSuccess(message, subscription, result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import pl.allegro.tech.hermes.consumers.consumer.result.ErrorHandler;
import pl.allegro.tech.hermes.consumers.consumer.result.SuccessHandler;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

Expand All @@ -26,15 +25,15 @@ public class ConsumerMessageSenderFactory {
private final String kafkaClusterName;
private final MessageSenderFactory messageSenderFactory;
private final Trackers trackers;
private final FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeout;
private final FutureAsyncTimeout futureAsyncTimeout;
private final UndeliveredMessageLog undeliveredMessageLog;
private final Clock clock;
private final ConsumerAuthorizationHandler consumerAuthorizationHandler;
private final ExecutorService rateLimiterReportingExecutor;
private final int senderAsyncTimeoutMs;

public ConsumerMessageSenderFactory(String kafkaClusterName, MessageSenderFactory messageSenderFactory,
Trackers trackers, FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeout,
Trackers trackers, FutureAsyncTimeout futureAsyncTimeout,
UndeliveredMessageLog undeliveredMessageLog, Clock clock,
InstrumentedExecutorServiceFactory instrumentedExecutorServiceFactory,
ConsumerAuthorizationHandler consumerAuthorizationHandler,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package pl.allegro.tech.hermes.consumers.consumer;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

public class ResilientMessageSender {
private final ConsumerRateLimiter rateLimiter;
private final List<Predicate<MessageSendingResult>> ignore;
private final FutureAsyncTimeout async;
private final int requestTimeoutMs;
private final int asyncTimeoutMs;

public ResilientMessageSender(ConsumerRateLimiter rateLimiter,
Subscription subscription,
FutureAsyncTimeout async,
int requestTimeoutMs,
int asyncTimeoutMs) {
this.rateLimiter = rateLimiter;
this.ignore = ignorableErrors(subscription);
this.async = async;
this.requestTimeoutMs = requestTimeoutMs;
this.asyncTimeoutMs = asyncTimeoutMs;
}

private static List<Predicate<MessageSendingResult>> ignorableErrors(Subscription subscription) {
Predicate<MessageSendingResult> ignore =
result -> result.ignoreInRateCalculation(
subscription.getSerialSubscriptionPolicy().isRetryClientErrors(),
subscription.hasOAuthPolicy()
);
return Collections.singletonList(ignore);
}

public <T extends MessageSendingResult> CompletableFuture<T> send(
Consumer<CompletableFuture<T>> resultFutureConsumer,
Function<Throwable, T> exceptionMapper
) {
try {
rateLimiter.acquire();
CompletableFuture<T> resultFuture = new CompletableFuture<>();
resultFutureConsumer.accept(resultFuture);
CompletableFuture<T> timeoutGuardedResultFuture = async.within(
resultFuture,
Duration.ofMillis(asyncTimeoutMs + requestTimeoutMs),
exceptionMapper);
return withCompletionHandle(timeoutGuardedResultFuture, exceptionMapper);
} catch (Exception e) {
rateLimiter.registerFailedSending();
return CompletableFuture.completedFuture(exceptionMapper.apply(e));
}
}

private <T extends MessageSendingResult> CompletableFuture<T> withCompletionHandle(
CompletableFuture<T> future,
Function<Throwable, T> exceptionMapper
) {
return future.handle((result, throwable) -> {
if (throwable != null) {
rateLimiter.registerFailedSending();
return exceptionMapper.apply(throwable);
} else {
if (result.succeeded()) {
rateLimiter.registerSuccessfulSending();
} else {
registerResultInRateLimiter(result);
}
return result;
}
});
}

private void registerResultInRateLimiter(MessageSendingResult result) {
if (ignore.stream().anyMatch(p -> p.test(result))) {
rateLimiter.registerSuccessfulSending();
} else {
rateLimiter.registerFailedSending();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,9 @@

import java.util.concurrent.CompletableFuture;

public abstract class CompletableFutureAwareMessageSender implements MessageSender {
public interface CompletableFutureAwareMessageSender {

@Override
public CompletableFuture<MessageSendingResult> send(Message message) {
try {
CompletableFuture<MessageSendingResult> resultFuture = new CompletableFuture<>();
sendMessage(message, resultFuture);
return resultFuture;
} catch (Exception e) {
return CompletableFuture.completedFuture(MessageSendingResult.failedResult(e));
}
}

protected abstract void sendMessage(Message message, CompletableFuture<MessageSendingResult> resultFuture);
void send(Message message, CompletableFuture<MessageSendingResult> resultFuture);

void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.exception.EndpointProtocolNotSupportedException;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender;

import java.util.HashMap;
import java.util.List;
Expand All @@ -21,14 +22,14 @@ public MessageSenderFactory(List<ProtocolMessageSenderProvider> providers) {
}
}

public MessageSender create(Subscription subscription) {
public MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender) {
EndpointAddress endpoint = subscription.getEndpoint();

ProtocolMessageSenderProvider provider = protocolProviders.get(endpoint.getProtocol());
if (provider == null) {
throw new EndpointProtocolNotSupportedException(endpoint);
}
return provider.create(subscription);
return provider.create(subscription, resilientMessageSender);
}

private void addSupportedProtocol(String protocol, ProtocolMessageSenderProvider provider) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package pl.allegro.tech.hermes.consumers.consumer.sender;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender;

import java.util.Set;

public interface ProtocolMessageSenderProvider {

MessageSender create(Subscription subscription);
MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender);

Set<String> getSupportedProtocols();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pl.allegro.tech.hermes.consumers.consumer.sender;

import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class SingleRecipientMessageSenderAdapter implements MessageSender {
private final CompletableFutureAwareMessageSender adaptee;
private final Function<Throwable, MessageSendingResult> exceptionMapper = MessageSendingResult::failedResult;

private final ResilientMessageSender resilientMessageSender;

public SingleRecipientMessageSenderAdapter(CompletableFutureAwareMessageSender adaptee, ResilientMessageSender resilientMessageSender) {
this.resilientMessageSender = resilientMessageSender;
this.adaptee = adaptee;
}

@Override
public CompletableFuture<MessageSendingResult> send(Message message) {
return resilientMessageSender.send(
resultFuture -> adaptee.send(message, resultFuture),
exceptionMapper
);
}

@Override
public void stop() {
adaptee.stop();
}
}
Loading

0 comments on commit 4f94d9e

Please sign in to comment.