Commit 9371e559 authored by yangxiaodong's avatar yangxiaodong

rewrite storage moudle.

parent dfcaf447
...@@ -92,7 +92,6 @@ Global ...@@ -92,7 +92,6 @@ Global
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.Build.0 = Debug|Any CPU {F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.ActiveCfg = Release|Any CPU {F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
...@@ -105,6 +104,6 @@ Global ...@@ -105,6 +104,6 @@ Global
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{69370370-9873-4D6A-965D-D1E16694047D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {69370370-9873-4D6A-965D-D1E16694047D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection EndGlobalSection
EndGlobal EndGlobal
...@@ -9,11 +9,11 @@ using Microsoft.EntityFrameworkCore; ...@@ -9,11 +9,11 @@ using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka namespace Sample.Kafka
{ {
public class AppDbContext : CapDbContext public class AppDbContext :DbContext
{ {
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{ {
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
} }
} }
} }
...@@ -29,7 +29,7 @@ namespace Sample.Kafka.Controllers ...@@ -29,7 +29,7 @@ namespace Sample.Kafka.Controllers
{ {
Console.WriteLine(person.Name); Console.WriteLine(person.Name);
Console.WriteLine(person.Age); Console.WriteLine(person.Age);
} }
[Route("~/send")] [Route("~/send")]
......
...@@ -24,13 +24,17 @@ namespace Sample.Kafka ...@@ -24,13 +24,17 @@ namespace Sample.Kafka
// This method gets called by the runtime. Use this method to add services to the container. // This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
services.AddDbContext<CapDbContext>(); services.AddDbContext<AppDbContext>();
services.AddCap() services.AddCap()
.AddEntityFrameworkStores<CapDbContext>() .AddEntityFrameworkStores<AppDbContext>(x=> {
x.ConnectionString = "Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True";
})
.AddRabbitMQ(x => .AddRabbitMQ(x =>
{ {
x.HostName = "localhost"; x.HostName = "192.168.2.206";
x.UserName = "admin";
x.Password = "123123";
}); });
//.AddKafka(x => x.Servers = ""); //.AddKafka(x => x.Servers = "");
......
...@@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder) public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder)
where TContext : DbContext where TContext : DbContext
{ {
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>(); //builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>(); builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>(); builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
...@@ -26,25 +26,26 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -26,25 +26,26 @@ namespace Microsoft.Extensions.DependencyInjection
} }
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<EFOptions> actionOptions) public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<SqlServerOptions> actionOptions)
where TContext : DbContext where TContext : DbContext
{ {
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>(); //builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddSingleton<IStorage, EFStorage>(); builder.Services.AddSingleton<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>(); builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
builder.Services.Configure(actionOptions); builder.Services.Configure(actionOptions);
var efOptions = new EFOptions(); var sqlServerOptions = new SqlServerOptions();
actionOptions(efOptions); actionOptions(sqlServerOptions);
builder.Services.AddSingleton(sqlServerOptions);
builder.Services.AddDbContext<CapDbContext>(options => builder.Services.AddDbContext<CapDbContext>(options =>
{ {
options.UseSqlServer(efOptions.ConnectionString, sqlOpts => options.UseSqlServer(sqlServerOptions.ConnectionString, sqlOpts =>
{ {
sqlOpts.MigrationsHistoryTable( sqlOpts.MigrationsHistoryTable(
efOptions.MigrationsHistoryTableName, sqlServerOptions.MigrationsHistoryTableName,
efOptions.MigrationsHistoryTableSchema ?? efOptions.Schema); sqlServerOptions.MigrationsHistoryTableSchema ?? sqlServerOptions.Schema);
}); });
}); });
......
...@@ -38,4 +38,24 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -38,4 +38,24 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </summary> /// </summary>
public string MigrationsHistoryTableName { get; set; } = DefaultMigrationsHistoryTableName; public string MigrationsHistoryTableName { get; set; } = DefaultMigrationsHistoryTableName;
} }
//public static class CapOptionsExtensions
//{
// public static EFOptions UseSqlServer(this CapOptions options, string connectionString)
// {
// return options.UseSqlServer(opts =>
// {
// opts.ConnectionString = connectionString;
// });
// }
// public static EFOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
// {
// if (configure == null) throw new ArgumentNullException(nameof(configure));
// (new EFOptions(configure));
// return options;
// }
//}
} }
...@@ -10,22 +10,19 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -10,22 +10,19 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </summary> /// </summary>
public class CapDbContext : DbContext public class CapDbContext : DbContext
{ {
private readonly EFOptions _efOptions; private SqlServerOptions _sqlServerOptions;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>. /// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary> /// </summary>
public CapDbContext() { public CapDbContext() { }
_efOptions = new EFOptions();
}
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>. /// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary> /// </summary>
/// <param name="options">The options to be used by a <see cref="DbContext"/>.</param> /// <param name="options">The options to be used by a <see cref="DbContext"/>.</param>
public CapDbContext(DbContextOptions<CapDbContext> options, EFOptions efOptions) public CapDbContext(DbContextOptions<CapDbContext> options, SqlServerOptions sqlServerOptions)
: base(options) { : base(options) {
_efOptions = efOptions; _sqlServerOptions = sqlServerOptions;
} }
/// <summary> /// <summary>
...@@ -51,7 +48,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -51,7 +48,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </param> /// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder) protected override void OnModelCreating(ModelBuilder modelBuilder)
{ {
modelBuilder.HasDefaultSchema(_efOptions.Schema); modelBuilder.HasDefaultSchema(_sqlServerOptions.Schema);
modelBuilder.Entity<CapSentMessage>(b => modelBuilder.Entity<CapSentMessage>(b =>
{ {
...@@ -68,9 +65,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -68,9 +65,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
}); });
} }
//protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
//{ {
// optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); }
//}
} }
} }
\ No newline at end of file
...@@ -27,12 +27,11 @@ namespace DotNetCore.CAP ...@@ -27,12 +27,11 @@ namespace DotNetCore.CAP
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)";
await connection.ExecuteAsync(sql, transaction); await connection.ExecuteAsync(sql, transaction);
WaitHandleEx.QueuePulseEvent.Set();
JobQueuer.PulseEvent.Set();
} }
public static async Task<int> Publish(this ICapPublisher publisher, string topic, string content, IDbConnection connection,IDbTransaction transaction) public static async Task Publish(this ICapPublisher publisher, string topic, string content, IDbConnection connection,IDbTransaction transaction)
{ {
var message = new CapSentMessage var message = new CapSentMessage
{ {
...@@ -42,10 +41,8 @@ namespace DotNetCore.CAP ...@@ -42,10 +41,8 @@ namespace DotNetCore.CAP
}; };
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)";
return await connection.ExecuteAsync(sql, transaction); await connection.ExecuteAsync(sql, transaction);
WaitHandleEx.QueuePulseEvent.Set();
JobQueuer.PulseEvent.Set();
} }
} }
} }
\ No newline at end of file
...@@ -3,6 +3,7 @@ using System.Collections.Generic; ...@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Data; using System.Data;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
namespace DotNetCore.CAP.EntityFrameworkCore namespace DotNetCore.CAP.EntityFrameworkCore
...@@ -16,10 +17,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -16,10 +17,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore
private readonly object _lockObject = new object(); private readonly object _lockObject = new object();
public EFFetchedMessage(string messageId, public EFFetchedMessage(string messageId,
int type,
IDbConnection connection, IDbConnection connection,
IDbContextTransaction transaction) IDbContextTransaction transaction)
{ {
MessageId = messageId; MessageId = messageId;
Type = type;
_connection = connection; _connection = connection;
_transaction = transaction; _transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval); _timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
...@@ -27,6 +30,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -27,6 +30,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public string MessageId { get; } public string MessageId { get; }
public int Type { get; }
public void RemoveFromQueue() public void RemoveFromQueue()
{ {
lock (_lockObject) lock (_lockObject)
......
...@@ -30,7 +30,14 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -30,7 +30,14 @@ namespace DotNetCore.CAP.EntityFrameworkCore
var context = provider.GetRequiredService<CapDbContext>(); var context = provider.GetRequiredService<CapDbContext>();
_logger.LogDebug("Ensuring all migrations are applied to Jobs database."); _logger.LogDebug("Ensuring all migrations are applied to Jobs database.");
await context.Database.MigrateAsync(cancellationToken); try
{
await context.Database.MigrateAsync(cancellationToken);
}
catch (Exception ex)
{
throw ex;
}
} }
} }
} }
......
using System; using System;
using System.Data; using System.Data;
using System.Data.SqlClient;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.EntityFrameworkCore namespace DotNetCore.CAP.EntityFrameworkCore
...@@ -12,11 +15,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -12,11 +15,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public class EFStorageConnection : IStorageConnection public class EFStorageConnection : IStorageConnection
{ {
private readonly CapDbContext _context; private readonly CapDbContext _context;
private readonly EFOptions _options; private readonly SqlServerOptions _options;
public EFStorageConnection( public EFStorageConnection(
CapDbContext context, CapDbContext context,
IOptions<EFOptions> options) IOptions<SqlServerOptions> options)
{ {
_context = context; _context = context;
_options = options.Value; _options = options.Value;
...@@ -24,49 +27,39 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -24,49 +27,39 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public CapDbContext Context => _context; public CapDbContext Context => _context;
public EFOptions Options => _options; public SqlServerOptions Options => _options;
public IStorageTransaction CreateTransaction() public IStorageTransaction CreateTransaction()
{ {
return new EFStorageTransaction(this); return new EFStorageTransaction(this);
} }
public Task<CapSentMessage> GetSentMessageAsync(string id) public Task<CapSentMessage> GetSentMessageAsync(string id)
{ {
return _context.CapSentMessages.FirstOrDefaultAsync(x => x.Id == id); return _context.CapSentMessages.FirstOrDefaultAsync(x => x.Id == id);
} }
public async Task<IFetchedMessage> FetchNextSentMessageAsync()
{
// var sql = $@"
//DELETE TOP (1)
//FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast, updlock, rowlock)
//OUTPUT DELETED.Id";
var queueFirst = await _context.CapQueue.FirstOrDefaultAsync(); public Task<IFetchedMessage> FetchNextMessageAsync()
if (queueFirst == null) {
return null; var sql = $@"
DELETE TOP (1)
_context.CapQueue.Remove(queueFirst); FROM [{_options.Schema}].[{nameof(CapDbContext.CapQueue)}] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[Type];";
var connection = _context.Database.GetDbConnection(); return FetchNextMessageCoreAsync(sql);
var transaction = _context.Database.CurrentTransaction;
transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
return new EFFetchedMessage(queueFirst.MessageId, connection, transaction);
} }
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
// var sql = $@"
//SELECT TOP (1) *
//FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
//WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{StatusName.Enqueued}'";
// var connection = _context.GetDbConnection();
// var message = _context.CapSentMessages.FromSql(sql).FirstOrDefaultAsync(); public async Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
var sql = $@"
SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
WHERE StateName = '{StatusName.Enqueued}'";
var message = _context.CapSentMessages.Where(x => x.StatusName == StatusName.Enqueued).FirstOrDefaultAsync(); var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapSentMessage>(sql)).FirstOrDefault();
if (message != null) if (message != null)
{ {
...@@ -76,12 +69,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -76,12 +69,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return message; return message;
} }
// CapReceviedMessage
public Task StoreReceivedMessageAsync(CapReceivedMessage message) public Task StoreReceivedMessageAsync(CapReceivedMessage message)
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
message.LastRun = NormalizeDateTime(message.LastRun);
_context.Add(message); _context.Add(message);
return _context.SaveChangesAsync(); return _context.SaveChangesAsync();
} }
...@@ -91,28 +84,59 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -91,28 +84,59 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return _context.CapReceivedMessages.FirstOrDefaultAsync(x => x.Id == id); return _context.CapReceivedMessages.FirstOrDefaultAsync(x => x.Id == id);
} }
public Task<IFetchedMessage> FetchNextReceivedMessageAsync() public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{ {
throw new NotImplementedException(); var sql = $@"
SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapReceivedMessages)}] WITH (readpast)
WHERE StateName = '{StatusName.Enqueued}'";
var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapReceivedMessage>(sql)).FirstOrDefault();
if (message != null)
{
_context.Attach(message);
}
return message;
} }
public Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync() public void Dispose()
{ {
throw new NotImplementedException();
} }
private DateTime? NormalizeDateTime(DateTime? dateTime) private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{ {
if (!dateTime.HasValue) return dateTime; FetchedMessage fetchedJob = null;
if (dateTime == DateTime.MinValue) var connection = _context.GetDbConnection();
var transaction = _context.Database.CurrentTransaction;
transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
try
{ {
return new DateTime(1754, 1, 1, 0, 0, 0, DateTimeKind.Utc); fetchedJob =
(await connection.QueryAsync<FetchedMessage>(sql, args, transaction.GetDbTransaction()))
.FirstOrDefault();
}
catch (SqlException)
{
transaction.Dispose();
throw;
} }
return dateTime;
}
public void Dispose() if (fetchedJob == null)
{ {
} transaction.Rollback();
transaction.Dispose();
return null;
}
return new EFFetchedMessage(
fetchedJob.MessageId,
fetchedJob.Type,
connection,
transaction);
}
} }
} }
...@@ -7,5 +7,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -7,5 +7,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public class FetchedMessage public class FetchedMessage
{ {
public string MessageId { get; set; } public string MessageId { get; set; }
public int Type { get; set; }
} }
} }
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class SqlServerOptions
{
public const string DefaultSchema = "cap";
public const string DefaultMigrationsHistoryTableName = "__EFMigrationsHistory";
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
/// <summary>
/// Gets or sets the migrations history table's schema.
/// If this is null, <see cref="Schema"/> will be used.
/// </summary>
public string MigrationsHistoryTableSchema { get; set; }
/// <summary>
/// Gets or sets the migrations history table's name.
/// Default is <see cref="DefaultMigrationsHistoryTableName"/>.
/// </summary>
public string MigrationsHistoryTableName { get; set; } = DefaultMigrationsHistoryTableName;
}
}
...@@ -15,7 +15,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -15,7 +15,7 @@ namespace Microsoft.Extensions.DependencyInjection
builder.Services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); builder.Services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
builder.Services.AddTransient<IJobProcessor, RabbitJobProcessor>(); builder.Services.AddTransient<IMessageJobProcessor, RabbitJobProcessor>();
return builder; return builder;
} }
......
...@@ -14,7 +14,7 @@ using RabbitMQ.Client; ...@@ -14,7 +14,7 @@ using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ namespace DotNetCore.CAP.RabbitMQ
{ {
public class RabbitJobProcessor : IJobProcessor public class RabbitJobProcessor : IMessageJobProcessor
{ {
private readonly RabbitMQOptions _rabbitMqOptions; private readonly RabbitMQOptions _rabbitMqOptions;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
...@@ -86,10 +86,9 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -86,10 +86,9 @@ namespace DotNetCore.CAP.RabbitMQ
using (var scopedContext = context.CreateScope()) using (var scopedContext = context.CreateScope())
{ {
var provider = scopedContext.Provider; var provider = scopedContext.Provider;
//var messageStore = provider.GetRequiredService<ICapMessageStore>();
var connection = provider.GetRequiredService<IStorageConnection>(); var connection = provider.GetRequiredService<IStorageConnection>();
if ((fetched = await connection.FetchNextSentMessageAsync()) != null) if ((fetched = await connection.FetchNextMessageAsync()) != null)
{ {
using (fetched) using (fetched)
{ {
......
...@@ -7,6 +7,7 @@ using DotNetCore.CAP.Abstractions.ModelBinding; ...@@ -7,6 +7,7 @@ using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Job; using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;
namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
...@@ -49,10 +50,12 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -49,10 +50,12 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSingleton<IProcessingServer, ConsumerHandler>(); services.AddSingleton<IProcessingServer, ConsumerHandler>();
services.AddSingleton<IProcessingServer, JobProcessingServer>(); services.AddSingleton<IProcessingServer, JobProcessingServer>();
services.AddSingleton<IBootstrapper, DefaultBootstrapper>(); services.AddSingleton<IBootstrapper, DefaultBootstrapper>();
services.AddSingleton<IStateChanger, StateChanger>();
//Processors
services.AddTransient<JobQueuer>();
//services.AddTransient<>
services.TryAddTransient<IJobProcessor, CronJobProcessor>(); //services.TryAddSingleton<IJob, CapJob>();
services.TryAddSingleton<IJob, CapJob>();
services.TryAddTransient<DefaultCronJobRegistry>();
services.TryAddScoped<ICapPublisher, DefaultCapPublisher>(); services.TryAddScoped<ICapPublisher, DefaultCapPublisher>();
......
...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP ...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP
public DefaultBootstrapper( public DefaultBootstrapper(
ILogger<DefaultBootstrapper> logger, ILogger<DefaultBootstrapper> logger,
IOptions<CapOptions> options, IOptions<CapOptions> options,
ICapMessageStore storage, IStorage storage,
IApplicationLifetime appLifetime, IApplicationLifetime appLifetime,
IServiceProvider provider) IServiceProvider provider)
{ {
...@@ -52,7 +52,7 @@ namespace DotNetCore.CAP ...@@ -52,7 +52,7 @@ namespace DotNetCore.CAP
protected CapOptions Options { get; } protected CapOptions Options { get; }
protected ICapMessageStore Storage { get; } protected IStorage Storage { get; }
protected IEnumerable<IProcessingServer> Servers { get; } protected IEnumerable<IProcessingServer> Servers { get; }
...@@ -65,6 +65,8 @@ namespace DotNetCore.CAP ...@@ -65,6 +65,8 @@ namespace DotNetCore.CAP
private async Task BootstrapTaskAsync() private async Task BootstrapTaskAsync()
{ {
await Storage.InitializeAsync(_cts.Token);
if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;
if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;
...@@ -98,7 +100,7 @@ namespace DotNetCore.CAP ...@@ -98,7 +100,7 @@ namespace DotNetCore.CAP
item.Dispose(); item.Dispose();
} }
}); });
return Task.FromResult(0); return Task.CompletedTask;
} }
} }
} }
\ No newline at end of file
...@@ -123,6 +123,11 @@ namespace DotNetCore.CAP ...@@ -123,6 +123,11 @@ namespace DotNetCore.CAP
return receivedMessage; return receivedMessage;
} }
public void Pulse()
{
throw new NotImplementedException();
}
//private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) //private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
//{ //{
// var provider = serviceScope.ServiceProvider; // var provider = serviceScope.ServiceProvider;
......
...@@ -6,6 +6,8 @@ namespace DotNetCore.CAP ...@@ -6,6 +6,8 @@ namespace DotNetCore.CAP
{ {
string MessageId { get; } string MessageId { get; }
int Type { get; }
void RemoveFromQueue(); void RemoveFromQueue();
void Requeue(); void Requeue();
......
...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP ...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Fetches the next message to be executed. /// Fetches the next message to be executed.
/// </summary> /// </summary>
Task<IFetchedMessage> FetchNextSentMessageAsync(); Task<IFetchedMessage> FetchNextMessageAsync();
/// <summary> /// <summary>
/// Returns the next message to be enqueued. /// Returns the next message to be enqueued.
...@@ -42,10 +42,6 @@ namespace DotNetCore.CAP ...@@ -42,10 +42,6 @@ namespace DotNetCore.CAP
/// <param name="id">The message's id.</param> /// <param name="id">The message's id.</param>
Task<CapReceivedMessage> GetReceivedMessageAsync(string id); Task<CapReceivedMessage> GetReceivedMessageAsync(string id);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<IFetchedMessage> FetchNextReceivedMessageAsync();
/// <summary> /// <summary>
/// Returns the next message to be enqueued. /// Returns the next message to be enqueued.
......
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.Job
{
public interface IAdditionalProcessor : IJobProcessor
{
}
}
...@@ -19,6 +19,7 @@ namespace DotNetCore.CAP.Job ...@@ -19,6 +19,7 @@ namespace DotNetCore.CAP.Job
private readonly CapOptions _options; private readonly CapOptions _options;
private IJobProcessor[] _processors; private IJobProcessor[] _processors;
private IMessageJobProcessor[] _messageProcessors;
private ProcessingContext _context; private ProcessingContext _context;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
...@@ -39,14 +40,14 @@ namespace DotNetCore.CAP.Job ...@@ -39,14 +40,14 @@ namespace DotNetCore.CAP.Job
public void Start() public void Start()
{ {
var processorCount = Environment.ProcessorCount; var processorCount = Environment.ProcessorCount;
//processorCount = 1; processorCount = 1;
_processors = GetProcessors(processorCount); _processors = GetProcessors(processorCount);
_logger.ServerStarting(processorCount, processorCount); _logger.ServerStarting(processorCount, processorCount);
_context = new ProcessingContext(_provider, _cts.Token); _context = new ProcessingContext(_provider, _cts.Token);
var processorTasks = _processors var processorTasks = _processors
.Select(InfiniteRetry) .Select(p => InfiniteRetry(p))
.Select(p => p.ProcessAsync(_context)); .Select(p => p.ProcessAsync(_context));
_compositeTask = Task.WhenAll(processorTasks); _compositeTask = Task.WhenAll(processorTasks);
} }
...@@ -66,7 +67,7 @@ namespace DotNetCore.CAP.Job ...@@ -66,7 +67,7 @@ namespace DotNetCore.CAP.Job
private bool AllProcessorsWaiting() private bool AllProcessorsWaiting()
{ {
foreach (var processor in _processors) foreach (var processor in _messageProcessors)
{ {
if (!processor.Waiting) if (!processor.Waiting)
{ {
...@@ -110,21 +111,14 @@ namespace DotNetCore.CAP.Job ...@@ -110,21 +111,14 @@ namespace DotNetCore.CAP.Job
var returnedProcessors = new List<IJobProcessor>(); var returnedProcessors = new List<IJobProcessor>();
for (int i = 0; i < processorCount; i++) for (int i = 0; i < processorCount; i++)
{ {
var processors = _provider.GetServices<IJobProcessor>(); var messageProcessors = _provider.GetServices<IMessageJobProcessor>();
foreach (var processor in processors) _messageProcessors = messageProcessors.ToArray();
{ returnedProcessors.AddRange(messageProcessors);
if (processor is CronJobProcessor)
{
if (i == 0) // only add first cronJob
returnedProcessors.Add(processor);
}
else
{
returnedProcessors.Add(processor);
}
}
} }
returnedProcessors.Add(_provider.GetService<JobQueuer>());
returnedProcessors.Add(_provider.GetService<IAdditionalProcessor>());
return returnedProcessors.ToArray(); return returnedProcessors.ToArray();
} }
} }
......
...@@ -13,11 +13,6 @@ namespace DotNetCore.CAP ...@@ -13,11 +13,6 @@ namespace DotNetCore.CAP
private static readonly Action<ILogger, Exception> _serverShuttingDown; private static readonly Action<ILogger, Exception> _serverShuttingDown;
private static readonly Action<ILogger, string, Exception> _expectedOperationCanceledException; private static readonly Action<ILogger, string, Exception> _expectedOperationCanceledException;
private static readonly Action<ILogger, Exception> _cronJobsNotFound;
private static readonly Action<ILogger, int, Exception> _cronJobsScheduling;
private static readonly Action<ILogger, string, double, Exception> _cronJobExecuted;
private static readonly Action<ILogger, string, Exception> _cronJobFailed;
private static readonly Action<ILogger, string, string, Exception> _enqueuingSentMessage; private static readonly Action<ILogger, string, string, Exception> _enqueuingSentMessage;
private static readonly Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage; private static readonly Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage;
private static readonly Action<ILogger, string, Exception> _executingConsumerMethod; private static readonly Action<ILogger, string, Exception> _executingConsumerMethod;
...@@ -45,26 +40,6 @@ namespace DotNetCore.CAP ...@@ -45,26 +40,6 @@ namespace DotNetCore.CAP
3, 3,
"Expected an OperationCanceledException, but found '{ExceptionMessage}'."); "Expected an OperationCanceledException, but found '{ExceptionMessage}'.");
_cronJobsNotFound = LoggerMessage.Define(
LogLevel.Debug,
1,
"No cron jobs found to schedule, cancelling processing of cron jobs.");
_cronJobsScheduling = LoggerMessage.Define<int>(
LogLevel.Debug,
2,
"Found {JobCount} cron job(s) to schedule.");
_cronJobExecuted = LoggerMessage.Define<string, double>(
LogLevel.Debug,
3,
"Cron job '{JobName}' executed succesfully. Took: {Seconds} secs.");
_cronJobFailed = LoggerMessage.Define<string>(
LogLevel.Warning,
4,
"Cron job '{jobName}' failed to execute.");
_enqueuingSentMessage = LoggerMessage.Define<string, string>( _enqueuingSentMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug, LogLevel.Debug,
2, 2,
...@@ -125,25 +100,5 @@ namespace DotNetCore.CAP ...@@ -125,25 +100,5 @@ namespace DotNetCore.CAP
{ {
_expectedOperationCanceledException(logger, ex.Message, ex); _expectedOperationCanceledException(logger, ex.Message, ex);
} }
public static void CronJobsNotFound(this ILogger logger)
{
_cronJobsNotFound(logger, null);
}
public static void CronJobsScheduling(this ILogger logger, IEnumerable<CronJob> jobs)
{
_cronJobsScheduling(logger, jobs.Count(), null);
}
public static void CronJobExecuted(this ILogger logger, string name, double seconds)
{
_cronJobExecuted(logger, name, seconds, null);
}
public static void CronJobFailed(this ILogger logger, string name, Exception ex)
{
_cronJobFailed(logger, name, ex);
}
} }
} }
\ No newline at end of file
using System; //using System;
using System.Collections.Generic; //using System.Collections.Generic;
using System.Text; //using System.Text;
using DotNetCore.CAP.Job; //using DotNetCore.CAP.Job;
using Xunit; //using Xunit;
namespace DotNetCore.CAP.Test.Job //namespace DotNetCore.CAP.Test.Job
{ //{
public class ComputedJobTest // public class ComputedJobTest
{ // {
[Fact] // [Fact]
public void UpdateNext_LastRunNever_SchedulesNow() // public void UpdateNext_LastRunNever_SchedulesNow()
{ // {
// Arrange // // Arrange
var now = new DateTime(2000, 1, 1, 8, 0, 0); // var now = new DateTime(2000, 1, 1, 8, 0, 0);
var cronJob = new CronJob(Cron.Daily()); // var cronJob = new CronJob(Cron.Daily());
var computed = new ComputedCronJob(cronJob); // var computed = new ComputedCronJob(cronJob);
// Act // // Act
computed.UpdateNext(now); // computed.UpdateNext(now);
// Assert // // Assert
Assert.Equal(computed.Next, now); // Assert.Equal(computed.Next, now);
} // }
[Fact] // [Fact]
public void UpdateNext_LastRun_BeforePrev_SchedulesNow() // public void UpdateNext_LastRun_BeforePrev_SchedulesNow()
{ // {
// Arrange // // Arrange
var now = new DateTime(2000, 1, 1, 8, 0, 0); // var now = new DateTime(2000, 1, 1, 8, 0, 0);
var cronJob = new CronJob(Cron.Daily(), now.Subtract(TimeSpan.FromDays(2))); // var cronJob = new CronJob(Cron.Daily(), now.Subtract(TimeSpan.FromDays(2)));
var computed = new ComputedCronJob(cronJob); // var computed = new ComputedCronJob(cronJob);
// Act // // Act
computed.UpdateNext(now); // computed.UpdateNext(now);
// Assert // // Assert
Assert.Equal(computed.Next, now); // Assert.Equal(computed.Next, now);
} // }
[Fact] // [Fact]
public void UpdateNext_LastRun_AfterPrev_SchedulesNormal() // public void UpdateNext_LastRun_AfterPrev_SchedulesNormal()
{ // {
// Arrange // // Arrange
var now = new DateTime(2000, 1, 1, 8, 0, 0); // var now = new DateTime(2000, 1, 1, 8, 0, 0);
var cronJob = new CronJob(Cron.Daily(), now.Subtract(TimeSpan.FromSeconds(5))); // var cronJob = new CronJob(Cron.Daily(), now.Subtract(TimeSpan.FromSeconds(5)));
var computed = new ComputedCronJob(cronJob); // var computed = new ComputedCronJob(cronJob);
// Act // // Act
computed.UpdateNext(now); // computed.UpdateNext(now);
// Assert // // Assert
Assert.True(computed.Next > now); // Assert.True(computed.Next > now);
} // }
} // }
} //}
\ No newline at end of file \ No newline at end of file
...@@ -30,13 +30,13 @@ namespace DotNetCore.CAP.Test.Job ...@@ -30,13 +30,13 @@ namespace DotNetCore.CAP.Test.Job
var services = new ServiceCollection(); var services = new ServiceCollection();
services.AddTransient<JobProcessingServer>(); services.AddTransient<JobProcessingServer>();
services.AddTransient<DefaultCronJobRegistry>(); // services.AddTransient<DefaultCronJobRegistry>();
services.AddLogging(); services.AddLogging();
services.AddSingleton(_options); services.AddSingleton(_options);
services.AddSingleton(_mockStorage.Object); services.AddSingleton(_mockStorage.Object);
_provider = services.BuildServiceProvider(); _provider = services.BuildServiceProvider();
_context = new ProcessingContext(_provider, null, _cancellationTokenSource.Token); _context = new ProcessingContext(_provider, _cancellationTokenSource.Token);
} }
//[Fact] //[Fact]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment