-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathQueueHandlerAsyncTests.cs
134 lines (117 loc) · 4.6 KB
/
QueueHandlerAsyncTests.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
using System;
using System.Threading;
using FluentAssertions;
using Grumpy.Common.Interfaces;
using Grumpy.Common.Threading;
using Grumpy.Json;
using Grumpy.MessageQueue.Enum;
using Grumpy.MessageQueue.Interfaces;
using Microsoft.Extensions.Logging.Abstractions;
using Newtonsoft.Json;
using NSubstitute;
using Xunit;
namespace Grumpy.MessageQueue.UnitTests
{
// ReSharper disable once ClassWithVirtualMembersNeverInherited.Global
public class QueueHandlerAsyncTests : IDisposable
{
private readonly IQueueFactory _queueFactory;
private readonly ITaskFactory _taskFactory;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly CancellationToken _cancellationToken;
private readonly ILocaleQueue _queue;
private bool _disposed;
public QueueHandlerAsyncTests()
{
_queue = Substitute.For<ILocaleQueue>();
_queue.Receive(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(e => CreateMessage("Message1"), e => CreateMessage("Message2"), e => CreateMessage("Message3"), e => null);
_queueFactory = Substitute.For<IQueueFactory>();
_queueFactory.CreateLocale(Arg.Any<string>(), Arg.Any<bool>(), Arg.Any<LocaleQueueMode>(), Arg.Any<bool>(), Arg.Any<AccessMode>()).Returns(_queue);
_taskFactory = new TaskFactory();
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
}
[Fact]
public void CanStopQueue()
{
using (var cut = CreateQueueHandler())
{
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, null, 100, false, false, _cancellationToken);
cut.Stop();
}
}
[Fact]
public void InvalidHeartbeatRateShouldThrow()
{
using (var cut = CreateQueueHandler())
{
Assert.Throws<ArgumentException>(() => cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, () => { }, -1, false, false, _cancellationToken));
}
}
[Fact]
public void MultiStartShouldThrowException()
{
using (var cut = CreateQueueHandler())
{
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, null, 100, false, false, _cancellationToken);
Assert.Throws<ArgumentException>(() => cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, null, 100, false, false, _cancellationToken));
}
}
[Fact]
public void QueueBeforeShouldBeIdle()
{
using (var cut = CreateQueueHandler())
{
cut.Idle.Should().BeTrue();
}
}
[Fact]
public void QueueAfterShouldNotBeIdle()
{
using (var cut = CreateQueueHandler())
{
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, null, 100, false, false, _cancellationToken);
cut.Idle.Should().BeFalse();
}
}
private IQueueHandler CreateQueueHandler()
{
return new QueueHandler(NullLogger.Instance, _queueFactory, _taskFactory);
}
private static ITransactionalMessage CreateMessage(object body)
{
var message = Substitute.For<ITransactionalMessage>();
message.Message.Returns(body);
message.Body.Returns(body.SerializeToJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }));
message.Type.Returns(body.GetType());
return message;
}
[Fact]
public void HeartbeatAreCalled()
{
using (var cut = CreateQueueHandler())
{
var i = 0;
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, () => { ++i; }, 1, false, false, _cancellationToken);
Thread.Sleep(1000);
i.Should().BeGreaterOrEqualTo(1);
}
}
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_cancellationTokenSource.Dispose();
_queue?.Dispose();
}
_disposed = true;
}
}
}
}