Commit 6e6812a5 authored by yangxiaodong's avatar yangxiaodong

refactor

parent e1ec8eac
...@@ -4,6 +4,7 @@ using System.Data; ...@@ -4,6 +4,7 @@ using System.Data;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using Dapper; using Dapper;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
namespace DotNetCore.CAP.EntityFrameworkCore namespace DotNetCore.CAP.EntityFrameworkCore
...@@ -17,7 +18,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -17,7 +18,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
private readonly object _lockObject = new object(); private readonly object _lockObject = new object();
public EFFetchedMessage(string messageId, public EFFetchedMessage(string messageId,
int type, MessageType type,
IDbConnection connection, IDbConnection connection,
IDbContextTransaction transaction) IDbContextTransaction transaction)
{ {
...@@ -30,7 +31,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -30,7 +31,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public string MessageId { get; } public string MessageId { get; }
public int Type { get; } public MessageType Type { get; }
public void RemoveFromQueue() public void RemoveFromQueue()
{ {
......
...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
_connection.Context.Add(new CapQueue _connection.Context.Add(new CapQueue
{ {
MessageId = message.Id, MessageId = message.Id,
Type = 1 Type = MessageType.Subscribe
}); });
} }
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.EntityFrameworkCore namespace DotNetCore.CAP.EntityFrameworkCore
{ {
...@@ -8,6 +9,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -8,6 +9,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore
{ {
public string MessageId { get; set; } public string MessageId { get; set; }
public int Type { get; set; } public MessageType Type { get; set; }
} }
} }
...@@ -53,9 +53,10 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -53,9 +53,10 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSingleton<IStateChanger, StateChanger>(); services.AddSingleton<IStateChanger, StateChanger>();
//Processors //Processors
services.AddTransient<JobQueuer>(); services.AddTransient<JobQueuer>();
//services.AddTransient<>
services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>();
services.AddSingleton<IQueueExecutor, SubscibeQueueExecutor>();
//services.TryAddSingleton<IJob, CapJob>();
services.TryAddScoped<ICapPublisher, DefaultCapPublisher>(); services.TryAddScoped<ICapPublisher, DefaultCapPublisher>();
......
...@@ -13,6 +13,11 @@ ...@@ -13,6 +13,11 @@
<AllowUnsafeBlocks>False</AllowUnsafeBlocks> <AllowUnsafeBlocks>False</AllowUnsafeBlocks>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<None Include="IQueueExecutor.Subscibe.cs" />
<None Include="Job\IJobProcessor.MessageJob.Default.cs" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="1.1.2" /> <PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" /> <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
......
...@@ -125,7 +125,7 @@ namespace DotNetCore.CAP ...@@ -125,7 +125,7 @@ namespace DotNetCore.CAP
public void Pulse() public void Pulse()
{ {
throw new NotImplementedException(); WaitHandleEx.ReceviedPulseEvent.Set();
} }
//private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) //private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
......
using System; using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
...@@ -6,7 +7,7 @@ namespace DotNetCore.CAP ...@@ -6,7 +7,7 @@ namespace DotNetCore.CAP
{ {
string MessageId { get; } string MessageId { get; }
int Type { get; } MessageType Type { get; }
void RemoveFromQueue(); void RemoveFromQueue();
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
public abstract class BasePublishQueueExecutor : IQueueExecutor
{
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
public BasePublishQueueExecutor(IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger)
{
_stateChanger = stateChanger;
_logger = logger;
}
public abstract Task<OperateResult> PublishAsync(string keyName, string content);
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{
using (fetched)
{
var message = await connection.GetSentMessageAsync(fetched.MessageId);
try
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = await PublishAsync(message.KeyName, message.Content);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateJobForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
return OperateResult.Failed(ex);
}
}
}
private async Task<bool> UpdateJobForRetryAsync(CapSentMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.UtcNow;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.LastRun = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true;
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
public class SubscibeQueueExecutor : IQueueExecutor
{
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
private readonly MethodMatcherCache _selector;
private readonly CapOptions _options;
public SubscibeQueueExecutor(
IStateChanger stateChanger,
MethodMatcherCache selector,
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory,
ILogger<BasePublishQueueExecutor> logger)
{
_selector = selector;
_consumerInvokerFactory = consumerInvokerFactory;
_consumerClientFactory = consumerClientFactory;
_stateChanger = stateChanger;
_logger = logger;
}
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{
using (fetched)
{
var message = await connection.GetReceivedMessageAsync(fetched.MessageId);
try
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = await ExecuteSubscribeAsync(message);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateJobForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
return OperateResult.Success;
}
catch (SubscriberNotFoundException ex)
{
_logger.LogError(ex.Message);
return OperateResult.Failed(ex);
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
return OperateResult.Failed(ex);
}
}
}
protected virtual async Task<OperateResult> ExecuteSubscribeAsync(CapReceivedMessage receivedMessage)
{
try
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{
throw new SubscriberNotFoundException(receivedMessage.KeyName + " has not been found.");
}
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
return OperateResult.Success;
}
catch (SubscriberNotFoundException ex)
{
throw ex;
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex);
return OperateResult.Failed(ex);
}
}
private async Task<bool> UpdateJobForRetryAsync(CapReceivedMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.UtcNow;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.LastRun = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
public interface IQueueExecutor
{
Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage message);
}
}
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IQueueExecutorFactory
{
IQueueExecutor GetInstance(MessageType messageType);
}
}
\ No newline at end of file
using System;
namespace DotNetCore.CAP.Internal
{
public class SubscriberNotFoundException : Exception
{
public SubscriberNotFoundException() { }
public SubscriberNotFoundException(string message) : base(message) { }
public SubscriberNotFoundException(string message, Exception inner) :
base(message, inner) { }
}
}
...@@ -37,7 +37,6 @@ namespace DotNetCore.CAP.Job ...@@ -37,7 +37,6 @@ namespace DotNetCore.CAP.Job
using (var scope = _provider.CreateScope()) using (var scope = _provider.CreateScope())
{ {
CapSentMessage sentMessage; CapSentMessage sentMessage;
// CapReceivedMessage receivedMessage;
var provider = scope.ServiceProvider; var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>(); var connection = provider.GetRequiredService<IStorageConnection>();
...@@ -46,7 +45,6 @@ namespace DotNetCore.CAP.Job ...@@ -46,7 +45,6 @@ namespace DotNetCore.CAP.Job
(sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null) (sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null)
{ {
System.Diagnostics.Debug.WriteLine("JobQueuer 执行 内部循环: " + DateTime.Now);
var state = new EnqueuedState(); var state = new EnqueuedState();
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
...@@ -54,10 +52,9 @@ namespace DotNetCore.CAP.Job ...@@ -54,10 +52,9 @@ namespace DotNetCore.CAP.Job
_stateChanger.ChangeState(sentMessage, state, transaction); _stateChanger.ChangeState(sentMessage, state, transaction);
await transaction.CommitAsync(); await transaction.CommitAsync();
} }
} }
} }
System.Diagnostics.Debug.WriteLine("JobQueuer 执行: " + DateTime.Now);
context.ThrowIfStopping(); context.ThrowIfStopping();
WaitHandleEx.SentPulseEvent.Set(); WaitHandleEx.SentPulseEvent.Set();
......
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Job
{
public class DefaultMessageJobProcessor : IMessageJobProcessor
{
private readonly IQueueExecutorFactory _queueExecutorFactory;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
private readonly TimeSpan _pollingDelay;
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public DefaultMessageJobProcessor(
IServiceProvider provider,
IQueueExecutorFactory queueExecutorFactory,
IOptions<CapOptions> capOptions,
ILogger<DefaultMessageJobProcessor> logger)
{
_logger = logger;
_queueExecutorFactory = queueExecutorFactory;
_provider = provider;
_cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(capOptions.Value.PollingDelay);
}
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context)
{
if (context == null)
throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context)
{
try
{
_logger.LogInformation("BaseMessageJobProcessor processing ...");
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked)
{
var token = GetTokenToWaitOn(context);
await WaitHandleEx.WaitAnyAsync(PulseEvent, token.WaitHandle, _pollingDelay);
}
}
finally
{
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context)
{
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context)
{
var fetched = default(IFetchedMessage);
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var connection = provider.GetRequiredService<IStorageConnection>();
if ((fetched = await connection.FetchNextMessageAsync()) != null)
{
using (fetched)
{
var queueExecutor = _queueExecutorFactory.GetInstance(fetched.Type);
await queueExecutor.ExecuteAsync(connection, fetched);
}
}
}
return fetched != null;
}
}
}
...@@ -18,6 +18,12 @@ namespace DotNetCore.CAP ...@@ -18,6 +18,12 @@ namespace DotNetCore.CAP
private static readonly Action<ILogger, string, Exception> _executingConsumerMethod; private static readonly Action<ILogger, string, Exception> _executingConsumerMethod;
private static readonly Action<ILogger, string, Exception> _receivedMessageRetryExecuting; private static readonly Action<ILogger, string, Exception> _receivedMessageRetryExecuting;
private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions() static LoggerExtensions()
{ {
_serverStarting = LoggerMessage.Define<int, int>( _serverStarting = LoggerMessage.Define<int, int>(
...@@ -59,8 +65,58 @@ namespace DotNetCore.CAP ...@@ -59,8 +65,58 @@ namespace DotNetCore.CAP
LogLevel.Error, LogLevel.Error,
5, 5,
"Received message topic method '{topicName}' failed to execute."); "Received message topic method '{topicName}' failed to execute.");
_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");
_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");
_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");
_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");
_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}
public static void JobFailed(this ILogger logger, Exception ex)
{
_jobFailed(logger, ex);
} }
public static void JobFailedWillRetry(this ILogger logger, Exception ex)
{
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries)
{
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds)
{
_jobExecuted(logger, seconds, null);
}
public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex) public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex)
{ {
_executingConsumerMethod(logger, methodName, ex); _executingConsumerMethod(logger, methodName, ex);
...@@ -100,5 +156,10 @@ namespace DotNetCore.CAP ...@@ -100,5 +156,10 @@ namespace DotNetCore.CAP
{ {
_expectedOperationCanceledException(logger, ex.Message, ex); _expectedOperationCanceledException(logger, ex.Message, ex);
} }
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex)
{
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
} }
} }
\ No newline at end of file
...@@ -9,6 +9,6 @@ ...@@ -9,6 +9,6 @@
/// <summary> /// <summary>
/// 0 is CapSentMessage, 1 is CapReceviedMessage /// 0 is CapSentMessage, 1 is CapReceviedMessage
/// </summary> /// </summary>
public int Type { get; set; } public MessageType Type { get; set; }
} }
} }
namespace DotNetCore.CAP.Models
{
public enum MessageType
{
Publish,
Subscribe
}
}
using System;
using System.Linq;
using System.Reflection;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP
{
public class QueueExecutorFactory : IQueueExecutorFactory
{
private readonly IServiceProvider _serviceProvider;
public QueueExecutorFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public IQueueExecutor GetInstance(MessageType messageType)
{
var _queueExectors = _serviceProvider.GetServices<IQueueExecutor>();
if (messageType== MessageType.Publish)
{
return _queueExectors.FirstOrDefault(x => typeof(BasePublishQueueExecutor).IsAssignableFrom(x.GetType()));
}
else
{
return _queueExectors.FirstOrDefault(x => !typeof(BasePublishQueueExecutor).IsAssignableFrom(x.GetType()));
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment