Commit 0308c030 authored by Savorboard's avatar Savorboard

Refactor mongodb module for new transaction mode

parent 53c5c74c
......@@ -24,9 +24,12 @@ namespace DotNetCore.CAP
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddScoped<ICapPublisher, SqlServerPublisher>();
services.AddScoped<ICallbackPublisher, SqlServerPublisher>();
services.AddTransient<ICollectProcessor, SqlServerCollectProcessor>();
services.AddTransient<CapTransactionBase, SqlServerCapTransaction>();
AddSqlServerOptions(services);
}
......
......@@ -4,26 +4,26 @@
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
public class SqlServerPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly SqlServerOptions _options;
private readonly bool _isUsingEF;
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, SqlServerOptions options)
: base(logger, dispatcher)
private SqlConnection _connection;
public SqlServerPublisher(IServiceProvider provider, SqlServerOptions options) : base(provider)
{
ServiceProvider = provider;
_options = options;
if (_options.DbContextType == null)
......@@ -31,57 +31,63 @@ namespace DotNetCore.CAP.SqlServer
return;
}
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
_isUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
await PublishAsyncInternal(message);
}
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
return conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
protected override void PrepareConnectionForEF()
protected override object GetDbTransaction()
{
if (_isUsingEF)
{
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
if (dbContextTransaction == null)
{
IsCapOpenedTrans = true;
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
return InitDbConnection();
}
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
return dbContextTransaction;
}
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
return InitDbConnection();
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
$"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
private IDbTransaction InitDbConnection()
{
_connection = new SqlConnection(_options.ConnectionString);
_connection.Open();
return _connection.BeginTransaction(IsolationLevel.ReadCommitted);
}
#endregion private methods
public void Dispose()
{
_dbContext?.Dispose();
_connection?.Dispose();
}
}
}
\ No newline at end of file
using System.Data;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore.Storage;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerCapTransaction : CapTransactionBase
{
public SqlServerCapTransaction(IDispatcher dispatcher) : base(dispatcher) { }
public override void Commit()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Commit();
break;
}
Flush();
}
public override void Rollback()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Rollback();
break;
}
}
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
}
}
public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
}
}
......@@ -38,7 +38,7 @@ namespace DotNetCore.CAP.SqlServer
return new SqlServerMonitoringApi(this, _options);
}
public async Task InitializeAsync(CancellationToken cancellationToken)
public async Task InitializeAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
......@@ -64,15 +64,10 @@ BEGIN
EXEC('CREATE SCHEMA [{schema}]')
END;
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NOT NULL
BEGIN
DROP TABLE [{schema}].[Queue];
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Received](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Id] [bigint] NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Group] [nvarchar](200) NULL,
[Content] [nvarchar](max) NULL,
......@@ -90,7 +85,7 @@ END;
IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Published](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Id] [bigint] NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL,
......
......@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.SqlServer
return new SqlServerStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
......@@ -50,7 +50,7 @@ namespace DotNetCore.CAP.SqlServer
}
}
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
......@@ -58,16 +58,16 @@ namespace DotNetCore.CAP.SqlServer
}
var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
INSERT INTO [{Options.Schema}].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.ExecuteScalarAsync<int>(sql, message);
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(Options.ConnectionString))
......@@ -87,7 +87,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
}
}
public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
......@@ -98,7 +98,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
}
}
public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
......@@ -108,9 +108,5 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
return connection.Execute(sql) > 0;
}
}
public void Dispose()
{
}
}
}
\ No newline at end of file
......@@ -62,29 +62,5 @@ namespace DotNetCore.CAP.SqlServer
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using Dapper;
using Microsoft.Extensions.Logging;
using Moq;
namespace DotNetCore.CAP.SqlServer.Test
{
public abstract class DatabaseTestHost : TestHost
public abstract class DatabaseTestHost:IDisposable
{
private static bool _sqlObjectInstalled;
public static object _lock = new object();
protected ILogger<SqlServerStorage> Logger;
protected CapOptions CapOptions;
protected SqlServerOptions SqlSeverOptions;
protected override void PostBuildServices()
{
base.PostBuildServices();
lock (_lock)
{
if (!_sqlObjectInstalled)
public bool SqlObjectInstalled;
protected DatabaseTestHost()
{
Logger = new Mock<ILogger<SqlServerStorage>>().Object;
CapOptions = new Mock<CapOptions>().Object;
SqlSeverOptions = new Mock<SqlServerOptions>()
.SetupProperty(x => x.ConnectionString, ConnectionUtil.GetConnectionString())
.Object;
InitializeDatabase();
}
}
}
public override void Dispose()
public void Dispose()
{
DeleteAllData();
base.Dispose();
}
private void InitializeDatabase()
{
using (CreateScope())
{
var storage = GetService<SqlServerStorage>();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).GetAwaiter().GetResult();
_sqlObjectInstalled = true;
}
}
private void CreateDatabase()
{
var masterConn = ConnectionUtil.GetMasterConnectionString();
var databaseName = ConnectionUtil.GetDatabaseName();
......@@ -50,8 +41,12 @@ namespace DotNetCore.CAP.SqlServer.Test
IF NOT EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}')
CREATE DATABASE [{databaseName}];");
}
new SqlServerStorage(Logger, CapOptions, SqlSeverOptions).InitializeAsync().GetAwaiter().GetResult();
SqlObjectInstalled = true;
}
private void DeleteAllData()
{
var conn = ConnectionUtil.GetConnectionString();
......
......@@ -10,30 +10,31 @@ namespace DotNetCore.CAP.SqlServer.Test
[Collection("sqlserver")]
public class SqlServerStorageConnectionTest : DatabaseTestHost
{
private SqlServerStorageConnection _storage;
private readonly SqlServerStorageConnection _storage;
public SqlServerStorageConnectionTest()
{
var options = GetService<SqlServerOptions>();
var capOptions = GetService<CapOptions>();
_storage = new SqlServerStorageConnection(options, capOptions);
_storage = new SqlServerStorageConnection(SqlSeverOptions, CapOptions);
}
[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = "INSERT INTO [Cap].[Published]([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) OUTPUT INSERTED.Id VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = "INSERT INTO [Cap].[Published]([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage
{
Id= insertedId,
Name = "SqlServerStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
await connection.ExecuteAsync(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
Assert.Equal("SqlServerStorageConnectionTest", message.Name);
......@@ -41,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer.Test
}
[Fact]
public async Task StoreReceivedMessageAsync_Test()
public void StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
......@@ -54,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer.Test
Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
_storage.StoreReceivedMessage(receivedMessage);
}
catch (Exception ex)
{
......@@ -66,20 +67,20 @@ namespace DotNetCore.CAP.SqlServer.Test
[Fact]
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO [Cap].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) OUTPUT INSERTED.Id
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = @"INSERT INTO [Cap].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage
{
Id= insertedId,
Name = "SqlServerStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
await connection.ExecuteAsync(sql, receivedMessage);
}
var message = await _storage.GetReceivedMessageAsync(insertedId);
......
using System;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.SqlServer.Test
{
public abstract class TestHost : IDisposable
{
protected IServiceCollection _services;
protected string _connectionString;
private IServiceProvider _provider;
private IServiceProvider _scopedProvider;
public TestHost()
{
CreateServiceCollection();
PreBuildServices();
BuildServices();
PostBuildServices();
}
protected IServiceProvider Provider => _scopedProvider ?? _provider;
private void CreateServiceCollection()
{
var services = new ServiceCollection();
services.AddOptions();
services.AddLogging();
_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new SqlServerOptions { ConnectionString = _connectionString });
services.AddSingleton(new CapOptions());
services.AddSingleton<SqlServerStorage>();
_services = services;
}
protected virtual void PreBuildServices()
{
}
private void BuildServices()
{
_provider = _services.BuildServiceProvider();
}
protected virtual void PostBuildServices()
{
}
public IDisposable CreateScope()
{
var scope = CreateScope(_provider);
var loc = scope.ServiceProvider;
_scopedProvider = loc;
return new DelegateDisposable(() =>
{
if (_scopedProvider == loc)
{
_scopedProvider = null;
}
scope.Dispose();
});
}
public IServiceScope CreateScope(IServiceProvider provider)
{
var scope = provider.GetService<IServiceScopeFactory>().CreateScope();
return scope;
}
public T GetService<T>() => Provider.GetService<T>();
public T Ensure<T>(ref T service)
where T : class
=> service ?? (service = GetService<T>());
public virtual void Dispose()
{
(_provider as IDisposable)?.Dispose();
}
private class DelegateDisposable : IDisposable
{
private Action _dispose;
public DelegateDisposable(Action dispose)
{
_dispose = dispose;
}
public void Dispose()
{
_dispose();
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment