Skip to content

Commit c9eb735

Browse files
OlhaTkachenkoOlha Tkachenko
andauthored
Support Timeout.Zero in ReceiveMessagesAsync (Azure#22934)
* Support Timeout.Zero in ReceiveMessagesAsync * upgrading AMQP in Data.props * updating AMQP version for Track 2 in Package.Data.props * Update Packages.Data.props reverting version change of AMQP for track 1 Co-authored-by: Olha Tkachenko <[email protected]>
1 parent 584a309 commit c9eb735

File tree

4 files changed

+76
-3
lines changed

4 files changed

+76
-3
lines changed

eng/Packages.Data.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
<PackageReference Update="Azure.Storage.Blobs" Version="12.8.0" />
9191

9292
<!-- Other approved packages -->
93-
<PackageReference Update="Microsoft.Azure.Amqp" Version="2.4.13" />
93+
<PackageReference Update="Microsoft.Azure.Amqp" Version="2.5.1" />
9494
<PackageReference Update="Microsoft.Identity.Client" Version="4.30.1" />
9595
<PackageReference Update="Microsoft.Identity.Client.Extensions.Msal" Version="2.18.4" />
9696

sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,15 @@ internal async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsy
260260

261261
if (maxWaitTime.HasValue)
262262
{
263-
Argument.AssertPositive(maxWaitTime.Value, nameof(maxWaitTime));
263+
// maxWaitTime could be zero only when prefetch enabled
264+
if (PrefetchCount > 0)
265+
{
266+
Argument.AssertNotNegative(maxWaitTime.Value, nameof(maxWaitTime));
267+
}
268+
else
269+
{
270+
Argument.AssertPositive(maxWaitTime.Value, nameof(maxWaitTime));
271+
}
264272
}
265273
if (PrefetchCount > 0 && maxMessages > PrefetchCount)
266274
{

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Linq;
78
using System.Text;
89
using System.Threading;
@@ -46,6 +47,28 @@ public async Task PeekUsingConnectionStringWithSharedKey()
4647
}
4748
}
4849

50+
[Test]
51+
public async Task PeekWithZeroTimeout()
52+
{
53+
await using (var scope =
54+
await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
55+
{
56+
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
57+
await using var receiverWithPrefetch = client.CreateReceiver(scope.QueueName,
58+
options: new ServiceBusReceiverOptions() {PrefetchCount = 10});
59+
60+
var stopwatch = new Stopwatch();
61+
stopwatch.Start();
62+
await receiverWithPrefetch.ReceiveMessagesAsync(10, TimeSpan.Zero).ConfigureAwait(false);
63+
stopwatch.Stop();
64+
var durationWithPrefetchModeInSecs = stopwatch.Elapsed.TotalSeconds;
65+
66+
// If prefetch is enabled, timeout 0 secs will not be replaced with default timeout.
67+
// In such case, only prefetched messages will be returned and no call to server will be made and call will be very fast.
68+
Assert.IsTrue(durationWithPrefetchModeInSecs < 1);
69+
}
70+
}
71+
4972
[Test]
5073
public async Task PeekUsingConnectionStringWithSas()
5174
{

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverTests.cs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Text;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using Azure.Core;
9-
using Azure.Messaging.ServiceBus.Amqp;
1010
using Azure.Messaging.ServiceBus.Core;
1111
using Moq;
1212
using NUnit.Framework;
@@ -121,6 +121,48 @@ public void ReceiveValidatesMaxWaitTime()
121121
Throws.InstanceOf<ArgumentOutOfRangeException>());
122122
}
123123

124+
[Test]
125+
public async Task ReceiveValidatesMaxWaitTimePrefetchMode()
126+
{
127+
var mockTransportReceiver = new Mock<TransportReceiver>();
128+
var mockConnection = CreateMockConnection();
129+
mockConnection.Setup(
130+
connection => connection.CreateTransportReceiver(
131+
It.IsAny<string>(),
132+
It.IsAny<ServiceBusRetryPolicy>(),
133+
It.IsAny<ServiceBusReceiveMode>(),
134+
It.IsAny<uint>(),
135+
It.IsAny<string>(),
136+
It.IsAny<string>(),
137+
It.IsAny<bool>(),
138+
It.IsAny<bool>(),
139+
It.IsAny<CancellationToken>()))
140+
.Returns(mockTransportReceiver.Object);
141+
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = new[]
142+
{
143+
ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: Guid.NewGuid().ToString())
144+
};
145+
mockTransportReceiver.Setup(transportReceiver =>
146+
transportReceiver.ReceiveMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(),
147+
It.IsAny<CancellationToken>()))
148+
.Returns(Task.FromResult(receivedMessages));
149+
150+
var receiver = new ServiceBusReceiver(mockConnection.Object, "queue", default,
151+
new ServiceBusReceiverOptions {PrefetchCount = 10});
152+
153+
Assert.That(
154+
async () => await receiver.ReceiveMessagesAsync(10, TimeSpan.FromSeconds(-1)),
155+
Throws.InstanceOf<ArgumentOutOfRangeException>());
156+
Assert.That(
157+
async () => await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(-1)),
158+
Throws.InstanceOf<ArgumentOutOfRangeException>());
159+
160+
var actuallyReceivedMessages =
161+
await receiver.ReceiveMessagesAsync(10, TimeSpan.FromSeconds(0)).ConfigureAwait(false);
162+
Assert.AreEqual(receivedMessages.Count, actuallyReceivedMessages.Count);
163+
Assert.AreEqual(receivedMessages[0].MessageId, actuallyReceivedMessages[0].MessageId);
164+
}
165+
124166
[Test]
125167
public async Task ReceiveMessageValidatesClientIsNotDisposed()
126168
{

0 commit comments

Comments
 (0)