Skip to content

Commit ee215fc

Browse files
authored
Add timeout to cleaning up sender/receiver links. (Azure#23381)
* Add logging for Handler and do not fail fast when complete cannot be emitted on close. Updating tests. * Add documentation to LinkHandler. Add null check. Explicitly call super.close(). * Do not call super.close() when close is called. We set the endpoint state to close, but leave the base LinkHandler to complete getEndpointStates(). Adding documentation. * Adding tests for LinkHandler. * Being explicit about calling handler.super(). Doing NPE checks. * Updating log messages to say how many pending sends we are disposing of. * Add timeout with close when disposing of RequestResponseChannel. * Adding verbose logs to ReactorConnection. * Adding SendLinkHandler test and ReceiveLinkHandlerTest to take care of localOpen and closes. * Fixing break in ReactorReceiverTest * Adding to ReactorConnectionTest. * Adding subscribeOn. * Emitting complete endpoints signal. * Adding tests for close. * Add CHANGELOG entry.
1 parent c6e9104 commit ee215fc

File tree

15 files changed

+903
-76
lines changed

15 files changed

+903
-76
lines changed

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
- Fixed a bug where SendTimeout-timer thread was not being disposed of resulting in lingering
88
threads when a send link was remotely closed.
9+
- Fixed a bug where ReactorConnection waited indefinitely for CBS node to complete closing. The underlying problem is that the RequestResponseChannel's sender and receiver links were not active, so they would wait forever for a remote close signal.
910

1011
## 2.3.0 (2021-07-01)
1112

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -431,23 +431,33 @@ Mono<Void> closeAsync(AmqpShutdownSignal shutdownSignal) {
431431
Flux.fromStream(managementNodes.values().stream()).flatMap(node -> node.closeAsync()));
432432

433433
final Mono<Void> closeReactor = Mono.fromRunnable(() -> {
434+
logger.verbose("connectionId[{}] Scheduling closeConnection work.", connectionId);
434435
final ReactorDispatcher dispatcher = reactorProvider.getReactorDispatcher();
435436

436-
try {
437-
if (dispatcher != null) {
438-
dispatcher.invoke(this::closeConnectionWork);
439-
} else {
437+
if (dispatcher != null) {
438+
try {
439+
dispatcher.invoke(() -> closeConnectionWork());
440+
} catch (IOException | RejectedExecutionException e) {
441+
logger.warning("connectionId[{}] Error while scheduling closeConnection work. Manually disposing.",
442+
connectionId, e);
440443
closeConnectionWork();
441444
}
442-
} catch (IOException | RejectedExecutionException e) {
443-
logger.warning("connectionId[{}] Error while scheduling closeConnection work. Manually disposing.",
444-
connectionId, e);
445+
} else {
445446
closeConnectionWork();
446447
}
447448
});
448449

449-
return Mono.whenDelayError(cbsCloseOperation, managementNodeCloseOperations)
450-
.then(closeReactor)
450+
return Mono.whenDelayError(
451+
cbsCloseOperation.doFinally(signalType ->
452+
logger.verbose("connectionId[{}] signalType[{}] Closed CBS node.", connectionId, signalType)),
453+
managementNodeCloseOperations.doFinally(signalType ->
454+
logger.verbose("connectionId[{}] signalType[{}] Closed management nodes.", connectionId,
455+
signalType)))
456+
457+
458+
.then(closeReactor.doFinally(signalType ->
459+
logger.verbose("connectionId[{}] signalType[{}] Closed reactor dispatcher.", connectionId,
460+
signalType)))
451461
.then(isClosedMono.asMono());
452462
}
453463

@@ -469,8 +479,9 @@ private synchronized void closeConnectionWork() {
469479
final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
470480
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));
471481

472-
final Mono<Void> closedExecutor = executor != null ? Mono.defer(() -> {
482+
final Mono<Void> closedExecutor = executor != null ? Mono.defer(() -> {
473483
synchronized (this) {
484+
logger.info("connectionId[{}] Closing executor.", connectionId);
474485
return executor.closeAsync();
475486
}
476487
}) : Mono.empty();
@@ -535,7 +546,7 @@ private synchronized Connection getOrCreateConnection() throws IOException {
535546

536547
// To avoid inconsistent synchronization of executor, we set this field with the closeAsync method.
537548
// It will not be kicked off until subscribed to.
538-
final Mono<Void> executorCloseMono = Mono.defer(() -> {
549+
final Mono<Void> executorCloseMono = Mono.defer(() -> {
539550
synchronized (this) {
540551
return executor.closeAsync();
541552
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -651,13 +651,13 @@ private void completeClose() {
651651
* @param error Error to pass to pending sends.
652652
*/
653653
private void handleError(Throwable error) {
654-
final String logMessage = isDisposed.getAndSet(true)
655-
? "This was already disposed. Dropping error."
656-
: "Disposing pending sends with error.";
657-
logger.verbose("connectionId[{}] entityPath[{}] linkName[{}] {}", handler.getConnectionId(), entityPath,
658-
getLinkName(), logMessage, error);
659-
660654
synchronized (pendingSendLock) {
655+
final String logMessage = isDisposed.getAndSet(true)
656+
? "This was already disposed. Dropping error."
657+
: String.format("Disposing of %s pending sends with error.", pendingSendsMap.size());
658+
logger.verbose("connectionId[{}] entityPath[{}] linkName[{}] {}", handler.getConnectionId(), entityPath,
659+
getLinkName(), logMessage, error);
660+
661661
pendingSendsMap.forEach((key, value) -> value.error(error));
662662
pendingSendsMap.clear();
663663
pendingSendsQueue.clear();
@@ -667,17 +667,18 @@ private void handleError(Throwable error) {
667667
}
668668

669669
private void handleClose() {
670-
final String logMessage = isDisposed.getAndSet(true)
671-
? "This was already disposed."
672-
: "Disposing pending sends.";
673-
logger.verbose("connectionId[{}] entityPath[{}] linkName[{}] {}", handler.getConnectionId(), entityPath,
674-
getLinkName(), logMessage);
675-
676670
final String message = String.format("Could not complete sends because link '%s' for '%s' is closed.",
677671
getLinkName(), entityPath);
678672
final AmqpErrorContext context = handler.getErrorContext(sender);
679673

680674
synchronized (pendingSendLock) {
675+
final String logMessage = isDisposed.getAndSet(true)
676+
? "This was already disposed."
677+
: String.format("Disposing of '%s' pending sends.", pendingSendsMap.size());
678+
679+
logger.verbose("connectionId[{}] entityPath[{}] linkName[{}] {}", handler.getConnectionId(), entityPath,
680+
getLinkName(), logMessage);
681+
681682
pendingSendsMap.forEach((key, value) -> value.error(new AmqpException(true, message, context)));
682683
pendingSendsMap.clear();
683684
pendingSendsQueue.clear();

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.UUID;
4141
import java.util.concurrent.ConcurrentSkipListMap;
4242
import java.util.concurrent.RejectedExecutionException;
43+
import java.util.concurrent.TimeoutException;
4344
import java.util.concurrent.atomic.AtomicBoolean;
4445
import java.util.concurrent.atomic.AtomicInteger;
4546
import java.util.concurrent.atomic.AtomicLong;
@@ -170,6 +171,8 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
170171
onTerminalState("SendLinkHandler");
171172
}),
172173

174+
//TODO (conniey): Do we need this if we already close the request response nodes when the
175+
// connection.closeWork is executed? It would be preferred to get rid of this circular dependency.
173176
amqpConnection.getShutdownSignals().next().flatMap(signal -> {
174177
logger.verbose("connectionId[{}] linkName[{}]: Shutdown signal received.", connectionId, linkName);
175178
return closeAsync();
@@ -201,15 +204,33 @@ public Flux<AmqpEndpointState> getEndpointStates() {
201204

202205
@Override
203206
public Mono<Void> closeAsync() {
207+
final Mono<Void> closeOperationWithTimeout = closeMono.asMono()
208+
.timeout(retryOptions.getTryTimeout())
209+
.onErrorResume(TimeoutException.class, error -> {
210+
return Mono.fromRunnable(() -> {
211+
logger.info("connectionId[{}] linkName[{}] Timed out waiting for RequestResponseChannel to complete"
212+
+ "closing. Manually closing.",
213+
connectionId, linkName, error);
214+
215+
onTerminalState("SendLinkHandler");
216+
onTerminalState("ReceiveLinkHandler");
217+
});
218+
})
219+
.subscribeOn(Schedulers.boundedElastic());
220+
204221
if (isDisposed.getAndSet(true)) {
205-
return closeMono.asMono().subscribeOn(Schedulers.boundedElastic());
222+
logger.verbose("connectionId[{}] linkName[{}] Channel already closed.", connectionId, linkName);
223+
return closeOperationWithTimeout;
206224
}
207225

208226
logger.verbose("connectionId[{}] linkName[{}] Closing request/response channel.", connectionId, linkName);
209227

210228
return Mono.fromRunnable(() -> {
211229
try {
212230
provider.getReactorDispatcher().invoke(() -> {
231+
logger.verbose("connectionId[{}] linkName[{}] Closing send link and receive link.",
232+
connectionId, linkName);
233+
213234
sendLink.close();
214235
receiveLink.close();
215236
});
@@ -219,7 +240,7 @@ public Mono<Void> closeAsync() {
219240
sendLink.close();
220241
receiveLink.close();
221242
}
222-
}).publishOn(Schedulers.boundedElastic()).then(closeMono.asMono());
243+
}).subscribeOn(Schedulers.boundedElastic()).then(closeOperationWithTimeout);
223244
}
224245

225246
public boolean isDisposed() {
@@ -365,12 +386,21 @@ private void handleError(Throwable error, String message) {
365386
}
366387

367388
private void onTerminalState(String handlerName) {
389+
if (pendingDisposes.get() == 0) {
390+
logger.verbose("connectionId[{}] linkName[{}]: Already disposed send/receive links.");
391+
return;
392+
}
393+
368394
final int remaining = pendingDisposes.decrementAndGet();
369395
logger.verbose("connectionId[{}] linkName[{}]: {} disposed. Remaining: {}",
370396
connectionId, linkName, handlerName, remaining);
371397

372398
if (remaining == 0) {
373399
subscriptions.dispose();
400+
401+
endpointStates.emitComplete(((signalType, emitResult) -> onEmitSinkFailure(signalType, emitResult,
402+
"Could not emit complete signal.")));
403+
374404
closeMono.emitEmpty((signalType, emitResult) -> onEmitSinkFailure(signalType, emitResult,
375405
handlerName + ". Error closing mono."));
376406
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ public abstract class Handler extends BaseHandler implements Closeable {
3434
* messages are brokered through an intermediary.
3535
* @param logger Logger to use for messages.
3636
*
37-
* @throws NullPointerException if {@code connectionId} or {@code hostname} is null.
37+
* @throws NullPointerException if {@code connectionId}, {@code hostname}, or {@code logger} is null.
3838
*/
3939
Handler(final String connectionId, final String hostname, ClientLogger logger) {
4040
this.connectionId = Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
4141
this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null.");
42-
this.logger = logger;
42+
this.logger = Objects.requireNonNull(logger, "'logger' cannot be null.");
4343
}
4444

4545
/**
@@ -68,11 +68,14 @@ public String getHostname() {
6868
* @return The endpoint states of the handler.
6969
*/
7070
public Flux<EndpointState> getEndpointStates() {
71+
// In previous incarnations, we used .distinct(). It hashed all the previous values and would only push values
72+
// that were not seen yet. What we want is only to push endpoint states that are unique from the previous one.
7173
return endpointStates.asFlux().distinctUntilChanged();
7274
}
7375

7476
/**
75-
* Emits the next endpoint. If the previous endpoint was emitted, it is skipped.
77+
* Emits the next endpoint. If the previous endpoint was emitted, it is skipped. If the handler is closed, the
78+
* endpoint state is not emitted.
7679
*
7780
* @param state The next endpoint state to emit.
7881
*/
@@ -117,7 +120,14 @@ public void close() {
117120
return;
118121
}
119122

120-
endpointStates.emitNext(EndpointState.CLOSED, Sinks.EmitFailureHandler.FAIL_FAST);
123+
// This is fine in the case that someone called onNext(EndpointState.CLOSED) and then called handler.close().
124+
// We want to ensure that the next endpoint subscriber does not believe the handler is alive still.
125+
endpointStates.emitNext(EndpointState.CLOSED, (signalType, emitResult) -> {
126+
logger.info("connectionId[{}] signal[{}] result[{}] Could not emit closed endpoint state.", connectionId,
127+
signalType, emitResult);
128+
129+
return false;
130+
});
121131

122132
endpointStates.emitComplete((signalType, emitResult) -> {
123133
logger.verbose("connectionId[{}] result[{}] Could not emit complete.", connectionId, emitResult);

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,36 @@
1212
import org.apache.qpid.proton.engine.Event;
1313
import org.apache.qpid.proton.engine.Link;
1414

15+
import java.util.Objects;
16+
1517
import static com.azure.core.amqp.implementation.AmqpErrorCode.TRACKING_ID_PROPERTY;
1618
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
1719

20+
/**
21+
* Base class for AMQP links.
22+
*
23+
* @see SendLinkHandler
24+
* @see ReceiveLinkHandler
25+
*/
1826
abstract class LinkHandler extends Handler {
1927
private final String entityPath;
2028

29+
/**
30+
* Creates an instance with the parameters.
31+
*
32+
* @param connectionId Identifier for the connection.
33+
* @param hostname Hostname of the connection. This could be the DNS hostname or the IP address of the
34+
* connection. Usually of the form {@literal "<your-namespace>.service.windows.net"} but can change if the
35+
* messages are brokered through an intermediary.
36+
* @param entityPath The address within the message broker for this link.
37+
* @param logger Logger to use for messages.
38+
*
39+
* @throws NullPointerException if {@code connectionId}, {@code hostname}, {@code entityPath}, or {@code logger} is
40+
* null.
41+
*/
2142
LinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) {
2243
super(connectionId, hostname, logger);
23-
this.entityPath = entityPath;
44+
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
2445
}
2546

2647
@Override
@@ -51,7 +72,10 @@ public void onLinkFinal(Event event) {
5172
? event.getLink().getName()
5273
: NOT_APPLICABLE;
5374
logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), linkName);
54-
close();
75+
76+
// Be explicit about wanting to call Handler.close(). When we receive onLinkFinal, the service and proton-j are
77+
// releasing this link. So we want to complete the endpoint states.
78+
super.close();
5579
}
5680

5781
public AmqpErrorContext getErrorContext(Link link) {
@@ -88,7 +112,7 @@ eventName, getConnectionId(), link.getName(),
88112

89113
onError(exception);
90114
} else {
91-
close();
115+
super.close();
92116
}
93117
}
94118
}

0 commit comments

Comments
 (0)