Commit b69e355e authored by yangxiaodong's avatar yangxiaodong

Fix Message Received Store Bugs.

parent 03ca87c0
......@@ -41,12 +41,31 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return OperateResult.Success;
}
public async Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string status, bool autoSaveChanges = true)
{
Context.Attach(message);
message.LastRun = DateTime.Now;
message.StatusName = status;
try
{
if (autoSaveChanges)
{
await Context.SaveChangesAsync();
}
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
return OperateResult.Success;
}
/// <summary>
/// First Enqueued Message.
/// </summary>
public async Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
return await SentMessages.FirstOrDefaultAsync(x => x.StateName == StateName.Enqueued);
return await SentMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued);
}
/// <summary>
......@@ -105,6 +124,30 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return OperateResult.Success;
}
public async Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status, bool autoSaveChanges = true)
{
Context.Attach(message);
message.LastRun = DateTime.Now;
message.StatusName = status;
try
{
if (autoSaveChanges)
{
await Context.SaveChangesAsync();
}
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
return OperateResult.Success;
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted()
{
return await ReceivedMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued);
}
/// <summary>
/// Updates the specified <paramref name="message"/> in the message store.
/// </summary>
......@@ -127,5 +170,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
}
}
}
\ No newline at end of file
......@@ -39,12 +39,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore
modelBuilder.Entity<CapSentMessage>(b =>
{
b.HasKey(m => m.Id);
b.Property(p => p.StateName).HasMaxLength(50);
b.Property(p => p.StatusName).HasMaxLength(50);
});
modelBuilder.Entity<CapReceivedMessage>(b =>
{
b.Property(p => p.StateName).HasMaxLength(50);
b.Property(p => p.StatusName).HasMaxLength(50);
});
}
}
......
......@@ -90,7 +90,7 @@ namespace DotNetCore.CAP.Kafka
if (message != null)
{
var sp = Stopwatch.StartNew();
message.StateName = StateName.Processing;
message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message);
var jobResult = ExecuteJob(message.KeyName, message.Content);
......@@ -104,7 +104,7 @@ namespace DotNetCore.CAP.Kafka
else
{
//TODO : the state will be deleted when release.
message.StateName = StateName.Succeeded;
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
......
......@@ -26,7 +26,6 @@ namespace DotNetCore.CAP.RabbitMQ
public RabbitJobProcessor(
IOptions<CapOptions> capOptions,
IOptions<RabbitMQOptions> rabbitMQOptions,
IOptions<RabbitMQOptions> options,
ILogger<RabbitJobProcessor> logger,
IServiceProvider provider)
{
......@@ -88,7 +87,7 @@ namespace DotNetCore.CAP.RabbitMQ
if (message != null)
{
var sp = Stopwatch.StartNew();
message.StateName = StateName.Processing;
message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message);
var jobResult = ExecuteJob(message.KeyName, message.Content);
......@@ -102,7 +101,7 @@ namespace DotNetCore.CAP.RabbitMQ
else
{
//TODO : the state will be deleted when release.
message.StateName = StateName.Succeeded;
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
......
using System.Collections.Generic;
using System;
using System.Collections.Generic;
namespace DotNetCore.CAP.Abstractions
{
......@@ -9,11 +10,11 @@ namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with
/// <paramref name="context"/>.
/// <paramref name="provider"/>.
/// </summary>
/// <param name="context">The <see cref="CapStartContext"/> associated with the current message.</param>
/// <param name="provider"> <see cref="IServiceProvider"/>.</param>
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns>
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(CapStartContext context);
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider);
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
......
......@@ -5,6 +5,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
......@@ -14,20 +15,23 @@ namespace DotNetCore.CAP
/// </summary>
public class DefaultBootstrapper : IBootstrapper
{
private IApplicationLifetime _appLifetime;
private CancellationTokenSource _cts;
private CancellationTokenRegistration _ctsRegistration;
private readonly ILogger<DefaultBootstrapper> _logger;
private readonly IApplicationLifetime _appLifetime;
private readonly CancellationTokenSource _cts;
private readonly CancellationTokenRegistration _ctsRegistration;
private Task _bootstrappingTask;
public DefaultBootstrapper(
ILogger<DefaultBootstrapper> logger,
IOptions<CapOptions> options,
ICapMessageStore storage,
IApplicationLifetime appLifetime,
IServiceProvider provider)
{
_logger = logger;
_appLifetime = appLifetime;
Options = options.Value;
Storage = storage;
_appLifetime = appLifetime;
Provider = provider;
Servers = Provider.GetServices<IProcessingServer>();
......@@ -39,8 +43,9 @@ namespace DotNetCore.CAP
{
_bootstrappingTask?.Wait();
}
catch (OperationCanceledException)
catch (OperationCanceledException ex)
{
_logger.ExpectedOperationCanceledException(ex);
}
});
}
......@@ -74,8 +79,9 @@ namespace DotNetCore.CAP
{
item.Start();
}
catch (Exception)
catch (Exception ex)
{
_logger.ServerStartedError(ex);
}
}
......
......@@ -15,6 +15,14 @@ namespace DotNetCore.CAP
/// <param name="message">The message to create in the store.</param>
Task<OperateResult> StoreSentMessageAsync(CapSentMessage message);
/// <summary>
/// Change <see cref="CapSentMessage"/> model status name.
/// </summary>
/// <param name="message">The type of <see cref="CapSentMessage"/>.</param>
/// <param name="statusName">The status name.</param>
/// <returns></returns>
Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName, bool autoSaveChanges = true);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
......@@ -33,6 +41,9 @@ namespace DotNetCore.CAP
/// <param name="message">The message to delete in the store.</param>
Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
......@@ -40,6 +51,19 @@ namespace DotNetCore.CAP
/// <returns></returns>
Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message);
/// <summary>
/// Change <see cref="CapReceivedMessage"/> model status name.
/// </summary>
/// <param name="message">The type of <see cref="CapReceivedMessage"/>.</param>
/// <param name="statusName">The status name.</param>
/// <returns></returns>
Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, bool autoSaveChanges = true);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted();
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
......
......@@ -48,7 +48,7 @@ namespace DotNetCore.CAP
Content = content
};
message.StateName = StateName.Enqueued;
message.StatusName = StatusName.Enqueued;
await _store.StoreSentMessageAsync(message);
WaitHandleEx.PulseEvent.Set();
......
......@@ -5,6 +5,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......@@ -20,12 +21,10 @@ namespace DotNetCore.CAP
private readonly MethodMatcherCache _selector;
private readonly CapOptions _options;
private readonly ICapMessageStore _messageStore;
private readonly CancellationTokenSource _cts;
public event EventHandler<CapMessage> MessageReceieved;
private CapStartContext _context;
private Task _compositeTask;
private bool _disposed;
......@@ -34,7 +33,6 @@ namespace DotNetCore.CAP
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory,
ILoggerFactory loggerFactory,
ICapMessageStore messageStore,
MethodMatcherCache selector,
IOptions<CapOptions> options)
{
......@@ -45,7 +43,6 @@ namespace DotNetCore.CAP
_consumerInvokerFactory = consumerInvokerFactory;
_consumerClientFactory = consumerClientFactory;
_options = options.Value;
_messageStore = messageStore;
_cts = new CancellationTokenSource();
}
......@@ -56,9 +53,7 @@ namespace DotNetCore.CAP
public void Start()
{
_context = new CapStartContext(_serviceProvider, _cts.Token);
var matchs = _selector.GetCandidatesMethods(_context);
var matchs = _selector.GetCandidatesMethods(_serviceProvider);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange);
......@@ -86,12 +81,17 @@ namespace DotNetCore.CAP
{
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content);
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var capMessage = new CapReceivedMessage(message)
{
StateName = StateName.Enqueued,
StatusName = StatusName.Enqueued,
Added = DateTime.Now
};
_messageStore.StoreReceivedMessageAsync(capMessage).Wait();
messageStore.StoreReceivedMessageAsync(capMessage).Wait();
ConsumerExecutorDescriptor executeDescriptor = null;
......@@ -105,13 +105,14 @@ namespace DotNetCore.CAP
invoker.InvokeAsync();
_messageStore.UpdateReceivedMessageAsync(capMessage).Wait();
messageStore.ChangeReceivedMessageStateAsync(capMessage,StatusName.Succeeded).Wait();
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed(executeDescriptor.MethodInfo.Name, ex);
}
}
}
public void Dispose()
{
......
......@@ -33,13 +33,13 @@ namespace DotNetCore.CAP.Infrastructure
public int Retries { get; set; }
public string StateName { get; set; }
public string StatusName { get; set; }
}
/// <summary>
/// The message state name.
/// The message status name.
/// </summary>
public struct StateName
public struct StatusName
{
public const string Enqueued = nameof(Enqueued);
public const string Processing = nameof(Processing);
......
......@@ -36,9 +36,9 @@ namespace DotNetCore.CAP.Internal
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
}
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(CapStartContext context)
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider)
{
var consumerServices = context.ServiceProvider.GetServices<IConsumerService>();
var consumerServices = provider.GetServices<IConsumerService>();
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
foreach (var service in consumerServices)
......
......@@ -13,11 +13,11 @@ namespace DotNetCore.CAP.Internal
_selector = selector;
}
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(CapStartContext routeContext)
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(IServiceProvider provider)
{
if (Entries.Count == 0)
{
var executorCollection = _selector.SelectCandidates(routeContext);
var executorCollection = _selector.SelectCandidates(provider);
foreach (var item in executorCollection)
{
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Job
{
public class CapJob : IJob
{
public Task ExecuteAsync()
private readonly MethodMatcherCache _selector;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<CapJob> _logger;
private readonly ICapMessageStore _messageStore;
public CapJob(
ILogger<CapJob> logger,
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
ICapMessageStore messageStore,
MethodMatcherCache selector)
{
_logger = logger;
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
_messageStore = messageStore;
_selector = selector;
}
public async Task ExecuteAsync()
{
var matchs = _selector.GetCandidatesMethods(_serviceProvider);
using (var scope = _serviceProvider.CreateScope())
{
Console.WriteLine("当前时间:" + DateTime.Now.ToString());
var provider = scope.ServiceProvider;
var messageStore = provider.GetService<ICapMessageStore>();
return Task.CompletedTask;
var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted();
if (nextReceivedMessage != null)
{
var executeDescriptor = matchs[nextReceivedMessage.KeyName];
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage);
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing);
await invoker.InvokeAsync();
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage,StatusName.Succeeded);
}
}
}
}
}
\ No newline at end of file
......@@ -9,6 +9,7 @@ namespace DotNetCore.CAP
internal static class LoggerExtensions
{
private static Action<ILogger, int, int, Exception> _serverStarting;
private static Action<ILogger, Exception> _serverStartingError;
private static Action<ILogger, Exception> _serverShuttingDown;
private static Action<ILogger, string, Exception> _expectedOperationCanceledException;
......@@ -28,6 +29,11 @@ namespace DotNetCore.CAP
1,
"Starting the processing server. Detected {MachineProcessorCount} machine processor(s). Initiating {ProcessorCount} job processor(s).");
_serverStartingError = LoggerMessage.Define(
LogLevel.Error,
5,
"Starting the processing server throw an exception.");
_serverShuttingDown = LoggerMessage.Define(
LogLevel.Debug,
2,
......@@ -94,6 +100,11 @@ namespace DotNetCore.CAP
_serverStarting(logger, machineProcessorCount, processorCount, null);
}
public static void ServerStartedError(this ILogger logger, Exception ex)
{
_serverStartingError(logger, ex);
}
public static void ServerShuttingDown(this ILogger logger)
{
_serverShuttingDown(logger, null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment