Skip to content

Commit a05fd90

Browse files
authored
[Service Bus Client] Detect Closed Shared Connection (Azure#20541)
The focus of these changes is to guard against a shared connection being closed when the `ServiceBusClient` that spawned a sender/receiver/processor is. Previously, the sender and receiver did not validate the connection status and would trigger exception behavior in the AMQP library, where the type of exception could vary depending on the library's internal state. This scenario will now trigger a consistent `ObjectDisposedException` with an error message stating that the connection was closed. The processor would trigger the AMQP exception and fail to recognize it as fatal; signaling the customer error handler and continuing to retry forever, though it cannot recover. The exception surfaced did not provide customers enough context to differentiate between the fatal scenario and one that was recoverable. The processor will now detect the connection being closed and interpret that as fatal, signaling the customer exception handler with an `ObjectDisposedException` with explicit message indicating an unrecoverable scenario and will then stop processing.
1 parent 9ff47f0 commit a05fd90

22 files changed

+1141
-66
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Administration/ServiceBusAdministrationClient.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System;
55
using System.Globalization;
6-
using System.IO;
76
using System.Threading;
87
using System.Threading.Tasks;
98
using Azure.Core;

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ internal class AmqpConnectionScope : TransportConnectionScope
3737
/// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
3838
private const string WebSocketsUriScheme = "wss";
3939

40+
/// <summary>Indicates whether or not this instance has been disposed.</summary>
41+
private volatile bool _disposed;
42+
4043
/// <summary>
4144
/// The version of AMQP to use within the scope.
4245
/// </summary>
@@ -83,7 +86,12 @@ internal class AmqpConnectionScope : TransportConnectionScope
8386
/// </summary>
8487
///
8588
/// <value><c>true</c> if disposed; otherwise, <c>false</c>.</value>
86-
public override bool IsDisposed { get; protected set; }
89+
///
90+
public override bool IsDisposed
91+
{
92+
get => _disposed;
93+
protected set => _disposed = value;
94+
}
8795

8896
/// <summary>
8997
/// The cancellation token to use with operations initiated by the scope.
@@ -254,6 +262,7 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
254262
ServiceBusEventSource.Log.CreateManagementLinkStart(identifier);
255263
try
256264
{
265+
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
257266
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
258267

259268
var stopWatch = ValueStopwatch.StartNew();
@@ -303,6 +312,7 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
303312
bool isSessionReceiver,
304313
CancellationToken cancellationToken)
305314
{
315+
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
306316
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
307317

308318
var stopWatch = ValueStopwatch.StartNew();
@@ -347,6 +357,7 @@ public virtual async Task<SendingAmqpLink> OpenSenderLinkAsync(
347357
TimeSpan timeout,
348358
CancellationToken cancellationToken)
349359
{
360+
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
350361
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
351362
var stopWatch = ValueStopwatch.StartNew();
352363

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ internal ServiceBusEventSource() { }
190190
internal const int RequestAuthorizationCompleteEvent = 106;
191191
internal const int RequestAuthorizationExceptionEvent = 107;
192192

193+
internal const int ProcessorClientClosedExceptionEvent = 108;
194+
193195
#endregion
194196
// add new event numbers here incrementing from previous
195197

@@ -778,6 +780,15 @@ public void ProcessorMessageHandlerException(string identifier, long sequenceNum
778780
}
779781
}
780782

783+
[Event(ProcessorClientClosedExceptionEvent, Level = EventLevel.Error, Message = "{0}: The Service Bus client associated with the processor was closed by the host application. The processor cannot continue and is shutting down.")]
784+
public void ProcessorClientClosedException(string identifier)
785+
{
786+
if (IsEnabled())
787+
{
788+
WriteEvent(ProcessorClientClosedExceptionEvent, identifier);
789+
}
790+
}
791+
781792
#endregion region
782793

783794
#region Rule management

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public virtual async Task ReceiveAndProcessMessagesAsync(CancellationToken cance
7777
try
7878
{
7979
// loop within the context of this thread
80-
while (!cancellationToken.IsCancellationRequested)
80+
while (!cancellationToken.IsCancellationRequested && !Processor.Connection.IsClosed)
8181
{
8282
errorSource = ServiceBusErrorSource.Receive;
8383
ServiceBusReceivedMessage message = await Receiver.ReceiveMessageAsync(

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs

100755100644
Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ internal event Func<ProcessSessionEventArgs, Task> SessionClosingAsync
434434

435435
/// <summary>
436436
/// Invokes the process message event handler after a message has been received.
437-
/// This method can be overriden to raise an event manually for testing purposes.
437+
/// This method can be overridden to raise an event manually for testing purposes.
438438
/// </summary>
439439
/// <param name="args">The event args containing information related to the message.</param>
440440
protected internal virtual async Task OnProcessMessageAsync(ProcessMessageEventArgs args)
@@ -443,8 +443,8 @@ protected internal virtual async Task OnProcessMessageAsync(ProcessMessageEventA
443443
}
444444

445445
/// <summary>
446-
/// Invokes the error event handler when an error has occured during processing.
447-
/// This method can be overriden to raise an event manually for testing purposes.
446+
/// Invokes the error event handler when an error has occurred during processing.
447+
/// This method can be overridden to raise an event manually for testing purposes.
448448
/// </summary>
449449
/// <param name="args">The event args containing information related to the error.</param>
450450
protected internal async virtual Task OnProcessErrorAsync(ProcessErrorEventArgs args)
@@ -486,6 +486,7 @@ public virtual async Task StartProcessingAsync(
486486
CancellationToken cancellationToken = default)
487487
{
488488
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusProcessor));
489+
Connection.ThrowIfClosed();
489490
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
490491
bool releaseGuard = false;
491492
try
@@ -672,7 +673,7 @@ private async Task RunReceiveTaskAsync(
672673
List<Task> tasks = new List<Task>(_maxConcurrentCalls + _receiverManagers.Count);
673674
try
674675
{
675-
while (!cancellationToken.IsCancellationRequested)
676+
while (!cancellationToken.IsCancellationRequested && !Connection.IsClosed)
676677
{
677678
foreach (ReceiverManager receiverManager in _receiverManagers)
678679
{
@@ -692,6 +693,42 @@ private async Task RunReceiveTaskAsync(
692693
tasks.RemoveAll(t => t.IsCompleted);
693694
}
694695
}
696+
697+
// If the main loop is aborting due to the connection being canceled, then
698+
// force the processor to stop.
699+
if (!cancellationToken.IsCancellationRequested && Connection.IsClosed)
700+
{
701+
Logger.ProcessorClientClosedException(Identifier);
702+
703+
// Because this is a highly unusual situation
704+
// and goes against the goal of resiliency for the processor, surface
705+
// a fatal exception with an explicit message detailing that processing
706+
// will be stopped.
707+
try
708+
{
709+
await OnProcessErrorAsync(
710+
new ProcessErrorEventArgs(
711+
new ObjectDisposedException(nameof(ServiceBusConnection), Resources.DisposedConnectionMessageProcessorMustStop),
712+
ServiceBusErrorSource.Receive,
713+
FullyQualifiedNamespace,
714+
EntityPath,
715+
cancellationToken))
716+
.ConfigureAwait(false);
717+
}
718+
catch (Exception ex)
719+
{
720+
// Don't bubble up exceptions raised from customer exception handler.
721+
Logger.ProcessorErrorHandlerThrewException(ex.ToString());
722+
}
723+
724+
// This call will deadlock if awaited, as StopProcessingAsync awaits
725+
// completion of this task. The processor is already known to be in a bad
726+
// state and exceptions in StopProcessingAsync are likely and will be logged
727+
// in that call. There is little value in surfacing those expected exceptions
728+
// to the customer error handler as well; allow StopProcessingAsync to run
729+
// in a fire-and-forget manner.
730+
_ = StopProcessingAsync();
731+
}
695732
}
696733
finally
697734
{

sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs

100755100644
Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessa
252252
{
253253
Argument.AssertAtLeast(maxMessages, 1, nameof(maxMessages));
254254
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
255+
_connection.ThrowIfClosed();
256+
255257
if (maxWaitTime.HasValue)
256258
{
257259
Argument.AssertPositive(maxWaitTime.Value, nameof(maxWaitTime));
@@ -438,8 +440,9 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna
438440
int maxMessages,
439441
CancellationToken cancellationToken)
440442
{
441-
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
442443
Argument.AssertAtLeast(maxMessages, 1, nameof(maxMessages));
444+
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
445+
_connection.ThrowIfClosed();
443446
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
444447

445448
Logger.PeekMessageStart(Identifier, sequenceNumber, maxMessages);
@@ -527,6 +530,7 @@ internal virtual async Task CompleteMessageAsync(
527530
CancellationToken cancellationToken = default)
528531
{
529532
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
533+
_connection.ThrowIfClosed();
530534
ThrowIfNotPeekLockMode();
531535
ThrowIfLockTokenIsEmpty(lockToken);
532536
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
@@ -614,6 +618,7 @@ internal virtual async Task AbandonMessageAsync(
614618
CancellationToken cancellationToken = default)
615619
{
616620
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
621+
_connection.ThrowIfClosed();
617622
ThrowIfNotPeekLockMode();
618623
ThrowIfLockTokenIsEmpty(lockToken);
619624
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
@@ -798,6 +803,7 @@ private async Task DeadLetterInternalAsync(
798803
CancellationToken cancellationToken = default)
799804
{
800805
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
806+
_connection.ThrowIfClosed();
801807
ThrowIfNotPeekLockMode();
802808
ThrowIfLockTokenIsEmpty(lockToken);
803809
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
@@ -891,6 +897,7 @@ internal virtual async Task DeferMessageAsync(
891897
CancellationToken cancellationToken = default)
892898
{
893899
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
900+
_connection.ThrowIfClosed();
894901
ThrowIfNotPeekLockMode();
895902
ThrowIfLockTokenIsEmpty(lockToken);
896903
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
@@ -978,8 +985,9 @@ public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDefer
978985
IEnumerable<long> sequenceNumbers,
979986
CancellationToken cancellationToken = default)
980987
{
981-
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
982988
Argument.AssertNotNull(sequenceNumbers, nameof(sequenceNumbers));
989+
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
990+
_connection.ThrowIfClosed();
983991
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
984992

985993
// the sequence numbers MUST be in array form for them to be encoded correctly
@@ -1067,6 +1075,7 @@ internal virtual async Task<DateTimeOffset> RenewMessageLockAsync(
10671075
CancellationToken cancellationToken = default)
10681076
{
10691077
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
1078+
_connection.ThrowIfClosed();
10701079
ThrowIfNotPeekLockMode();
10711080
ThrowIfSessionReceiver();
10721081
ThrowIfLockTokenIsEmpty(lockToken);

sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs

100755100644
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ namespace Azure.Messaging.ServiceBus
2020
/// </summary>
2121
public class ServiceBusSessionReceiver : ServiceBusReceiver
2222
{
23+
/// <summary>
24+
/// The active connection to the Azure Service Bus service, enabling client communications for metadata
25+
/// about the associated Service Bus entity and access to transport-aware receivers.
26+
/// </summary>
27+
///
28+
private readonly ServiceBusConnection _connection;
29+
2330
/// <summary>
2431
/// The Session Id associated with the receiver.
2532
/// </summary>
@@ -89,6 +96,7 @@ internal ServiceBusSessionReceiver(
8996
string sessionId = default) :
9097
base(connection, entityPath, true, plugins, options?.ToReceiverOptions(), sessionId, cancellationToken)
9198
{
99+
_connection = connection;
92100
}
93101

94102
/// <summary>
@@ -111,6 +119,7 @@ protected ServiceBusSessionReceiver() : base() { }
111119
public virtual async Task<BinaryData> GetSessionStateAsync(CancellationToken cancellationToken = default)
112120
{
113121
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver));
122+
_connection.ThrowIfClosed();
114123
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
115124
Logger.GetSessionStateStart(Identifier, SessionId);
116125
using DiagnosticScope scope = ScopeFactory.CreateScope(
@@ -154,6 +163,7 @@ public virtual async Task SetSessionStateAsync(
154163
CancellationToken cancellationToken = default)
155164
{
156165
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver));
166+
_connection.ThrowIfClosed();
157167
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
158168
Logger.SetSessionStateStart(Identifier, SessionId);
159169
using DiagnosticScope scope = ScopeFactory.CreateScope(

sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,4 +309,7 @@
309309
<data name="InvalidAmqpMessageValueBody" xml:space="preserve">
310310
<value>{0} is not a supported value body type.</value>
311311
</data>
312-
</root>
312+
<data name="DisposedConnectionMessageProcessorMustStop" xml:space="preserve">
313+
<value>The message processor is unable to continue and will stop processing; the host application has closed the connection to the Service Bus service.</value>
314+
</data>
315+
</root>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs

100755100644
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System;
55
using System.Collections.Generic;
66
using System.ComponentModel;
7-
using System.Diagnostics;
87
using System.Diagnostics.CodeAnalysis;
98
using System.Globalization;
109
using System.Linq;
@@ -189,6 +188,8 @@ public virtual async Task SendMessagesAsync(
189188
{
190189
Argument.AssertNotNull(messages, nameof(messages));
191190
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
191+
_connection.ThrowIfClosed();
192+
192193
IReadOnlyList<ServiceBusMessage> messageList = messages switch
193194
{
194195
IReadOnlyList<ServiceBusMessage> alreadyList => alreadyList,
@@ -299,6 +300,8 @@ public virtual async ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(
299300
CancellationToken cancellationToken = default)
300301
{
301302
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
303+
_connection.ThrowIfClosed();
304+
302305
options = options?.Clone() ?? new CreateMessageBatchOptions();
303306
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
304307
Logger.CreateMessageBatchStart(Identifier);
@@ -334,6 +337,8 @@ public virtual async Task SendMessagesAsync(
334337
{
335338
Argument.AssertNotNull(messageBatch, nameof(messageBatch));
336339
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
340+
_connection.ThrowIfClosed();
341+
337342
if (messageBatch.Count == 0)
338343
{
339344
return;
@@ -419,6 +424,7 @@ public virtual async Task<IReadOnlyList<long>> ScheduleMessagesAsync(
419424
{
420425
Argument.AssertNotNull(messages, nameof(messages));
421426
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
427+
_connection.ThrowIfClosed();
422428
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
423429

424430
IReadOnlyList<ServiceBusMessage> messageList = messages switch
@@ -486,8 +492,9 @@ public virtual async Task CancelScheduledMessagesAsync(
486492
IEnumerable<long> sequenceNumbers,
487493
CancellationToken cancellationToken = default)
488494
{
489-
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
490495
Argument.AssertNotNull(sequenceNumbers, nameof(sequenceNumbers));
496+
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
497+
_connection.ThrowIfClosed();
491498
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
492499

493500
// the sequence numbers MUST be in array form for them to be encoded correctly

0 commit comments

Comments
 (0)