Commit 475ace0e authored by Savorboard's avatar Savorboard

rename keyName to name.

parent 14a3d4a5
......@@ -25,12 +25,13 @@ namespace Sample.Kafka
{
services.AddDbContext<AppDbContext>();
services.AddCap(x=> {
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
x.UseRabbitMQ("localhost");
});
// Add framework services.
services.AddMvc();
}
......
......@@ -44,4 +44,4 @@ namespace Microsoft.Extensions.DependencyInjection
return options;
}
}
}
}
\ No newline at end of file
using System;
using Microsoft.EntityFrameworkCore;
using DotNetCore.CAP.EntityFrameworkCore;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
......@@ -17,8 +16,8 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, EFStorage>();
services.AddScoped<IStorageConnection, EFStorageConnection>();
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddScoped<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
......@@ -27,16 +26,6 @@ namespace DotNetCore.CAP
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);
services.AddSingleton(sqlServerOptions);
services.AddDbContext<CapDbContext>(options =>
{
options.UseSqlServer(sqlServerOptions.ConnectionString, sqlOpts =>
{
sqlOpts.MigrationsHistoryTable(
sqlServerOptions.MigrationsHistoryTableName,
sqlServerOptions.MigrationsHistoryTableSchema ?? sqlServerOptions.Schema);
});
});
}
}
}
using System.Data.Common;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore
{
/// <summary>
/// Base class for the Entity Framework database context used for CAP.
/// </summary>
public class CapDbContext : DbContext
{
private SqlServerOptions _sqlServerOptions;
/// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary>
public CapDbContext() { }
/// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary>
/// <param name="options">The options to be used by a <see cref="DbContext"/>.</param>
public CapDbContext(DbContextOptions<CapDbContext> options, SqlServerOptions sqlServerOptions)
: base(options) {
_sqlServerOptions = sqlServerOptions;
}
/// <summary>
/// Gets or sets the <see cref="CapSentMessage"/> of Messages.
/// </summary>
public DbSet<CapSentMessage> CapSentMessages { get; set; }
public DbSet<CapQueue> CapQueue { get; set; }
/// <summary>
/// Gets or sets the <see cref="CapReceivedMessages"/> of Messages.
/// </summary>
public DbSet<CapReceivedMessage> CapReceivedMessages { get; set; }
public DbConnection GetDbConnection() => Database.GetDbConnection();
/// <summary>
/// Configures the schema for the identity framework.
/// </summary>
/// <param name="modelBuilder">
/// The builder being used to construct the model for this context.
/// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
_sqlServerOptions = new SqlServerOptions();
modelBuilder.HasDefaultSchema(_sqlServerOptions.Schema);
modelBuilder.Entity<CapSentMessage>(b =>
{
b.HasKey(m => m.Id);
b.HasIndex(x => x.StatusName);
b.Property(p => p.StatusName).IsRequired().HasMaxLength(50);
});
modelBuilder.Entity<CapReceivedMessage>(b =>
{
b.HasKey(m => m.Id);
b.HasIndex(x => x.StatusName);
b.Property(p => p.StatusName).IsRequired().HasMaxLength(50);
});
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
// optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
\ No newline at end of file
......@@ -80,14 +80,14 @@ namespace DotNetCore.CAP.EntityFrameworkCore
private async Task PublishWithTrans(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapSentMessage
var message = new CapPublishedMessage
{
KeyName = topic,
Name = topic,
Content = content,
StatusName = StatusName.Scheduled
};
var sql = $"INSERT INTO {_options.Schema}.[{nameof(CapDbContext.CapSentMessages)}] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
var sql = $"INSERT INTO {_options.Schema}.[Published] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction);
PublishQueuer.PulseEvent.Set();
......
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorage : IStorage
{
private IServiceProvider _provider;
private ILogger _logger;
public EFStorage(
IServiceProvider provider,
ILogger<EFStorage> logger)
{
_provider = provider;
_logger = logger;
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
using (var scope = _provider.CreateScope())
{
if (cancellationToken.IsCancellationRequested) return;
var provider = scope.ServiceProvider;
var context = provider.GetRequiredService<CapDbContext>();
_logger.LogDebug("Ensuring all migrations are applied to Jobs database.");
try
{
await context.Database.MigrateAsync(cancellationToken);
}
catch (Exception ex)
{
throw ex;
}
}
}
}
}
......@@ -7,8 +7,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
{
public class FetchedMessage
{
public string MessageId { get; set; }
public int MessageId { get; set; }
public MessageType Type { get; set; }
public MessageType MessageType { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Text;
using System.Threading.Tasks;
using Dapper;
......@@ -21,8 +22,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
private static readonly string[] Tables =
{
nameof(CapDbContext.CapSentMessages),
nameof(CapDbContext.CapReceivedMessages),
"Published","Received"
};
public DefaultAdditionalProcessor(
......@@ -44,18 +44,14 @@ namespace DotNetCore.CAP.EntityFrameworkCore
var removedCount = 0;
do
{
using (var scope = _provider.CreateScope())
using(var connection = new SqlConnection(_options.ConnectionString))
{
var provider = scope.ServiceProvider;
var jobsDbContext = provider.GetService<CapDbContext>();
var connection = jobsDbContext.GetDbConnection();
removedCount = await connection.ExecuteAsync($@"
DELETE TOP (@count)
FROM [{_options.Schema}].[{table}] WITH (readpast)
WHERE ExpiresAt < @now;", new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
{
await context.WaitAsync(_delay);
......
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using DotNetCore.CAP.EntityFrameworkCore;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.EntityFrameworkCore.Migrations
{
[DbContext(typeof(CapDbContext))]
[Migration("20170714102709_InitializeDB")]
partial class InitializeDB
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
modelBuilder
.HasDefaultSchema("cap")
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("DotNetCore.CAP.Models.CapQueue", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<string>("MessageId");
b.Property<int>("Type");
b.HasKey("Id");
b.ToTable("CapQueue");
});
modelBuilder.Entity("DotNetCore.CAP.Models.CapReceivedMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<DateTime?>("ExpiresAt");
b.Property<string>("Group");
b.Property<string>("KeyName");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.IsRequired()
.HasMaxLength(50);
b.HasKey("Id");
b.HasIndex("StatusName");
b.ToTable("CapReceivedMessages");
});
modelBuilder.Entity("DotNetCore.CAP.Models.CapSentMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<DateTime?>("ExpiresAt");
b.Property<string>("KeyName");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.IsRequired()
.HasMaxLength(50);
b.HasKey("Id");
b.HasIndex("StatusName");
b.ToTable("CapSentMessages");
});
}
}
}
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Metadata;
namespace DotNetCore.CAP.EntityFrameworkCore.Migrations
{
public partial class InitializeDB : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.EnsureSchema(
name: "cap");
migrationBuilder.CreateTable(
name: "CapQueue",
schema: "cap",
columns: table => new
{
Id = table.Column<int>(nullable: false)
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn),
MessageId = table.Column<string>(nullable: true),
Type = table.Column<int>(nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_CapQueue", x => x.Id);
});
migrationBuilder.CreateTable(
name: "CapReceivedMessages",
schema: "cap",
columns: table => new
{
Id = table.Column<string>(nullable: false),
Added = table.Column<DateTime>(nullable: false),
Content = table.Column<string>(nullable: true),
ExpiresAt = table.Column<DateTime>(nullable: true),
Group = table.Column<string>(nullable: true),
KeyName = table.Column<string>(nullable: true),
Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_CapReceivedMessages", x => x.Id);
});
migrationBuilder.CreateTable(
name: "CapSentMessages",
schema: "cap",
columns: table => new
{
Id = table.Column<string>(nullable: false),
Added = table.Column<DateTime>(nullable: false),
Content = table.Column<string>(nullable: true),
ExpiresAt = table.Column<DateTime>(nullable: true),
KeyName = table.Column<string>(nullable: true),
Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_CapSentMessages", x => x.Id);
});
migrationBuilder.CreateIndex(
name: "IX_CapReceivedMessages_StatusName",
schema: "cap",
table: "CapReceivedMessages",
column: "StatusName");
migrationBuilder.CreateIndex(
name: "IX_CapSentMessages_StatusName",
schema: "cap",
table: "CapSentMessages",
column: "StatusName");
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "CapQueue",
schema: "cap");
migrationBuilder.DropTable(
name: "CapReceivedMessages",
schema: "cap");
migrationBuilder.DropTable(
name: "CapSentMessages",
schema: "cap");
}
}
}
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using DotNetCore.CAP.EntityFrameworkCore;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.EntityFrameworkCore.Migrations
{
[DbContext(typeof(CapDbContext))]
partial class CapDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
modelBuilder
.HasDefaultSchema("cap")
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("DotNetCore.CAP.Models.CapQueue", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<string>("MessageId");
b.Property<int>("Type");
b.HasKey("Id");
b.ToTable("CapQueue");
});
modelBuilder.Entity("DotNetCore.CAP.Models.CapReceivedMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<DateTime?>("ExpiresAt");
b.Property<string>("Group");
b.Property<string>("KeyName");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.IsRequired()
.HasMaxLength(50);
b.HasKey("Id");
b.HasIndex("StatusName");
b.ToTable("CapReceivedMessages");
});
modelBuilder.Entity("DotNetCore.CAP.Models.CapSentMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<DateTime?>("ExpiresAt");
b.Property<string>("KeyName");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.IsRequired()
.HasMaxLength(50);
b.HasKey("Id");
b.HasIndex("StatusName");
b.ToTable("CapSentMessages");
});
}
}
}
......@@ -9,29 +9,29 @@ using Microsoft.EntityFrameworkCore.Storage;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFFetchedMessage : IFetchedMessage
public class SqlServerFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbContextTransaction _transaction;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();
public EFFetchedMessage(string messageId,
public SqlServerFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbContextTransaction transaction)
IDbTransaction transaction)
{
MessageId = messageId;
Type = type;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public string MessageId { get; }
public int MessageId { get; }
public MessageType Type { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
......@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
{
try
{
_connection?.Execute("SELECT 1", _transaction.GetDbTransaction());
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
......
using System;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class SqlServerStorage : IStorage
{
private IServiceProvider _provider;
private ILogger _logger;
public SqlServerStorage(
IServiceProvider provider,
ILogger<SqlServerStorage> logger)
{
_provider = provider;
_logger = logger;
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
using (var scope = _provider.CreateScope())
{
if (cancellationToken.IsCancellationRequested) return;
var provider = scope.ServiceProvider;
var options = provider.GetRequiredService<SqlServerOptions>();
var sql = CreateDbTablesScript(options.Schema);
using (var connection = new SqlConnection(options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
}
protected virtual string CreateDbTablesScript(string schema)
{
var batchSQL =
$@"
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
BEGIN
EXEC('CREATE SCHEMA {schema}')
END
GO
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Queue](
[MessageId] [int] NOT NULL,
[MessageType] [tinyint] NOT NULL
) ON [PRIMARY]
END
GO
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Received](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Group] [nvarchar](200) NULL,
[Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL,
[Added] [datetime2](7) NOT NULL,
[ExpiresAt] [datetime2](7) NULL,
[StatusName] [nvarchar](50) NOT NULL,
CONSTRAINT [PK_{schema}.Received] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END
GO
IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Published](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL,
[Added] [datetime2](7) NOT NULL,
[ExpiresAt] [datetime2](7) NULL,
[StatusName] [nvarchar](50) NOT NULL,
CONSTRAINT [PK_{schema}.Published] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END
GO";
return batchSQL;
}
}
}
using System;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorageConnection : IStorageConnection
public class SqlServerStorageConnection : IStorageConnection
{
private readonly CapDbContext _context;
private readonly SqlServerOptions _options;
public EFStorageConnection(
CapDbContext context,
IOptions<SqlServerOptions> options)
public SqlServerStorageConnection(IOptions<SqlServerOptions> options)
{
_context = context;
_options = options.Value;
}
public CapDbContext Context => _context;
public SqlServerOptions Options => _options;
public IStorageTransaction CreateTransaction()
{
return new EFStorageTransaction(this);
return new SqlServerStorageTransaction(this);
}
public Task<CapSentMessage> GetSentMessageAsync(string id)
public Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
return _context.CapSentMessages.FirstOrDefaultAsync(x => x.Id == id);
var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"
DELETE TOP (1)
FROM [{_options.Schema}].[{nameof(CapDbContext.CapQueue)}] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[Type];";
FROM [{_options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[MessageType];";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $@"
SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
WHERE StatusName = '{StatusName.Scheduled}'";
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapSentMessage>(sql)).FirstOrDefault();
if (message != null)
using (var connection = new SqlConnection(_options.ConnectionString))
{
_context.Attach(message);
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
return message;
}
// CapReceviedMessage
......@@ -75,31 +61,32 @@ WHERE StatusName = '{StatusName.Scheduled}'";
{
if (message == null) throw new ArgumentNullException(nameof(message));
_context.Add(message);
return _context.SaveChangesAsync();
var sql = $@"
INSERT INTO [{_options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return connection.ExecuteAsync(sql, message);
}
}
public Task<CapReceivedMessage> GetReceivedMessageAsync(string id)
public Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
return _context.CapReceivedMessages.FirstOrDefaultAsync(x => x.Id == id);
var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $@"
SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapReceivedMessages)}] WITH (readpast)
WHERE StatusName = '{StatusName.Enqueued}'";
var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapReceivedMessage>(sql)).FirstOrDefault();
if (message != null)
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
_context.Attach(message);
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
return message;
}
public void Dispose()
......@@ -108,35 +95,28 @@ WHERE StatusName = '{StatusName.Enqueued}'";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
FetchedMessage fetchedMessage = null;
var connection = _context.GetDbConnection();
var transaction = _context.Database.CurrentTransaction;
transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
FetchedMessage fetched = null;
try
using (var connection = new SqlConnection(_options.ConnectionString))
{
fetchedMessage =
(await connection.QueryAsync<FetchedMessage>(sql, args, transaction.GetDbTransaction()))
.FirstOrDefault();
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
try
{
fetched = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
if (fetched == null)
return null;
return new SqlServerFetchedMessage(fetched.MessageId, fetched.MessageType, connection, transaction);
}
catch (Exception)
{
transaction.Rollback();
return null;
}
}
}
catch (SqlException)
{
transaction.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
return null;
}
return new EFFetchedMessage(
fetchedMessage.MessageId,
fetchedMessage.Type,
connection,
transaction);
}
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class EFStorageTransaction
: IStorageTransaction, IDisposable
public class SqlServerStorageTransaction : IStorageTransaction, IDisposable
{
private EFStorageConnection _connection;
private readonly SqlServerStorageConnection _connection;
private readonly SqlServerOptions _options;
private readonly string _schema;
public EFStorageTransaction(EFStorageConnection connection)
private IDbTransaction _dbTransaction;
private IDbConnection _dbConnection;
public SqlServerStorageTransaction(SqlServerStorageConnection connection)
{
_connection = connection;
_options = _connection.Options;
_schema = _options.Schema;
_dbConnection = new SqlConnection(_options.ConnectionString);
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapSentMessage message)
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
// NOOP. EF will detect changes.
var sql = $"UPDATE [{_schema}].[Published] SET [ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
// NOOP. EF will detect changes.
var sql = $"UPDATE [{_schema}].[Received] SET [ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message);
}
public void EnqueueMessage(CapSentMessage message)
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
_connection.Context.Add(new CapQueue
{
MessageId = message.Id,
Type = 0
});
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish });
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
_connection.Context.Add(new CapQueue
{
MessageId = message.Id,
Type = MessageType.Subscribe
});
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe });
}
public Task CommitAsync()
{
return _connection.Context.SaveChangesAsync();
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
}
\ No newline at end of file
......@@ -16,9 +16,9 @@ namespace DotNetCore.CAP
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <param name="topic">the topic name or exchange router key.</param>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
Task PublishAsync(string topic, string content);
Task PublishAsync(string name, string content);
/// <summary>
/// Publis a object message to specified topic.
......@@ -28,25 +28,25 @@ namespace DotNetCore.CAP
/// </para>
/// </summary>
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="topic">the topic name or exchange router key.</param>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">object instance that will be serialized of json.</param>
Task PublishAsync<T>(string topic, T contentObj);
Task PublishAsync<T>(string name, T contentObj);
/// <summary>
/// Publish a string message to specified topic with transacton.
/// </summary>
/// <param name="topic">the topic name or exchange router key.</param>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the dbConnection of <see cref="IDbConnection"/></param>
Task PublishAsync(string topic, string content, IDbConnection dbConnection);
Task PublishAsync(string name, string content, IDbConnection dbConnection);
/// <summary>
/// Publish a string message to specified topic with transacton.
/// </summary>
/// <param name="topic">the topic name or exchange router key.</param>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction);
Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction);
}
}
\ No newline at end of file
......@@ -101,7 +101,7 @@ namespace DotNetCore.CAP
{
client.MessageReceieved += (sender, message) =>
{
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content);
_logger.EnqueuingReceivedMessage(message.Name, message.Content);
using (var scope = _serviceProvider.CreateScope())
{
......
......@@ -3,14 +3,14 @@ using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IFetchedMessage : IDisposable
{
string MessageId { get; }
public interface IFetchedMessage : IDisposable
{
int MessageId { get; }
MessageType Type { get; }
MessageType MessageType { get; }
void RemoveFromQueue();
void RemoveFromQueue();
void Requeue();
}
}
void Requeue();
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
......@@ -92,7 +89,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex);
}
}
......@@ -102,11 +99,11 @@ namespace DotNetCore.CAP
{
try
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name);
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{
throw new SubscriberNotFoundException(receivedMessage.KeyName + " has not been found.");
throw new SubscriberNotFoundException(receivedMessage.Name + " has not been found.");
}
// If there are multiple consumers in the same group, we will take the first
......@@ -123,7 +120,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex);
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", ex);
return OperateResult.Failed(ex);
}
}
......@@ -148,6 +145,5 @@ namespace DotNetCore.CAP
}
return true;
}
}
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@
{
public string Group { get; set; }
public string KeyName { get; set; }
public string Name { get; set; }
public string Content { get; set; }
}
......
......@@ -2,13 +2,11 @@
{
public class CapQueue
{
public int Id { get; set; }
public string MessageId { get; set; }
public int MessageId { get; set; }
/// <summary>
/// 0 is CapSentMessage, 1 is CapReceviedMessage
/// </summary>
public MessageType Type { get; set; }
public MessageType MessageType { get; set; }
}
}
......@@ -13,22 +13,21 @@ namespace DotNetCore.CAP.Models
/// </remarks>
public CapReceivedMessage()
{
Id = Guid.NewGuid().ToString();
Added = DateTime.Now;
}
public CapReceivedMessage(MessageContext message) : this()
{
Group = message.Group;
KeyName = message.KeyName;
Name = message.Name;
Content = message.Content;
}
public string Id { get; set; }
public int Id { get; set; }
public string Group { get; set; }
public string KeyName { get; set; }
public string Name { get; set; }
public string Content { get; set; }
......@@ -45,7 +44,7 @@ namespace DotNetCore.CAP.Models
return new MessageContext
{
Group = Group,
KeyName = KeyName,
Name = Name,
Content = Content
};
}
......
......@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.Processor
{
using (fetched)
{
var queueExecutor = _queueExecutorFactory.GetInstance(fetched.Type);
var queueExecutor = _queueExecutorFactory.GetInstance(fetched.MessageType);
await queueExecutor.ExecuteAsync(connection, fetched);
}
}
......
......@@ -38,13 +38,13 @@ namespace DotNetCore.CAP.Processor
{
using (var scope = _provider.CreateScope())
{
CapSentMessage sentMessage;
CapPublishedMessage sentMessage;
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();
while (
!context.IsStopping &&
(sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null)
(sentMessage = await connection.GetNextPublishedMessageToBeEnqueuedAsync()) != null)
{
var state = new EnqueuedState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment