Commit 13a0cdac authored by Savorboard's avatar Savorboard Committed by GitHub

Release 2.2.5 (#162)

* update version to 2.2.4

* Fixed Incorrect local IP address judgment of IPv6. (#140)

* Fixed DateTime localization format conversion error to sql.(#139)

* update version to 2.2.5

* remove unused constructor.

* Fixed DateTime localization format conversion error to sql.(#139)

* Improved logging

* support RabbitMQ cluster configuration.

* Fixed dashboard message page re-requeue and re-executed  operate bug. (#158)

* refactor code

* refactor log extensions.

* refactor retry task processor.

* Fixed  configuration options of FailedThresholdCallback could not be invoke when the value less then three.  (#161)

* update samples.

* Fixed SendAsync or ExecuteAsync recursion retries bug. (#160)

* Fixed SendAsync or ExecuteAsync recursion retries bug. (#160)
parent 5a4675d4
......@@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>2</VersionMinor>
<VersionPatch>4</VersionPatch>
<VersionPatch>5</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
using Microsoft.AspNetCore.Builder;
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......@@ -16,6 +17,11 @@ namespace Sample.RabbitMQ.MySql
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = (type, name, content) =>
{
Console.WriteLine($@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}");
};
});
services.AddMvc();
......
......@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.MySql
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
......@@ -80,7 +80,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
......
......@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
......@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
......
......@@ -37,7 +37,10 @@ namespace DotNetCore.CAP
/// <summary> The topic exchange type. </summary>
public const string ExchangeType = "topic";
/// <summary>The host to connect to.</summary>
/// <summary>
/// The host to connect to.
/// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
/// </summary>
public string HostName { get; set; } = "localhost";
/// <summary>
......
......@@ -76,7 +76,6 @@ namespace DotNetCore.CAP.RabbitMQ
{
var factory = new ConnectionFactory
{
HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
......@@ -86,6 +85,13 @@ namespace DotNetCore.CAP.RabbitMQ
SocketWriteTimeout = options.SocketWriteTimeout
};
if (options.HostName.Contains(","))
{
return () => factory.CreateConnection(
options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries));
}
factory.HostName = options.HostName;
return () => factory.CreateConnection();
}
......
......@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.SqlServer
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
......@@ -78,7 +78,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
......
......@@ -182,7 +182,7 @@ namespace DotNetCore.CAP.Abstractions
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
......@@ -204,10 +204,11 @@ namespace DotNetCore.CAP.Abstractions
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
Console.WriteLine("================22222222222222=====================");
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(DbConnection, DbTransaction, message);
Console.WriteLine("================777777777777777777777=====================");
ClosedCap();
if (id > 0)
......@@ -220,7 +221,7 @@ namespace DotNetCore.CAP.Abstractions
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
_logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
......
......@@ -28,19 +28,14 @@ namespace DotNetCore.CAP.Dashboard
public CapDashboardRequest(HttpContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
_context = context;
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public override string Method => _context.Request.Method;
public override string Path => _context.Request.Path.Value;
public override string PathBase => _context.Request.PathBase.Value;
public override string LocalIpAddress => _context.Connection.LocalIpAddress.ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.ToString();
public override string LocalIpAddress => _context.Connection.LocalIpAddress.MapToIPv4().ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.MapToIPv4().ToString();
public override string GetQuery(string key)
{
......
......@@ -3,7 +3,7 @@
using System.Reflection;
using DotNetCore.CAP.Dashboard.Pages;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Dashboard
{
......@@ -83,24 +83,34 @@ namespace DotNetCore.CAP.Dashboard
Routes.AddJsonResult("/published/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id).GetAwaiter().GetResult();
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});
Routes.AddJsonResult("/received/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id).GetAwaiter().GetResult();
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});
Routes.AddPublishBatchCommand(
"/published/requeue",
(client, messageId) =>
client.Storage.GetConnection().ChangePublishedState(messageId, StatusName.Scheduled));
{
var msg = client.Storage.GetConnection().GetPublishedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<IDispatcher>().EnqueueToPublish(msg);
});
Routes.AddPublishBatchCommand(
"/received/requeue",
(client, messageId) =>
client.Storage.GetConnection().ChangeReceivedState(messageId, StatusName.Scheduled));
{
var msg = client.Storage.GetConnection().GetReceivedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<IDispatcher>().EnqueueToExecute(msg);
});
Routes.AddRazorPage(
"/published/(?<StatusName>.+)",
......
......@@ -9,26 +9,27 @@ namespace DotNetCore.CAP.Dashboard
{
public bool Authorize(DashboardContext context)
{
var ipAddress = context.Request.RemoteIpAddress;
// if unknown, assume not local
if (string.IsNullOrEmpty(context.Request.RemoteIpAddress))
if (string.IsNullOrEmpty(ipAddress))
{
return false;
}
// check if localhost
if (context.Request.RemoteIpAddress == "127.0.0.1" || context.Request.RemoteIpAddress == "::1")
if (ipAddress == "127.0.0.1" || ipAddress == "0.0.0.1")
{
return true;
}
// compare with local address
if (context.Request.RemoteIpAddress == context.Request.LocalIpAddress)
if (ipAddress == context.Request.LocalIpAddress)
{
return true;
}
// check if private ip
if (Helper.IsInnerIP(context.Request.RemoteIpAddress))
if (Helper.IsInnerIP(ipAddress))
{
return true;
}
......
......@@ -43,6 +43,24 @@ namespace DotNetCore.CAP
public abstract Task<OperateResult> PublishAsync(string keyName, string content);
public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{
bool retry;
OperateResult result;
do
{
var executedResult = await SendWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = executedResult.Item1;
} while (retry);
return result;
}
private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
......@@ -63,60 +81,33 @@ namespace DotNetCore.CAP
TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);
return OperateResult.Success;
return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);
await SetFailedState(message, result.Exception, out bool stillRetry);
if (stillRetry)
{
_logger.SenderRetrying(message.Id, message.Retries);
await SendAsync(message);
}
return OperateResult.Failed(result.Exception);
var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
}
}
private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
return true;
}
private Task SetSuccessfulState(CapPublishedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}
private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
{
IState newState = new FailedState();
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}
AddErrorReasonToContent(message, ex);
return _stateChanger.ChangeStateAsync(message, newState, _connection);
var needRetry = UpdateMessageForRetry(message);
await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
return needRetry;
}
private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
......@@ -124,6 +115,37 @@ namespace DotNetCore.CAP
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}
private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}
_logger.SenderRetrying(message.Id, retries);
return true;
}
private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();
......
......@@ -50,6 +50,29 @@ namespace DotNetCore.CAP
private IConsumerInvoker Invoker { get; }
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message)
{
bool retry;
OperateResult result;
do
{
var executedResult = await ExecuteWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = executedResult.Item1;
} while (retry);
return result;
}
/// <summary>
/// Execute message consumption once.
/// </summary>
/// <param name="message">the message rececived of <see cref="CapReceivedMessage"/></param>
/// <returns>Item1 is need still restry, Item2 is executed result.</returns>
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{
if (message == null)
{
......@@ -68,65 +91,65 @@ namespace DotNetCore.CAP
_logger.ConsumerExecuted(sp.Elapsed.TotalSeconds);
return OperateResult.Success;
return (false, OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");
await SetFailedState(message, ex, out bool stillRetry);
if (stillRetry)
{
await ExecuteAsync(message);
}
return OperateResult.Failed(ex);
return (await SetFailedState(message, ex), OperateResult.Failed(ex));
}
}
private Task SetSuccessfulState(CapReceivedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}
private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry)
private async Task<bool> SetFailedState(CapReceivedMessage message, Exception ex)
{
IState newState = new FailedState();
if (ex is SubscriberNotFoundException)
{
stillRetry = false;
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
}
else
{
stillRetry = UpdateMessageForRetry(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}
}
AddErrorReasonToContent(message, ex);
return _stateChanger.ChangeStateAsync(message, newState, _connection);
var needRetry = UpdateMessageForRetry(message);
await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
return needRetry;
}
private static bool UpdateMessageForRetry(CapReceivedMessage message)
private bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
_logger.ConsumerExecutionRetrying(message.Id, retries);
return true;
}
......
......@@ -10,141 +10,70 @@ namespace DotNetCore.CAP
[SuppressMessage("ReSharper", "InconsistentNaming")]
internal static class LoggerExtensions
{
private static readonly Action<ILogger, Exception> _serverStarting;
private static readonly Action<ILogger, Exception> _processorsStartingError;
private static readonly Action<ILogger, Exception> _serverShuttingDown;
private static readonly Action<ILogger, string, Exception> _expectedOperationCanceledException;
private static readonly Action<ILogger, string, string, string, Exception> _modelBinderFormattingException;
private static readonly Action<ILogger, Exception> _consumerFailedWillRetry;
private static readonly Action<ILogger, double, Exception> _consumerExecuted;
private static readonly Action<ILogger, int, int, Exception> _senderRetrying;
private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecuting;
private static readonly Action<ILogger, double, Exception> _messageHasBeenSent;
private static readonly Action<ILogger, int, string, Exception> _messagePublishException;
static LoggerExtensions()
public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries)
{
_serverStarting = LoggerMessage.Define(
LogLevel.Debug,
1,
"Starting the processing server.");
_processorsStartingError = LoggerMessage.Define(
LogLevel.Error,
5,
"Starting the processors throw an exception.");
_serverShuttingDown = LoggerMessage.Define(
LogLevel.Information,
2,
"Shutting down the processing server...");
_expectedOperationCanceledException = LoggerMessage.Define<string>(
LogLevel.Warning,
3,
"Expected an OperationCanceledException, but found '{ExceptionMessage}'.");
LoggerMessage.Define<string>(
LogLevel.Error,
5,
"Consumer method '{methodName}' failed to execute.");
LoggerMessage.Define<string>(
LogLevel.Error,
5,
"Received message topic method '{topicName}' failed to execute.");
_modelBinderFormattingException = LoggerMessage.Define<string, string, string>(
LogLevel.Error,
5,
"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'."
);
_senderRetrying = LoggerMessage.Define<int, int>(
LogLevel.Debug,
3,
"The {Retries}th retrying send a message failed. message id: {MessageId} ");
_consumerExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Consumer executed. Took: {Seconds} secs.");
_consumerFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Consumer failed to execute. Will retry.");
_exceptionOccuredWhileExecuting = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to store a message. message id: {MessageId}");
logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying.");
}
_messageHasBeenSent = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Message published. Took: {Seconds} secs.");
public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries)
{
logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying.");
}
_messagePublishException = LoggerMessage.Define<int, string>(
LogLevel.Error,
6,
"An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}");
public static void ExecutedThresholdCallbackFailed(this ILogger logger, Exception ex)
{
logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message);
}
public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex)
public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries)
{
_consumerFailedWillRetry(logger, ex);
logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}");
}
public static void SenderRetrying(this ILogger logger, int messageId, int retries)
{
_senderRetrying(logger, messageId, retries, null);
logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
}
public static void MessageHasBeenSent(this ILogger logger, double seconds)
{
_messageHasBeenSent(logger, seconds, null);
logger.LogDebug($"Message published. Took: {seconds} secs.");
}
public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex)
{
_messagePublishException(logger, messageId, reason, ex);
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}
public static void ConsumerExecuted(this ILogger logger, double seconds)
{
_consumerExecuted(logger, seconds, null);
logger.LogDebug($"Consumer executed. Took: {seconds} secs.");
}
public static void ServerStarting(this ILogger logger)
{
_serverStarting(logger, null);
logger.LogInformation("Starting the processing server.");
}
public static void ProcessorsStartedError(this ILogger logger, Exception ex)
{
_processorsStartingError(logger, ex);
logger.LogError(ex, "Starting the processors throw an exception.");
}
public static void ServerShuttingDown(this ILogger logger)
{
_serverShuttingDown(logger, null);
logger.LogInformation("Shutting down the processing server...");
}
public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex)
{
_expectedOperationCanceledException(logger, ex.Message, ex);
}
public static void ExceptionOccuredWhileExecuting(this ILogger logger, string messageId, Exception ex)
{
_exceptionOccuredWhileExecuting(logger, messageId, ex);
logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'.");
}
public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName,
string content, Exception ex)
{
_modelBinderFormattingException(logger, methodName, parameterName, content, ex);
logger.LogError(ex, $"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{methodName}' ParameterName:'{parameterName}' Content:'{content}'.");
}
}
}
\ No newline at end of file
......@@ -15,12 +15,6 @@ namespace DotNetCore.CAP.Models
Added = DateTime.Now;
}
public CapPublishedMessage(MessageContext message)
{
Name = message.Name;
Content = message.Content;
}
public int Id { get; set; }
public string Name { get; set; }
......
......@@ -3,11 +3,7 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
......@@ -15,26 +11,18 @@ namespace DotNetCore.CAP.Processor
public class NeedRetryMessageProcessor : IProcessor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly IPublishExecutor _publishExecutor;
private readonly IStateChanger _stateChanger;
private readonly IPublishMessageSender _publishMessageSender;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval;
public NeedRetryMessageProcessor(
IOptions<CapOptions> options,
ILogger<NeedRetryMessageProcessor> logger,
IStateChanger stateChanger,
ISubscriberExecutor subscriberExecutor,
IPublishExecutor publishExecutor)
IPublishMessageSender publishMessageSender)
{
_options = options.Value;
_logger = logger;
_stateChanger = stateChanger;
_subscriberExecutor = subscriberExecutor;
_publishExecutor = publishExecutor;
_waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval);
_publishMessageSender = publishMessageSender;
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
}
public async Task ProcessAsync(ProcessingContext context)
......@@ -56,57 +44,10 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetPublishedMessagesOfNeedRetry();
var hasException = false;
foreach (var message in messages)
{
if (message.Retries > _options.FailedRetryCount)
{
continue;
}
using (var transaction = connection.CreateTransaction())
{
var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);
if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
"MessageId:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync();
}
await _publishMessageSender.SendAsync(message);
context.ThrowIfStopping();
......@@ -117,69 +58,15 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetReceivedMessagesOfNeedRetry();
var hasException = false;
foreach (var message in messages)
{
if (message.Retries > _options.FailedRetryCount)
{
continue;
}
using (var transaction = connection.CreateTransaction())
{
var result = await _subscriberExecutor.ExecuteAsync(message);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);
if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
"We will stop retrying to execute the message. message id:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync();
}
await _subscriberExecutor.ExecuteAsync(message);
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
}
public DateTime GetDueTime(DateTime addedTime, int retries)
{
var retryBehavior = RetryBehavior.DefaultRetry;
return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment