Skip to content

Commit aadbe7c

Browse files
Make ServiceBusMessageActions thread-safe (Azure#25556)
* Make ServiceBusMessageActions thread-safe * PR fb
1 parent ac60b87 commit aadbe7c

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ await receiver.ReceiveMessagesAsync(
343343
foreach (ServiceBusReceivedMessage message in messagesArray)
344344
{
345345
// skip messages that were settled in the user's function
346-
if (input.MessageActions.SettledMessages.Contains(message))
346+
if (input.MessageActions.SettledMessages.ContainsKey(message))
347347
{
348348
continue;
349349
}
@@ -360,7 +360,7 @@ await receiver.ReceiveMessagesAsync(
360360
foreach (ServiceBusReceivedMessage message in messagesArray)
361361
{
362362
// skip messages that were settled in the user's function
363-
if (input.MessageActions.SettledMessages.Contains(message))
363+
if (input.MessageActions.SettledMessages.ContainsKey(message))
364364
{
365365
continue;
366366
}

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Concurrent;
45
using Azure.Messaging.ServiceBus;
56
using System.Collections.Generic;
67
using System.Threading;
@@ -17,7 +18,7 @@ public class ServiceBusMessageActions
1718
private readonly ProcessMessageEventArgs _eventArgs;
1819
private readonly ProcessSessionMessageEventArgs _sessionEventArgs;
1920

20-
internal HashSet<ServiceBusReceivedMessage> SettledMessages { get; } = new();
21+
internal ConcurrentDictionary<ServiceBusReceivedMessage, byte> SettledMessages { get; } = new();
2122

2223
internal ServiceBusMessageActions(ProcessSessionMessageEventArgs sessionEventArgs)
2324
{
@@ -64,7 +65,7 @@ public virtual async Task AbandonMessageAsync(
6465
await _sessionEventArgs.AbandonMessageAsync(message, propertiesToModify, cancellationToken).ConfigureAwait(false);
6566
}
6667

67-
SettledMessages.Add(message);
68+
TrackMessageAsSettled(message);
6869
}
6970

7071
///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
@@ -85,7 +86,7 @@ public virtual async Task CompleteMessageAsync(
8586
await _sessionEventArgs.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false);
8687
}
8788

88-
SettledMessages.Add(message);
89+
TrackMessageAsSettled(message);
8990
}
9091

9192
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, string, string, CancellationToken)"/>
@@ -123,7 +124,7 @@ await _sessionEventArgs.DeadLetterMessageAsync(
123124
.ConfigureAwait(false);
124125
}
125126

126-
SettledMessages.Add(message);
127+
TrackMessageAsSettled(message);
127128
}
128129

129130
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
@@ -157,7 +158,7 @@ await _sessionEventArgs.DeadLetterMessageAsync(
157158
.ConfigureAwait(false);
158159
}
159160

160-
SettledMessages.Add(message);
161+
TrackMessageAsSettled(message);
161162
}
162163

163164
///<inheritdoc cref="ServiceBusReceiver.DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
@@ -191,7 +192,10 @@ await _sessionEventArgs.DeferMessageAsync(
191192
.ConfigureAwait(false);
192193
}
193194

194-
SettledMessages.Add(message);
195+
TrackMessageAsSettled(message);
195196
}
197+
198+
private void TrackMessageAsSettled(ServiceBusReceivedMessage message)
199+
=> SettledMessages[message] = 0;
196200
}
197201
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Azure.Messaging.ServiceBus;
8+
using Microsoft.Azure.WebJobs.ServiceBus;
9+
using Moq;
10+
using NUnit.Framework;
11+
12+
namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests
13+
{
14+
public class MessageActionsTests
15+
{
16+
[Test]
17+
public async Task CanCompleteMessagesConcurrently()
18+
{
19+
var mockReceiver = new Mock<ServiceBusReceiver>();
20+
mockReceiver.Setup(r => r.CompleteMessageAsync(It.IsAny<ServiceBusReceivedMessage>(), It.IsAny<CancellationToken>()))
21+
// simulate completing the message
22+
.Returns(async() => await Task.Delay(20));
23+
var actions = new ServiceBusMessageActions(mockReceiver.Object);
24+
var tasks = new List<Task>();
25+
for (int i = 0; i < 1000; i++)
26+
{
27+
tasks.Add(actions.CompleteMessageAsync(ServiceBusModelFactory.ServiceBusReceivedMessage()));
28+
}
29+
30+
await Task.WhenAll(tasks);
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)