Commit e1ec8eac authored by yangxiaodong's avatar yangxiaodong

refactor storage module.

parent 9371e559
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore
{
/// <summary>
/// Represents a new instance of a persistence store for the specified message types.
/// </summary>
/// <typeparam name="TContext">The type of the data context class used to access the store.</typeparam>
public class CapMessageStore<TContext> : ICapMessageStore where TContext : DbContext
{
/// <summary>
/// Constructs a new instance of <see cref="TContext"/>.
/// </summary>
/// <param name="context">The <see cref="DbContext"/>.</param>
public CapMessageStore(TContext context)
{
Context = context ?? throw new ArgumentNullException(nameof(context));
}
public TContext Context { get; private set; }
private DbSet<CapSentMessage> SentMessages => Context.Set<CapSentMessage>();
/// <summary>
/// Creates the specified <paramref name="message"/> in the cap message store.
/// </summary>
/// <param name="message">The message to create.</param>
public async Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Add(message);
await Context.SaveChangesAsync();
return OperateResult.Success;
}
}
}
\ No newline at end of file
...@@ -56,17 +56,24 @@ OUTPUT DELETED.MessageId,DELETED.[Type];"; ...@@ -56,17 +56,24 @@ OUTPUT DELETED.MessageId,DELETED.[Type];";
var sql = $@" var sql = $@"
SELECT TOP (1) * SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast) FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
WHERE StateName = '{StatusName.Enqueued}'"; WHERE StatusName = '{StatusName.Scheduled}'";
var connection = _context.GetDbConnection(); try
var message = (await connection.QueryAsync<CapSentMessage>(sql)).FirstOrDefault(); {
var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapSentMessage>(sql)).FirstOrDefault();
if (message != null) if (message != null)
{
_context.Attach(message);
}
return message;
}
catch (Exception ex)
{ {
_context.Attach(message); throw;
} }
return message;
} }
// CapReceviedMessage // CapReceviedMessage
......
...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.RabbitMQ
public Task ProcessAsync(ProcessingContext context) public Task ProcessAsync(ProcessingContext context)
{ {
if (context == null) throw new ArgumentNullException(nameof(context)); if (context == null) throw new ArgumentNullException(nameof(context));
System.Diagnostics.Debug.WriteLine("RabbitMQ Processor 执行: " + DateTime.Now);
context.ThrowIfStopping(); context.ThrowIfStopping();
return ProcessCoreAsync(context); return ProcessCoreAsync(context);
} }
...@@ -64,10 +64,9 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -64,10 +64,9 @@ namespace DotNetCore.CAP.RabbitMQ
if (!worked) if (!worked)
{ {
var token = GetTokenToWaitOn(context); var token = GetTokenToWaitOn(context);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.SentPulseEvent, await WaitHandleEx.WaitAnyAsync(WaitHandleEx.SentPulseEvent, token.WaitHandle, _pollingDelay);
context.CancellationToken.WaitHandle, _pollingDelay); }
} }
finally finally
{ {
...@@ -92,6 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -92,6 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
using (fetched) using (fetched)
{ {
var message = await connection.GetSentMessageAsync(fetched.MessageId); var message = await connection.GetSentMessageAsync(fetched.MessageId);
try try
{ {
......
...@@ -56,7 +56,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -56,7 +56,7 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.BasicConsume(_queueName, false, consumer); _channel.BasicConsume(_queueName, false, consumer);
while (true) while (true)
{ {
Task.Delay(timeout); Task.Delay(timeout).Wait();
} }
} }
......
...@@ -46,27 +46,6 @@ namespace DotNetCore.CAP ...@@ -46,27 +46,6 @@ namespace DotNetCore.CAP
return this; return this;
} }
/// <summary>
/// Add an <see cref="ICapMessageStore"/> .
/// </summary>
/// <typeparam name="T">The type for the <see cref="ICapMessageStore"/> to add. </typeparam>
/// <returns>The current <see cref="CapBuilder"/> instance.</returns>
public virtual CapBuilder AddMessageStore<T>()
where T : class, ICapMessageStore
{
return AddScoped(typeof(ICapMessageStore), typeof(T));
}
/// <summary>
/// Add an <see cref="IJob"/> for process <see cref="CapJob"/>.
/// </summary>
/// <typeparam name="T">The type of the job.</typeparam>
public virtual CapBuilder AddJobs<T>()
where T : class, IJob
{
return AddSingleton<IJob, T>();
}
/// <summary> /// <summary>
/// Add an <see cref="ICapPublisher"/>. /// Add an <see cref="ICapPublisher"/>.
/// </summary> /// </summary>
......
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
/// <summary>
/// Provides an abstraction for a store which manages CAP message.
/// </summary>
public interface ICapMessageStore
{
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
Task<OperateResult> StoreSentMessageAsync(CapSentMessage message);
}
}
\ No newline at end of file
...@@ -11,14 +11,10 @@ namespace DotNetCore.CAP ...@@ -11,14 +11,10 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public class DefaultCapPublisher : ICapPublisher public class DefaultCapPublisher : ICapPublisher
{ {
private readonly ICapMessageStore _store; private readonly ILogger _logger;
private readonly ILogger _logger;
public DefaultCapPublisher( public DefaultCapPublisher( ILogger<DefaultCapPublisher> logger)
ICapMessageStore store, {
ILogger<DefaultCapPublisher> logger)
{
_store = store;
_logger = logger; _logger = logger;
} }
...@@ -50,11 +46,11 @@ namespace DotNetCore.CAP ...@@ -50,11 +46,11 @@ namespace DotNetCore.CAP
StatusName = StatusName.Enqueued StatusName = StatusName.Enqueued
}; };
await _store.StoreSentMessageAsync(message); //await _store.StoreSentMessageAsync(message);
WaitHandleEx.PulseEvent.Set(); // WaitHandleEx.PulseEvent.Set();
_logger.EnqueuingSentMessage(topic, content); // _logger.EnqueuingSentMessage(topic, content);
} }
} }
} }
\ No newline at end of file
...@@ -66,7 +66,7 @@ namespace DotNetCore.CAP ...@@ -66,7 +66,7 @@ namespace DotNetCore.CAP
client.Listening(_pollingDelay); client.Listening(_pollingDelay);
} }
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current); }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} }
_compositeTask = Task.CompletedTask; _compositeTask = Task.CompletedTask;
} }
...@@ -106,7 +106,7 @@ namespace DotNetCore.CAP ...@@ -106,7 +106,7 @@ namespace DotNetCore.CAP
{ {
var receviedMessage = StoreMessage(scope, message); var receviedMessage = StoreMessage(scope, message);
client.Commit(); client.Commit();
// ProcessMessage(scope, receviedMessage); // ProcessMessage(scope, receviedMessage);
} }
}; };
} }
......
...@@ -9,6 +9,7 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -9,6 +9,7 @@ namespace DotNetCore.CAP.Infrastructure
/// </summary> /// </summary>
public struct StatusName public struct StatusName
{ {
public const string Scheduled = nameof(Scheduled);
public const string Enqueued = nameof(Enqueued); public const string Enqueued = nameof(Enqueued);
public const string Processing = nameof(Processing); public const string Processing = nameof(Processing);
public const string Succeeded = nameof(Succeeded); public const string Succeeded = nameof(Succeeded);
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Job
{
public class CapJob : IJob
{
private readonly MethodMatcherCache _selector;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<CapJob> _logger;
private readonly ICapMessageStore _messageStore;
public CapJob(
ILogger<CapJob> logger,
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
ICapMessageStore messageStore,
MethodMatcherCache selector)
{
_logger = logger;
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
_messageStore = messageStore;
_selector = selector;
}
public async Task ExecuteAsync()
{
//var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
//using (var scope = _serviceProvider.CreateScope())
//{
// var provider = scope.ServiceProvider;
// var messageStore = provider.GetService<ICapMessageStore>();
// var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted();
// if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group))
// {
// try
// {
// await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing);
// // If there are multiple consumers in the same group, we will take the first
// var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0];
// var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext());
// var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
// await invoker.InvokeAsync();
// await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded);
// }
// catch (Exception ex)
// {
// _logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex);
// }
// }
//}
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
namespace DotNetCore.CAP.Job
{
public interface IJob
{
/// <summary>
/// Executes the job.
/// </summary>
Task ExecuteAsync();
}
}
\ No newline at end of file
using System; using System;
using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -23,6 +24,7 @@ namespace DotNetCore.CAP.Job ...@@ -23,6 +24,7 @@ namespace DotNetCore.CAP.Job
{ {
while (!context.IsStopping) while (!context.IsStopping)
{ {
Debug.WriteLine("InfiniteRetryProcessor在开线程:" + _inner.ToString() + " : " + DateTime.Now);
try try
{ {
await _inner.ProcessAsync(context); await _inner.ProcessAsync(context);
......
...@@ -46,7 +46,8 @@ namespace DotNetCore.CAP.Job ...@@ -46,7 +46,8 @@ namespace DotNetCore.CAP.Job
(sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null) (sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null)
{ {
var state = new EnqueuedState(); System.Diagnostics.Debug.WriteLine("JobQueuer 执行 内部循环: " + DateTime.Now);
var state = new EnqueuedState();
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
...@@ -56,7 +57,8 @@ namespace DotNetCore.CAP.Job ...@@ -56,7 +57,8 @@ namespace DotNetCore.CAP.Job
} }
} }
context.ThrowIfStopping(); System.Diagnostics.Debug.WriteLine("JobQueuer 执行: " + DateTime.Now);
context.ThrowIfStopping();
WaitHandleEx.SentPulseEvent.Set(); WaitHandleEx.SentPulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.QueuePulseEvent, await WaitHandleEx.WaitAnyAsync(WaitHandleEx.QueuePulseEvent,
......
...@@ -117,7 +117,7 @@ namespace DotNetCore.CAP.Job ...@@ -117,7 +117,7 @@ namespace DotNetCore.CAP.Job
} }
returnedProcessors.Add(_provider.GetService<JobQueuer>()); returnedProcessors.Add(_provider.GetService<JobQueuer>());
returnedProcessors.Add(_provider.GetService<IAdditionalProcessor>()); //returnedProcessors.Add(_provider.GetService<IAdditionalProcessor>());
return returnedProcessors.ToArray(); return returnedProcessors.ToArray();
} }
......
...@@ -10,29 +10,29 @@ namespace DotNetCore.CAP.Test ...@@ -10,29 +10,29 @@ namespace DotNetCore.CAP.Test
{ {
public class CapBuilderTest public class CapBuilderTest
{ {
[Fact] //[Fact]
public void CanOverrideMessageStore() //public void CanOverrideMessageStore()
{ //{
var services = new ServiceCollection(); // var services = new ServiceCollection();
services.AddCap().AddMessageStore<MyMessageStore>(); // services.AddCap().AddMessageStore<MyMessageStore>();
var thingy = services.BuildServiceProvider() // var thingy = services.BuildServiceProvider()
.GetRequiredService<ICapMessageStore>() as MyMessageStore; // .GetRequiredService<ICapMessageStore>() as MyMessageStore;
Assert.NotNull(thingy); // Assert.NotNull(thingy);
} ////}
[Fact] //[Fact]
public void CanOverrideJobs() //public void CanOverrideJobs()
{ //{
var services = new ServiceCollection(); // var services = new ServiceCollection();
services.AddCap().AddJobs<MyJobTest>(); // services.AddCap().AddJobs<MyJobTest>();
var thingy = services.BuildServiceProvider() // var thingy = services.BuildServiceProvider()
.GetRequiredService<IJob>() as MyJobTest; // .GetRequiredService<IJob>() as MyJobTest;
Assert.NotNull(thingy); // Assert.NotNull(thingy);
} //}
[Fact] [Fact]
public void CanOverrideProducerService() public void CanOverrideProducerService()
...@@ -58,65 +58,6 @@ namespace DotNetCore.CAP.Test ...@@ -58,65 +58,6 @@ namespace DotNetCore.CAP.Test
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
} }
private class MyJobTest : IJob
{
public Task ExecuteAsync()
{
throw new NotImplementedException();
}
}
private class MyMessageStore : ICapMessageStore
{
public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true)
{
throw new NotImplementedException();
}
public Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName,
bool autoSaveChanges = true)
{
throw new NotImplementedException();
}
public Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted()
{
throw new NotImplementedException();
}
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
throw new NotImplementedException();
}
public Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
public Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
public Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
}
} }
} }
\ No newline at end of file
...@@ -5,17 +5,17 @@ using DotNetCore.CAP.Models; ...@@ -5,17 +5,17 @@ using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Test namespace DotNetCore.CAP.Test
{ {
public class NoopMessageStore : ICapMessageStore //public class NoopMessageStore : ICapMessageStore
{ //{
public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, // public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true) // bool autoSaveChanges = true)
{ // {
throw new NotImplementedException(); // throw new NotImplementedException();
} // }
public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message) // public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{ // {
throw new NotImplementedException(); // throw new NotImplementedException();
} // }
} //}
} }
\ No newline at end of file
using System; //using System;
using System.Threading.Tasks; //using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; //using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; //using DotNetCore.CAP.Models;
using Microsoft.AspNetCore.Http; //using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection; //using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; //using Microsoft.Extensions.Logging;
using Xunit; //using Xunit;
namespace DotNetCore.CAP.Test //namespace DotNetCore.CAP.Test
{ //{
public abstract class MessageManagerTestBase // public abstract class MessageManagerTestBase
{ // {
private const string NullValue = "(null)"; // private const string NullValue = "(null)";
protected virtual bool ShouldSkipDbTests() // protected virtual bool ShouldSkipDbTests()
{ // {
return false; // return false;
} // }
protected virtual void SetupMessageServices(IServiceCollection services, object context = null) // protected virtual void SetupMessageServices(IServiceCollection services, object context = null)
{ // {
services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>(); // services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
services.AddCap(); // services.AddCap();
AddMessageStore(services, context); // AddMessageStore(services, context);
services.AddSingleton<ILogger<ICapMessageStore>>(new TestLogger<ICapMessageStore>()); // services.AddSingleton<ILogger<ICapMessageStore>>(new TestLogger<ICapMessageStore>());
} // }
protected virtual ICapMessageStore CreateManager(object context = null, IServiceCollection services = null, // protected virtual ICapMessageStore CreateManager(object context = null, IServiceCollection services = null,
Action<IServiceCollection> configureServices = null) // Action<IServiceCollection> configureServices = null)
{ // {
if (services == null) // if (services == null)
{ // {
services = new ServiceCollection(); // services = new ServiceCollection();
} // }
if (context == null) // if (context == null)
{ // {
context = CreateTestContext(); // context = CreateTestContext();
} // }
SetupMessageServices(services, context); // SetupMessageServices(services, context);
configureServices?.Invoke(services); // configureServices?.Invoke(services);
return services.BuildServiceProvider().GetService<ICapMessageStore>(); // return services.BuildServiceProvider().GetService<ICapMessageStore>();
} // }
protected abstract object CreateTestContext(); // protected abstract object CreateTestContext();
protected abstract CapSentMessage CreateTestSentMessage(string content = ""); // protected abstract CapSentMessage CreateTestSentMessage(string content = "");
protected abstract CapReceivedMessage CreateTestReceivedMessage(string content = ""); // protected abstract CapReceivedMessage CreateTestReceivedMessage(string content = "");
protected abstract void AddMessageStore(IServiceCollection services, object context = null); // protected abstract void AddMessageStore(IServiceCollection services, object context = null);
[Fact] // [Fact]
public async Task CanDeleteSentMessage() // public async Task CanDeleteSentMessage()
{ // {
if (ShouldSkipDbTests()) // if (ShouldSkipDbTests())
{ // {
return; // return;
} // }
var manager = CreateManager(); // var manager = CreateManager();
var message = CreateTestSentMessage(); // var message = CreateTestSentMessage();
var operateResult = await manager.StoreSentMessageAsync(message); // var operateResult = await manager.StoreSentMessageAsync(message);
Assert.NotNull(operateResult); // Assert.NotNull(operateResult);
Assert.True(operateResult.Succeeded); // Assert.True(operateResult.Succeeded);
// operateResult = await manager.RemoveSentMessageAsync(message); // // operateResult = await manager.RemoveSentMessageAsync(message);
// Assert.NotNull(operateResult); // // Assert.NotNull(operateResult);
// Assert.True(operateResult.Succeeded); // // Assert.True(operateResult.Succeeded);
} // }
//[Fact] // //[Fact]
//public async Task CanUpdateReceivedMessage() // //public async Task CanUpdateReceivedMessage()
//{ // //{
// if (ShouldSkipDbTests()) // // if (ShouldSkipDbTests())
// { // // {
// return; // // return;
// } // // }
// var manager = CreateManager(); // // var manager = CreateManager();
// var message = CreateTestReceivedMessage(); // // var message = CreateTestReceivedMessage();
// // var operateResult = await manager.StoreReceivedMessageAsync(message); // // // var operateResult = await manager.StoreReceivedMessageAsync(message);
// // Assert.NotNull(operateResult); // // // Assert.NotNull(operateResult);
// // Assert.True(operateResult.Succeeded); // // // Assert.True(operateResult.Succeeded);
// // message.StatusName = StatusName.Processing; // // // message.StatusName = StatusName.Processing;
// // operateResult = await manager.UpdateReceivedMessageAsync(message); // // // operateResult = await manager.UpdateReceivedMessageAsync(message);
// // Assert.NotNull(operateResult); // // // Assert.NotNull(operateResult);
// // Assert.True(operateResult.Succeeded); // // // Assert.True(operateResult.Succeeded);
//} // //}
[Fact] // [Fact]
public async Task CanGetNextSendMessage() // public async Task CanGetNextSendMessage()
{ // {
if (ShouldSkipDbTests()) // if (ShouldSkipDbTests())
{ // {
return; // return;
} // }
var manager = CreateManager(); // var manager = CreateManager();
var message = CreateTestSentMessage(); // var message = CreateTestSentMessage();
var operateResult = await manager.StoreSentMessageAsync(message); // var operateResult = await manager.StoreSentMessageAsync(message);
Assert.NotNull(operateResult); // Assert.NotNull(operateResult);
Assert.True(operateResult.Succeeded); // Assert.True(operateResult.Succeeded);
// var storeMessage = await manager.GetNextSentMessageToBeEnqueuedAsync(); // // var storeMessage = await manager.GetNextSentMessageToBeEnqueuedAsync();
// Assert.Equal(message, storeMessage); // // Assert.Equal(message, storeMessage);
} // }
} // }
} //}
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment