Skip to content

Commit 01dd7df

Browse files
authored
Fixes OOM when receiving from Event Hubs (Azure#20832)
* Add check for messageQueue.size() * Fixing message format. * Adding more credits if there are none left on the link. * Enable disabled parallel test. * Consolidating add credits logic. * Adding test that we add credits correctly on new requests. * Add documentation to link processor.
1 parent 1559b99 commit 01dd7df

File tree

5 files changed

+264
-111
lines changed

5 files changed

+264
-111
lines changed

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java

Lines changed: 104 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3232
import java.util.concurrent.atomic.AtomicReference;
3333
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
34+
import java.util.function.Supplier;
3435

3536
/**
3637
* Processes AMQP receive links into a stream of AMQP messages.
@@ -41,6 +42,7 @@ public class AmqpReceiveLinkProcessor extends FluxProcessor<AmqpReceiveLink, Mes
4142
private final AtomicBoolean isTerminated = new AtomicBoolean();
4243
private final AtomicInteger retryAttempts = new AtomicInteger();
4344
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
45+
private final AtomicBoolean linkHasNoCredits = new AtomicBoolean();
4446
private final Object creditsAdded = new Object();
4547

4648
private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference<>();
@@ -163,28 +165,48 @@ public void onNext(AmqpReceiveLink next) {
163165
currentLink = next;
164166
currentLinkName = next.getLinkName();
165167

166-
// For a new link, add the prefetch as credits.
167-
next.setEmptyCreditListener(this::getCreditsToAdd);
168+
// Empty credit listener is invoked when there are no credits left on the underlying link.
169+
next.setEmptyCreditListener(() -> {
170+
final int credits;
171+
synchronized (creditsAdded) {
172+
credits = getCreditsToAdd();
173+
174+
// This means that considering the downstream request and current size of the message queue, we
175+
// have enough messages to satisfy them.
176+
// Thus, there are no credits on the link AND we are not going to add anymore.
177+
// We'll wait until the next time downstream calls request(long) to get more events.
178+
if (credits < 1) {
179+
linkHasNoCredits.compareAndSet(false, true);
180+
} else {
181+
logger.info("linkName[{}] entityPath[{}] credits[{}] Link is empty. Adding more credits.",
182+
linkName, entityPath, credits);
183+
}
184+
}
185+
186+
return credits;
187+
});
168188

169189
currentLinkSubscriptions = Disposables.composite(
190+
// For a new link, add the prefetch as credits.
170191
next.getEndpointStates().filter(e -> e == AmqpEndpointState.ACTIVE).next()
171192
.flatMap(state -> {
172193
// If there was already a subscriber downstream who made a request, see if that is more than
173194
// the prefetch. If it is, then add the difference. (ie. if they requested 500, but our
174195
// prefetch is 100, we'll add 500 credits rather than 100.
175-
final int creditsToAdd = getCreditsToAdd();
176-
final int total = Math.max(prefetch, creditsToAdd);
177-
178-
logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.",
179-
linkName, prefetch, creditsToAdd);
196+
final Mono<Void> operation;
197+
synchronized (creditsAdded) {
198+
final int creditsToAdd = getCreditsToAdd();
199+
final int total = Math.max(prefetch, creditsToAdd);
200+
201+
logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.",
202+
linkName, prefetch, creditsToAdd);
203+
operation = next.addCredits(total);
204+
}
180205

181-
return next.addCredits(total);
182-
})
183-
.onErrorResume(IllegalStateException.class, error -> {
184-
logger.info("linkName[{}] was already closed. Could not add credits.", linkName);
185-
return Mono.empty();
206+
return operation;
186207
})
187-
.subscribe(),
208+
.subscribe(noop -> {
209+
}, error -> logger.info("linkName[{}] was already closed. Could not add credits.", linkName)),
188210
next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe(
189211
state -> {
190212
// Connection was successfully opened, we can reset the retry interval.
@@ -356,26 +378,7 @@ public void request(long request) {
356378

357379
Operators.addCap(REQUESTED, this, request);
358380

359-
synchronized (creditsAdded) {
360-
final AmqpReceiveLink link = currentLink;
361-
final int credits = getCreditsToAdd();
362-
363-
logger.verbose("linkName[{}] entityPath[{}] request[{}] credits[{}] Backpressure request from downstream.",
364-
currentLinkName, entityPath, request, credits);
365-
366-
if (link != null) {
367-
link.addCredits(credits)
368-
.onErrorResume(IllegalStateException.class, error -> {
369-
logger.info("linkName[{}] was already closed. Could not add credits.", link.getLinkName());
370-
return Mono.empty();
371-
})
372-
.subscribe();
373-
} else {
374-
logger.verbose("entityPath[{}] credits[{}] totalRequest[{}] totalSent[{}] totalCredits[{}] "
375-
+ "There is no link to add credits to, yet.", entityPath, credits);
376-
}
377-
}
378-
381+
addCreditsToLink("Backpressure request from downstream. Request: " + request);
379382
drain();
380383
}
381384

@@ -482,7 +485,8 @@ private void drainQueue() {
482485
try {
483486
subscriber.onNext(message);
484487
} catch (Exception e) {
485-
logger.error("Exception occurred while handling downstream onNext operation.", e);
488+
logger.error("linkName[{}] entityPath[{}] Exception occurred while handling downstream onNext "
489+
+ "operation.", currentLinkName, entityPath, e);
486490
throw logger.logExceptionAsError(Exceptions.propagate(
487491
Operators.onOperatorError(upstream, e, message, subscriber.currentContext())));
488492
}
@@ -496,6 +500,10 @@ private void drainQueue() {
496500
numberRequested = REQUESTED.addAndGet(this, -numberEmitted);
497501
}
498502
}
503+
504+
if (numberRequested > 0L && isEmpty) {
505+
addCreditsToLink("Adding more credits in drain loop.");
506+
}
499507
}
500508

501509
private boolean checkAndSetTerminated() {
@@ -517,22 +525,72 @@ private boolean checkAndSetTerminated() {
517525
return true;
518526
}
519527

520-
private int getCreditsToAdd() {
528+
/**
529+
* Consolidates all credits calculation when checking to see if more should be added. This is invoked in
530+
* {@link #drainQueue()} and {@link #request(long)}.
531+
*
532+
* Calculates if there are enough credits to satisfy the downstream subscriber. If there is not AND the link has no
533+
* more credits, we will add them onto the link.
534+
*
535+
* In the case that the link has some credits, but _not_ enough to satisfy the request, when the link is empty, it
536+
* will call {@link AmqpReceiveLink#setEmptyCreditListener(Supplier)} to get how much is remaining.
537+
*
538+
* @param message Additional message for context.
539+
*/
540+
private void addCreditsToLink(String message) {
521541
synchronized (creditsAdded) {
522-
final CoreSubscriber<? super Message> subscriber = downstream.get();
523-
final long request = REQUESTED.get(this);
524-
525-
final int credits;
526-
if (subscriber == null || request == 0) {
527-
credits = 0;
528-
} else if (request == Long.MAX_VALUE) {
529-
credits = 1;
530-
} else {
531-
credits = Long.valueOf(request).intValue();
542+
final AmqpReceiveLink link = currentLink;
543+
final int credits = getCreditsToAdd();
544+
545+
if (link == null) {
546+
logger.verbose("entityPath[{}] creditsToAdd[{}] There is no link to add credits to.",
547+
entityPath, credits);
548+
return;
549+
}
550+
551+
final String linkName = link.getLinkName();
552+
553+
if (credits < 1) {
554+
logger.verbose("linkName[{}] entityPath[{}] creditsToAdd[{}] There are no additional credits to add.",
555+
linkName, entityPath, credits);
556+
return;
557+
}
558+
559+
if (linkHasNoCredits.compareAndSet(true, false)) {
560+
logger.info("linkName[{}] entityPath[{}] creditsToAdd[{}] There are no more credits on link."
561+
+ " Adding more. {}", linkName, entityPath, credits, message);
562+
563+
link.addCredits(credits).subscribe(noop -> {
564+
}, error -> {
565+
logger.info("linkName[{}] entityPath[{}] was already closed. Could not add credits.",
566+
linkName, entityPath);
567+
linkHasNoCredits.compareAndSet(false, true);
568+
});
532569
}
570+
}
571+
}
533572

534-
return credits;
573+
/**
574+
* Gets the number of credits to add based on {@link #requested} and how many messages are still in queue.
575+
* If {@link #requested} is {@link Long#MAX_VALUE}, then we add credits 1 by 1. Similar to Track 1's behaviour.
576+
*
577+
* @return The number of credits to add.
578+
*/
579+
private int getCreditsToAdd() {
580+
final CoreSubscriber<? super Message> subscriber = downstream.get();
581+
final long request = REQUESTED.get(this);
582+
583+
final int credits;
584+
if (subscriber == null || request == 0) {
585+
credits = 0;
586+
} else if (request == Long.MAX_VALUE) {
587+
credits = 1;
588+
} else {
589+
final int remaining = Long.valueOf(request).intValue() - messageQueue.size();
590+
credits = Math.max(remaining, 0);
535591
}
592+
593+
return credits;
536594
}
537595

538596
private void disposeReceiver(AmqpReceiveLink link) {

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.azure.core.util.logging.ClientLogger;
88
import com.azure.messaging.eventhubs.models.EventPosition;
99
import org.junit.jupiter.api.Assertions;
10-
import org.junit.jupiter.api.Disabled;
1110
import org.junit.jupiter.api.Tag;
1211
import org.junit.jupiter.api.Test;
1312
import org.junit.jupiter.params.ParameterizedTest;
@@ -75,7 +74,6 @@ void receiveMessage(AmqpTransportType transportType) {
7574
*/
7675
@ParameterizedTest
7776
@EnumSource(value = AmqpTransportType.class)
78-
@Disabled("Works part of the time: https://github.com/Azure/azure-sdk-for-java/issues/9659")
7977
void parallelEventHubClients(AmqpTransportType transportType) throws InterruptedException {
8078
// Arrange
8179
final int numberOfClients = 3;

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,50 @@ void canReceiveWithBackpressure() {
596596
}
597597
}
598598

599+
/**
600+
* Verify that when we specify a small prefetch, it continues to fetch items.
601+
*/
602+
@Test
603+
void receivesWithSmallPrefetch() {
604+
// Arrange
605+
final String secondPartitionId = "2";
606+
final AtomicBoolean isActive = new AtomicBoolean(true);
607+
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
608+
final Disposable producerEvents = getEvents(isActive)
609+
.flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId)))
610+
.subscribe(
611+
sent -> {
612+
},
613+
error -> logger.error("Error sending event", error),
614+
() -> logger.info("Event sent."));
615+
616+
final int prefetch = 5;
617+
final int backpressure = 3;
618+
final int batchSize = 10;
619+
final EventHubConsumerAsyncClient consumer = builder
620+
.prefetchCount(prefetch)
621+
.buildAsyncConsumerClient();
622+
623+
// Act & Assert
624+
try {
625+
StepVerifier.create(consumer.receiveFromPartition(secondPartitionId, EventPosition.latest()), prefetch)
626+
.expectNextCount(prefetch)
627+
.thenRequest(backpressure)
628+
.expectNextCount(backpressure)
629+
.thenRequest(batchSize)
630+
.expectNextCount(batchSize)
631+
.thenRequest(batchSize)
632+
.expectNextCount(batchSize)
633+
.thenAwait(Duration.ofSeconds(1))
634+
.thenCancel()
635+
.verify(TIMEOUT);
636+
} finally {
637+
isActive.set(false);
638+
producerEvents.dispose();
639+
dispose(producer, consumer);
640+
}
641+
}
642+
599643
private static void assertPartitionEvent(PartitionEvent event, String eventHubName, Set<Integer> allPartitions,
600644
Set<Integer> expectedPartitions) {
601645
final PartitionContext context = event.getPartitionContext();

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
88
import com.azure.messaging.eventhubs.models.EventPosition;
99
import org.junit.jupiter.api.Assertions;
10-
import org.junit.jupiter.api.Disabled;
1110
import org.junit.jupiter.api.Tag;
1211
import org.junit.jupiter.api.Test;
1312
import reactor.test.StepVerifier;
@@ -23,8 +22,6 @@
2322
/**
2423
* Verifies we can use various prefetch options with {@link EventHubConsumerAsyncClient}.
2524
*/
26-
@Disabled("Set prefetch tests do not work because they try to send very large number of events at once."
27-
+ "https://github.com/Azure/azure-sdk-for-java/issues/9659")
2825
@Tag(TestUtils.INTEGRATION)
2926
class SetPrefetchCountTest extends IntegrationTestBase {
3027
private static final String PARTITION_ID = "3";

0 commit comments

Comments
 (0)