Skip to content

Commit 47ea075

Browse files
authored
Cancelling function execution after partition ownership lost (Azure#27340)
1 parent 4b96b18 commit 47ea075

File tree

4 files changed

+85
-49
lines changed

4 files changed

+85
-49
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -147,63 +147,67 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err
147147
return Task.CompletedTask;
148148
}
149149

150-
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages)
150+
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken processingCancellationToken)
151151
{
152-
var events = messages.ToArray();
153-
EventData eventToCheckpoint = null;
154-
155-
var triggerInput = new EventHubTriggerInput
152+
using (CancellationTokenSource linkedCts =
153+
CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, processingCancellationToken))
156154
{
157-
Events = events,
158-
ProcessorPartition = context
159-
};
155+
var events = messages.ToArray();
156+
EventData eventToCheckpoint = null;
160157

161-
if (_singleDispatch)
162-
{
163-
// Single dispatch
164-
int eventCount = triggerInput.Events.Length;
158+
var triggerInput = new EventHubTriggerInput
159+
{
160+
Events = events,
161+
ProcessorPartition = context
162+
};
165163

166-
for (int i = 0; i < eventCount; i++)
164+
if (_singleDispatch)
167165
{
168-
if (_cts.IsCancellationRequested)
166+
// Single dispatch
167+
int eventCount = triggerInput.Events.Length;
168+
169+
for (int i = 0; i < eventCount; i++)
169170
{
170-
break;
171+
if (linkedCts.Token.IsCancellationRequested)
172+
{
173+
break;
174+
}
175+
176+
EventHubTriggerInput eventHubTriggerInput = triggerInput.GetSingleEventTriggerInput(i);
177+
TriggeredFunctionData input = new TriggeredFunctionData
178+
{
179+
TriggerValue = eventHubTriggerInput,
180+
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
181+
};
182+
183+
await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false);
184+
eventToCheckpoint = events[i];
171185
}
172-
173-
EventHubTriggerInput eventHubTriggerInput = triggerInput.GetSingleEventTriggerInput(i);
186+
}
187+
else
188+
{
189+
// Batch dispatch
174190
TriggeredFunctionData input = new TriggeredFunctionData
175191
{
176-
TriggerValue = eventHubTriggerInput,
177-
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
192+
TriggerValue = triggerInput,
193+
TriggerDetails = triggerInput.GetTriggerDetails(context)
178194
};
179195

180-
await _executor.TryExecuteAsync(input, _cts.Token).ConfigureAwait(false);
181-
eventToCheckpoint = events[i];
196+
await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false);
197+
eventToCheckpoint = events.LastOrDefault();
182198
}
183-
}
184-
else
185-
{
186-
// Batch dispatch
187-
TriggeredFunctionData input = new TriggeredFunctionData
188-
{
189-
TriggerValue = triggerInput,
190-
TriggerDetails = triggerInput.GetTriggerDetails(context)
191-
};
192199

193-
await _executor.TryExecuteAsync(input, _cts.Token).ConfigureAwait(false);
194-
eventToCheckpoint = events.LastOrDefault();
195-
}
196-
197-
// Checkpoint if we processed any events.
198-
// Don't checkpoint if no events. This can reset the sequence counter to 0.
199-
// Note: we intentionally checkpoint the batch regardless of function
200-
// success/failure. EventHub doesn't support any sort "poison event" model,
201-
// so that is the responsibility of the user's function currently. E.g.
202-
// the function should have try/catch handling around all event processing
203-
// code, and capture/log/persist failed events, since they won't be retried.
204-
if (eventToCheckpoint != null)
205-
{
206-
await CheckpointAsync(eventToCheckpoint, context).ConfigureAwait(false);
200+
// Checkpoint if we processed any events.
201+
// Don't checkpoint if no events. This can reset the sequence counter to 0.
202+
// Note: we intentionally checkpoint the batch regardless of function
203+
// success/failure. EventHub doesn't support any sort "poison event" model,
204+
// so that is the responsibility of the user's function currently. E.g.
205+
// the function should have try/catch handling around all event processing
206+
// code, and capture/log/persist failed events, since they won't be retried.
207+
if (eventToCheckpoint != null)
208+
{
209+
await CheckpointAsync(eventToCheckpoint, context).ConfigureAwait(false);
210+
}
207211
}
208212
}
209213

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ protected override Task OnProcessingEventBatchAsync(IEnumerable<EventData> event
104104
return Task.CompletedTask;
105105
}
106106

107-
return partition.EventProcessor.ProcessEventsAsync(partition, events);
107+
return partition.EventProcessor.ProcessEventsAsync(partition, events, cancellationToken);
108108
}
109109

110110
protected override async Task OnInitializingPartitionAsync(EventProcessorHostPartition partition, CancellationToken cancellationToken)

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Azure.Messaging.EventHubs;
89
using Azure.Messaging.EventHubs.Processor;
@@ -14,6 +15,6 @@ internal interface IEventProcessor
1415
Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason);
1516
Task OpenAsync(EventProcessorHostPartition context);
1617
Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error);
17-
Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages);
18+
Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken cancellationToken);
1819
}
1920
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public async Task ProcessEvents_SingleDispatch_CheckpointsCorrectly(int batchChe
5151
for (int i = 0; i < 100; i++)
5252
{
5353
List<EventData> events = new List<EventData>() { new EventData(new byte[0]) };
54-
await eventProcessor.ProcessEventsAsync(partitionContext, events);
54+
await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
5555
}
5656

5757
Assert.AreEqual(expected, checkpoints);
@@ -82,7 +82,7 @@ public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchC
8282
for (int i = 0; i < 100; i++)
8383
{
8484
List<EventData> events = new List<EventData>() { new EventData(new byte[0]), new EventData(new byte[0]), new EventData(new byte[0]) };
85-
await eventProcessor.ProcessEventsAsync(partitionContext, events);
85+
await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
8686
}
8787

8888
processor.Verify(
@@ -126,7 +126,7 @@ public async Task ProcessEvents_Failure_Checkpoints()
126126

127127
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true);
128128

129-
await eventProcessor.ProcessEventsAsync(partitionContext, events);
129+
await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
130130

131131
processor.Verify(
132132
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny<EventData>(), It.IsAny<CancellationToken>()),
@@ -269,5 +269,36 @@ public void Dispose_StopsTheProcessor()
269269
(listener as IListener).Cancel();
270270
host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Exactly(2));
271271
}
272+
273+
[Test]
274+
public async Task ProcessEvents_CancellationToken_CancelsExecution()
275+
{
276+
var partitionContext = EventHubTests.GetPartitionContext();
277+
var options = new EventHubOptions();
278+
var processor = new Mock<EventProcessorHost>(MockBehavior.Strict);
279+
processor.Setup(p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny<EventData>(), It.IsAny<CancellationToken>())).Returns(Task.CompletedTask);
280+
partitionContext.ProcessorHost = processor.Object;
281+
282+
var loggerMock = new Mock<ILogger>();
283+
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
284+
executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>()))
285+
.Callback<TriggeredFunctionData, CancellationToken>(async (TriggeredFunctionData triggeredFunctionData, CancellationToken cancellationToken) =>
286+
{
287+
while (!cancellationToken.IsCancellationRequested)
288+
{
289+
await Task.Delay(100);
290+
}
291+
})
292+
.ReturnsAsync(new FunctionResult(true));
293+
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true);
294+
List<EventData> events = new List<EventData>() { new EventData(new byte[0]) };
295+
CancellationTokenSource source = new CancellationTokenSource();
296+
// Start another thread to cancel execution
297+
_ = Task.Run(async () =>
298+
{
299+
await Task.Delay(500);
300+
});
301+
await eventProcessor.ProcessEventsAsync(partitionContext, events, source.Token);
302+
}
272303
}
273304
}

0 commit comments

Comments
 (0)