Skip to content

Commit 618d08f

Browse files
committed
Better Connect and let incorrect logging
1 parent 393d395 commit 618d08f

File tree

7 files changed

+90
-19
lines changed

7 files changed

+90
-19
lines changed

Grumpy.MessageQueue.Msmq.IntegrationTests/LocaleQueueTests.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void CancelReceiveAsyncShouldStopBeforeTimeout()
146146
}
147147

148148
stopwatch.Stop();
149-
stopwatch.ElapsedMilliseconds.Should().BeInRange(400, 700);
149+
stopwatch.ElapsedMilliseconds.Should().BeInRange(400, 711);
150150
}
151151
}
152152
finally
@@ -270,6 +270,7 @@ public void SendAndReceiveLargeMessageShouldWork()
270270

271271
queue.Count().Should().Be(2);
272272
queue.Transactional.Should().BeTrue();
273+
queue.AccessMode.Should().Be(AccessMode.SendAndReceive);
273274
}
274275

275276
using (var queue = CreateLocalQueue(name, true, LocaleQueueMode.Durable))

Grumpy.MessageQueue.Msmq.UnitTests/QueueTests.cs

+13-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ public void SendToExistingQueueShouldNotCallCreate()
5050
_messageQueueManager.Received(2).Get(".", Arg.Any<string>(), Arg.Any<bool>(), Arg.Any<QueueAccessMode>());
5151
}
5252

53+
[Fact]
54+
public void SendLargeMessageToNonTransactionalQueueShouldThrow()
55+
{
56+
SetQueue(Substitute.For<System.Messaging.MessageQueue>(), true);
57+
58+
using (var cut = CreateLocaleQueue("MyQueue", true, LocaleQueueMode.Durable, AccessMode.Send, false))
59+
{
60+
Assert.Throws<MessageSizeException>(() => cut.Send(new StringBuilder().Insert(0, "A", 5000000)));
61+
}
62+
}
63+
5364
[Fact]
5465
public void SendToNoneExistingQueueShouldCallCreate()
5566
{
@@ -273,9 +284,9 @@ public void ReceiveOnSendQueueShouldThrowException()
273284
}
274285
}
275286

276-
private IQueue CreateLocaleQueue(string queue = "MyQueue", bool privateQueue = true, LocaleQueueMode localeQueueMode = LocaleQueueMode.TemporaryMaster, AccessMode accessMode = AccessMode.SendAndReceive)
287+
private IQueue CreateLocaleQueue(string queue = "MyQueue", bool privateQueue = true, LocaleQueueMode localeQueueMode = LocaleQueueMode.TemporaryMaster, AccessMode accessMode = AccessMode.SendAndReceive, bool transactional = true)
277288
{
278-
return new LocaleQueue(_logger, _messageQueueManager, _messageQueueTransactionFactory, queue, privateQueue, localeQueueMode, true, accessMode);
289+
return new LocaleQueue(_logger, _messageQueueManager, _messageQueueTransactionFactory, queue, privateQueue, localeQueueMode, transactional, accessMode);
279290
}
280291

281292
private void SetQueue(System.Messaging.MessageQueue queue, bool exists)

Grumpy.MessageQueue.Msmq/Queue.cs

+14-16
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
using Grumpy.MessageQueue.Msmq.Interfaces;
1717
using Microsoft.Extensions.Logging;
1818
using Newtonsoft.Json;
19-
using ObjectExtensions = Grumpy.Common.Extensions.ObjectExtensions;
2019

2120
namespace Grumpy.MessageQueue.Msmq
2221
{
@@ -25,7 +24,6 @@ public abstract class Queue : IQueue
2524
{
2625
private const int MaxMsmqMessageSize = 4096000;
2726
private System.Messaging.MessageQueue _messageQueue;
28-
private readonly AccessMode _accessMode;
2927
private readonly object _messageQueueLock;
3028
private readonly Timer _disconnectTimer;
3129

@@ -49,7 +47,7 @@ protected Queue(ILogger logger, IMessageQueueManager messageQueueManager, IMessa
4947
Durable = durable;
5048
_messageQueueLock = new object();
5149
_disconnectTimer = new Timer(Disconnect, null, 3600000, 3600000);
52-
_accessMode = accessMode;
50+
AccessMode = accessMode;
5351
}
5452

5553
/// <summary>
@@ -80,11 +78,14 @@ protected Queue(ILogger logger, IMessageQueueManager messageQueueManager, IMessa
8078
/// <inheritdoc />
8179
public bool Transactional { get; }
8280

81+
/// <inheritdoc />
82+
public AccessMode AccessMode { get; }
83+
8384
/// <inheritdoc />
8485
public int Count()
8586
{
86-
if (!_accessMode.In(AccessMode.Receive, AccessMode.SendAndReceive))
87-
throw new AccessModeException(nameof(Count), _accessMode);
87+
if (!AccessMode.In(AccessMode.Receive, AccessMode.SendAndReceive))
88+
throw new AccessModeException(nameof(Count), AccessMode);
8889

8990
try
9091
{
@@ -123,16 +124,13 @@ public virtual void Connect()
123124
{
124125
lock (_messageQueueLock)
125126
{
126-
if (_messageQueue != null)
127-
Disconnect();
128-
129127
if (_messageQueue == null)
130128
{
131-
_messageQueue = GetQueue(_accessMode);
129+
_messageQueue = GetQueue(AccessMode);
132130

133131
if (_messageQueue != null)
134132
{
135-
switch (_accessMode)
133+
switch (AccessMode)
136134
{
137135
case AccessMode.Receive:
138136
_messageQueue.MessageReadPropertyFilter = new MessagePropertyFilter { AppSpecific = true, Id = true, Body = true };
@@ -147,11 +145,11 @@ public virtual void Connect()
147145
_messageQueue.DefaultPropertiesToSend = new DefaultPropertiesToSend { Recoverable = Durable };
148146
break;
149147
default:
150-
throw new ArgumentOutOfRangeException(nameof(_accessMode), _accessMode, "Unknown Access Mode");
148+
throw new ArgumentOutOfRangeException(nameof(AccessMode), AccessMode, "Unknown Access Mode");
151149
}
152150
}
153151

154-
Logger.Information("Connected to Message Queue {@Queue} {@MessageQueue}", this, _messageQueue);
152+
Logger.Information("Connected to Message Queue {@Queue}", this);
155153
}
156154

157155
if (_messageQueue == null)
@@ -187,8 +185,8 @@ public void Disconnect()
187185
/// <inheritdoc />
188186
public void Send<T>(T message)
189187
{
190-
if (!_accessMode.In(AccessMode.Send, AccessMode.SendAndReceive))
191-
throw new AccessModeException(nameof(Send), _accessMode);
188+
if (!AccessMode.In(AccessMode.Send, AccessMode.SendAndReceive))
189+
throw new AccessModeException(nameof(Send), AccessMode);
192190

193191
try
194192
{
@@ -248,8 +246,8 @@ public async Task<ITransactionalMessage> ReceiveAsync(int millisecondsTimeout, C
248246

249247
private async Task<ITransactionalMessage> ReceiveAsyncInternal(int millisecondsTimeout, CancellationToken cancellationToken)
250248
{
251-
if (!_accessMode.In(AccessMode.Receive, AccessMode.SendAndReceive))
252-
throw new AccessModeException(nameof(Send), _accessMode);
249+
if (!AccessMode.In(AccessMode.Receive, AccessMode.SendAndReceive))
250+
throw new AccessModeException(nameof(Send), AccessMode);
253251

254252
var timeout = TimeSpan.FromMilliseconds(millisecondsTimeout);
255253

Grumpy.MessageQueue.TestTools.UnitTests/TestQueueHandlerTests.cs

+12
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@ public void TestQueueHandlerShouldWork()
3030
numberOfHeartbeats.Should().BeGreaterOrEqualTo(1);
3131
}
3232

33+
[Fact]
34+
public void TestQueueHandlerShouldWorkWithCancelHandler()
35+
{
36+
var q = new TestQueueHandlerFactory();
37+
using (var w = q.Create())
38+
{
39+
w.Start("MyQueue", true, LocaleQueueMode.DurableCreate, true, Handler, (o, e) => true, null, 1, true, true, new CancellationToken());
40+
w.Stop();
41+
w.Idle.Should().BeFalse();
42+
}
43+
}
44+
3345
private void Handler(object message, CancellationToken cancellationToken)
3446
{
3547
if ((string)message == "Exception")

Grumpy.MessageQueue.UnitTests/QueueHandlerAsyncTests.cs

+31
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,25 @@ public void CanStopQueue()
4646
}
4747
}
4848

49+
[Fact]
50+
public void InvalidHeartbeatRateShouldThrow()
51+
{
52+
using (var cut = CreateQueueHandler())
53+
{
54+
Assert.Throws<ArgumentException>(() => cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, () => { }, -1, false, false, _cancellationToken));
55+
}
56+
}
57+
58+
[Fact]
59+
public void MultiStartShouldThrowException()
60+
{
61+
using (var cut = CreateQueueHandler())
62+
{
63+
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, null, 100, false, false, _cancellationToken);
64+
Assert.Throws<ArgumentException>(() => cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, null, 100, false, false, _cancellationToken));
65+
}
66+
}
67+
4968
[Fact]
5069
public void QueueBeforeShouldBeIdle()
5170
{
@@ -81,6 +100,18 @@ private static ITransactionalMessage CreateMessage(object body)
81100
return message;
82101
}
83102

103+
[Fact]
104+
public void HeartbeatAreCalled()
105+
{
106+
using (var cut = CreateQueueHandler())
107+
{
108+
var i = 0;
109+
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, (m,c) => { }, null, () => { ++i; }, 1, false, false, _cancellationToken);
110+
Thread.Sleep(1000);
111+
i.Should().BeGreaterOrEqualTo(1);
112+
}
113+
}
114+
84115
public void Dispose()
85116
{
86117
Dispose(true);

Grumpy.MessageQueue.UnitTests/QueueHandlerSyncTests.cs

+13
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@ public void HandlerThrowExceptionShouldResultInErrors()
5656
numberOfErrors.Should().Be(1);
5757
}
5858

59+
[Fact]
60+
public void HandlerAndErrorHandlerThrowExceptionShouldNAckMessage()
61+
{
62+
var numberOfErrors = 0;
63+
var message = CreateMessage("MyMessage");
64+
65+
_queue.Receive(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(e => message, e => null);
66+
67+
ExecuteHandler((m, c) => throw new Exception((string)m), (m, e) => throw new Exception());
68+
69+
message.Received(1).NAck();
70+
}
71+
5972
[Fact]
6073
public void HeartbeatHandlerShouldBeCalled()
6174
{

Grumpy.MessageQueue/Interfaces/IQueue.cs

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public interface IQueue : IDisposable
3131
/// </summary>
3232
bool Transactional { get; }
3333

34+
/// <summary>
35+
/// Queue Access Mode
36+
/// </summary>
37+
AccessMode AccessMode { get; }
38+
3439
/// <summary>
3540
/// Number of Messages in Queue
3641
/// </summary>

0 commit comments

Comments
 (0)