Commit 1e250edd authored by Savorboard's avatar Savorboard

Cleanup Code

parent b55693b6
...@@ -13,8 +13,8 @@ namespace DotNetCore.CAP.MongoDB ...@@ -13,8 +13,8 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBPublisher : CapPublisherBase, ICallbackPublisher public class MongoDBPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly MongoDBOptions _options;
private readonly IMongoClient _client; private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
public MongoDBPublisher(IServiceProvider provider, MongoDBOptions options) public MongoDBPublisher(IServiceProvider provider, MongoDBOptions options)
: base(provider) : base(provider)
...@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.MongoDB ...@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.MongoDB
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false }; var insertOptions = new InsertOneOptions {BypassDocumentValidation = false};
var collection = _client var collection = _client
.GetDatabase(_options.DatabaseName) .GetDatabase(_options.DatabaseName)
...@@ -41,7 +41,8 @@ namespace DotNetCore.CAP.MongoDB ...@@ -41,7 +41,8 @@ namespace DotNetCore.CAP.MongoDB
{ {
return collection.InsertOneAsync(message, insertOptions, cancel); return collection.InsertOneAsync(message, insertOptions, cancel);
} }
var dbTrans = (IClientSessionHandle)transaction.DbTransaction;
var dbTrans = (IClientSessionHandle) transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel); return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel);
} }
} }
......
...@@ -61,10 +61,10 @@ namespace DotNetCore.CAP ...@@ -61,10 +61,10 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="client">The <see cref="IMongoClient"/>.</param> /// <param name="client">The <see cref="IMongoClient" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="IClientSessionHandle"/> of MongoDB transaction session object.</returns> /// <returns>The <see cref="IClientSessionHandle" /> of MongoDB transaction session object.</returns>
public static IClientSessionHandle StartTransaction(this IMongoClient client, public static IClientSessionHandle StartTransaction(this IMongoClient client,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
......
...@@ -55,7 +55,8 @@ namespace DotNetCore.CAP.MongoDB ...@@ -55,7 +55,8 @@ namespace DotNetCore.CAP.MongoDB
if (names.All(n => n != _options.PublishedCollection)) if (names.All(n => n != _options.PublishedCollection))
{ {
await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken); await database.CreateCollectionAsync(_options.PublishedCollection,
cancellationToken: cancellationToken);
} }
_logger.LogDebug("Ensuring all create database tables script are applied."); _logger.LogDebug("Ensuring all create database tables script are applied.");
......
...@@ -105,6 +105,6 @@ namespace DotNetCore.CAP.MongoDB ...@@ -105,6 +105,6 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
collection.InsertOne(message); collection.InsertOne(message);
} }
} }
} }
\ No newline at end of file
...@@ -47,7 +47,7 @@ namespace DotNetCore.CAP ...@@ -47,7 +47,7 @@ namespace DotNetCore.CAP
using (var scope = x.CreateScope()) using (var scope = x.CreateScope())
{ {
var provider = scope.ServiceProvider; var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(mysqlOptions.DbContextType); var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions; return mysqlOptions;
} }
......
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
public class MySqlOptions : EFOptions public class MySqlOptions : EFOptions
......
...@@ -20,7 +20,7 @@ namespace DotNetCore.CAP.MySql ...@@ -20,7 +20,7 @@ namespace DotNetCore.CAP.MySql
public MySqlPublisher(IServiceProvider provider) : base(provider) public MySqlPublisher(IServiceProvider provider) : base(provider)
{ {
_options = provider.GetService<MySqlOptions>(); _options = provider.GetService<MySqlOptions>();
} }
public async Task PublishCallbackAsync(CapPublishedMessage message) public async Task PublishCallbackAsync(CapPublishedMessage message)
...@@ -45,6 +45,7 @@ namespace DotNetCore.CAP.MySql ...@@ -45,6 +45,7 @@ namespace DotNetCore.CAP.MySql
{ {
dbTrans = dbContextTrans.GetDbTransaction(); dbTrans = dbContextTrans.GetDbTransaction();
} }
var conn = dbTrans?.Connection; var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans); await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
} }
...@@ -55,7 +56,7 @@ namespace DotNetCore.CAP.MySql ...@@ -55,7 +56,7 @@ namespace DotNetCore.CAP.MySql
{ {
return return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
#endregion private methods #endregion private methods
} }
......
...@@ -67,7 +67,6 @@ namespace DotNetCore.CAP ...@@ -67,7 +67,6 @@ namespace DotNetCore.CAP
public static ICapTransaction Begin(this ICapTransaction transaction, public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false) IDbTransaction dbTransaction, bool autoCommit = false)
{ {
transaction.DbTransaction = dbTransaction; transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit; transaction.AutoCommit = autoCommit;
...@@ -77,10 +76,10 @@ namespace DotNetCore.CAP ...@@ -77,10 +76,10 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="database">The <see cref="DatabaseFacade"/>.</param> /// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="IDbContextTransaction"/> of EF dbcontext transaction object.</returns> /// <returns>The <see cref="IDbContextTransaction" /> of EF dbcontext transaction object.</returns>
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
...@@ -92,10 +91,10 @@ namespace DotNetCore.CAP ...@@ -92,10 +91,10 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection"/>.</param> /// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction"/> object.</returns> /// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
......
...@@ -36,7 +36,7 @@ namespace DotNetCore.CAP.MySql ...@@ -36,7 +36,7 @@ namespace DotNetCore.CAP.MySql
foreach (var table in tables) foreach (var table in tables)
{ {
_logger.LogDebug($"Collecting expired data from table [{table}]."); _logger.LogDebug($"Collecting expired data from table [{table}].");
int removedCount; int removedCount;
do do
{ {
......
...@@ -126,7 +126,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix); ...@@ -126,7 +126,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
{ {
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state"; var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count; return count;
} }
...@@ -169,7 +169,7 @@ select aggr.* from ( ...@@ -169,7 +169,7 @@ select aggr.* from (
var valuesMap = connection.Query<TimelineCounter>( var valuesMap = connection.Query<TimelineCounter>(
sqlQuery, sqlQuery,
new { keys = keyMaps.Keys, statusName }) new {keys = keyMaps.Keys, statusName})
.ToDictionary(x => x.Key, x => x.Count); .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)
......
...@@ -109,6 +109,6 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -109,6 +109,6 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
{ {
return connection.Execute(sql) > 0; return connection.Execute(sql) > 0;
} }
} }
} }
} }
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
public class PostgreSqlOptions : EFOptions public class PostgreSqlOptions : EFOptions
......
...@@ -17,6 +17,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -17,6 +17,7 @@ namespace DotNetCore.CAP.PostgreSql
public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly PostgreSqlOptions _options; private readonly PostgreSqlOptions _options;
public PostgreSqlPublisher(IServiceProvider provider) : base(provider) public PostgreSqlPublisher(IServiceProvider provider) : base(provider)
{ {
_options = provider.GetService<PostgreSqlOptions>(); _options = provider.GetService<PostgreSqlOptions>();
...@@ -44,6 +45,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -44,6 +45,7 @@ namespace DotNetCore.CAP.PostgreSql
{ {
dbTrans = dbContextTrans.GetDbTransaction(); dbTrans = dbContextTrans.GetDbTransaction();
} }
var conn = dbTrans?.Connection; var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans); await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
} }
...@@ -62,6 +64,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -62,6 +64,7 @@ namespace DotNetCore.CAP.PostgreSql
conn.Open(); conn.Open();
return conn; return conn;
} }
#endregion private methods #endregion private methods
} }
} }
\ No newline at end of file
...@@ -76,10 +76,10 @@ namespace DotNetCore.CAP ...@@ -76,10 +76,10 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection"/>.</param> /// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction"/> object.</returns> /// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
...@@ -95,10 +95,10 @@ namespace DotNetCore.CAP ...@@ -95,10 +95,10 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="database">The <see cref="DatabaseFacade"/>.</param> /// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="IDbContextTransaction"/> of EF dbcontext transaction object.</returns> /// <returns>The <see cref="IDbContextTransaction" /> of EF dbcontext transaction object.</returns>
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
......
...@@ -128,7 +128,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' ...@@ -128,7 +128,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
var sqlQuery = var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count; return count;
} }
...@@ -170,7 +170,7 @@ with aggr as ( ...@@ -170,7 +170,7 @@ with aggr as (
) )
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new { keys = keyMaps.Keys.ToList(), statusName }) var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName})
.ToList() .ToList()
.ToDictionary(x => x.Key, x => x.Count); .ToDictionary(x => x.Key, x => x.Count);
......
...@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry() public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString)) using (var connection = new NpgsqlConnection(Options.ConnectionString))
...@@ -86,10 +86,6 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -86,10 +86,6 @@ namespace DotNetCore.CAP.PostgreSql
} }
} }
public void Dispose()
{
}
public bool ChangePublishedState(long messageId, string state) public bool ChangePublishedState(long messageId, string state)
{ {
var sql = var sql =
...@@ -111,5 +107,9 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -111,5 +107,9 @@ namespace DotNetCore.CAP.PostgreSql
return connection.Execute(sql) > 0; return connection.Execute(sql) > 0;
} }
} }
public void Dispose()
{
}
} }
} }
\ No newline at end of file
...@@ -49,7 +49,7 @@ namespace DotNetCore.CAP ...@@ -49,7 +49,7 @@ namespace DotNetCore.CAP
using (var scope = x.CreateScope()) using (var scope = x.CreateScope())
{ {
var provider = scope.ServiceProvider; var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType); var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions; return sqlServerOptions;
} }
......
...@@ -12,8 +12,12 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -12,8 +12,12 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
{ {
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>> internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>>
{ {
private readonly IDispatcher _dispatcher; private const string SqlClientPrefix = "System.Data.SqlClient.";
public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError";
private readonly ConcurrentDictionary<Guid, List<CapPublishedMessage>> _bufferList; private readonly ConcurrentDictionary<Guid, List<CapPublishedMessage>> _bufferList;
private readonly IDispatcher _dispatcher;
public DiagnosticObserver(IDispatcher dispatcher, public DiagnosticObserver(IDispatcher dispatcher,
ConcurrentDictionary<Guid, List<CapPublishedMessage>> bufferList) ConcurrentDictionary<Guid, List<CapPublishedMessage>> bufferList)
...@@ -22,19 +26,12 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -22,19 +26,12 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
_bufferList = bufferList; _bufferList = bufferList;
} }
private const string SqlClientPrefix = "System.Data.SqlClient.";
public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError";
public void OnCompleted() public void OnCompleted()
{ {
} }
public void OnError(Exception error) public void OnError(Exception error)
{ {
} }
public void OnNext(KeyValuePair<string, object> evt) public void OnNext(KeyValuePair<string, object> evt)
...@@ -60,7 +57,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -60,7 +57,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
} }
} }
static object GetProperty(object _this, string propertyName) private static object GetProperty(object _this, string propertyName)
{ {
return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this); return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this);
} }
......
...@@ -11,10 +11,8 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -11,10 +11,8 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
{ {
public class DiagnosticProcessorObserver : IObserver<DiagnosticListener> public class DiagnosticProcessorObserver : IObserver<DiagnosticListener>
{ {
private readonly IDispatcher _dispatcher;
public const string DiagnosticListenerName = "SqlClientDiagnosticListener"; public const string DiagnosticListenerName = "SqlClientDiagnosticListener";
private readonly IDispatcher _dispatcher;
public ConcurrentDictionary<Guid, List<CapPublishedMessage>> BufferList { get; }
public DiagnosticProcessorObserver(IDispatcher dispatcher) public DiagnosticProcessorObserver(IDispatcher dispatcher)
{ {
...@@ -22,6 +20,8 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -22,6 +20,8 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
BufferList = new ConcurrentDictionary<Guid, List<CapPublishedMessage>>(); BufferList = new ConcurrentDictionary<Guid, List<CapPublishedMessage>>();
} }
public ConcurrentDictionary<Guid, List<CapPublishedMessage>> BufferList { get; }
public void OnCompleted() public void OnCompleted()
{ {
} }
......
...@@ -56,9 +56,8 @@ namespace DotNetCore.CAP.SqlServer ...@@ -56,9 +56,8 @@ namespace DotNetCore.CAP.SqlServer
{ {
return return
$"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
#endregion private methods #endregion private methods
} }
} }
\ No newline at end of file
...@@ -22,7 +22,7 @@ namespace DotNetCore.CAP ...@@ -22,7 +22,7 @@ namespace DotNetCore.CAP
private readonly DiagnosticProcessorObserver _diagnosticProcessor; private readonly DiagnosticProcessorObserver _diagnosticProcessor;
public SqlServerCapTransaction( public SqlServerCapTransaction(
IDispatcher dispatcher, IDispatcher dispatcher,
SqlServerOptions sqlServerOptions, SqlServerOptions sqlServerOptions,
IServiceProvider serviceProvider) : base(dispatcher) IServiceProvider serviceProvider) : base(dispatcher)
{ {
...@@ -30,6 +30,7 @@ namespace DotNetCore.CAP ...@@ -30,6 +30,7 @@ namespace DotNetCore.CAP
{ {
_dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext; _dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext;
} }
_diagnosticProcessor = serviceProvider.GetRequiredService<DiagnosticProcessorObserver>(); _diagnosticProcessor = serviceProvider.GetRequiredService<DiagnosticProcessorObserver>();
} }
...@@ -55,14 +56,14 @@ namespace DotNetCore.CAP ...@@ -55,14 +56,14 @@ namespace DotNetCore.CAP
} }
} }
var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId; var transactionKey = ((SqlConnection) dbTransaction.Connection).ClientConnectionId;
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list)) if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list))
{ {
list.Add(msg); list.Add(msg);
} }
else else
{ {
var msgList = new List<CapPublishedMessage>(1) { msg }; var msgList = new List<CapPublishedMessage>(1) {msg};
_diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList); _diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList);
} }
} }
...@@ -134,10 +135,10 @@ namespace DotNetCore.CAP ...@@ -134,10 +135,10 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection"/>.</param> /// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction"/> object.</returns> /// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static IDbTransaction BeginTransaction(this IDbConnection dbConnection, public static IDbTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
...@@ -148,16 +149,16 @@ namespace DotNetCore.CAP ...@@ -148,16 +149,16 @@ namespace DotNetCore.CAP
var dbTransaction = dbConnection.BeginTransaction(); var dbTransaction = dbConnection.BeginTransaction();
var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit); var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit);
return (IDbTransaction)capTransaction.DbTransaction; return (IDbTransaction) capTransaction.DbTransaction;
} }
/// <summary> /// <summary>
/// Start the CAP transaction /// Start the CAP transaction
/// </summary> /// </summary>
/// <param name="database">The <see cref="DatabaseFacade"/>.</param> /// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher"/>.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="IDbContextTransaction"/> of EF dbcontext transaction object.</returns> /// <returns>The <see cref="IDbContextTransaction" /> of EF dbcontext transaction object.</returns>
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
......
...@@ -135,7 +135,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; ...@@ -135,7 +135,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
var sqlQuery = var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state"; $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count; return count;
} }
...@@ -189,7 +189,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; ...@@ -189,7 +189,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
var valuesMap = connection.Query<TimelineCounter>( var valuesMap = connection.Query<TimelineCounter>(
_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, _options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery,
new { keys = keyMaps.Keys, statusName }) new {keys = keyMaps.Keys, statusName})
.ToDictionary(x => x.Key, x => x.Count); .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)
......
...@@ -17,10 +17,10 @@ namespace DotNetCore.CAP.SqlServer ...@@ -17,10 +17,10 @@ namespace DotNetCore.CAP.SqlServer
public class SqlServerStorage : IStorage public class SqlServerStorage : IStorage
{ {
private readonly CapOptions _capOptions; private readonly CapOptions _capOptions;
private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver;
private readonly IDbConnection _existingConnection = null; private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver;
public SqlServerStorage(ILogger<SqlServerStorage> logger, public SqlServerStorage(ILogger<SqlServerStorage> logger,
CapOptions capOptions, CapOptions capOptions,
......
...@@ -63,7 +63,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -63,7 +63,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(Options.ConnectionString)) using (var connection = new SqlConnection(Options.ConnectionString))
{ {
connection.Execute(sql, message); connection.Execute(sql, message);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment