Skip to content

Commit add4b29

Browse files
Use IAsyncEnumerable for GetRules (Azure#29130)
* Use IAsyncEnumerable for GetRules * docs * Fix logs * Samples * Change log
1 parent 2597c22 commit add4b29

File tree

9 files changed

+188
-110
lines changed

9 files changed

+188
-110
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
# Release History
22

3-
## 7.9.0-beta.1 (Unreleased)
3+
## 7.9.0-beta.1 (2022-06-06)
44

55
### Features Added
66

7-
### Breaking Changes
7+
- Added `ServiceBusRuleManager` for managing rules.
88

99
### Bugs Fixed
1010

11+
- Updated behavior of `ServiceBusSessionReceiver.IsClosed` to return `true` if the underlying link was closed.
12+
1113
### Other Changes
1214

15+
- Include lock token in additional event source logs.
16+
1317
## 7.8.1 (2022-05-16)
1418

1519
### Bugs Fixed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ protected ServiceBusRuleManager() { }
388388
public override bool Equals(object obj) { throw null; }
389389
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
390390
public override int GetHashCode() { throw null; }
391-
public virtual System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<Azure.Messaging.ServiceBus.Administration.RuleProperties>> GetRulesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
391+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Messaging.ServiceBus.Administration.RuleProperties> GetRulesAsync([System.Runtime.CompilerServices.EnumeratorCancellationAttribute] System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
392392
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
393393
public override string ToString() { throw null; }
394394
}

sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample12_ManagingRules.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ while (true)
4848
await ruleManager.CreateRuleAsync("price-filter", new SqlRuleFilter("Price < 30000"));
4949
await ruleManager.DeleteRuleAsync("brand-filter");
5050

51-
// we can also use the rule manager to list the rules on the subscription.
52-
IReadOnlyList<RuleProperties> rules = await ruleManager.GetRulesAsync();
53-
foreach (RuleProperties rule in rules)
51+
// we can also use the rule manager to iterate over the rules on the subscription.
52+
await foreach (RuleProperties rule in ruleManager.GetRulesAsync())
5453
{
5554
// we should only have 1 rule at this point - "price-filter"
5655
Console.WriteLine(rule.Name);

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpRuleManager.cs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -225,34 +225,32 @@ private async Task RemoveRuleInternalAsync(
225225
}
226226

227227
/// <summary>
228-
/// Get all rules associated with the subscription.
228+
/// Get rules associated with the subscription.
229229
/// </summary>
230-
///
230+
/// <param name="skip">The number of rules to skip when retrieving the next set of rules.</param>
231231
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
232-
///
233232
/// <returns>Returns a list of rules description</returns>
234-
public override async Task<List<RuleProperties>> GetRulesAsync(CancellationToken cancellationToken) =>
233+
public override async Task<List<RuleProperties>> GetRulesAsync(int skip, CancellationToken cancellationToken) =>
235234
await _retryPolicy.RunOperation(
236-
static async (manager, timeout, token) => await manager.GetRulesInternalAsync(timeout).ConfigureAwait(false),
235+
async (manager, timeout, token) => await manager.GetRulesInternalAsync(timeout, skip).ConfigureAwait(false),
237236
this,
238237
_connectionScope,
239238
cancellationToken).ConfigureAwait(false);
240239

241240
/// <summary>
242-
/// Get all rules associated with the subscription.
241+
/// Get rules associated with the subscription.
243242
/// </summary>
244-
///
245243
/// <param name="timeout">The per-try timeout specified in the RetryOptions.</param>
246-
///
244+
/// <param name="skip">The number of rules to skip when retrieving the next set of rules.</param>
247245
/// <returns>Returns a list of rules description</returns>
248-
private async Task<List<RuleProperties>> GetRulesInternalAsync(TimeSpan timeout)
246+
private async Task<List<RuleProperties>> GetRulesInternalAsync(TimeSpan timeout, int skip)
249247
{
250248
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
251249
ManagementConstants.Operations.EnumerateRulesOperation,
252250
timeout,
253251
null);
254-
amqpRequestMessage.Map[ManagementConstants.Properties.Top] = int.MaxValue;
255-
amqpRequestMessage.Map[ManagementConstants.Properties.Skip] = 0;
252+
amqpRequestMessage.Map[ManagementConstants.Properties.Top] = 100;
253+
amqpRequestMessage.Map[ManagementConstants.Properties.Skip] = skip;
256254

257255
var response = await ManagementUtilities.ExecuteRequestResponseAsync(
258256
_connectionScope,

sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportRuleManager.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@ public abstract Task RemoveRuleAsync(
4747
/// <summary>
4848
/// Get all rules associated with the subscription.
4949
/// </summary>
50-
///
50+
/// <param name="skip">The number of rules to skip when retrieving the next set of rules.</param>
5151
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
52-
///
5352
/// <returns>Returns a list of rules description</returns>
54-
public abstract Task<List<RuleProperties>> GetRulesAsync(CancellationToken cancellationToken);
53+
public abstract Task<List<RuleProperties>> GetRulesAsync(int skip, CancellationToken cancellationToken);
5554

5655
/// <summary>
5756
/// Closes the connection to the transport rule manager instance.

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,17 @@ protected ServiceBusEventSource() : base(EventSourceName)
136136
internal const int SetSessionStateCompleteEvent = 66;
137137
internal const int SetSessionStateExceptionEvent = 67;
138138

139-
internal const int AddRuleStartEvent = 68;
140-
internal const int AddRuleCompleteEvent = 69;
141-
internal const int AddRuleExceptionEvent = 70;
139+
internal const int CreateRuleStartEvent = 68;
140+
internal const int CreateRuleCompleteEvent = 69;
141+
internal const int CreateRuleExceptionEvent = 70;
142142

143-
internal const int RemoveRuleStartEvent = 71;
144-
internal const int RemoveRuleCompleteEvent = 72;
145-
internal const int RemoveRuleExceptionEvent = 73;
143+
internal const int DeleteRuleStartEvent = 71;
144+
internal const int DeleteRuleCompleteEvent = 72;
145+
internal const int DeleteRuleExceptionEvent = 73;
146146

147-
internal const int GetRuleStartEvent = 74;
148-
internal const int GetRuleCompleteEvent = 75;
149-
internal const int GetRuleExceptionEvent = 76;
147+
internal const int GetRulesStartEvent = 74;
148+
internal const int GetRulesCompleteEvent = 75;
149+
internal const int GetRulesExceptionEvent = 76;
150150

151151
internal const int ClientCreateStartEvent = 77;
152152
internal const int ClientCreateCompleteEvent = 78;
@@ -1025,84 +1025,84 @@ public virtual void ProcessorAcceptSessionTimeout(
10251025
#endregion region
10261026

10271027
#region Rule management
1028-
[Event(AddRuleStartEvent, Level = EventLevel.Informational, Message = "{0}: Add rule start. RuleName = {1}")]
1029-
public virtual void AddRuleStart(string identifiers, string ruleName)
1028+
[Event(CreateRuleStartEvent, Level = EventLevel.Informational, Message = "{0}: CreateRule start. RuleName = {1}")]
1029+
public virtual void CreateRuleStart(string identifiers, string ruleName)
10301030
{
10311031
if (IsEnabled())
10321032
{
1033-
WriteEvent(AddRuleStartEvent, identifiers, ruleName);
1033+
WriteEvent(CreateRuleStartEvent, identifiers, ruleName);
10341034
}
10351035
}
10361036

1037-
[Event(AddRuleCompleteEvent, Level = EventLevel.Informational, Message = "{0}: Add rule done.")]
1038-
public virtual void AddRuleComplete(string identifier)
1037+
[Event(CreateRuleCompleteEvent, Level = EventLevel.Informational, Message = "{0}: CreateRule done. RuleName = {1}")]
1038+
public virtual void CreateRuleComplete(string identifier, string ruleName)
10391039
{
10401040
if (IsEnabled())
10411041
{
1042-
WriteEvent(AddRuleCompleteEvent, identifier);
1042+
WriteEvent(CreateRuleCompleteEvent, identifier, ruleName);
10431043
}
10441044
}
10451045

1046-
[Event(AddRuleExceptionEvent, Level = EventLevel.Error, Message = "{0}: Add rule Exception: {1}.")]
1047-
public virtual void AddRuleException(string identifier, string exception)
1046+
[Event(CreateRuleExceptionEvent, Level = EventLevel.Error, Message = "{0}: CreateRule Exception: {1}. RuleName = {2}")]
1047+
public virtual void CreateRuleException(string identifier, string exception, string ruleName)
10481048
{
10491049
if (IsEnabled())
10501050
{
1051-
WriteEvent(AddRuleExceptionEvent, identifier, exception);
1051+
WriteEvent(CreateRuleExceptionEvent, identifier, exception, ruleName);
10521052
}
10531053
}
10541054

1055-
[Event(RemoveRuleStartEvent, Level = EventLevel.Informational, Message = "{0}: Remove rule start. RuleName = {1}")]
1056-
public virtual void RemoveRuleStart(string identifiers, string ruleName)
1055+
[Event(DeleteRuleStartEvent, Level = EventLevel.Informational, Message = "{0}: Delete rule start. RuleName = {1}")]
1056+
public virtual void DeleteRuleStart(string identifiers, string ruleName)
10571057
{
10581058
if (IsEnabled())
10591059
{
1060-
WriteEvent(RemoveRuleStartEvent, identifiers, ruleName);
1060+
WriteEvent(DeleteRuleStartEvent, identifiers, ruleName);
10611061
}
10621062
}
10631063

1064-
[Event(RemoveRuleCompleteEvent, Level = EventLevel.Informational, Message = "{0}: Remove rule done.")]
1065-
public virtual void RemoveRuleComplete(string identifier)
1064+
[Event(DeleteRuleCompleteEvent, Level = EventLevel.Informational, Message = "{0}: Delete rule done. RuleName = {1}")]
1065+
public virtual void DeleteRuleComplete(string identifier, string ruleName)
10661066
{
10671067
if (IsEnabled())
10681068
{
1069-
WriteEvent(RemoveRuleCompleteEvent, identifier);
1069+
WriteEvent(DeleteRuleCompleteEvent, identifier, ruleName);
10701070
}
10711071
}
10721072

1073-
[Event(RemoveRuleExceptionEvent, Level = EventLevel.Error, Message = "{0}: Remove rule Exception: {1}.")]
1074-
public virtual void RemoveRuleException(string identifier, string exception)
1073+
[Event(DeleteRuleExceptionEvent, Level = EventLevel.Error, Message = "{0}: Delete rule Exception: {1}. RuleName = {2}")]
1074+
public virtual void DeleteRuleException(string identifier, string exception, string ruleName)
10751075
{
10761076
if (IsEnabled())
10771077
{
1078-
WriteEvent(RemoveRuleExceptionEvent, identifier, exception);
1078+
WriteEvent(DeleteRuleExceptionEvent, identifier, exception, ruleName);
10791079
}
10801080
}
10811081

1082-
[Event(GetRuleStartEvent, Level = EventLevel.Informational, Message = "{0}: Get rule start.")]
1083-
public virtual void GetRuleStart(string identifiers)
1082+
[Event(GetRulesStartEvent, Level = EventLevel.Informational, Message = "{0}: GetRules start.")]
1083+
public virtual void GetRulesStart(string identifiers)
10841084
{
10851085
if (IsEnabled())
10861086
{
1087-
WriteEvent(GetRuleStartEvent, identifiers);
1087+
WriteEvent(GetRulesStartEvent, identifiers);
10881088
}
10891089
}
10901090

1091-
[Event(GetRuleCompleteEvent, Level = EventLevel.Informational, Message = "{0}: Get rule done.")]
1092-
public virtual void GetRuleComplete(string identifier)
1091+
[Event(GetRulesCompleteEvent, Level = EventLevel.Informational, Message = "{0}: GetRules done.")]
1092+
public virtual void GetRulesComplete(string identifier)
10931093
{
10941094
if (IsEnabled())
10951095
{
1096-
WriteEvent(GetRuleCompleteEvent, identifier);
1096+
WriteEvent(GetRulesCompleteEvent, identifier);
10971097
}
10981098
}
10991099

1100-
[Event(GetRuleExceptionEvent, Level = EventLevel.Error, Message = "{0}: Get rule Exception: {1}.")]
1101-
public virtual void GetRuleException(string identifier, string exception)
1100+
[Event(GetRulesExceptionEvent, Level = EventLevel.Error, Message = "{0}: GetRules Exception: {1}.")]
1101+
public virtual void GetRulesException(string identifier, string exception)
11021102
{
11031103
if (IsEnabled())
11041104
{
1105-
WriteEvent(GetRuleExceptionEvent, identifier, exception);
1105+
WriteEvent(GetRulesExceptionEvent, identifier, exception);
11061106
}
11071107
}
11081108
#endregion

sdk/servicebus/Azure.Messaging.ServiceBus/src/RuleManager/ServiceBusRuleManager.cs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.ComponentModel;
77
using System.Diagnostics.CodeAnalysis;
8+
using System.Runtime.CompilerServices;
89
using System.Threading;
910
using System.Threading.Tasks;
1011
using Azure.Core;
@@ -139,7 +140,7 @@ public virtual async Task CreateRuleAsync(
139140
Argument.AssertNotNull(options, nameof(options));
140141
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
141142
EntityNameFormatter.CheckValidRuleName(options.Name);
142-
ServiceBusEventSource.Log.AddRuleStart(Identifier, options.Name);
143+
ServiceBusEventSource.Log.CreateRuleStart(Identifier, options.Name);
143144

144145
try
145146
{
@@ -149,12 +150,12 @@ await InnerRuleManager.AddRuleAsync(
149150
}
150151
catch (Exception exception)
151152
{
152-
ServiceBusEventSource.Log.AddRuleException(Identifier, exception.ToString());
153+
ServiceBusEventSource.Log.CreateRuleException(Identifier, exception.ToString(), options.Name);
153154
throw;
154155
}
155156

156157
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
157-
ServiceBusEventSource.Log.AddRuleComplete(Identifier);
158+
ServiceBusEventSource.Log.CreateRuleComplete(Identifier, options.Name);
158159
}
159160

160161
/// <summary>
@@ -172,7 +173,7 @@ public virtual async Task DeleteRuleAsync(
172173
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusRuleManager));
173174
Argument.AssertNotNullOrEmpty(ruleName, nameof(ruleName));
174175
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
175-
ServiceBusEventSource.Log.RemoveRuleStart(Identifier, ruleName);
176+
ServiceBusEventSource.Log.DeleteRuleStart(Identifier, ruleName);
176177

177178
try
178179
{
@@ -182,40 +183,51 @@ await InnerRuleManager.RemoveRuleAsync(
182183
}
183184
catch (Exception exception)
184185
{
185-
ServiceBusEventSource.Log.RemoveRuleException(Identifier, exception.ToString());
186+
ServiceBusEventSource.Log.DeleteRuleException(Identifier, exception.ToString(), ruleName);
186187
throw;
187188
}
188189

189-
ServiceBusEventSource.Log.RemoveRuleComplete(Identifier);
190+
ServiceBusEventSource.Log.DeleteRuleComplete(Identifier, ruleName);
190191
}
191192

192193
/// <summary>
193-
/// Get all rules associated with the subscription.
194+
/// Iterates over the rules associated with the subscription.
194195
/// </summary>
195196
///
196197
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
197-
///
198-
/// <returns>Returns a list of <see cref="RuleProperties"/></returns>
199-
public virtual async Task<IReadOnlyList<RuleProperties>> GetRulesAsync(CancellationToken cancellationToken = default)
198+
/// <returns>Returns each rule on the associated subscription.</returns>
199+
public virtual async IAsyncEnumerable<RuleProperties> GetRulesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
200200
{
201201
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusRuleManager));
202202
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
203-
ServiceBusEventSource.Log.GetRuleStart(Identifier);
204-
List<RuleProperties> rulePropertiesList;
203+
ServiceBusEventSource.Log.GetRulesStart(Identifier);
204+
int skip = 0;
205205

206-
try
207-
{
208-
rulePropertiesList = await InnerRuleManager.GetRulesAsync(cancellationToken).ConfigureAwait(false);
209-
}
210-
catch (Exception exception)
206+
while (!cancellationToken.IsCancellationRequested)
211207
{
212-
ServiceBusEventSource.Log.GetRuleException(Identifier, exception.ToString());
213-
throw;
208+
List<RuleProperties> ruleProperties;
209+
try
210+
{
211+
ruleProperties = await InnerRuleManager.GetRulesAsync(skip, cancellationToken).ConfigureAwait(false);
212+
}
213+
catch (Exception exception)
214+
{
215+
ServiceBusEventSource.Log.GetRulesException(Identifier, exception.ToString());
216+
throw;
217+
}
218+
skip += ruleProperties.Count;
219+
if (ruleProperties.Count == 0)
220+
{
221+
break;
222+
}
223+
224+
foreach (var rule in ruleProperties)
225+
{
226+
yield return rule;
227+
}
214228
}
215229

216-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
217-
ServiceBusEventSource.Log.GetRuleComplete(Identifier);
218-
return rulePropertiesList;
230+
ServiceBusEventSource.Log.GetRulesComplete(Identifier);
219231
}
220232

221233
/// <summary>

0 commit comments

Comments
 (0)