forked from dotnetcore/CAP
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathIQueueExecutor.Publish.Base.cs
More file actions
98 lines (86 loc) · 3.25 KB
/
Copy pathIQueueExecutor.Publish.Base.cs
File metadata and controls
98 lines (86 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
public abstract class BasePublishQueueExecutor : IQueueExecutor
{
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
protected BasePublishQueueExecutor(IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger)
{
_stateChanger = stateChanger;
_logger = logger;
}
public abstract Task<OperateResult> PublishAsync(string keyName, string content);
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{
var message = await connection.GetPublishedMessageAsync(fetched.MessageId);
try
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = await PublishAsync(message.Name, message.Content);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateMessageForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex);
}
}
private async Task<bool> UpdateMessageForRetryAsync(CapPublishedMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.Now;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true;
}
}
}