Commit 69fdcf87 authored by Savorboard's avatar Savorboard

fixed retry processor bugs.

parent f539fd1a
...@@ -80,18 +80,29 @@ namespace DotNetCore.CAP.Processor ...@@ -80,18 +80,29 @@ namespace DotNetCore.CAP.Processor
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
try var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
if (result.Succeeded)
{ {
await _publishExecutor.PublishAsync(message.Name, message.Content);
_stateChanger.ChangeState(message, new SucceededState(), transaction); _stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
} }
catch (Exception e) else
{ {
message.Content = Helper.AddExceptionProperty(message.Content, e); message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message); transaction.UpdateMessage(message);
}
if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
"MessageId:" + message.Id);
}
}
await transaction.CommitAsync(); await transaction.CommitAsync();
} }
...@@ -126,12 +137,44 @@ namespace DotNetCore.CAP.Processor ...@@ -126,12 +137,44 @@ namespace DotNetCore.CAP.Processor
} }
} }
await _subscriberExecutor.ExecuteAsync(message); using (var transaction = connection.CreateTransaction())
{
var result = await _subscriberExecutor.ExecuteAsync(message);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);
if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
"We will stop retrying to execute the message. message id:" + message.Id);
}
}
await transaction.CommitAsync();
}
context.ThrowIfStopping(); context.ThrowIfStopping();
await context.WaitAsync(_delay); await context.WaitAsync(_delay);
} }
} }
public DateTime GetDueTime(DateTime addedTime, int retries)
{
var retryBehavior = RetryBehavior.DefaultRetry;
return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment