@@ -23,7 +23,8 @@ public class QueueHandler : IQueueHandler
2323 private CancellationTokenRegistration _cancellationTokenRegistration ;
2424 private IQueue _queue ;
2525 private Action < object , CancellationToken > _messageHandler ;
26- private Action < object , Exception > _errorHandler ;
26+ private Action < object , Exception > _errorHandlerAction ;
27+ private Func < object , Exception , bool > _errorHandlerFunc ;
2728 private Action _heartbeatHandler ;
2829 private ITask _processTask ;
2930 private int _heartRateMilliseconds ;
@@ -49,6 +50,23 @@ public QueueHandler(IQueueFactory queueFactory, ITaskFactory taskFactory)
4950
5051 /// <inheritdoc />
5152 public void Start ( string queueName , bool privateQueue , LocaleQueueMode localeQueueMode , bool transactional , Action < object , CancellationToken > messageHandler , Action < object , Exception > errorHandler , Action heartbeatHandler , int heartRateMilliseconds , bool multiThreadedHandler , bool syncMode , CancellationToken cancellationToken )
53+ {
54+ _errorHandlerFunc = null ;
55+ _errorHandlerAction = errorHandler ;
56+
57+ Start ( queueName , privateQueue , localeQueueMode , transactional , messageHandler , heartbeatHandler , heartRateMilliseconds , multiThreadedHandler , syncMode , cancellationToken ) ;
58+ }
59+
60+ /// <inheritdoc />
61+ public void Start ( string queueName , bool privateQueue , LocaleQueueMode localeQueueMode , bool transactional , Action < object , CancellationToken > messageHandler , Func < object , Exception , bool > errorHandler , Action heartbeatHandler , int heartRateMilliseconds , bool multiThreadedHandler , bool syncMode , CancellationToken cancellationToken )
62+ {
63+ _errorHandlerFunc = errorHandler ;
64+ _errorHandlerAction = null ;
65+
66+ Start ( queueName , privateQueue , localeQueueMode , transactional , messageHandler , heartbeatHandler , heartRateMilliseconds , multiThreadedHandler , syncMode , cancellationToken ) ;
67+ }
68+
69+ private void Start ( string queueName , bool privateQueue , LocaleQueueMode localeQueueMode , bool transactional , Action < object , CancellationToken > messageHandler , Action heartbeatHandler , int heartRateMilliseconds , bool multiThreadedHandler , bool syncMode , CancellationToken cancellationToken )
5270 {
5371 if ( _cancellationTokenSource != null )
5472 throw new ArgumentException ( "Handler not stopped" ) ;
@@ -60,7 +78,6 @@ public void Start(string queueName, bool privateQueue, LocaleQueueMode localeQue
6078 _syncMode = syncMode ;
6179 _queue = _queueFactory . CreateLocale ( queueName , privateQueue , localeQueueMode , transactional ) ;
6280 _messageHandler = messageHandler ?? throw new ArgumentNullException ( nameof ( messageHandler ) ) ;
63- _errorHandler = errorHandler ;
6481 _heartbeatHandler = heartbeatHandler ;
6582 _heartRateMilliseconds = heartRateMilliseconds ;
6683 _multiThreadedHandler = multiThreadedHandler ;
@@ -198,19 +215,30 @@ private void ErrorHandler(ITransactionalMessage message, Exception exception)
198215 {
199216 try
200217 {
201- _errorHandler ? . Invoke ( message . Message , exception ) ;
202-
203- if ( _errorHandler == null )
204- message . NAck ( ) ;
205- else
218+ if ( CallErrorHandler ( message . Message , exception ) )
206219 message . Ack ( ) ;
220+ else
221+ message . NAck ( ) ;
207222 }
208223 catch
209224 {
210225 message . NAck ( ) ;
211226 }
212227 }
213228
229+ private bool CallErrorHandler ( object message , Exception exception )
230+ {
231+ if ( _errorHandlerFunc != null )
232+ return _errorHandlerFunc ( message , exception ) ;
233+
234+ if ( _errorHandlerAction == null )
235+ return false ;
236+
237+ _errorHandlerAction ( message , exception ) ;
238+
239+ return true ;
240+ }
241+
214242 private void Heartbeat ( )
215243 {
216244 try
@@ -257,7 +285,7 @@ private void CleanUpTasks()
257285 foreach ( var task in _workTasks . Where ( t => t . IsCompleted || t . IsFaulted ) )
258286 {
259287 if ( task . IsFaulted )
260- _errorHandler ? . Invoke ( task . AsyncState , task . Exception ) ;
288+ CallErrorHandler ( task . AsyncState , task . Exception ) ;
261289
262290 task . Dispose ( ) ;
263291 }
0 commit comments