Commit 20b90339 authored by yangxiaodong's avatar yangxiaodong

Refactor message storage.

parent 732878f7
......@@ -21,7 +21,7 @@ namespace Microsoft.Extensions.DependencyInjection
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection<TContext>>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
return builder;
}
......@@ -30,10 +30,11 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<EFOptions> options)
where TContext : DbContext
{
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection<TContext>>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
builder.Services.Configure(options);
return builder;
......
using DotNetCore.CAP.Infrastructure;
using System.Data.Common;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore
......@@ -8,6 +10,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </summary>
public class CapDbContext : DbContext
{
private readonly EFOptions _efOptions;
/// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary>
......@@ -17,18 +21,26 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary>
/// <param name="options">The options to be used by a <see cref="DbContext"/>.</param>
public CapDbContext(DbContextOptions options) : base(options) { }
public CapDbContext(DbContextOptions<CapDbContext> options, EFOptions efOptions)
: base(options) {
_efOptions = efOptions;
}
/// <summary>
/// Gets or sets the <see cref="CapSentMessage"/> of Messages.
/// </summary>
public DbSet<CapSentMessage> CapSentMessages { get; set; }
public DbSet<CapQueue> CapQueue { get; set; }
/// <summary>
/// Gets or sets the <see cref="CapReceivedMessages"/> of Messages.
/// </summary>
public DbSet<CapReceivedMessage> CapReceivedMessages { get; set; }
public DbConnection GetDbConnection() => Database.GetDbConnection();
/// <summary>
/// Configures the schema for the identity framework.
/// </summary>
......@@ -37,15 +49,20 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.HasDefaultSchema(_efOptions.Schema);
modelBuilder.Entity<CapSentMessage>(b =>
{
b.HasKey(m => m.Id);
b.Property(p => p.StatusName).HasMaxLength(50);
b.HasIndex(x => x.StatusName);
b.Property(p => p.StatusName).IsRequired().HasMaxLength(50);
});
modelBuilder.Entity<CapReceivedMessage>(b =>
{
b.Property(p => p.StatusName).HasMaxLength(50);
b.HasKey(m => m.Id);
b.HasIndex(x => x.StatusName);
b.Property(p => p.StatusName).IsRequired().HasMaxLength(50);
});
}
}
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore
......
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using Microsoft.EntityFrameworkCore.Storage;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbContextTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();
public EFFetchedMessage(string messageId,
IDbConnection connection,
IDbContextTransaction transaction)
{
MessageId = messageId;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public string MessageId { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction.GetDbTransaction());
}
catch
{
}
}
}
}
}
using System;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using MR.AspNetCore.Jobs.Models;
using MR.AspNetCore.Jobs.Server;
using MR.AspNetCore.Jobs.Server.States;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorageConnection<TContext> : IStorageConnection where TContext : DbContext
public class EFStorageConnection : IStorageConnection
{
private readonly CapDbContext _context;
private readonly EFOptions _options;
......@@ -31,67 +26,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public EFOptions Options => _options;
public Task StoreCronJobAsync(CronJob job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
_context.Add(job);
return _context.SaveChangesAsync();
}
public Task AttachCronJobAsync(CronJob job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
_context.Attach(job);
return Task.FromResult(true);
}
public Task UpdateCronJobAsync(CronJob job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
return _context.SaveChangesAsync();
}
public Task<CronJob[]> GetCronJobsAsync()
{
return _context.CronJobs.ToArrayAsync();
}
public async Task RemoveCronJobAsync(string name)
{
var cronJob = await _context.CronJobs.FirstOrDefaultAsync(j => j.Name == name);
if (cronJob != null)
{
_context.Remove(cronJob);
await _context.SaveChangesAsync();
}
}
public IStorageTransaction CreateTransaction()
{
return new EFStorageTransaction(this);
}
public void Dispose()
{
}
private DateTime? NormalizeDateTime(DateTime? dateTime)
{
if (!dateTime.HasValue) return dateTime;
if (dateTime == DateTime.MinValue)
{
return new DateTime(1754, 1, 1, 0, 0, 0, DateTimeKind.Utc);
}
return dateTime;
}
public Task StoreSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
......@@ -107,96 +46,83 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return _context.CapSentMessages.FirstOrDefaultAsync(x => x.Id == id);
}
public Task<IFetchedJob> FetchNextJobAsync()
public async Task<IFetchedMessage> FetchNextSentMessageAsync()
{
// var sql = $@"
//DELETE TOP (1)
//FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast, updlock, rowlock)
//OUTPUT DELETED.Id";
var queueFirst = await _context.CapQueue.FirstOrDefaultAsync();
if (queueFirst == null)
return null;
_context.CapQueue.Remove(queueFirst);
var connection = _context.Database.GetDbConnection();
var transaction = _context.Database.CurrentTransaction;
transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
return new EFFetchedMessage(queueFirst.MessageId, connection, transaction);
}
public async Task<Job> GetNextJobToBeEnqueuedAsync()
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
var sql = $@"
SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(JobsDbContext.Jobs)}] WITH (readpast)
WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{ScheduledState.StateName}'";
// var sql = $@"
//SELECT TOP (1) *
//FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
//WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{StatusName.Enqueued}'";
var connection = _context.GetDbConnection();
// var connection = _context.GetDbConnection();
var job = (await connection.QueryAsync<Job>(sql)).FirstOrDefault();
// var message = _context.CapSentMessages.FromSql(sql).FirstOrDefaultAsync();
if (job != null)
var message = _context.CapSentMessages.Where(x => x.StatusName == StatusName.Enqueued).FirstOrDefaultAsync();
if (message != null)
{
_context.Attach(job);
_context.Attach(message);
}
return job;
return message;
}
public Task<IFetchedMessage> FetchNextSentMessageAsync()
public Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql = $@"
DELETE TOP (1)
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.Id";
if (message == null) throw new ArgumentNullException(nameof(message));
//return FetchNextDelayedMessageCoreAsync(sql);
throw new NotImplementedException();
}
message.LastRun = NormalizeDateTime(message.LastRun);
//private async Task<IFetchedMessage> FetchNextDelayedMessageCoreAsync(string sql, object args = null)
//{
// FetchedMessage fetchedJob = null;
// var connection = _context.Database.GetDbConnection();
// var transaction = _context.Database.CurrentTransaction;
// transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
// try
// {
// fetchedJob =
// (await _context...QueryAsync<FetchedMessage>(sql, args, transaction.GetDbTransaction()))
// .FirstOrDefault();
// }
// catch (SqlException)
// {
// transaction.Dispose();
// throw;
// }
// if (fetchedJob == null)
// {
// transaction.Rollback();
// transaction.Dispose();
// return null;
// }
// return new SqlServerFetchedJob(
// fetchedJob.JobId,
// connection,
// transaction);
//}
_context.Add(message);
return _context.SaveChangesAsync();
}
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
public Task<CapReceivedMessage> GetReceivedMessageAsync(string id)
{
throw new NotImplementedException();
return _context.CapReceivedMessages.FirstOrDefaultAsync(x => x.Id == id);
}
public Task StoreReceivedMessageAsync(CapReceivedMessage message)
public Task<IFetchedMessage> FetchNextReceivedMessageAsync()
{
throw new NotImplementedException();
}
public Task<CapReceivedMessage> GetReceivedMessageAsync(string id)
public Task<CapSentMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
throw new NotImplementedException();
}
public Task<IFetchedMessage> FetchNextReceivedMessageAsync()
private DateTime? NormalizeDateTime(DateTime? dateTime)
{
throw new NotImplementedException();
if (!dateTime.HasValue) return dateTime;
if (dateTime == DateTime.MinValue)
{
return new DateTime(1754, 1, 1, 0, 0, 0, DateTimeKind.Utc);
}
return dateTime;
}
public Task<CapSentMessage> GetNextReceviedMessageToBeEnqueuedAsync()
public void Dispose()
{
throw new NotImplementedException();
}
}
}
......@@ -4,64 +4,60 @@ using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorageTransaction : IStorageTransaction, IDisposable
{
private EFStorageConnection _connection;
public class EFStorageTransaction
: IStorageTransaction, IDisposable
{
private EFStorageConnection _connection;
public EFStorageTransaction(EFStorageConnection connection)
{
_connection = connection;
}
public void UpdateJob(Job job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
// NOOP. EF will detect changes.
}
public void EnqueueJob(Job job)
{
}
public Task CommitAsync()
{
return _connection.Context.SaveChangesAsync();
}
public void Dispose()
{
}
public EFStorageTransaction(EFStorageConnection connection)
{
_connection = connection;
}
public void UpdateMessage(CapSentMessage message)
{
throw new NotImplementedException();
if (message == null) throw new ArgumentNullException(nameof(message));
// NOOP. EF will detect changes.
}
public void UpdateMessage(CapReceivedMessage message)
{
throw new NotImplementedException();
if (message == null) throw new ArgumentNullException(nameof(message));
// NOOP. EF will detect changes.
}
public void EnqueueMessage(CapSentMessage message)
{
if (job == null) throw new ArgumentNullException(nameof(job));
if (message == null) throw new ArgumentNullException(nameof(message));
_connection.Context.Add(new JobQueue
_connection.Context.Add(new CapQueue
{
JobId = job.Id
MessageId = message.Id,
Type = 0
});
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (job == null) throw new ArgumentNullException(nameof(job));
if (message == null) throw new ArgumentNullException(nameof(message));
_connection.Context.Add(new JobQueue
_connection.Context.Add(new CapQueue
{
JobId = job.Id
MessageId = message.Id,
Type = 1
});
}
public Task CommitAsync()
{
return _connection.Context.SaveChangesAsync();
}
public void Dispose()
{
}
}
}
using System;
using System.Collections.Generic;
using System.Data;
namespace DotNetCore.CAP.EntityFrameworkCore
{
static class HelperExtensions
{
public static void Execute(this IDbConnection connection, string sql, IDbTransaction transcation = null)
{
try
{
connection.Open();
using (var command = connection.CreateCommand())
{
command.CommandText = "SELELCT 1";
if (transcation != null)
command.Transaction = transcation;
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
......
......@@ -5,6 +5,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......
......@@ -4,7 +4,7 @@ namespace DotNetCore.CAP
{
public interface IFetchedMessage : IDisposable
{
int MessageId { get; }
string MessageId { get; }
void RemoveFromQueue();
......
namespace DotNetCore.CAP.Models
{
public class CapQueue
{
public int Id { get; set; }
public string MessageId { get; set; }
/// <summary>
/// 0 is CapSentMessage, 1 is CapReceviedMessage
/// </summary>
public int Type { get; set; }
}
}
using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Models
{
......@@ -33,7 +34,7 @@ namespace DotNetCore.CAP.Models
public DateTime Added { get; set; }
public DateTime LastRun { get; set; }
public DateTime? LastRun { get; set; }
public int Retries { get; set; }
......
using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Models
{
......
using System;
using System.Linq;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
......
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
......
......@@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Test
{
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment