Commit 00bbf6cc authored by Savorboard's avatar Savorboard

Fixed configuration options of FailedThresholdCallback could not be invoke...

Fixed  configuration options of FailedThresholdCallback could not be invoke when the value less then three.  (#161)
parent a007dfef
...@@ -48,18 +48,19 @@ namespace DotNetCore.CAP ...@@ -48,18 +48,19 @@ namespace DotNetCore.CAP
OperateResult result; OperateResult result;
do do
{ {
result = await SendWithoutRetryAsync(message); var executedResult = await SendWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success) if (result == OperateResult.Success)
{ {
return result; return result;
} }
retry = UpdateMessageForRetry(message); retry = executedResult.Item1;
} while (retry); } while (retry);
return result; return result;
} }
private async Task<OperateResult> SendWithoutRetryAsync(CapPublishedMessage message) private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{ {
var startTime = DateTimeOffset.UtcNow; var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
...@@ -80,34 +81,17 @@ namespace DotNetCore.CAP ...@@ -80,34 +81,17 @@ namespace DotNetCore.CAP
TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed); TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);
return OperateResult.Success; return (false, OperateResult.Success);
} }
else else
{ {
TracingError(operationId, message, result, startTime, stopwatch.Elapsed); TracingError(operationId, message, result, startTime, stopwatch.Elapsed);
await SetFailedState(message, result.Exception); var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
return OperateResult.Failed(result.Exception);
} }
} }
private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
_logger.SenderRetrying(message.Id, retries);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
return true;
}
private Task SetSuccessfulState(CapPublishedMessage message) private Task SetSuccessfulState(CapPublishedMessage message)
{ {
...@@ -115,10 +99,15 @@ namespace DotNetCore.CAP ...@@ -115,10 +99,15 @@ namespace DotNetCore.CAP
return _stateChanger.ChangeStateAsync(message, succeededState, _connection); return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
} }
private Task SetFailedState(CapPublishedMessage message, Exception ex) private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
{ {
AddErrorReasonToContent(message, ex); AddErrorReasonToContent(message, ex);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
var needRetry = UpdateMessageForRetry(message);
await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
return needRetry;
} }
private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception) private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
...@@ -126,6 +115,37 @@ namespace DotNetCore.CAP ...@@ -126,6 +115,37 @@ namespace DotNetCore.CAP
message.Content = Helper.AddExceptionProperty(message.Content, exception); message.Content = Helper.AddExceptionProperty(message.Content, exception);
} }
private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}
_logger.SenderRetrying(message.Id, retries);
return true;
}
private (Guid, TracingHeaders) TracingBefore(string topic, string values) private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{ {
Guid operationId = Guid.NewGuid(); Guid operationId = Guid.NewGuid();
......
...@@ -55,18 +55,24 @@ namespace DotNetCore.CAP ...@@ -55,18 +55,24 @@ namespace DotNetCore.CAP
OperateResult result; OperateResult result;
do do
{ {
result = await ExecuteWithoutRetryAsync(message); var executedResult = await ExecuteWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success) if (result == OperateResult.Success)
{ {
return result; return result;
} }
retry = UpdateMessageForRetry(message); retry = executedResult.Item1;
} while (retry); } while (retry);
return result; return result;
} }
private async Task<OperateResult> ExecuteWithoutRetryAsync(CapReceivedMessage message) /// <summary>
/// Execute message consumption once.
/// </summary>
/// <param name="message">the message rececived of <see cref="CapReceivedMessage"/></param>
/// <returns>Item1 is need still restry, Item2 is executed result.</returns>
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{ {
if (message == null) if (message == null)
{ {
...@@ -85,26 +91,23 @@ namespace DotNetCore.CAP ...@@ -85,26 +91,23 @@ namespace DotNetCore.CAP
_logger.ConsumerExecuted(sp.Elapsed.TotalSeconds); _logger.ConsumerExecuted(sp.Elapsed.TotalSeconds);
return OperateResult.Success; return (false, OperateResult.Success);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}"); _logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");
await SetFailedState(message, ex); return (await SetFailedState(message, ex), OperateResult.Failed(ex));
return OperateResult.Failed(ex);
} }
} }
private Task SetSuccessfulState(CapReceivedMessage message) private Task SetSuccessfulState(CapReceivedMessage message)
{ {
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter); var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
return _stateChanger.ChangeStateAsync(message, succeededState, _connection); return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
} }
private Task SetFailedState(CapReceivedMessage message, Exception ex) private async Task<bool> SetFailedState(CapReceivedMessage message, Exception ex)
{ {
if (ex is SubscriberNotFoundException) if (ex is SubscriberNotFoundException)
{ {
...@@ -113,7 +116,11 @@ namespace DotNetCore.CAP ...@@ -113,7 +116,11 @@ namespace DotNetCore.CAP
AddErrorReasonToContent(message, ex); AddErrorReasonToContent(message, ex);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection); var needRetry = UpdateMessageForRetry(message);
await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
return needRetry;
} }
private bool UpdateMessageForRetry(CapReceivedMessage message) private bool UpdateMessageForRetry(CapReceivedMessage message)
...@@ -121,14 +128,29 @@ namespace DotNetCore.CAP ...@@ -121,14 +128,29 @@ namespace DotNetCore.CAP
var retryBehavior = RetryBehavior.DefaultRetry; var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries; var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount); var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount) if (retries >= retryCount)
{ {
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false; return false;
} }
_logger.ConsumerExecutionRetrying(message.Id, retries); _logger.ConsumerExecutionRetrying(message.Id, retries);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
return true; return true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment