Skip to content

Commit f38dc64

Browse files
authored
feat(circuit-breaker): add job ID + old/new state change to circuit-breaker transition (#469)
* feat: add job ID + old/new state change to circuit-breaker transition * pr-sug: rename state change to '...EventArgs' * pr-sug: rename event-args to `MessagePumpCircuitStateChangedEventArgs`
1 parent 9bb1f24 commit f38dc64

File tree

5 files changed

+97
-33
lines changed

5 files changed

+97
-33
lines changed

docs/preview/02-Features/06-general-messaging.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,16 @@ The instances should implement the `ICircuitBreakerEventHandler`, which allows y
9696
```csharp
9797
public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler
9898
{
99-
public Task OnTransitionAsync(MessagePumpCircuitState newState)
99+
public Task OnTransitionAsync(MessagePumpCircuitStateChang change)
100100
{
101-
// ...
101+
// The job ID of the message pump that was transitioned.
102+
string jobId = change.JobId;
103+
104+
// The circuit breaker state transitions.
105+
MessagePumpCircuitState oldState = change.OldState;
106+
MessagePumpCircuitState newState = change.NewState;
107+
108+
// Process the state change event...
102109
}
103110
}
104111
```

src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,12 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati
145145

146146
if (!CircuitState.IsHalfOpen)
147147
{
148-
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
148+
MessagePumpCircuitState
149+
oldState = CircuitState,
150+
newState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
149151

150-
NotifyCircuitBreakerStateChangedSubscribers();
152+
CircuitState = newState;
153+
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
151154
}
152155
}
153156

@@ -162,9 +165,12 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca
162165

163166
if (!CircuitState.IsHalfOpen)
164167
{
165-
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
168+
MessagePumpCircuitState
169+
oldState = CircuitState,
170+
newState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
166171

167-
NotifyCircuitBreakerStateChangedSubscribers();
172+
CircuitState = newState;
173+
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
168174
}
169175
}
170176

@@ -176,9 +182,12 @@ internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions option
176182
{
177183
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);
178184

179-
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);
185+
MessagePumpCircuitState
186+
oldState = CircuitState,
187+
newState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);
180188

181-
NotifyCircuitBreakerStateChangedSubscribers();
189+
CircuitState = newState;
190+
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
182191
}
183192

184193
/// <summary>
@@ -188,18 +197,21 @@ protected void NotifyResumeRetrievingMessages()
188197
{
189198
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);
190199

191-
CircuitState = MessagePumpCircuitState.Closed;
200+
MessagePumpCircuitState
201+
oldState = CircuitState,
202+
newState = MessagePumpCircuitState.Closed;
192203

193-
NotifyCircuitBreakerStateChangedSubscribers();
204+
CircuitState = newState;
205+
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
194206
}
195207

196-
private void NotifyCircuitBreakerStateChangedSubscribers()
208+
private void NotifyCircuitBreakerStateChangedSubscribers(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)
197209
{
198210
ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump();
199211

200212
foreach (var handler in eventHandlers)
201213
{
202-
Task.Run(() => handler.OnTransition(CircuitState));
214+
Task.Run(() => handler.OnTransition(new MessagePumpCircuitStateChangedEventArgs(JobId, oldState, newState)));
203215
}
204216
}
205217

src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,47 @@ public interface ICircuitBreakerEventHandler
3333
/// <summary>
3434
/// Notifies the application on a change in the message pump's circuit breaker state.
3535
/// </summary>
36-
/// <param name="newState">The new circuit breaker state in which the message pump is currently running on.</param>
37-
void OnTransition(MessagePumpCircuitState newState);
36+
/// <param name="args">The change in the circuit breaker state for a message pump.</param>
37+
void OnTransition(MessagePumpCircuitStateChangedEventArgs args);
38+
}
39+
40+
/// <summary>
41+
/// Represents a change event of the <see cref="MessagePumpCircuitState"/> in a <see cref="MessagePump"/>.
42+
/// </summary>
43+
public class MessagePumpCircuitStateChangedEventArgs
44+
{
45+
/// <summary>
46+
/// Initializes a new instance of the <see cref="MessagePumpCircuitStateChangedEventArgs" /> class.
47+
/// </summary>
48+
internal MessagePumpCircuitStateChangedEventArgs(
49+
string jobId,
50+
MessagePumpCircuitState oldState,
51+
MessagePumpCircuitState newState)
52+
{
53+
if (string.IsNullOrWhiteSpace(jobId))
54+
{
55+
throw new ArgumentException("Requires a non-blank job ID for the circuit breaker event state change registration", nameof(jobId));
56+
}
57+
58+
JobId = jobId;
59+
OldState = oldState;
60+
NewState = newState;
61+
}
62+
63+
/// <summary>
64+
/// Gets the unique ID to distinguish the linked message pump that had it circuit breaker state changed.
65+
/// </summary>
66+
public string JobId { get; }
67+
68+
/// <summary>
69+
/// Gets the original circuit breaker state the linked message pump was in.
70+
/// </summary>
71+
public MessagePumpCircuitState OldState { get; }
72+
73+
/// <summary>
74+
/// Gets the current circuit breaker state the linked message pump is in.
75+
/// </summary>
76+
public MessagePumpCircuitState NewState { get; }
3877
}
3978

4079
/// <summary>

src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
using System.Collections.Generic;
1+
using System;
22
using System.Collections.ObjectModel;
3-
using System.Linq;
3+
using System.Threading.Tasks;
44
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
5+
using Arcus.Testing;
56
using Xunit;
7+
using Xunit.Sdk;
68

79
namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
810
{
@@ -11,28 +13,32 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
1113
/// </summary>
1214
internal class MockCircuitBreakerEventHandler : ICircuitBreakerEventHandler
1315
{
14-
private readonly Collection<MessagePumpCircuitState> _states = new();
16+
private readonly Collection<MessagePumpCircuitStateChangedEventArgs> _states = new();
1517

1618
/// <summary>
1719
/// Notifies the application on a change in the message pump's circuit breaker state.
1820
/// </summary>
19-
/// <param name="newState">The new circuit breaker state in which the message pump is currently running on.</param>
20-
public void OnTransition(MessagePumpCircuitState newState)
21+
/// <param name="args">The change in the circuit breaker state for a message pump.</param>
22+
public void OnTransition(MessagePumpCircuitStateChangedEventArgs args)
2123
{
22-
_states.Add(newState);
24+
_states.Add(args);
2325
}
2426

25-
public void ShouldTransitionCorrectly()
27+
/// <summary>
28+
/// Verifies that all fired circuit breaker state transitions are happening correctly.
29+
/// </summary>
30+
public async Task ShouldTransitionedCorrectlyAsync()
2631
{
27-
Assert.NotEmpty(_states);
28-
29-
MessagePumpCircuitState firstTransition = _states[0];
30-
Assert.True(firstTransition.IsOpen, $"when the message pump starts up, the first transition should always be from a closed to open state, but got: {firstTransition}");
32+
await Poll.Target<XunitException>(() =>
33+
{
34+
Assert.NotEmpty(_states);
35+
Assert.Equal(3, _states.Count);
3136

32-
IEnumerable<(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)> transitions =
33-
_states.SkipLast(1).Zip(_states.Skip(1));
37+
}).Every(TimeSpan.FromSeconds(1))
38+
.Timeout(TimeSpan.FromSeconds(10))
39+
.FailWith("could not in time find all the fired circuit breaker change events, possibly the message pump did not fired them");
3440

35-
Assert.All(transitions, t => VerifyCorrectTransition(t.oldState, t.newState));
41+
Assert.All(_states, t => VerifyCorrectTransition(t.OldState, t.NewState));
3642
}
3743

3844
private static void VerifyCorrectTransition(
@@ -41,7 +47,7 @@ private static void VerifyCorrectTransition(
4147
{
4248
if (oldState.IsClosed)
4349
{
44-
Assert.True(newState.IsHalfOpen, $"when the message pump comes from a closed state, the next state should always be half-open, but got: {newState}");
50+
Assert.True(newState.IsOpen, $"when the message pump comes from a closed state, the next state should always be open, but got: {newState}");
4551
}
4652
else if (oldState.IsOpen)
4753
{

src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public async Task ServiceBusMessageQueuePump_WithUnavailableDependencySystem_Cir
6262
await producer.ProduceAsync(messageAfterBreak);
6363
await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId);
6464

65-
mockEventHandler1.ShouldTransitionCorrectly();
66-
mockEventHandler2.ShouldTransitionCorrectly();
65+
await mockEventHandler1.ShouldTransitionedCorrectlyAsync();
66+
await mockEventHandler2.ShouldTransitionedCorrectlyAsync();
6767
}
6868

6969
[Fact]
@@ -99,8 +99,8 @@ public async Task ServiceBusMessageTopicPump_WithUnavailableDependencySystem_Cir
9999
await producer.ProduceAsync(messageAfterBreak);
100100
await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId);
101101

102-
mockEventHandler1.ShouldTransitionCorrectly();
103-
mockEventHandler2.ShouldTransitionCorrectly();
102+
await mockEventHandler1.ShouldTransitionedCorrectlyAsync();
103+
await mockEventHandler2.ShouldTransitionedCorrectlyAsync();
104104
}
105105

106106
private async Task<TemporaryTopicSubscription> CreateTopicSubscriptionForMessageAsync(params ServiceBusMessage[] messages)

0 commit comments

Comments
 (0)