Commit 2e491a6a authored by Savorboard's avatar Savorboard

refactor retry task processor.

parent 9b23617f
...@@ -3,11 +3,7 @@ ...@@ -3,11 +3,7 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
...@@ -15,26 +11,18 @@ namespace DotNetCore.CAP.Processor ...@@ -15,26 +11,18 @@ namespace DotNetCore.CAP.Processor
public class NeedRetryMessageProcessor : IProcessor public class NeedRetryMessageProcessor : IProcessor
{ {
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger; private readonly IPublishMessageSender _publishMessageSender;
private readonly CapOptions _options;
private readonly IPublishExecutor _publishExecutor;
private readonly IStateChanger _stateChanger;
private readonly ISubscriberExecutor _subscriberExecutor; private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval; private readonly TimeSpan _waitingInterval;
public NeedRetryMessageProcessor( public NeedRetryMessageProcessor(
IOptions<CapOptions> options, IOptions<CapOptions> options,
ILogger<NeedRetryMessageProcessor> logger,
IStateChanger stateChanger,
ISubscriberExecutor subscriberExecutor, ISubscriberExecutor subscriberExecutor,
IPublishExecutor publishExecutor) IPublishMessageSender publishMessageSender)
{ {
_options = options.Value;
_logger = logger;
_stateChanger = stateChanger;
_subscriberExecutor = subscriberExecutor; _subscriberExecutor = subscriberExecutor;
_publishExecutor = publishExecutor; _publishMessageSender = publishMessageSender;
_waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval); _waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
} }
public async Task ProcessAsync(ProcessingContext context) public async Task ProcessAsync(ProcessingContext context)
...@@ -56,57 +44,10 @@ namespace DotNetCore.CAP.Processor ...@@ -56,57 +44,10 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{ {
var messages = await connection.GetPublishedMessagesOfNeedRetry(); var messages = await connection.GetPublishedMessagesOfNeedRetry();
var hasException = false;
foreach (var message in messages) foreach (var message in messages)
{ {
if (message.Retries > _options.FailedRetryCount) await _publishMessageSender.SendAsync(message);
{
continue;
}
using (var transaction = connection.CreateTransaction())
{
var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);
if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
"MessageId:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync();
}
context.ThrowIfStopping(); context.ThrowIfStopping();
...@@ -117,69 +58,15 @@ namespace DotNetCore.CAP.Processor ...@@ -117,69 +58,15 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context) private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context)
{ {
var messages = await connection.GetReceivedMessagesOfNeedRetry(); var messages = await connection.GetReceivedMessagesOfNeedRetry();
var hasException = false;
foreach (var message in messages) foreach (var message in messages)
{ {
if (message.Retries > _options.FailedRetryCount) await _subscriberExecutor.ExecuteAsync(message);
{
continue;
}
using (var transaction = connection.CreateTransaction())
{
var result = await _subscriberExecutor.ExecuteAsync(message);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);
if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
"We will stop retrying to execute the message. message id:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync();
}
context.ThrowIfStopping(); context.ThrowIfStopping();
await context.WaitAsync(_delay); await context.WaitAsync(_delay);
} }
} }
public DateTime GetDueTime(DateTime addedTime, int retries)
{
var retryBehavior = RetryBehavior.DefaultRetry;
return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment