Commit cbc145b7 authored by Savorboard's avatar Savorboard

Adjustments do not use transactions internally when a transaction is not...

Adjustments do not use transactions internally when a transaction is not available to publish a message
parent 7d1fe75b
...@@ -8,31 +8,19 @@ using System.Threading.Tasks; ...@@ -8,31 +8,19 @@ using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using MySql.Data.MySqlClient; using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql namespace DotNetCore.CAP.MySql
{ {
public class MySqlPublisher : CapPublisherBase, ICallbackPublisher, IDisposable public class MySqlPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly DbContext _dbContext;
private readonly MySqlOptions _options; private readonly MySqlOptions _options;
private readonly bool _isUsingEF;
private MySqlConnection _connection; public MySqlPublisher(IServiceProvider provider) : base(provider)
public MySqlPublisher(IServiceProvider provider, MySqlOptions options) : base(provider)
{ {
_options = options; _options = provider.GetService<MySqlOptions>();
if (_options.DbContextType == null)
{
return;
}
_isUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
} }
public async Task PublishCallbackAsync(CapPublishedMessage message) public async Task PublishCallbackAsync(CapPublishedMessage message)
...@@ -40,32 +28,26 @@ namespace DotNetCore.CAP.MySql ...@@ -40,32 +28,26 @@ namespace DotNetCore.CAP.MySql
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
if (NotUseTransaction)
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
await connection.OpenAsync(cancel);
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction; var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans) if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{ {
dbTrans = dbContextTrans.GetDbTransaction(); dbTrans = dbContextTrans.GetDbTransaction();
} }
var conn = dbTrans?.Connection; var conn = dbTrans?.Connection;
return conn.ExecuteAsync(PrepareSql(), message, dbTrans); await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
protected override object GetDbTransaction()
{
if (_isUsingEF)
{
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
if (dbContextTransaction == null)
{
return InitDbConnection();
}
return dbContextTransaction;
}
return InitDbConnection();
} }
#region private methods #region private methods
...@@ -74,21 +56,8 @@ namespace DotNetCore.CAP.MySql ...@@ -74,21 +56,8 @@ namespace DotNetCore.CAP.MySql
{ {
return return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
private IDbTransaction InitDbConnection()
{
_connection = new MySqlConnection(_options.ConnectionString);
_connection.Open();
return _connection.BeginTransaction(IsolationLevel.ReadCommitted);
}
#endregion private methods #endregion private methods
public void Dispose()
{
_dbContext?.Dispose();
_connection?.Dispose();
}
} }
} }
\ No newline at end of file
...@@ -8,31 +8,18 @@ using System.Threading.Tasks; ...@@ -8,31 +8,18 @@ using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Npgsql; using Npgsql;
namespace DotNetCore.CAP.PostgreSql namespace DotNetCore.CAP.PostgreSql
{ {
public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly DbContext _dbContext;
private readonly PostgreSqlOptions _options; private readonly PostgreSqlOptions _options;
private readonly bool _isUsingEF; public PostgreSqlPublisher(IServiceProvider provider) : base(provider)
private NpgsqlConnection _connection;
public PostgreSqlPublisher(IServiceProvider provider, PostgreSqlOptions options): base(provider)
{ {
_options = options; _options = provider.GetService< PostgreSqlOptions>();
if (_options.DbContextType == null)
{
return;
}
_isUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
} }
public async Task PublishCallbackAsync(CapPublishedMessage message) public async Task PublishCallbackAsync(CapPublishedMessage message)
...@@ -43,6 +30,14 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -43,6 +30,14 @@ namespace DotNetCore.CAP.PostgreSql
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
if (NotUseTransaction)
{
using (var connection = InitDbConnection())
{
return connection.ExecuteAsync(PrepareSql(), message);
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction; var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans) if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{ {
...@@ -52,22 +47,6 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -52,22 +47,6 @@ namespace DotNetCore.CAP.PostgreSql
return conn.ExecuteAsync(PrepareSql(), message, dbTrans); return conn.ExecuteAsync(PrepareSql(), message, dbTrans);
} }
protected override object GetDbTransaction()
{
if (_isUsingEF)
{
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
if (dbContextTransaction == null)
{
return InitDbConnection();
}
return dbContextTransaction;
}
return InitDbConnection();
}
#region private methods #region private methods
private string PrepareSql() private string PrepareSql()
...@@ -76,18 +55,12 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -76,18 +55,12 @@ namespace DotNetCore.CAP.PostgreSql
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
private IDbTransaction InitDbConnection() private IDbConnection InitDbConnection()
{ {
_connection = new NpgsqlConnection(_options.ConnectionString); var conn = new NpgsqlConnection(_options.ConnectionString);
_connection.Open(); conn.Open();
return _connection.BeginTransaction(IsolationLevel.ReadCommitted); return conn;
} }
#endregion private methods #endregion private methods
public void Dispose()
{
_dbContext?.Dispose();
_connection?.Dispose();
}
} }
} }
\ No newline at end of file
...@@ -8,31 +8,20 @@ using System.Threading; ...@@ -8,31 +8,20 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
public class SqlServerPublisher : CapPublisherBase, ICallbackPublisher, IDisposable public class SqlServerPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly DbContext _dbContext;
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly bool _isUsingEF;
private SqlConnection _connection; public SqlServerPublisher(IServiceProvider provider) : base(provider)
public SqlServerPublisher(IServiceProvider provider, SqlServerOptions options) : base(provider)
{ {
_options = options; _options = ServiceProvider.GetService<SqlServerOptions>();
if (_options.DbContextType == null)
{
return;
}
_isUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
} }
public async Task PublishCallbackAsync(CapPublishedMessage message) public async Task PublishCallbackAsync(CapPublishedMessage message)
...@@ -40,54 +29,45 @@ namespace DotNetCore.CAP.SqlServer ...@@ -40,54 +29,45 @@ namespace DotNetCore.CAP.SqlServer
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
var dbTrans = transaction.DbTransaction as IDbTransaction; if (NotUseTransaction)
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{ {
dbTrans = dbContextTrans.GetDbTransaction(); using (var connection = new SqlConnection(_options.ConnectionString))
}
var conn = dbTrans?.Connection;
return conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
protected override object GetDbTransaction()
{
if (_isUsingEF)
{
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
if (dbContextTransaction == null)
{ {
return InitDbConnection(); await connection.OpenAsync(cancel);
await connection.ExecuteAsync(PrepareSql(), message);
return;
} }
}
return dbContextTransaction; var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
} }
return InitDbConnection(); var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
} }
#region private methods #region private methods
private string PrepareSql() private string PrepareSql()
{ {
return return
$"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
private IDbTransaction InitDbConnection() //private IDbConnection InitDbConnection()
{ //{
_connection = new SqlConnection(_options.ConnectionString); // var conn = ;
_connection.Open(); // conn.OpenAsync();
return _connection.BeginTransaction(IsolationLevel.ReadCommitted); // return conn;
} //}
#endregion private methods #endregion private methods
public void Dispose()
{
_dbContext?.Dispose();
_connection?.Dispose();
}
} }
} }
\ No newline at end of file
...@@ -5,6 +5,7 @@ using System; ...@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data; using System.Data;
using System.Data.SqlClient; using System.Data.SqlClient;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.SqlServer.Diagnostics; using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
...@@ -34,6 +35,12 @@ namespace DotNetCore.CAP ...@@ -34,6 +35,12 @@ namespace DotNetCore.CAP
protected override void AddToSent(CapPublishedMessage msg) protected override void AddToSent(CapPublishedMessage msg)
{ {
if (DbTransaction is NoopTransaction)
{
base.AddToSent(msg);
return;
}
var dbTransaction = DbTransaction as IDbTransaction; var dbTransaction = DbTransaction as IDbTransaction;
if (dbTransaction == null) if (dbTransaction == null)
{ {
...@@ -64,6 +71,9 @@ namespace DotNetCore.CAP ...@@ -64,6 +71,9 @@ namespace DotNetCore.CAP
{ {
switch (DbTransaction) switch (DbTransaction)
{ {
case NoopTransaction _:
Flush();
break;
case IDbTransaction dbTransaction: case IDbTransaction dbTransaction:
dbTransaction.Commit(); dbTransaction.Commit();
break; break;
......
...@@ -7,6 +7,7 @@ using System.Threading; ...@@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
...@@ -68,7 +69,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -68,7 +69,7 @@ namespace DotNetCore.CAP.Abstractions
if (Transaction.DbTransaction == null) if (Transaction.DbTransaction == null)
{ {
NotUseTransaction = true; NotUseTransaction = true;
Transaction.DbTransaction = GetDbTransaction(); Transaction.DbTransaction = new NoopTransaction();
} }
Guid operationId = default(Guid); Guid operationId = default(Guid);
...@@ -103,8 +104,6 @@ namespace DotNetCore.CAP.Abstractions ...@@ -103,8 +104,6 @@ namespace DotNetCore.CAP.Abstractions
} }
} }
protected abstract object GetDbTransaction();
protected abstract Task ExecuteAsync(CapPublishedMessage message, protected abstract Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)); CancellationToken cancel = default(CancellationToken));
......
...@@ -23,6 +23,7 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -23,6 +23,7 @@ namespace DotNetCore.CAP.Infrastructure
private static SnowflakeId _snowflakeId; private static SnowflakeId _snowflakeId;
private readonly object _lock = new object(); private readonly object _lock = new object();
private static readonly object s_lock = new object();
private long _lastTimestamp = -1L; private long _lastTimestamp = -1L;
private SnowflakeId(long workerId, long datacenterId, long sequence = 0L) private SnowflakeId(long workerId, long datacenterId, long sequence = 0L)
...@@ -46,7 +47,10 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -46,7 +47,10 @@ namespace DotNetCore.CAP.Infrastructure
public static SnowflakeId Default(long datacenterId = 0) public static SnowflakeId Default(long datacenterId = 0)
{ {
return _snowflakeId ?? (_snowflakeId = new SnowflakeId(AppDomain.CurrentDomain.Id, datacenterId)); lock (s_lock)
{
return _snowflakeId ?? (_snowflakeId = new SnowflakeId(AppDomain.CurrentDomain.Id, datacenterId));
}
} }
public virtual long NextId() public virtual long NextId()
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Internal
{
public class NoopTransaction
{
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment