Commit 732878f7 authored by Savorboard's avatar Savorboard

重写存储模块。

parent 2f31e5b0
......@@ -59,6 +59,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.EntityFramew
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test\DotNetCore.CAP.Test\DotNetCore.CAP.Test.csproj", "{F608B509-A99B-4AC7-8227-42051DD4A578}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebApp1", "..\..\CAPTests\WebApp1\WebApp1.csproj", "{880F8E24-5B18-43E6-A75B-8AD7B24FCBEC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebApp2", "..\..\CAPTests\WebApp2\WebApp2.csproj", "{C190100E-EF0D-4C63-9189-F29D0E64D66E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -93,6 +97,14 @@ Global
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.Build.0 = Release|Any CPU
{880F8E24-5B18-43E6-A75B-8AD7B24FCBEC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{880F8E24-5B18-43E6-A75B-8AD7B24FCBEC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{880F8E24-5B18-43E6-A75B-8AD7B24FCBEC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{880F8E24-5B18-43E6-A75B-8AD7B24FCBEC}.Release|Any CPU.Build.0 = Release|Any CPU
{C190100E-EF0D-4C63-9189-F29D0E64D66E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C190100E-EF0D-4C63-9189-F29D0E64D66E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C190100E-EF0D-4C63-9189-F29D0E64D66E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C190100E-EF0D-4C63-9189-F29D0E64D66E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -106,5 +118,7 @@ Global
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{69370370-9873-4D6A-965D-D1E16694047D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{880F8E24-5B18-43E6-A75B-8AD7B24FCBEC} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{C190100E-EF0D-4C63-9189-F29D0E64D66E} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
EndGlobal
......@@ -2,30 +2,18 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.EntityFrameworkCore;
using DotNetCore.CAP.Infrastructure;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka
{
public class AppDbContext : DbContext
public class AppDbContext : CapDbContext
{
public DbSet<CapSentMessage> SentMessages { get; set; }
public DbSet<CapReceivedMessage> ReceivedMessages { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<CapSentMessage>().Property(x => x.StatusName).HasMaxLength(50);
modelBuilder.Entity<CapReceivedMessage>().Property(x => x.StatusName).HasMaxLength(50);
base.OnModelCreating(modelBuilder);
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
......@@ -33,9 +33,15 @@ namespace Sample.Kafka.Controllers
}
[Route("~/send")]
public async Task<IActionResult> SendTopic()
public async Task<IActionResult> SendTopic([FromServices] AppDbContext dbContext)
{
await _producer.PublishAsync("zzwl.topic.finace.callBack", new Person { Name = "Test", Age = 11 });
using (var trans = dbContext.Database.BeginTransaction())
{
await _producer.PublishAsync("zzwl.topic.finace.callBack", new Person { Name = "Test", Age = 11 });
trans.Commit();
}
return Ok();
}
......
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Sample.Kafka;
namespace Sample.Kafka.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20170629074148_InitCreate")]
partial class InitCreate
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
modelBuilder
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapReceivedMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<string>("KeyName");
b.Property<DateTime>("LastRun");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.HasMaxLength(50);
b.HasKey("Id");
b.ToTable("ReceivedMessages");
});
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapSentMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<string>("KeyName");
b.Property<DateTime>("LastRun");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.HasMaxLength(50);
b.HasKey("Id");
b.ToTable("SentMessages");
});
}
}
}
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore.Migrations;
namespace Sample.Kafka.Migrations
{
public partial class InitCreate : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "ReceivedMessages",
columns: table => new
{
Id = table.Column<string>(nullable: false),
Added = table.Column<DateTime>(nullable: false),
Content = table.Column<string>(nullable: true),
KeyName = table.Column<string>(nullable: true),
LastRun = table.Column<DateTime>(nullable: false),
Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_ReceivedMessages", x => x.Id);
});
migrationBuilder.CreateTable(
name: "SentMessages",
columns: table => new
{
Id = table.Column<string>(nullable: false),
Added = table.Column<DateTime>(nullable: false),
Content = table.Column<string>(nullable: true),
KeyName = table.Column<string>(nullable: true),
LastRun = table.Column<DateTime>(nullable: false),
Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_SentMessages", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "ReceivedMessages");
migrationBuilder.DropTable(
name: "SentMessages");
}
}
}
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Sample.Kafka;
namespace Sample.Kafka.Migrations
{
[DbContext(typeof(AppDbContext))]
partial class AppDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
modelBuilder
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapReceivedMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<string>("KeyName");
b.Property<DateTime>("LastRun");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.HasMaxLength(50);
b.HasKey("Id");
b.ToTable("ReceivedMessages");
});
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapSentMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<string>("KeyName");
b.Property<DateTime>("LastRun");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.HasMaxLength(50);
b.HasKey("Id");
b.ToTable("SentMessages");
});
}
}
}
......@@ -29,9 +29,7 @@ namespace Sample.Kafka
.AddEntityFrameworkStores<AppDbContext>()
.AddRabbitMQ(x =>
{
x.HostName = "192.168.2.206";
x.UserName = "admin";
x.Password = "123123";
x.HostName = "localhost";
});
//.AddKafka(x => x.Servers = "");
......
using DotNetCore.CAP;
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
......@@ -19,7 +20,25 @@ namespace Microsoft.Extensions.DependencyInjection
{
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection<TContext>>();
return builder;
}
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<EFOptions> options)
where TContext : DbContext
{
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection<TContext>>();
builder.Services.Configure(options);
return builder;
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFOptions
{
public const string DefaultSchema = "cap";
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorage : IStorage
{
private IServiceProvider _provider;
private ILogger _logger;
public EFStorage(
IServiceProvider provider,
ILogger<EFStorage> logger)
{
_provider = provider;
_logger = logger;
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
using (var scope = _provider.CreateScope())
{
if (cancellationToken.IsCancellationRequested) return;
var provider = scope.ServiceProvider;
var context = provider.GetRequiredService<CapDbContext>();
_logger.LogDebug("Ensuring all migrations are applied to Jobs database.");
await context.Database.MigrateAsync(cancellationToken);
}
}
}
}
using System;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using MR.AspNetCore.Jobs.Models;
using MR.AspNetCore.Jobs.Server;
using MR.AspNetCore.Jobs.Server.States;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorageConnection<TContext> : IStorageConnection where TContext : DbContext
{
private readonly CapDbContext _context;
private readonly EFOptions _options;
public EFStorageConnection(
CapDbContext context,
IOptions<EFOptions> options)
{
_context = context;
_options = options.Value;
}
public CapDbContext Context => _context;
public EFOptions Options => _options;
public Task StoreCronJobAsync(CronJob job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
_context.Add(job);
return _context.SaveChangesAsync();
}
public Task AttachCronJobAsync(CronJob job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
_context.Attach(job);
return Task.FromResult(true);
}
public Task UpdateCronJobAsync(CronJob job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
return _context.SaveChangesAsync();
}
public Task<CronJob[]> GetCronJobsAsync()
{
return _context.CronJobs.ToArrayAsync();
}
public async Task RemoveCronJobAsync(string name)
{
var cronJob = await _context.CronJobs.FirstOrDefaultAsync(j => j.Name == name);
if (cronJob != null)
{
_context.Remove(cronJob);
await _context.SaveChangesAsync();
}
}
public IStorageTransaction CreateTransaction()
{
return new EFStorageTransaction(this);
}
public void Dispose()
{
}
private DateTime? NormalizeDateTime(DateTime? dateTime)
{
if (!dateTime.HasValue) return dateTime;
if (dateTime == DateTime.MinValue)
{
return new DateTime(1754, 1, 1, 0, 0, 0, DateTimeKind.Utc);
}
return dateTime;
}
public Task StoreSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
message.LastRun = NormalizeDateTime(message.LastRun);
_context.Add(message);
return _context.SaveChangesAsync();
}
public Task<CapSentMessage> GetSentMessageAsync(string id)
{
return _context.CapSentMessages.FirstOrDefaultAsync(x => x.Id == id);
}
public Task<IFetchedJob> FetchNextJobAsync()
{
}
public async Task<Job> GetNextJobToBeEnqueuedAsync()
{
var sql = $@"
SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(JobsDbContext.Jobs)}] WITH (readpast)
WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{ScheduledState.StateName}'";
var connection = _context.GetDbConnection();
var job = (await connection.QueryAsync<Job>(sql)).FirstOrDefault();
if (job != null)
{
_context.Attach(job);
}
return job;
}
public Task<IFetchedMessage> FetchNextSentMessageAsync()
{
var sql = $@"
DELETE TOP (1)
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.Id";
//return FetchNextDelayedMessageCoreAsync(sql);
throw new NotImplementedException();
}
//private async Task<IFetchedMessage> FetchNextDelayedMessageCoreAsync(string sql, object args = null)
//{
// FetchedMessage fetchedJob = null;
// var connection = _context.Database.GetDbConnection();
// var transaction = _context.Database.CurrentTransaction;
// transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
// try
// {
// fetchedJob =
// (await _context...QueryAsync<FetchedMessage>(sql, args, transaction.GetDbTransaction()))
// .FirstOrDefault();
// }
// catch (SqlException)
// {
// transaction.Dispose();
// throw;
// }
// if (fetchedJob == null)
// {
// transaction.Rollback();
// transaction.Dispose();
// return null;
// }
// return new SqlServerFetchedJob(
// fetchedJob.JobId,
// connection,
// transaction);
//}
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
throw new NotImplementedException();
}
public Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<CapReceivedMessage> GetReceivedMessageAsync(string id)
{
throw new NotImplementedException();
}
public Task<IFetchedMessage> FetchNextReceivedMessageAsync()
{
throw new NotImplementedException();
}
public Task<CapSentMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
throw new NotImplementedException();
}
}
}
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorageTransaction : IStorageTransaction, IDisposable
{
private EFStorageConnection _connection;
public EFStorageTransaction(EFStorageConnection connection)
{
_connection = connection;
}
public void UpdateJob(Job job)
{
if (job == null) throw new ArgumentNullException(nameof(job));
// NOOP. EF will detect changes.
}
public void EnqueueJob(Job job)
{
}
public Task CommitAsync()
{
return _connection.Context.SaveChangesAsync();
}
public void Dispose()
{
}
public void UpdateMessage(CapSentMessage message)
{
throw new NotImplementedException();
}
public void UpdateMessage(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public void EnqueueMessage(CapSentMessage message)
{
if (job == null) throw new ArgumentNullException(nameof(job));
_connection.Context.Add(new JobQueue
{
JobId = job.Id
});
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (job == null) throw new ArgumentNullException(nameof(job));
_connection.Context.Add(new JobQueue
{
JobId = job.Id
});
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class FetchedMessage
{
public string MessageId { get; set; }
}
}
using System;
namespace DotNetCore.CAP
{
public interface IFetchedMessage : IDisposable
{
int MessageId { get; }
void RemoveFromQueue();
void Requeue();
}
}
using System.Threading;
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
/// <summary>
/// Represents a persisted storage.
/// </summary>
public interface IStorage
{
/// <summary>
/// Initializes the storage. For example, making sure a database is created and migrations are applied.
/// </summary>
Task InitializeAsync(CancellationToken cancellationToken);
}
}
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
/// <summary>
/// Represents a connection to the storage.
/// </summary>
public interface IStorageConnection : IDisposable
{
//Sent messages
/// <summary>
/// Stores the message.
/// </summary>
/// <param name="message">The message to store.</param>
Task StoreSentMessageAsync(CapSentMessage message);
/// <summary>
/// Returns the message with the given id.
/// </summary>
/// <param name="id">The message's id.</param>
Task<CapSentMessage> GetSentMessageAsync(string id);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<IFetchedMessage> FetchNextSentMessageAsync();
/// <summary>
/// Returns the next message to be enqueued.
/// </summary>
Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync();
// Received messages
/// <summary>
/// Stores the message.
/// </summary>
/// <param name="message">The message to store.</param>
Task StoreReceivedMessageAsync(CapReceivedMessage message);
/// <summary>
/// Returns the message with the given id.
/// </summary>
/// <param name="id">The message's id.</param>
Task<CapReceivedMessage> GetReceivedMessageAsync(string id);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<IFetchedMessage> FetchNextReceivedMessageAsync();
/// <summary>
/// Returns the next message to be enqueued.
/// </summary>
Task<CapSentMessage> GetNextReceviedMessageToBeEnqueuedAsync();
//-----------------------------------------
/// <summary>
/// Creates and returns an <see cref="IStorageTransaction"/>.
/// </summary>
IStorageTransaction CreateTransaction();
}
}
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IStorageTransaction : IDisposable
{
void UpdateMessage(CapSentMessage message);
void UpdateMessage(CapReceivedMessage message);
void EnqueueMessage(CapSentMessage message);
void EnqueueMessage(CapReceivedMessage message);
Task CommitAsync();
}
}
using System;
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP.Models
{
public class CapReceivedMessage
{
......
using System;
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP.Models
{
public class CapSentMessage
{
......@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Infrastructure
public DateTime Added { get; set; }
public DateTime LastRun { get; set; }
public DateTime? LastRun { get; set; }
public int Retries { get; set; }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment