Commit 0025e3cb authored by Savorboard's avatar Savorboard

refactor

parent bcca8458
...@@ -12,131 +12,151 @@ using Microsoft.Extensions.Logging; ...@@ -12,131 +12,151 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
public class SubscribeQueueExecutor : IQueueExecutor public class SubscribeQueueExecutor : IQueueExecutor
{ {
private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;
public SubscribeQueueExecutor( public SubscribeQueueExecutor(
IStateChanger stateChanger, IStateChanger stateChanger,
MethodMatcherCache selector, MethodMatcherCache selector,
CapOptions options, CapOptions options,
IConsumerInvokerFactory consumerInvokerFactory, IConsumerInvokerFactory consumerInvokerFactory,
ILogger<BasePublishQueueExecutor> logger) ILogger<BasePublishQueueExecutor> logger)
{ {
_selector = selector; _selector = selector;
_options = options; _options = options;
_consumerInvokerFactory = consumerInvokerFactory; _consumerInvokerFactory = consumerInvokerFactory;
_stateChanger = stateChanger; _stateChanger = stateChanger;
_logger = logger; _logger = logger;
} }
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{ {
var message = await connection.GetReceivedMessageAsync(fetched.MessageId); //return await Task.FromResult(OperateResult.Success);
try var message = await connection.GetReceivedMessageAsync(fetched.MessageId);
{ try
var sp = Stopwatch.StartNew(); {
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
_logger.JobRetrying(message.Retries); if (message.Retries > 0)
var result = await ExecuteSubscribeAsync(message); _logger.JobRetrying(message.Retries);
sp.Stop(); var result = await ExecuteSubscribeAsync(message);
sp.Stop();
IState newState;
if (!result.Succeeded) IState newState;
{ if (!result.Succeeded)
var shouldRetry = await UpdateMessageForRetryAsync(message, connection, result.Exception?.Message); {
if (shouldRetry) var shouldRetry = await UpdateMessageForRetryAsync(message, connection, result.Exception?.Message);
{ if (shouldRetry)
newState = new ScheduledState(); {
_logger.JobFailedWillRetry(result.Exception); newState = new ScheduledState();
} _logger.JobFailedWillRetry(result.Exception);
else }
{ else
newState = new FailedState(); {
_logger.JobFailed(result.Exception); newState = new FailedState();
} _logger.JobFailed(result.Exception);
} }
else }
{ else
newState = new SucceededState(_options.SucceedMessageExpiredAfter); {
} newState = new SucceededState(_options.SucceedMessageExpiredAfter);
await _stateChanger.ChangeStateAsync(message, newState, connection); }
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
fetched.RemoveFromQueue();
if (result.Succeeded)
_logger.JobExecuted(sp.Elapsed.TotalSeconds); if (result.Succeeded)
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
return OperateResult.Success;
} return OperateResult.Success;
catch (Exception ex) }
{ catch (SubscriberNotFoundException ex)
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); {
return OperateResult.Failed(ex); _logger.LogError(ex.Message);
}
} await AddErrorReasonToContent(message, ex.Message, connection);
protected virtual async Task<OperateResult> ExecuteSubscribeAsync(CapReceivedMessage receivedMessage) await _stateChanger.ChangeStateAsync(message, new FailedState(), connection);
{
try fetched.RemoveFromQueue();
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); return OperateResult.Failed(ex);
}
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group)) catch (Exception ex)
throw new SubscriberNotFoundException(receivedMessage.Name + " has not been found."); {
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; fetched.Requeue();
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
return OperateResult.Failed(ex);
await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); }
}
return OperateResult.Success;
} protected virtual async Task<OperateResult> ExecuteSubscribeAsync(CapReceivedMessage receivedMessage)
catch (SubscriberNotFoundException ex) {
{ try
_logger.LogError("Can not be found subscribe method of name: " + receivedMessage.Name); {
return OperateResult.Failed(ex); var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name);
}
catch (Exception ex) if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{ {
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method.";
ex); throw new SubscriberNotFoundException(error);
}
return OperateResult.Failed(ex);
} // If there are multiple consumers in the same group, we will take the first
} var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
private static async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message,
IStorageConnection connection, string exceptionMessage) await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
{
var retryBehavior = RetryBehavior.DefaultRetry; return OperateResult.Success;
}
var retries = ++message.Retries; catch (Exception ex)
if (retries >= retryBehavior.RetryCount) {
return false; _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}",
ex);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due; return OperateResult.Failed(ex);
}
var exceptions = new List<KeyValuePair<string, string>> }
{
new KeyValuePair<string, string>("ExceptionMessage", exceptionMessage) private static async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message, IStorageConnection connection, string exceptionMessage)
}; {
var retryBehavior = RetryBehavior.DefaultRetry;
message.Content = Helper.AddJsonProperty(message.Content, exceptions);
using (var transaction = connection.CreateTransaction()) var retries = ++message.Retries;
{ if (retries >= retryBehavior.RetryCount)
transaction.UpdateMessage(message); return false;
await transaction.CommitAsync();
} var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
return true; message.ExpiresAt = due;
}
} await AddErrorReasonToContent(message, exceptionMessage, connection);
return true;
}
public static Task AddErrorReasonToContent(CapReceivedMessage message, string description, IStorageConnection connection)
{
var exceptions = new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("ExceptionMessage", description)
};
message.Content = Helper.AddJsonProperty(message.Content, exceptions);
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
transaction.CommitAsync();
}
return Task.CompletedTask;
}
}
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment