Skip to content

Commit 297d5dc

Browse files
Feature - provide specific Azure Service Bus message operations during message handling (#119)
* Feature - provide specific Azure Service Bus message operations during message handling * pr-sug: add single not-matching message handler to the message pump * pr-sug: add sending of the order message * pr-sug: don't autocomplete in dead letter scenario * pr-sug: extra logging + more strict build pipelines * pr-sug: use public service type * pr-fix: use correct dead letter queue * pr-add: integration test with abandoning the message * pr-fix: use correct logger property * pr-add: integration test for abandoning mesasges * pr-docs: update docs with influence features * pr-test: update with finishing touches on the comments and clarity * pr-fix: not autocomplete on abandon * pr-fix: update with static delivery count * Update docs/preview/features/message-pumps/service-bus.md Co-authored-by: Tom Kerkhove <[email protected]> * Update docs/preview/features/message-pumps/service-bus.md Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/MessageHandling/AzureServiceBusFallbackMessageHandler.cs Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/MessageHandling/AzureServiceBusFallbackMessageHandler.cs Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/MessageHandling/AzureServiceBusFallbackMessageHandler.cs Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/MessageHandling/AzureServiceBusFallbackMessageHandler.cs Co-authored-by: Tom Kerkhove <[email protected]> * Update src/Arcus.Messaging.Pumps.ServiceBus/MessageHandling/AzureServiceBusFallbackMessageHandler.cs Co-authored-by: Tom Kerkhove <[email protected]> * pr-sug: add xml docs for 'Service' prop * pr-sug: update logs with extra information and lower cases Co-authored-by: Tom Kerkhove <[email protected]>
1 parent 97d3b49 commit 297d5dc

23 files changed

+1045
-54
lines changed

build/ci-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ stages:
135135

136136
- stage: SelfContainingIntegrationTests
137137
displayName: Self-Containing Integration Tests
138-
dependsOn: Build
138+
dependsOn: DockerIntegrationTests
139139
condition: succeeded()
140140
variables:
141141
- name: 'Arcus.Health.Port.Queue'

build/nuget-release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ stages:
120120

121121
- stage: SelfContainingIntegrationTests
122122
displayName: Self-Containing Integration Tests
123-
dependsOn: Build
123+
dependsOn: DockerIntegrationTests
124124
condition: succeeded()
125125
variables:
126126
- name: 'Arcus.Health.Port.Queue'

docs/preview/features/message-pumps/customization.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ layout: default
88
While the message processing is handled by the `IMessageHandler<>` implementations, the message pump controls in what format the message is received.
99
We allow several customizations while implementing your own message pump.
1010

11+
- [Control custom deserialization](#control-custom-deserialization)
12+
- [Filter messages based on message context](#filter-messages-based-on-message-context)
13+
- [Fallback message handling](#fallback-message-handling)
14+
1115
## Control custom deserialization
1216

1317
When inheriting from an `...MessagePump` type, there's a way to control how the incoming raw message is being deserialized.

docs/preview/features/message-pumps/service-bus.md

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Azure Service Bus Message Pump will perform all the plumbing that is required fo
1414
- Provide telemetry
1515

1616
As a user, the only thing you have to do is **focus on processing messages, not how to get them**.
17-
1817
You can do this by creating a message handler which implements from `IAzureServiceBusMessageHandler<TMessage>` (or `IMessageHandler<TMessage, MessageContext>`).
1918

2019
Here is an example of a message handler that expects messages of type `Order`:
@@ -57,6 +56,13 @@ public class OrdersMessageHandler : IMessageHandler<Order>
5756
}
5857
```
5958

59+
Other topics:
60+
- [Configuration](#configuration)
61+
- [Customized configuration](#customized-configuration)
62+
- [Fallback message handling](#fallback-message-handling)
63+
- [Influence handling of Service Bus message in a message handler](#influence-handling-of-Service-Bus-message-in-message-handler)
64+
- [Correlation](#correlation)
65+
6066
## Configuration
6167

6268
Once the message handler is created, you can very easily configure it:
@@ -194,6 +200,89 @@ public void ConfigureServices(IServiceCollection services)
194200
}
195201
```
196202

203+
## Influence handling of Service Bus message in message handler
204+
205+
When an Azure Service Bus message is received (either via regular message handlers or fallback message handlers), we allow specific Azure Service Bus operations during the message handling.
206+
Currently we support [**Dead letter**](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues) and [*Abandon**](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.messagereceiver.abandon?view=azure-dotnet).
207+
208+
### During (regular) message handling
209+
210+
To have access to the Azure Service Bus operations, you have to implement the `abstract` `AzureServiceBusMessageHandler<T>` class.
211+
Behind the screens it implements the `IMessageHandler<>` interface, so you can register this the same way as your other regular message handlers.
212+
213+
This base class provides several protected methods to call the Azure Service Bus operations:
214+
- `.DeadLetterMessageAsync`
215+
- `.AbandonMessageAsync`
216+
217+
Example:
218+
219+
```csharp
220+
public class AbandonsUnknownOrderMessageHandler : AzureServiceBusMessageHandler<Order>
221+
{
222+
public AbandonsUnknownOrderMessageHandler(ILogger<AbandonsUnknownOrderMessageHandler> logger)
223+
: base(logger)
224+
{
225+
}
226+
227+
public override async Task ProcessMessageAsync(Order order, AzureServiceBusMessageContext context, ...)
228+
{
229+
if (order.Id < 1)
230+
{
231+
await AbandonMessageAsync();
232+
}
233+
else
234+
{
235+
Logger.LogInformation("Received valid order");
236+
}
237+
}
238+
}
239+
```
240+
241+
The registration happens the same as any other regular message handler:
242+
243+
```csharp
244+
public void ConfigureServices(IServiceCollection services)
245+
{
246+
services.WithServiceBusMessageHandler<AbandonUnknownOrderMessageHandler, Order>();
247+
}
248+
```
249+
250+
### During fallback message handling
251+
252+
To have access to the Azure Service Bus operations, you have to implement the abstract `AzureServiceBusFallbackMessageHandler` class.
253+
Behind the scenes it implements the `IServiceBusFallbackMessageHandler`, so you can register this the same way as any other fallback message handler.
254+
255+
This base class provides several protected methods to call the Azure Service Bus operations:
256+
- `.DeadLetterAsync`
257+
- `.AbandonAsync`
258+
259+
Example:
260+
261+
```csharp
262+
public class DeadLetterFallbackMessageHandler : AzureServiceBusFallbackMessageHandler
263+
{
264+
public DeadLetterFallbackMessageHandler(ILogger<DeadLetterFallbackMessageHandler> logger)
265+
: base(logger)
266+
{
267+
}
268+
269+
public override async Task ProcessMessageAsync(Message message, AzureServiceBusMessageContext context, ...)
270+
{
271+
Logger.LogInformation("Message is not handled by any message handler, will dead letter");
272+
await DeadLetterAsync(message);
273+
}
274+
}
275+
```
276+
277+
The registration happens the same way as any other fallback message handler:
278+
279+
```csharp
280+
public void ConfigureServices(IServiceCollection services)
281+
{
282+
services.WithServiceBusFallbackMessageHandler<DeadLetterFallbackMessageHandler>();
283+
}
284+
```
285+
197286
## Correlation
198287

199288
To retrieve the correlation information of Azure Service Bus messages, we provide an extension that wraps all correlation information.
@@ -220,4 +309,4 @@ We provide templates to get started easily:
220309
- Azure Service Bus Queue Worker Template ([docs](https://templates.arcus-azure.net/features/servicebus-queue-worker-template))
221310
- Azure Service Bus Topic Worker Template ([docs](https://templates.arcus-azure.net/features/servicebus-topic-worker-template))
222311

223-
[&larr; back](/)
312+
[&larr; back](/)

src/Arcus.Messaging.Pumps.Abstractions/MessageHandling/IServiceCollectionExtensions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public static IServiceCollection WithMessageHandler<TMessageHandler, TMessage>(t
2626
{
2727
Guard.NotNull(services, nameof(services));
2828

29-
services.AddSingleton<IMessageHandler<TMessage, MessageContext>, TMessageHandler>();
29+
services.AddTransient<IMessageHandler<TMessage, MessageContext>, TMessageHandler>();
3030

3131
return services;
3232
}
@@ -48,7 +48,7 @@ public static IServiceCollection WithMessageHandler<TMessageHandler, TMessage>(
4848
Guard.NotNull(services, nameof(services));
4949
Guard.NotNull(implementationFactory, nameof(implementationFactory));
5050

51-
services.AddSingleton<IMessageHandler<TMessage, MessageContext>, TMessageHandler>(implementationFactory);
51+
services.AddTransient<IMessageHandler<TMessage, MessageContext>, TMessageHandler>(implementationFactory);
5252

5353
return services;
5454
}
@@ -68,7 +68,7 @@ public static IServiceCollection WithMessageHandler<TMessageHandler, TMessage, T
6868
{
6969
Guard.NotNull(services, nameof(services));
7070

71-
services.AddSingleton<IMessageHandler<TMessage, TMessageContext>, TMessageHandler>();
71+
services.AddTransient<IMessageHandler<TMessage, TMessageContext>, TMessageHandler>();
7272

7373
return services;
7474
}
@@ -92,7 +92,7 @@ public static IServiceCollection WithMessageHandler<TMessageHandler, TMessage, T
9292
Guard.NotNull(services, nameof(services));
9393
Guard.NotNull(implementationFactory, nameof(implementationFactory));
9494

95-
services.AddSingleton<IMessageHandler<TMessage, TMessageContext>, TMessageHandler>(implementationFactory);
95+
services.AddTransient<IMessageHandler<TMessage, TMessageContext>, TMessageHandler>(implementationFactory);
9696

9797
return services;
9898
}
@@ -186,7 +186,7 @@ public static IServiceCollection WithMessageHandler<TMessageHandler, TMessage, T
186186
Guard.NotNull(messageContextFilter, nameof(messageContextFilter));
187187
Guard.NotNull(implementationFactory, nameof(implementationFactory));
188188

189-
return services.AddSingleton<IMessageHandler<TMessage, TMessageContext>, MessageHandlerRegistration<TMessage, TMessageContext>>(
189+
return services.AddTransient<IMessageHandler<TMessage, TMessageContext>, MessageHandlerRegistration<TMessage, TMessageContext>>(
190190
serviceProvider => new MessageHandlerRegistration<TMessage, TMessageContext>(
191191
messageContextFilter, implementationFactory(serviceProvider)));
192192
}

src/Arcus.Messaging.Pumps.Abstractions/MessageHandling/MessageHandler.cs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ namespace Arcus.Messaging.Pumps.Abstractions.MessageHandling
1818
/// </summary>
1919
public class MessageHandler
2020
{
21-
private readonly object _service;
2221
private readonly ILogger _logger;
2322

2423
private MessageHandler(Type serviceType, object service, ILogger logger)
@@ -29,19 +28,24 @@ private MessageHandler(Type serviceType, object service, ILogger logger)
2928
Guard.For<ArgumentException>(
3029
() => serviceType.GenericTypeArguments.Length != 2,
3130
$"Message handler type '{serviceType.Name}' has not the expected 2 generic type arguments");
32-
33-
_service = service;
31+
3432
_logger = logger;
3533

34+
Service = service;
3635
ServiceType = serviceType;
3736
MessageType = ServiceType.GenericTypeArguments[0];
3837
MessageContextType = ServiceType.GenericTypeArguments[1];
3938
}
4039

40+
/// <summary>
41+
/// Gets the instance of the message handler that this abstracted message handler represents.
42+
/// </summary>
43+
public object Service { get; }
44+
4145
/// <summary>
4246
/// Gets the type of the message handler that this abstracted message handler represents.
4347
/// </summary>
44-
internal Type ServiceType { get; }
48+
public Type ServiceType { get; }
4549

4650
/// <summary>
4751
/// Gets the type of the message that this abstracted message handler can process.
@@ -104,13 +108,13 @@ public bool CanProcessMessage<TMessageContext>(TMessageContext messageContext) w
104108
"Message context type '{ActualMessageContextType}' matches registered message handler's {MessageHandlerType} context type {ExpectedMessageContextType}",
105109
actualMessageContextType.Name, ServiceType.Name, expectedMessageContextType.Name);
106110

107-
if (_service.GetType().Name == typeof(MessageHandlerRegistration<,>).Name)
111+
if (Service.GetType().Name == typeof(MessageHandlerRegistration<,>).Name)
108112
{
109113
_logger.LogTrace(
110114
"Determining whether the message context predicate registered with the message handler {MessageHandlerType} holds...",
111115
ServiceType.Name);
112116

113-
var canProcessMessage = (bool) _service.InvokeMethod(
117+
var canProcessMessage = (bool) Service.InvokeMethod(
114118
"CanProcessMessage",
115119
BindingFlags.Instance | BindingFlags.NonPublic,
116120
messageContext);
@@ -169,13 +173,13 @@ public async Task ProcessMessageAsync<TMessageContext>(
169173
try
170174
{
171175
var processMessageAsync =
172-
(Task)_service.InvokeMethod(
176+
(Task)Service.InvokeMethod(
173177
methodName, BindingFlags.Instance | BindingFlags.Public, message, messageContext, correlationInfo, cancellationToken);
174178

175179
if (processMessageAsync is null)
176180
{
177181
throw new InvalidOperationException(
178-
$"The '{typeof(IMessageHandler<,>).Name}' implementation '{_service.GetType().Name}' returned 'null' while calling the '{methodName}' method");
182+
$"The '{typeof(IMessageHandler<,>).Name}' implementation '{Service.GetType().Name}' returned 'null' while calling the '{methodName}' method");
179183
}
180184

181185
await processMessageAsync;
@@ -190,7 +194,7 @@ public async Task ProcessMessageAsync<TMessageContext>(
190194
methodName, ServiceType.Name, methodName, ServiceType.Name);
191195

192196
throw new AmbiguousMatchException(
193-
$"Ambiguous match found of '{methodName}' methods in the '{_service.GetType().Name}'. ", exception);
197+
$"Ambiguous match found of '{methodName}' methods in the '{Service.GetType().Name}'. ", exception);
194198
}
195199
}
196200
}

src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@ protected virtual Task HandleReceiveExceptionAsync(Exception receiveException)
9090
return Task.CompletedTask;
9191
}
9292

93+
/// <summary>
94+
/// Pre-process the message by setting the necessary values the <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation.
95+
/// </summary>
96+
/// <param name="messageHandler">The message handler to be used to process the message.</param>
97+
/// <param name="messageContext">The message context of the message that will be handled.</param>
98+
protected virtual Task PreProcessMessageAsync<TMessageContext>(MessageHandler messageHandler, TMessageContext messageContext)
99+
where TMessageContext : MessageContext
100+
{
101+
return Task.CompletedTask;
102+
}
103+
93104
/// <summary>
94105
/// Handle a new message that was received
95106
/// </summary>
@@ -173,7 +184,7 @@ private async Task<bool> TryProcessMessageAsync<TMessageContext>(
173184
CancellationToken cancellationToken)
174185
where TMessageContext : MessageContext
175186
{
176-
IEnumerable<MessageHandler> handlers = _messageHandlers.Value;
187+
IEnumerable<MessageHandler> handlers = MessageHandler.SubtractFrom(ServiceProvider, Logger);
177188
if (!handlers.Any() && _fallbackMessageHandler is null)
178189
{
179190
throw new InvalidOperationException(
@@ -215,6 +226,7 @@ private async Task<bool> ProcessMessageAsync<TMessageContext>(
215226
"Successful parsing from abstracted message to concrete message handler type did unexpectedly result in a 'null' parsing result");
216227
}
217228

229+
await PreProcessMessageAsync(handler, messageContext);
218230
await handler.ProcessMessageAsync(result, messageContext, correlationInfo, cancellationToken);
219231
return true;
220232
}

0 commit comments

Comments
 (0)