Skip to content

Commit c0a421d

Browse files
Add message batch factory (Azure#18146)
* Add message batch factory * Export API
1 parent 0f409d2 commit c0a421d

File tree

4 files changed

+182
-12
lines changed

4 files changed

+182
-12
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,18 +180,13 @@ internal ServiceBusMessageBatch() { }
180180
public void Dispose() { }
181181
public bool TryAddMessage(Azure.Messaging.ServiceBus.ServiceBusMessage message) { throw null; }
182182
}
183-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
184183
public static partial class ServiceBusModelFactory
185184
{
186-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
187185
public static Azure.Messaging.ServiceBus.Administration.QueueProperties QueueProperties(string name, System.TimeSpan lockDuration = default(System.TimeSpan), long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null, bool enablePartitioning = false) { throw null; }
188-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
189186
public static Azure.Messaging.ServiceBus.Administration.RuleProperties RuleProperties(string name, Azure.Messaging.ServiceBus.Administration.RuleFilter filter = null, Azure.Messaging.ServiceBus.Administration.RuleAction action = null) { throw null; }
190-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
187+
public static Azure.Messaging.ServiceBus.ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes, System.Collections.Generic.IList<Azure.Messaging.ServiceBus.ServiceBusMessage> batchMessageStore, Azure.Messaging.ServiceBus.CreateMessageBatchOptions batchOptions = null, System.Func<Azure.Messaging.ServiceBus.ServiceBusMessage, bool> tryAddCallback = null) { throw null; }
191188
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body = null, string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary<string, object> properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset)) { throw null; }
192-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
193189
public static Azure.Messaging.ServiceBus.Administration.SubscriptionProperties SubscriptionProperties(string topicName, string subscriptionName, System.TimeSpan lockDuration = default(System.TimeSpan), bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null) { throw null; }
194-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
195190
public static Azure.Messaging.ServiceBus.Administration.TopicProperties TopicProperties(string name, long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), bool enablePartitioning = false) { throw null; }
196191
}
197192
public partial class ServiceBusProcessor : System.IAsyncDisposable
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Baselining these until the next GA as they are not actual compat issues.
2+
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.TopicProperties(System.String, System.Int64, System.Boolean, System.TimeSpan, System.TimeSpan, System.TimeSpan, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.Boolean)' in the contract but not the implementation.
3+
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.ServiceBusReceivedMessage(System.BinaryData, System.String, System.String, System.String, System.String, System.String, System.TimeSpan, System.String, System.String, System.String, System.String, System.String, System.DateTimeOffset, System.Collections.Generic.IDictionary<System.String, System.Object>, System.Guid, System.Int32, System.DateTimeOffset, System.Int64, System.String, System.Int64, System.DateTimeOffset)' in the contract but not the implementation.
4+
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.RuleProperties(System.String, Azure.Messaging.ServiceBus.Administration.RuleFilter, Azure.Messaging.ServiceBus.Administration.RuleAction)' in the contract but not the implementation.
5+
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.QueueProperties(System.String, System.TimeSpan, System.Int64, System.Boolean, System.Boolean, System.TimeSpan, System.TimeSpan, System.Boolean, System.TimeSpan, System.Int32, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.String, System.String, System.String, System.Boolean)' in the contract but not the implementation.
6+
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory' in the contract but not the implementation.
7+
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.SubscriptionProperties(System.String, System.String, System.TimeSpan, System.Boolean, System.TimeSpan, System.TimeSpan, System.Boolean, System.Int32, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.String, System.String, System.String)' in the contract but not the implementation.

sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs

Lines changed: 121 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,18 @@
77
using Azure.Core.Amqp;
88
using Azure.Messaging.ServiceBus.Amqp;
99
using Azure.Messaging.ServiceBus.Administration;
10+
using Azure.Messaging.ServiceBus.Core;
1011

1112
namespace Azure.Messaging.ServiceBus
1213
{
1314
/// <summary>
1415
/// This class contains methods to create certain ServiceBus models.
1516
/// </summary>
16-
[EditorBrowsable(EditorBrowsableState.Never)]
1717
public static class ServiceBusModelFactory
1818
{
1919
/// <summary>
2020
/// Creates a new ServiceBusReceivedMessage instance for mocking.
2121
/// </summary>
22-
[EditorBrowsable(EditorBrowsableState.Never)]
2322
public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
2423
BinaryData body = default,
2524
string messageId = default,
@@ -108,7 +107,6 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
108107
/// <summary>
109108
/// Creates a new <see cref="QueueProperties"/> instance for mocking.
110109
/// </summary>
111-
[EditorBrowsable(EditorBrowsableState.Never)]
112110
public static QueueProperties QueueProperties(
113111
string name,
114112
TimeSpan lockDuration = default,
@@ -149,7 +147,6 @@ public static QueueProperties QueueProperties(
149147
/// <summary>
150148
/// Creates a new <see cref="TopicProperties"/> instance for mocking.
151149
/// </summary>
152-
[EditorBrowsable(EditorBrowsableState.Never)]
153150
public static TopicProperties TopicProperties(
154151
string name,
155152
long maxSizeInMegabytes = default,
@@ -176,7 +173,6 @@ public static TopicProperties TopicProperties(
176173
/// <summary>
177174
/// Creates a new <see cref="SubscriptionProperties"/> instance for mocking.
178175
/// </summary>
179-
[EditorBrowsable(EditorBrowsableState.Never)]
180176
public static SubscriptionProperties SubscriptionProperties(
181177
string topicName,
182178
string subscriptionName,
@@ -209,7 +205,6 @@ public static SubscriptionProperties SubscriptionProperties(
209205
/// <summary>
210206
/// Creates a new <see cref="RuleProperties"/> instance for mocking.
211207
/// </summary>
212-
[EditorBrowsable(EditorBrowsableState.Never)]
213208
public static RuleProperties RuleProperties(
214209
string name,
215210
RuleFilter filter = default,
@@ -218,5 +213,125 @@ public static RuleProperties RuleProperties(
218213
{
219214
Action = action
220215
};
216+
217+
/// <summary>
218+
/// Initializes a new instance of the <see cref="ServiceBusMessageBatch" /> class.
219+
/// </summary>
220+
///
221+
/// <param name="batchSizeBytes">The size, in bytes, that the batch should report; this is a static value and will not mutate as messages are added.</param>
222+
/// <param name="batchMessageStore">A list to which messages will be added when <see cref="ServiceBusMessageBatch.TryAddMessage" /> calls are successful.</param>
223+
/// <param name="batchOptions">The set of options to consider when creating this batch.</param>
224+
/// <param name="tryAddCallback"> A function that will be invoked when <see cref="ServiceBusMessageBatch.TryAddMessage" /> is called;
225+
/// the return of this callback represents the result of <see cref="ServiceBusMessageBatch.TryAddMessage" />.
226+
/// If not provided, all events will be accepted into the batch.</param>
227+
///
228+
/// <returns>The <see cref="ServiceBusMessageBatch" /> instance that was created.</returns>
229+
///
230+
public static ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes,
231+
IList<ServiceBusMessage> batchMessageStore,
232+
CreateMessageBatchOptions batchOptions = default,
233+
Func<ServiceBusMessage, bool> tryAddCallback = default)
234+
{
235+
tryAddCallback ??= _ => true;
236+
batchOptions ??= new CreateMessageBatchOptions();
237+
batchOptions.MaxSizeInBytes ??= long.MaxValue;
238+
239+
var transportBatch = new ListTransportBatch(batchOptions.MaxSizeInBytes.Value, batchSizeBytes, batchMessageStore, tryAddCallback);
240+
return new ServiceBusMessageBatch(transportBatch);
241+
}
242+
243+
/// <summary>
244+
/// Allows for the transport event batch created by the factory to be injected for testing purposes.
245+
/// </summary>
246+
///
247+
private sealed class ListTransportBatch : TransportMessageBatch
248+
{
249+
/// <summary>The backing store for storing events in the batch.</summary>
250+
private readonly IList<ServiceBusMessage> _backingStore;
251+
252+
/// <summary>A callback to be invoked when an adding an event via <see cref="TryAddMessage"/></summary>
253+
private readonly Func<ServiceBusMessage, bool> _tryAddCallback;
254+
255+
/// <summary>
256+
/// The maximum size allowed for the batch, in bytes. This includes the events in the batch as
257+
/// well as any overhead for the batch itself when sent to the Event Hubs service.
258+
/// </summary>
259+
///
260+
public override long MaxSizeInBytes { get; }
261+
262+
/// <summary>
263+
/// The size of the batch, in bytes, as it will be sent to the Event Hubs
264+
/// service.
265+
/// </summary>
266+
///
267+
public override long SizeInBytes { get; }
268+
269+
/// <summary>
270+
/// The count of events contained in the batch.
271+
/// </summary>
272+
///
273+
public override int Count => _backingStore.Count;
274+
275+
/// <summary>
276+
/// Initializes a new instance of the <see cref="ListTransportBatch"/> class.
277+
/// </summary>
278+
///
279+
/// <param name="maxSizeInBytes"> The maximum size allowed for the batch, in bytes.</param>
280+
/// <param name="sizeInBytes">The size of the batch, in bytes; this will be treated as a static value for the property.</param>
281+
/// <param name="backingStore">The backing store for holding events in the batch.</param>
282+
/// <param name="tryAddCallback">A callback for deciding if a TryAdd attempt is successful.</param>
283+
///
284+
internal ListTransportBatch(long maxSizeInBytes,
285+
long sizeInBytes,
286+
IList<ServiceBusMessage> backingStore,
287+
Func<ServiceBusMessage, bool> tryAddCallback) =>
288+
(MaxSizeInBytes, SizeInBytes, _backingStore, _tryAddCallback) = (maxSizeInBytes, sizeInBytes, backingStore, tryAddCallback);
289+
290+
/// <summary>
291+
/// Attempts to add an event to the batch, ensuring that the size
292+
/// of the batch does not exceed its maximum.
293+
/// </summary>
294+
///
295+
/// <param name="message">The event to attempt to add to the batch.</param>
296+
///
297+
/// <returns><c>true</c> if the event was added; otherwise, <c>false</c>.</returns>
298+
///
299+
public override bool TryAddMessage(ServiceBusMessage message)
300+
{
301+
if (_tryAddCallback(message))
302+
{
303+
_backingStore.Add(message);
304+
return true;
305+
}
306+
307+
return false;
308+
}
309+
310+
/// <summary>
311+
/// Clears the batch, removing all events and resetting the
312+
/// available size.
313+
/// </summary>
314+
///
315+
public override void Clear() => _backingStore.Clear();
316+
317+
/// <summary>
318+
/// Represents the batch as an enumerable set of transport-specific
319+
/// representations of an event.
320+
/// </summary>
321+
///
322+
/// <typeparam name="T">The transport-specific event representation being requested.</typeparam>
323+
///
324+
/// <returns>The set of events as an enumerable of the requested type.</returns>
325+
///
326+
public override IEnumerable<T> AsEnumerable<T>() => (IEnumerable<T>)_backingStore;
327+
328+
/// <summary>
329+
/// Performs the task needed to clean up resources used by the <see cref="TransportMessageBatch" />.
330+
/// </summary>
331+
///
332+
public override void Dispose()
333+
{
334+
}
335+
}
221336
}
222337
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using NUnit.Framework;
7+
8+
namespace Azure.Messaging.ServiceBus.Tests.Message
9+
{
10+
public class MessageBatchTests
11+
{
12+
/// <summary>
13+
/// Verifies functionality of the <see cref="ServiceBusModelFactory.ServiceBusMessageBatch" />
14+
/// method.
15+
/// </summary>
16+
///
17+
[Test]
18+
public void EventDataBatchRespectsTheTryAddCallback()
19+
{
20+
var eventLimit = 3;
21+
var store = new List<ServiceBusMessage>();
22+
var batch = ServiceBusModelFactory.ServiceBusMessageBatch(5, store, tryAddCallback: _ => store.Count < eventLimit);
23+
24+
while (store.Count < eventLimit)
25+
{
26+
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Test"))), Is.True, $"The batch contains { store.Count } events; adding another should be permitted.");
27+
}
28+
29+
Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit.");
30+
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; it should not be possible to add a new event.");
31+
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; a second attempt to add a new event should not succeed.");
32+
33+
Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit after the failed TryAdd attempts.");
34+
Assert.That(batch.AsEnumerable<ServiceBusMessage>(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store.");
35+
}
36+
37+
/// <summary>
38+
/// Verifies functionality of the <see cref="EventHubsModelFactory.EventDataBatch" />
39+
/// method.
40+
/// </summary>
41+
///
42+
[Test]
43+
public void EventDataBatchIsSafeToDispose()
44+
{
45+
var size = 1024;
46+
var store = new List<ServiceBusMessage> { new ServiceBusMessage(Array.Empty<byte>()), new ServiceBusMessage(Array.Empty<byte>()) };
47+
var options = new CreateMessageBatchOptions { MaxSizeInBytes = 2048 };
48+
var batch = ServiceBusModelFactory.ServiceBusMessageBatch(size, store, options, _ => false);
49+
50+
Assert.That(() => batch.Dispose(), Throws.Nothing);
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)