Commit c69d1f6f authored by Savorboard's avatar Savorboard

Refactor postgresql module for new transaction mode

parent 5fe7e5b2
......@@ -24,9 +24,12 @@ namespace DotNetCore.CAP
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddScoped<ICapPublisher, PostgreSqlPublisher>();
services.AddScoped<ICallbackPublisher, PostgreSqlPublisher>();
services.AddTransient<ICollectProcessor, PostgreSqlCollectProcessor>();
services.AddTransient<CapTransactionBase, PostgreSqlCapTransaction>();
AddSingletonPostgreSqlOptions(services);
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
......
......@@ -3,73 +3,69 @@
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly PostgreSqlOptions _options;
private readonly bool _isUsingEF;
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, PostgreSqlOptions options)
: base(logger, dispatcher)
private NpgsqlConnection _connection;
public PostgreSqlPublisher(IServiceProvider provider, PostgreSqlOptions options): base(provider)
{
ServiceProvider = provider;
_options = options;
if (_options.DbContextType != null)
if (_options.DbContextType == null)
{
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
return;
}
_isUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
await PublishAsyncInternal(message);
}
protected override void PrepareConnectionForEF()
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
IsCapOpenedTrans = true;
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
dbTrans = dbContextTrans.GetDbTransaction();
}
DbTransaction = dbTrans;
var conn = dbTrans?.Connection;
return conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
protected override object GetDbTransaction()
{
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
if (_isUsingEF)
{
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
if (dbContextTransaction == null)
{
return InitDbConnection();
}
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
return dbContextTransaction;
}
return InitDbConnection();
}
#region private methods
......@@ -77,9 +73,21 @@ namespace DotNetCore.CAP.PostgreSql
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
private IDbTransaction InitDbConnection()
{
_connection = new NpgsqlConnection(_options.ConnectionString);
_connection.Open();
return _connection.BeginTransaction(IsolationLevel.ReadCommitted);
}
#endregion private methods
public void Dispose()
{
_dbContext?.Dispose();
_connection?.Dispose();
}
}
}
\ No newline at end of file
using System.Data;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore.Storage;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlCapTransaction : CapTransactionBase
{
public PostgreSqlCapTransaction(IDispatcher dispatcher) : base(dispatcher) { }
public override void Commit()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Commit();
break;
}
Flush();
}
public override void Rollback()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Rollback();
break;
}
}
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
}
}
public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
}
}
......@@ -100,10 +100,8 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
DROP TABLE IF EXISTS ""{schema}"".""queue"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Group"" VARCHAR(200) NULL,
""Content"" TEXT NULL,
......@@ -114,7 +112,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
);
CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
......
......@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.PostgreSql
return new PostgreSqlStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
......@@ -50,7 +50,7 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
......@@ -58,15 +58,15 @@ namespace DotNetCore.CAP.PostgreSql
}
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.ExecuteScalarAsync<int>(sql, message);
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
......@@ -90,7 +90,7 @@ namespace DotNetCore.CAP.PostgreSql
{
}
public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
......@@ -101,7 +101,7 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
......
......@@ -35,9 +35,7 @@ namespace DotNetCore.CAP.PostgreSql
}
var sql =
$@"UPDATE ""{
_schema
}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
$@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -66,29 +64,5 @@ namespace DotNetCore.CAP.PostgreSql
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
}
}
\ No newline at end of file
......@@ -22,17 +22,18 @@ namespace DotNetCore.CAP.PostgreSql.Test
[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = @"INSERT INTO ""cap"".""published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage
{
Id = insertedId,
Name = "PostgreSqlStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
await connection.ExecuteAsync(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
......@@ -41,10 +42,11 @@ namespace DotNetCore.CAP.PostgreSql.Test
}
[Fact]
public async Task StoreReceivedMessageAsync_Test()
public void StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Id = SnowflakeId.Default().NextId(),
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
......@@ -54,7 +56,7 @@ namespace DotNetCore.CAP.PostgreSql.Test
Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
_storage.StoreReceivedMessage(receivedMessage);
}
catch (Exception ex)
{
......@@ -67,19 +69,21 @@ namespace DotNetCore.CAP.PostgreSql.Test
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO ""cap"".""received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
INSERT INTO ""cap"".""received""(""Id"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage
{
Id= insertedId,
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
await connection.ExecuteAsync(sql, receivedMessage);
}
var message = await _storage.GetReceivedMessageAsync(insertedId);
......
......@@ -6,13 +6,11 @@ namespace DotNetCore.CAP.PostgreSql.Test
[Collection("postgresql")]
public class SqlServerStorageTest : DatabaseTestHost
{
private readonly string _dbName;
private readonly string _masterDbConnectionString;
private readonly string _dbConnectionString;
public SqlServerStorageTest()
{
_dbName = ConnectionUtil.GetDatabaseName();
_masterDbConnectionString = ConnectionUtil.GetMasterConnectionString();
_dbConnectionString = ConnectionUtil.GetConnectionString();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment