Commit 83595345 authored by Savorboard's avatar Savorboard

rafactor publisher excutor.

parent 07a0ec18
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
public interface IPublishExecutor
{
Task<OperateResult> PublishAsync(string keyName, string content);
}
}
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
...@@ -8,7 +9,7 @@ using Microsoft.Extensions.Logging; ...@@ -8,7 +9,7 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
public abstract class BasePublishQueueExecutor : IQueueExecutor public abstract class BasePublishQueueExecutor : IQueueExecutor, IPublishExecutor
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CapOptions _options; private readonly CapOptions _options;
...@@ -40,7 +41,7 @@ namespace DotNetCore.CAP ...@@ -40,7 +41,7 @@ namespace DotNetCore.CAP
IState newState; IState newState;
if (!result.Succeeded) if (!result.Succeeded)
{ {
var shouldRetry = await UpdateMessageForRetryAsync(message, connection); var shouldRetry = UpdateMessageForRetryAsync(message);
if (shouldRetry) if (shouldRetry)
{ {
newState = new ScheduledState(); newState = new ScheduledState();
...@@ -51,6 +52,7 @@ namespace DotNetCore.CAP ...@@ -51,6 +52,7 @@ namespace DotNetCore.CAP
newState = new FailedState(); newState = new FailedState();
_logger.JobFailed(result.Exception); _logger.JobFailed(result.Exception);
} }
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
} }
else else
{ {
...@@ -67,6 +69,7 @@ namespace DotNetCore.CAP ...@@ -67,6 +69,7 @@ namespace DotNetCore.CAP
} }
catch (Exception ex) catch (Exception ex)
{ {
fetched.Requeue();
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex); return OperateResult.Failed(ex);
} }
...@@ -74,8 +77,7 @@ namespace DotNetCore.CAP ...@@ -74,8 +77,7 @@ namespace DotNetCore.CAP
public abstract Task<OperateResult> PublishAsync(string keyName, string content); public abstract Task<OperateResult> PublishAsync(string keyName, string content);
private static async Task<bool> UpdateMessageForRetryAsync(CapPublishedMessage message, private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
IStorageConnection connection)
{ {
var retryBehavior = RetryBehavior.DefaultRetry; var retryBehavior = RetryBehavior.DefaultRetry;
...@@ -85,11 +87,7 @@ namespace DotNetCore.CAP ...@@ -85,11 +87,7 @@ namespace DotNetCore.CAP
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due; message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true; return true;
} }
} }
......
using System; using System;
using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
...@@ -14,29 +12,25 @@ namespace DotNetCore.CAP ...@@ -14,29 +12,25 @@ namespace DotNetCore.CAP
{ {
public class SubscribeQueueExecutor : IQueueExecutor public class SubscribeQueueExecutor : IQueueExecutor
{ {
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;
private readonly ISubscriberExecutor _subscriberExecutor;
public SubscribeQueueExecutor( public SubscribeQueueExecutor(
IStateChanger stateChanger,
MethodMatcherCache selector,
CapOptions options, CapOptions options,
IConsumerInvokerFactory consumerInvokerFactory, IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger) ISubscriberExecutor subscriberExecutor,
ILogger<SubscribeQueueExecutor> logger)
{ {
_selector = selector;
_options = options; _options = options;
_consumerInvokerFactory = consumerInvokerFactory; _subscriberExecutor = subscriberExecutor;
_stateChanger = stateChanger; _stateChanger = stateChanger;
_logger = logger; _logger = logger;
} }
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{ {
//return await Task.FromResult(OperateResult.Success);
var message = await connection.GetReceivedMessageAsync(fetched.MessageId); var message = await connection.GetReceivedMessageAsync(fetched.MessageId);
try try
{ {
...@@ -45,29 +39,13 @@ namespace DotNetCore.CAP ...@@ -45,29 +39,13 @@ namespace DotNetCore.CAP
if (message.Retries > 0) if (message.Retries > 0)
_logger.JobRetrying(message.Retries); _logger.JobRetrying(message.Retries);
var result = await ExecuteSubscribeAsync(message);
var result = await _subscriberExecutor.ExecuteAsync(message);
sp.Stop(); sp.Stop();
IState newState; var state = GetNewState(result, message);
if (!result.Succeeded)
{ await _stateChanger.ChangeStateAsync(message, state, connection);
var shouldRetry = await UpdateMessageForRetryAsync(message, connection, result.Exception?.Message);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState(_options.SucceedMessageExpiredAfter);
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue(); fetched.RemoveFromQueue();
...@@ -80,7 +58,7 @@ namespace DotNetCore.CAP ...@@ -80,7 +58,7 @@ namespace DotNetCore.CAP
{ {
_logger.LogError(ex.Message); _logger.LogError(ex.Message);
await AddErrorReasonToContent(message, ex.Message, connection); AddErrorReasonToContent(message, ex);
await _stateChanger.ChangeStateAsync(message, new FailedState(), connection); await _stateChanger.ChangeStateAsync(message, new FailedState(), connection);
...@@ -98,36 +76,32 @@ namespace DotNetCore.CAP ...@@ -98,36 +76,32 @@ namespace DotNetCore.CAP
} }
} }
protected virtual async Task<OperateResult> ExecuteSubscribeAsync(CapReceivedMessage receivedMessage) private IState GetNewState(OperateResult result, CapReceivedMessage message)
{ {
try IState newState;
if (!result.Succeeded)
{ {
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); var shouldRetry = UpdateMessageForRetryAsync(message);
if (shouldRetry)
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{ {
var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method."; newState = new ScheduledState();
throw new SubscriberNotFoundException(error); _logger.JobFailedWillRetry(result.Exception);
} }
else
// If there are multiple consumers in the same group, we will take the first {
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; newState = new FailedState();
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); _logger.JobFailed(result.Exception);
await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
return OperateResult.Success;
} }
catch (Exception ex) AddErrorReasonToContent(message, result.Exception);
}
else
{ {
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", newState = new SucceededState(_options.SucceedMessageExpiredAfter);
ex);
return OperateResult.Failed(ex);
} }
return newState;
} }
private static async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message, IStorageConnection connection, string exceptionMessage) private static bool UpdateMessageForRetryAsync(CapReceivedMessage message)
{ {
var retryBehavior = RetryBehavior.DefaultRetry; var retryBehavior = RetryBehavior.DefaultRetry;
...@@ -138,25 +112,12 @@ namespace DotNetCore.CAP ...@@ -138,25 +112,12 @@ namespace DotNetCore.CAP
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due; message.ExpiresAt = due;
await AddErrorReasonToContent(message, exceptionMessage, connection);
return true; return true;
} }
public static Task AddErrorReasonToContent(CapReceivedMessage message, string description, IStorageConnection connection) private static void AddErrorReasonToContent(CapReceivedMessage message, Exception exception)
{
var exceptions = new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("ExceptionMessage", description)
};
message.Content = Helper.AddJsonProperty(message.Content, exceptions);
using (var transaction = connection.CreateTransaction())
{ {
transaction.UpdateMessage(message); message.Content = Helper.AddExceptionProperty(message.Content, exception);
transaction.CommitAsync();
}
return Task.CompletedTask;
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment