Skip to content

Commit 08b1fc9

Browse files
committed
Added feature for indicate from error handler function if the message should be Ack or Nacked
1 parent c2eadd9 commit 08b1fc9

File tree

10 files changed

+101
-28
lines changed

10 files changed

+101
-28
lines changed

Grumpy.MessageQueue.Msmq/Grumpy.MessageQueue.Msmq.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
</ItemGroup>
6464
<ItemGroup>
6565
<PackageReference Include="Grumpy.Common">
66-
<Version>1.2.3</Version>
66+
<Version>1.2.7</Version>
6767
</PackageReference>
6868
<PackageReference Include="Grumpy.Json">
6969
<Version>1.2.3</Version>

Grumpy.MessageQueue.Msmq/QueueFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Grumpy.MessageQueue.Msmq
88
public class QueueFactory : IQueueFactory
99
{
1010
private readonly IMessageQueueManager _messageQueueManager;
11-
private IMessageQueueTransactionFactory _messageQueueTransactionFactory;
11+
private readonly IMessageQueueTransactionFactory _messageQueueTransactionFactory;
1212

1313
/// <inheritdoc />
1414
public QueueFactory()

Grumpy.MessageQueue.NuGet/Grumpy.MessageQueue.nuspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<language>en-US</language>
1616
<copyright>Copyright © Busted-Janum 2018</copyright>
1717
<dependencies>
18-
<dependency id="Grumpy.Common" version="1.2.3" />
18+
<dependency id="Grumpy.Common" version="1.2.7" />
1919
<dependency id="Grumpy.Json" version="1.2.3" />
2020
</dependencies>
2121
</metadata>

Grumpy.MessageQueue.TestTools/Grumpy.MessageQueue.TestTools.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
</ItemGroup>
4949
<ItemGroup>
5050
<PackageReference Include="Grumpy.Common">
51-
<Version>1.2.3</Version>
51+
<Version>1.2.7</Version>
5252
</PackageReference>
5353
</ItemGroup>
5454
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />

Grumpy.MessageQueue.TestTools/TestQueueHandler.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ public TestQueueHandler(ICollection<object> messages)
2828

2929
/// <inheritdoc />
3030
public void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Action<object, Exception> errorHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken)
31+
{
32+
Start(messageHandler, errorHandler, heartbeatHandler, heartRateMilliseconds, cancellationToken);
33+
}
34+
35+
/// <inheritdoc />
36+
public void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Func<object, Exception, bool> errorHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken)
37+
{
38+
Start(messageHandler, (message, exception) => { errorHandler(message, exception); }, heartbeatHandler, heartRateMilliseconds, cancellationToken);
39+
}
40+
41+
private void Start(Action<object, CancellationToken> messageHandler, Action<object, Exception> errorHandler, Action heartbeatHandler, int heartRateMilliseconds,
42+
CancellationToken cancellationToken)
3143
{
3244
if (heartbeatHandler != null)
3345
{

Grumpy.MessageQueue.UnitTests/QueueHandlerAsyncTests.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Diagnostics;
32
using System.Threading;
43
using FluentAssertions;
54
using Grumpy.Common.Interfaces;
@@ -22,7 +21,6 @@ public class QueueHandlerAsyncTests : IDisposable
2221
private readonly CancellationToken _cancellationToken;
2322
private readonly ILocaleQueue _queue;
2423
private bool _disposed;
25-
private readonly Stopwatch _stopwatch;
2624

2725
public QueueHandlerAsyncTests()
2826
{
@@ -35,8 +33,6 @@ public QueueHandlerAsyncTests()
3533

3634
_cancellationTokenSource = new CancellationTokenSource();
3735
_cancellationToken = _cancellationTokenSource.Token;
38-
39-
_stopwatch = new Stopwatch();
4036
}
4137

4238
[Fact]
@@ -73,17 +69,6 @@ private IQueueHandler CreateQueueHandler()
7369
return new QueueHandler(_queueFactory, _taskFactory);
7470
}
7571

76-
private void ExecuteHandler(Action<object, CancellationToken> messageHandler, bool multiThreadedHandler)
77-
{
78-
using (var cut = new QueueHandler(_queueFactory, _taskFactory))
79-
{
80-
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, messageHandler, null, null, 100, multiThreadedHandler, false, _cancellationToken);
81-
82-
// ReSharper disable once AccessToDisposedClosure
83-
TimerUtility.WaitForIt(() => cut.Idle, 6000);
84-
}
85-
}
86-
8772
private static ITransactionalMessage CreateMessage(object body)
8873
{
8974
var message = Substitute.For<ITransactionalMessage>();

Grumpy.MessageQueue.UnitTests/QueueHandlerSyncTests.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,30 @@ public void NonFunctionalHandlerShouldNAckTransactionalMessage()
125125
transactionalMessage.Received(1).NAck();
126126
}
127127

128+
[Fact]
129+
public void NonFunctionalHandlerAndTrueFromErrorHandlerShouldAckTransactionalMessage()
130+
{
131+
var transactionalMessage = CreateMessage("Message1");
132+
_queue.Receive(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(e => transactionalMessage, e => null);
133+
134+
ExecuteHandler((m, c) => throw new Exception(), (o, exception) => true);
135+
136+
transactionalMessage.Received(1).Ack();
137+
transactionalMessage.Received(0).NAck();
138+
}
139+
140+
[Fact]
141+
public void NonFunctionalHandlerAndFalseFromErrorHandlerShouldAckTransactionalMessage()
142+
{
143+
var transactionalMessage = CreateMessage("Message1");
144+
_queue.Receive(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(e => transactionalMessage, e => null);
145+
146+
ExecuteHandler((m, c) => throw new Exception(), (o, exception) => false);
147+
148+
transactionalMessage.Received(0).Ack();
149+
transactionalMessage.Received(1).NAck();
150+
}
151+
128152
private void ExecuteHandler(Action<object, CancellationToken> messageHandler, Action<object, Exception> errorHandler = null, Action heartbeatHandler = null)
129153
{
130154
using (var cut = CreateQueueHandler())
@@ -133,6 +157,14 @@ private void ExecuteHandler(Action<object, CancellationToken> messageHandler, Ac
133157
}
134158
}
135159

160+
private void ExecuteHandler(Action<object, CancellationToken> messageHandler, Func<object, Exception, bool> errorHandler)
161+
{
162+
using (var cut = CreateQueueHandler())
163+
{
164+
cut.Start("MyQueue", true, LocaleQueueMode.TemporaryMaster, true, messageHandler, errorHandler, null, 100, false, true, _cancellationToken);
165+
}
166+
}
167+
136168
private IQueueHandler CreateQueueHandler()
137169
{
138170
return new QueueHandler(_queueFactory, _taskFactory);

Grumpy.MessageQueue/Grumpy.MessageQueue.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
</ItemGroup>
5555
<ItemGroup>
5656
<PackageReference Include="Grumpy.Common">
57-
<Version>1.2.3</Version>
57+
<Version>1.2.7</Version>
5858
</PackageReference>
5959
</ItemGroup>
6060
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />

Grumpy.MessageQueue/Interfaces/IQueueHandler.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,22 @@ public interface IQueueHandler : IDisposable
2626
/// <param name="cancellationToken">Cancellation Token</param>
2727
void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Action<object, Exception> errorHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken);
2828

29+
/// <summary>
30+
/// Start the Queue handler
31+
/// </summary>
32+
/// <param name="queueName">Message Queue Name</param>
33+
/// <param name="privateQueue">Is Message Queue Private</param>
34+
/// <param name="localeQueueMode">Message Queue Create Mode</param>
35+
/// <param name="transactional">Is Message Queue transactional</param>
36+
/// <param name="messageHandler">Message Handler</param>
37+
/// <param name="errorHandler">Error Handler</param>
38+
/// <param name="heartbeatHandler">Heartbeat Handler</param>
39+
/// <param name="heartRateMilliseconds">Heart rate in milliseconds</param>
40+
/// <param name="multiThreadedHandler">Handler executed multi threaded</param>
41+
/// <param name="syncMode">Should the handler be running in synchronous mode</param>
42+
/// <param name="cancellationToken">Cancellation Token</param>
43+
void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Func<object, Exception, bool> errorHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken);
44+
2945
/// <summary>
3046
/// Indicate if the Queue handler is idle or managing active tasks
3147
/// </summary>

Grumpy.MessageQueue/QueueHandler.cs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public class QueueHandler : IQueueHandler
2323
private CancellationTokenRegistration _cancellationTokenRegistration;
2424
private IQueue _queue;
2525
private Action<object, CancellationToken> _messageHandler;
26-
private Action<object, Exception> _errorHandler;
26+
private Action<object, Exception> _errorHandlerAction;
27+
private Func<object, Exception, bool> _errorHandlerFunc;
2728
private Action _heartbeatHandler;
2829
private ITask _processTask;
2930
private int _heartRateMilliseconds;
@@ -49,6 +50,23 @@ public QueueHandler(IQueueFactory queueFactory, ITaskFactory taskFactory)
4950

5051
/// <inheritdoc />
5152
public void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Action<object, Exception> errorHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken)
53+
{
54+
_errorHandlerFunc = null;
55+
_errorHandlerAction = errorHandler;
56+
57+
Start(queueName, privateQueue, localeQueueMode, transactional, messageHandler, heartbeatHandler, heartRateMilliseconds, multiThreadedHandler, syncMode, cancellationToken);
58+
}
59+
60+
/// <inheritdoc />
61+
public void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Func<object, Exception, bool> errorHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken)
62+
{
63+
_errorHandlerFunc = errorHandler;
64+
_errorHandlerAction = null;
65+
66+
Start(queueName, privateQueue, localeQueueMode, transactional, messageHandler, heartbeatHandler, heartRateMilliseconds, multiThreadedHandler, syncMode, cancellationToken);
67+
}
68+
69+
private void Start(string queueName, bool privateQueue, LocaleQueueMode localeQueueMode, bool transactional, Action<object, CancellationToken> messageHandler, Action heartbeatHandler, int heartRateMilliseconds, bool multiThreadedHandler, bool syncMode, CancellationToken cancellationToken)
5270
{
5371
if (_cancellationTokenSource != null)
5472
throw new ArgumentException("Handler not stopped");
@@ -60,7 +78,6 @@ public void Start(string queueName, bool privateQueue, LocaleQueueMode localeQue
6078
_syncMode = syncMode;
6179
_queue = _queueFactory.CreateLocale(queueName, privateQueue, localeQueueMode, transactional);
6280
_messageHandler = messageHandler ?? throw new ArgumentNullException(nameof(messageHandler));
63-
_errorHandler = errorHandler;
6481
_heartbeatHandler = heartbeatHandler;
6582
_heartRateMilliseconds = heartRateMilliseconds;
6683
_multiThreadedHandler = multiThreadedHandler;
@@ -198,19 +215,30 @@ private void ErrorHandler(ITransactionalMessage message, Exception exception)
198215
{
199216
try
200217
{
201-
_errorHandler?.Invoke(message.Message, exception);
202-
203-
if (_errorHandler == null)
204-
message.NAck();
205-
else
218+
if (CallErrorHandler(message.Message, exception))
206219
message.Ack();
220+
else
221+
message.NAck();
207222
}
208223
catch
209224
{
210225
message.NAck();
211226
}
212227
}
213228

229+
private bool CallErrorHandler(object message, Exception exception)
230+
{
231+
if (_errorHandlerFunc != null)
232+
return _errorHandlerFunc(message, exception);
233+
234+
if (_errorHandlerAction == null)
235+
return false;
236+
237+
_errorHandlerAction(message, exception);
238+
239+
return true;
240+
}
241+
214242
private void Heartbeat()
215243
{
216244
try
@@ -257,7 +285,7 @@ private void CleanUpTasks()
257285
foreach (var task in _workTasks.Where(t => t.IsCompleted || t.IsFaulted))
258286
{
259287
if (task.IsFaulted)
260-
_errorHandler?.Invoke(task.AsyncState, task.Exception);
288+
CallErrorHandler(task.AsyncState, task.Exception);
261289

262290
task.Dispose();
263291
}

0 commit comments

Comments
 (0)