Skip to content

Commit a347f34

Browse files
InitialOffsetOptions updates (Azure#19700)
* InitialOffsetOptions updates * Update tests * fix * Add docs
1 parent 168990e commit a347f34

File tree

8 files changed

+116
-57
lines changed

8 files changed

+116
-57
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ public EventHubOptions() { }
3838
public partial class InitialOffsetOptions
3939
{
4040
public InitialOffsetOptions() { }
41-
public string EnqueuedTimeUTC { get { throw null; } set { } }
42-
public string Type { get { throw null; } set { } }
41+
public System.DateTimeOffset? EnqueuedTimeUtc { get { throw null; } set { } }
42+
public Microsoft.Azure.WebJobs.EventHubs.OffsetType? Type { get { throw null; } set { } }
43+
}
44+
public enum OffsetType
45+
{
46+
FromStart = 0,
47+
FromEnd = 1,
48+
FromEnqueuedTime = 2,
4349
}
4450
}
4551
namespace Microsoft.Extensions.Hosting

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ private JObject ConstructRetryOptions() =>
181181
private JObject ConstructInitialOffsetOptions() =>
182182
new JObject
183183
{
184-
{ nameof(InitialOffsetOptions.Type), InitialOffsetOptions.Type },
185-
{ nameof(InitialOffsetOptions.EnqueuedTimeUTC), InitialOffsetOptions.EnqueuedTimeUTC },
184+
{ nameof(InitialOffsetOptions.Type), InitialOffsetOptions.Type.ToString() },
185+
{ nameof(InitialOffsetOptions.EnqueuedTimeUtc), InitialOffsetOptions.EnqueuedTimeUtc },
186186
};
187187
}
188188
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,25 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<
8484

8585
internal static void ConfigureInitialOffsetOptions(EventHubOptions options)
8686
{
87-
string offsetType = options?.InitialOffsetOptions?.Type?.ToLower(CultureInfo.InvariantCulture) ?? string.Empty;
88-
if (!string.IsNullOrEmpty(offsetType))
87+
OffsetType? type = options?.InitialOffsetOptions?.Type;
88+
if (type.HasValue)
8989
{
90-
switch (offsetType)
90+
switch (type)
9191
{
92-
case "fromstart":
92+
case OffsetType.FromStart:
9393
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.Earliest;
9494
break;
95-
case "fromend":
95+
case OffsetType.FromEnd:
9696
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.Latest;
9797
break;
98-
case "fromenqueuedtime":
98+
case OffsetType.FromEnqueuedTime:
9999
try
100100
{
101-
DateTime enqueuedTimeUTC = DateTime.Parse(options.InitialOffsetOptions.EnqueuedTimeUTC, CultureInfo.InvariantCulture).ToUniversalTime();
102-
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.FromEnqueuedTime(enqueuedTimeUTC);
101+
options.EventProcessorOptions.DefaultStartingPosition = EventPosition.FromEnqueuedTime(options.InitialOffsetOptions.EnqueuedTimeUtc.Value);
103102
}
104103
catch (FormatException fe)
105104
{
106-
string message = $"{nameof(EventHubOptions)}:{nameof(InitialOffsetOptions)}:{nameof(InitialOffsetOptions.EnqueuedTimeUTC)} is configured with an invalid format. " +
105+
string message = $"{nameof(EventHubOptions)}:{nameof(InitialOffsetOptions)}:{nameof(InitialOffsetOptions.EnqueuedTimeUtc)} is configured with an invalid format. " +
107106
"Please use a format supported by DateTime.Parse(). e.g. 'yyyy-MM-ddTHH:mm:ssZ'";
108107
throw new InvalidOperationException(message, fe);
109108
}
Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
5+
46
namespace Microsoft.Azure.WebJobs.EventHubs
57
{
68
/// <summary>
@@ -11,26 +13,13 @@ public class InitialOffsetOptions
1113
{
1214
/// <summary>
1315
/// Gets or sets the type of the initial offset.
14-
/// <list type="bullet">
15-
/// <item>
16-
/// <description>fromStart: The default option if not specified. Will start processing from the start of the stream.</description>
17-
/// </item>
18-
/// <item>
19-
/// <description>fromEnd: Will start processing events from the end of the stream.Use this option if you only want to process events
20-
/// that are added after the function starts.</description>
21-
/// </item>
22-
/// <item>
23-
/// <description>fromEnqueuedTime: Will process events that were enqueued by Event Hubs on or after the specified time.Note that this applies
24-
/// to all Event Hubs partitions and there is no support for specifying a per-partition value.</description>
25-
/// </item>
26-
/// </list>
2716
/// </summary>
28-
public string Type { get; set; } = "";
17+
public OffsetType? Type { get; set; }
2918

3019
/// <summary>
3120
/// Gets or sets the time that events should be processed after. Any parsable format is accepted.
32-
/// Only applies when the <see cref="Type"/> is "fromEnqueuedTime".
21+
/// Only applies when the <see cref="Type"/> is <see cref="OffsetType.FromEnqueuedTime"/>.
3322
/// </summary>
34-
public string EnqueuedTimeUTC { get; set; } = "";
23+
public DateTimeOffset? EnqueuedTimeUtc { get; set; }
3524
}
3625
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
namespace Microsoft.Azure.WebJobs.EventHubs
5+
{
6+
/// <summary>
7+
/// The types of offsets that can be configured in the <see cref="InitialOffsetOptions"/>.
8+
/// </summary>
9+
public enum OffsetType
10+
{
11+
/// <summary>
12+
/// The default option if not specified. Will start processing from the start of the stream.
13+
/// </summary>
14+
FromStart = 0,
15+
16+
/// <summary>
17+
/// Will start processing events from the end of the stream. Use this option if you only want to process events
18+
/// that are added after the function starts.
19+
/// </summary>
20+
FromEnd = 1,
21+
22+
/// <summary>
23+
/// Will process events that were enqueued by Event Hubs on or after the specified time.Note that this applies
24+
/// to all Event Hubs partitions and there is no support for specifying a per-partition value.
25+
/// </summary>
26+
FromEnqueuedTime = 2
27+
}
28+
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubConfigurationTests.cs

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public void ConfigureOptions_AppliesValuesCorrectly()
4242
Assert.AreEqual(5, options.BatchCheckpointFrequency);
4343
Assert.AreEqual(31, options.EventProcessorOptions.PartitionOwnershipExpirationInterval.TotalSeconds);
4444
Assert.AreEqual(21, options.EventProcessorOptions.LoadBalancingUpdateInterval.TotalSeconds);
45-
Assert.AreEqual("FromEnqueuedTime", options.InitialOffsetOptions.Type);
46-
Assert.AreEqual("2020-09-13T12:00Z", options.InitialOffsetOptions.EnqueuedTimeUTC);
45+
Assert.AreEqual("FromEnqueuedTime", options.InitialOffsetOptions.Type.ToString());
46+
Assert.AreEqual("2020-09-13 12:00:00Z", options.InitialOffsetOptions.EnqueuedTimeUtc.Value.ToString("u"));
4747
Assert.AreEqual(5, options.ClientRetryOptions.MaximumRetries);
4848
Assert.AreEqual(TimeSpan.FromSeconds(1), options.ClientRetryOptions.Delay);
4949
Assert.AreEqual(TimeSpan.FromMinutes(1), options.ClientRetryOptions.MaximumDelay);
@@ -57,7 +57,7 @@ public void ConfigureOptions_Format_Returns_Expected()
5757
{
5858
EventHubOptions options = CreateOptionsFromConfig();
5959

60-
string format = ((IOptionsFormatter) options).Format();
60+
string format = ((IOptionsFormatter)options).Format();
6161
JObject iObj = JObject.Parse(format);
6262
EventHubOptions result = iObj.ToObject<EventHubOptions>();
6363

@@ -67,8 +67,8 @@ public void ConfigureOptions_Format_Returns_Expected()
6767
Assert.AreEqual(123, result.PrefetchCount);
6868
Assert.AreEqual(TimeSpan.FromSeconds(31), result.PartitionOwnershipExpirationInterval);
6969
Assert.AreEqual(TimeSpan.FromSeconds(21), result.LoadBalancingUpdateInterval);
70-
Assert.AreEqual("FromEnqueuedTime", result.InitialOffsetOptions.Type);
71-
Assert.AreEqual("2020-09-13T12:00Z", result.InitialOffsetOptions.EnqueuedTimeUTC);
70+
Assert.AreEqual("FromEnqueuedTime", result.InitialOffsetOptions.Type.ToString());
71+
Assert.AreEqual("2020-09-13 12:00:00Z", result.InitialOffsetOptions.EnqueuedTimeUtc.Value.ToString("u"));
7272
Assert.AreEqual(5, result.ClientRetryOptions.MaximumRetries);
7373
Assert.AreEqual(TimeSpan.FromSeconds(1), result.ClientRetryOptions.Delay);
7474
Assert.AreEqual(TimeSpan.FromMinutes(1), result.ClientRetryOptions.MaximumDelay);
@@ -88,16 +88,16 @@ public void ConfigureOptions_AppliesValuesCorrectly_BackCompat()
8888
Assert.AreEqual(5, options.BatchCheckpointFrequency);
8989
Assert.AreEqual(31, options.EventProcessorOptions.PartitionOwnershipExpirationInterval.TotalSeconds);
9090
Assert.AreEqual(21, options.EventProcessorOptions.LoadBalancingUpdateInterval.TotalSeconds);
91-
Assert.AreEqual("FromEnqueuedTime", options.InitialOffsetOptions.Type);
92-
Assert.AreEqual("2020-09-13T12:00Z", options.InitialOffsetOptions.EnqueuedTimeUTC);
91+
Assert.AreEqual("FromEnqueuedTime", options.InitialOffsetOptions.Type.ToString());
92+
Assert.AreEqual("2020-09-13 12:00:00Z", options.InitialOffsetOptions.EnqueuedTimeUtc.Value.ToString("u"));
9393
}
9494

9595
[Test]
9696
public void ConfigureOptions_Format_Returns_Expected_BackCompat()
9797
{
9898
EventHubOptions options = CreateOptionsFromConfigBackCompat();
9999

100-
string format = ((IOptionsFormatter) options).Format();
100+
string format = ((IOptionsFormatter)options).Format();
101101
JObject iObj = JObject.Parse(format);
102102
EventHubOptions result = iObj.ToObject<EventHubOptions>();
103103

@@ -107,8 +107,45 @@ public void ConfigureOptions_Format_Returns_Expected_BackCompat()
107107
Assert.AreEqual(123, result.PrefetchCount);
108108
Assert.AreEqual(TimeSpan.FromSeconds(31), result.PartitionOwnershipExpirationInterval);
109109
Assert.AreEqual(TimeSpan.FromSeconds(21), result.LoadBalancingUpdateInterval);
110-
Assert.AreEqual("FromEnqueuedTime", result.InitialOffsetOptions.Type);
111-
Assert.AreEqual("2020-09-13T12:00Z", result.InitialOffsetOptions.EnqueuedTimeUTC);
110+
Assert.AreEqual("FromEnqueuedTime", result.InitialOffsetOptions.Type.ToString());
111+
Assert.AreEqual("2020-09-13 12:00:00Z", result.InitialOffsetOptions.EnqueuedTimeUtc.Value.ToString("u"));
112+
}
113+
114+
[Test]
115+
[TestCase("fromstart", OffsetType.FromStart)]
116+
[TestCase("FromStart", OffsetType.FromStart)]
117+
[TestCase("fromend", OffsetType.FromEnd)]
118+
[TestCase("FromEnd", OffsetType.FromEnd)]
119+
public void CanParseInitialOffsetFromConfig(string offsetType, OffsetType typeEnum)
120+
{
121+
string extensionPath = "AzureWebJobs:Extensions:EventHubs";
122+
var options = TestHelpers.GetConfiguredOptions<EventHubOptions>(
123+
b =>
124+
{
125+
b.AddEventHubs();
126+
},
127+
new Dictionary<string, string> { { $"{extensionPath}:InitialOffsetOptions:Type", offsetType } });
128+
Assert.AreEqual(typeEnum, options.InitialOffsetOptions.Type);
129+
}
130+
131+
[Test]
132+
[TestCase("fromEnqueuedTime", "2020-09-13T12:00Z")]
133+
[TestCase("fromenqueuedtime", "2020-09-13 12:00:00Z")]
134+
public void CanParseInitialOffsetFromConfig_EnqueuedTime(string offsetType, string enqueuedTime)
135+
{
136+
string extensionPath = "AzureWebJobs:Extensions:EventHubs";
137+
var options = TestHelpers.GetConfiguredOptions<EventHubOptions>(
138+
b =>
139+
{
140+
b.AddEventHubs();
141+
},
142+
new Dictionary<string, string>
143+
{
144+
{ $"{extensionPath}:InitialOffsetOptions:Type", offsetType },
145+
{ $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUtc", enqueuedTime },
146+
});
147+
Assert.AreEqual(OffsetType.FromEnqueuedTime, options.InitialOffsetOptions.Type);
148+
Assert.AreEqual(DateTimeOffset.Parse(enqueuedTime), options.InitialOffsetOptions.EnqueuedTimeUtc);
112149
}
113150

114151
private EventHubOptions CreateOptionsFromConfig()
@@ -124,7 +161,7 @@ private EventHubOptions CreateOptionsFromConfig()
124161
{ $"{extensionPath}:LoadBalancingUpdateInterval", "00:00:21" },
125162
{ $"{extensionPath}:LoadBalancingStrategy", "0" },
126163
{ $"{extensionPath}:InitialOffsetOptions:Type", "FromEnqueuedTime" },
127-
{ $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUTC", "2020-09-13T12:00Z" },
164+
{ $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUTC", "2020-09-13 12:00:00Z" },
128165
{ $"{extensionPath}:ClientRetryOptions:MaximumRetries", "5" },
129166
{ $"{extensionPath}:ClientRetryOptions:Delay", "00:00:01" },
130167
{ $"{extensionPath}:ClientRetryOptions:MaxDelay", "00:01:00" },
@@ -153,7 +190,7 @@ private EventHubOptions CreateOptionsFromConfigBackCompat()
153190
{ $"{extensionPath}:PartitionManagerOptions:LeaseDuration", "00:00:31" },
154191
{ $"{extensionPath}:PartitionManagerOptions:RenewInterval", "00:00:21" },
155192
{ $"{extensionPath}:InitialOffsetOptions:Type", "FromEnqueuedTime" },
156-
{ $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUTC", "2020-09-13T12:00Z" },
193+
{ $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUTC", "2020-09-13 12:00:00Z" },
157194
};
158195

159196
return TestHelpers.GetConfiguredOptions<EventHubOptions>(b =>

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public async Task EventHub_InitialOffsetFromStart()
299299
{
300300
services.Configure<EventHubOptions>(options =>
301301
{
302-
options.InitialOffsetOptions.Type = "FromStart";
302+
options.InitialOffsetOptions.Type = OffsetType.FromStart;
303303
});
304304
});
305305
ConfigureTestEventHub(builder);
@@ -325,7 +325,7 @@ public async Task EventHub_InitialOffsetFromEnd()
325325
{
326326
services.Configure<EventHubOptions>(options =>
327327
{
328-
options.InitialOffsetOptions.Type = "FromEnd";
328+
options.InitialOffsetOptions.Type = OffsetType.FromEnd;
329329
});
330330
});
331331
ConfigureTestEventHub(builder);
@@ -346,8 +346,6 @@ public async Task EventHub_InitialOffsetFromEnd()
346346
[Test]
347347
public async Task EventHub_InitialOffsetFromEnqueuedTime()
348348
{
349-
// Mark the time now and send a message which should be the only one that is picked up when we run the actual test host
350-
351349
var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
352350
for (int i = 0; i < 3; i++)
353351
{
@@ -377,9 +375,10 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
377375
{
378376
services.Configure<EventHubOptions>(options =>
379377
{
380-
options.InitialOffsetOptions.Type = "FromEnqueuedTime";
381-
// for some reason, this doesn't seem to work if including milliseconds in the format
382-
options.InitialOffsetOptions.EnqueuedTimeUTC = _initialOffsetEnqueuedTimeUTC.ToString("yyyy-MM-ddTHH:mm:ssZ");
378+
options.InitialOffsetOptions.Type = OffsetType.FromEnqueuedTime;
379+
// for some reason, this doesn't seem to work reliably if including milliseconds in the format
380+
var dto = DateTimeOffset.Parse(_initialOffsetEnqueuedTimeUTC.ToString("yyyy-MM-ddTHH:mm:ssZ"));
381+
options.InitialOffsetOptions.EnqueuedTimeUtc = dto;
383382
});
384383
});
385384
ConfigureTestEventHub(builder);
@@ -658,6 +657,7 @@ public class EventHubTestInitialOffsetFromEnqueuedTimeJobs
658657

659658
public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connection = TestHubName)] EventData[] events)
660659
{
660+
Assert.LessOrEqual(events.Length, ExpectedEventsCount);
661661
foreach (EventData eventData in events)
662662
{
663663
string message = Encoding.UTF8.GetString(eventData.Body.ToArray());

0 commit comments

Comments
 (0)