Commit 327ef56f authored by Savorboard's avatar Savorboard

use Diagnostics event feature to send the message after transaction commited for Sql Server.

parent 3f37a453
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System; using System;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer; using DotNetCore.CAP.SqlServer;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
...@@ -22,6 +23,7 @@ namespace DotNetCore.CAP ...@@ -22,6 +23,7 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services) public void AddServices(IServiceCollection services)
{ {
services.AddSingleton<CapDatabaseStorageMarkerService>(); services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<DiagnosticProcessorObserver>();
services.AddSingleton<IStorage, SqlServerStorage>(); services.AddSingleton<IStorage, SqlServerStorage>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Reflection;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer.Diagnostics
{
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>>
{
private readonly IDispatcher _dispatcher;
private readonly ConcurrentDictionary<Guid, List<CapPublishedMessage>> _bufferList;
public DiagnosticObserver(IDispatcher dispatcher,
ConcurrentDictionary<Guid, List<CapPublishedMessage>> bufferList)
{
_dispatcher = dispatcher;
_bufferList = bufferList;
}
private const string SqlClientPrefix = "System.Data.SqlClient.";
public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError";
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(KeyValuePair<string, object> evt)
{
if (evt.Key == SqlAfterCommitTransaction)
{
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId;
if (_bufferList.TryRemove(transactionKey, out var msgList))
{
foreach (var message in msgList)
{
_dispatcher.EnqueueToPublish(message);
}
}
}
else if (evt.Key == SqlErrorCommitTransaction)
{
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId;
_bufferList.TryRemove(transactionKey, out _);
}
}
static object GetProperty(object _this, string propertyName)
{
return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this);
}
}
}
\ No newline at end of file
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer.Diagnostics
{
public class DiagnosticProcessorObserver : IObserver<DiagnosticListener>
{
private readonly IDispatcher _dispatcher;
public const string DiagnosticListenerName = "SqlClientDiagnosticListener";
public ConcurrentDictionary<Guid, List<CapPublishedMessage>> BufferList { get; }
public DiagnosticProcessorObserver(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
BufferList = new ConcurrentDictionary<Guid, List<CapPublishedMessage>>();
}
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(DiagnosticListener listener)
{
if (listener.Name == DiagnosticListenerName)
{
listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList));
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data; using System.Data;
using System.Diagnostics; using System.Data.SqlClient;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
...@@ -11,50 +15,68 @@ namespace DotNetCore.CAP ...@@ -11,50 +15,68 @@ namespace DotNetCore.CAP
{ {
public class SqlServerCapTransaction : CapTransactionBase public class SqlServerCapTransaction : CapTransactionBase
{ {
public SqlServerCapTransaction(IDispatcher dispatcher) : base(dispatcher) private readonly DiagnosticProcessorObserver _diagnosticProcessor;
public SqlServerCapTransaction(IDispatcher dispatcher,
DiagnosticProcessorObserver diagnosticProcessor) : base(dispatcher)
{ {
_diagnosticProcessor = diagnosticProcessor;
} }
public override void Commit() protected override void AddToSent(CapPublishedMessage msg)
{ {
Debug.Assert(DbTransaction != null); var transactionKey = ((SqlConnection)((IDbTransaction)DbTransaction).Connection).ClientConnectionId;
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list))
switch (DbTransaction)
{ {
case IDbTransaction dbTransaction: list.Add(msg);
dbTransaction.Commit(); }
break; else
case IDbContextTransaction dbContextTransaction: {
dbContextTransaction.Commit(); var msgList = new List<CapPublishedMessage>(1) { msg };
break; _diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList);
} }
Flush();
} }
public override void Rollback() public override void Commit()
{ {
Debug.Assert(DbTransaction != null); throw new NotImplementedException();
}
switch (DbTransaction) public override void Rollback()
{ {
case IDbTransaction dbTransaction: throw new NotImplementedException();
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Rollback();
break;
}
} }
public override void Dispose() public override void Dispose()
{ {
(DbTransaction as IDbTransaction)?.Dispose();
} }
} }
public static class CapTransactionExtensions public static class CapTransactionExtensions
{ {
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static IDbTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed)
{
dbConnection.Open();
}
var dbTransaction = dbConnection.BeginTransaction();
var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit);
return (IDbTransaction)capTransaction.DbTransaction;
}
public static ICapTransaction Begin(this ICapTransaction transaction, public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false) IDbContextTransaction dbTransaction, bool autoCommit = false)
{ {
...@@ -64,7 +86,7 @@ namespace DotNetCore.CAP ...@@ -64,7 +86,7 @@ namespace DotNetCore.CAP
return transaction; return transaction;
} }
public static IDbContextTransaction BeginAndJoinToTransaction(this DatabaseFacade database, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
var trans = database.BeginTransaction(); var trans = database.BeginTransaction();
......
...@@ -4,10 +4,12 @@ ...@@ -4,10 +4,12 @@
using System; using System;
using System.Data; using System.Data;
using System.Data.SqlClient; using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
...@@ -18,12 +20,15 @@ namespace DotNetCore.CAP.SqlServer ...@@ -18,12 +20,15 @@ namespace DotNetCore.CAP.SqlServer
private readonly IDbConnection _existingConnection = null; private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver;
public SqlServerStorage(ILogger<SqlServerStorage> logger, public SqlServerStorage(ILogger<SqlServerStorage> logger,
CapOptions capOptions, CapOptions capOptions,
SqlServerOptions options) SqlServerOptions options,
DiagnosticProcessorObserver diagnosticProcessorObserver)
{ {
_options = options; _options = options;
_diagnosticProcessorObserver = diagnosticProcessorObserver;
_logger = logger; _logger = logger;
_capOptions = capOptions; _capOptions = capOptions;
} }
...@@ -53,6 +58,8 @@ namespace DotNetCore.CAP.SqlServer ...@@ -53,6 +58,8 @@ namespace DotNetCore.CAP.SqlServer
} }
_logger.LogDebug("Ensuring all create database tables script are applied."); _logger.LogDebug("Ensuring all create database tables script are applied.");
DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver);
} }
protected virtual string CreateDbTablesScript(string schema) protected virtual string CreateDbTablesScript(string schema)
......
...@@ -14,7 +14,6 @@ namespace DotNetCore.CAP.SqlServer ...@@ -14,7 +14,6 @@ namespace DotNetCore.CAP.SqlServer
{ {
private readonly IDbConnection _dbConnection; private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly string _schema; private readonly string _schema;
public SqlServerStorageTransaction(SqlServerStorageConnection connection) public SqlServerStorageTransaction(SqlServerStorageConnection connection)
...@@ -24,7 +23,6 @@ namespace DotNetCore.CAP.SqlServer ...@@ -24,7 +23,6 @@ namespace DotNetCore.CAP.SqlServer
_dbConnection = new SqlConnection(options.ConnectionString); _dbConnection = new SqlConnection(options.ConnectionString);
_dbConnection.Open(); _dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
} }
public void UpdateMessage(CapPublishedMessage message) public void UpdateMessage(CapPublishedMessage message)
...@@ -36,7 +34,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -36,7 +34,7 @@ namespace DotNetCore.CAP.SqlServer
var sql = var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction); _dbConnection.Execute(sql, message);
} }
public void UpdateMessage(CapReceivedMessage message) public void UpdateMessage(CapReceivedMessage message)
...@@ -48,18 +46,16 @@ namespace DotNetCore.CAP.SqlServer ...@@ -48,18 +46,16 @@ namespace DotNetCore.CAP.SqlServer
var sql = var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction); _dbConnection.Execute(sql, message);
} }
public Task CommitAsync() public Task CommitAsync()
{ {
_dbTransaction.Commit();
return Task.CompletedTask; return Task.CompletedTask;
} }
public void Dispose() public void Dispose()
{ {
_dbTransaction.Dispose();
_dbConnection.Dispose(); _dbConnection.Dispose();
} }
} }
......
...@@ -2,16 +2,18 @@ using System; ...@@ -2,16 +2,18 @@ using System;
using System.Data; using System.Data;
using System.Data.SqlClient; using System.Data.SqlClient;
using Dapper; using Dapper;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Moq; using Moq;
namespace DotNetCore.CAP.SqlServer.Test namespace DotNetCore.CAP.SqlServer.Test
{ {
public abstract class DatabaseTestHost:IDisposable public abstract class DatabaseTestHost : IDisposable
{ {
protected ILogger<SqlServerStorage> Logger; protected ILogger<SqlServerStorage> Logger;
protected CapOptions CapOptions; protected CapOptions CapOptions;
protected SqlServerOptions SqlSeverOptions; protected SqlServerOptions SqlSeverOptions;
protected DiagnosticProcessorObserver DiagnosticProcessorObserver;
public bool SqlObjectInstalled; public bool SqlObjectInstalled;
...@@ -23,6 +25,8 @@ namespace DotNetCore.CAP.SqlServer.Test ...@@ -23,6 +25,8 @@ namespace DotNetCore.CAP.SqlServer.Test
.SetupProperty(x => x.ConnectionString, ConnectionUtil.GetConnectionString()) .SetupProperty(x => x.ConnectionString, ConnectionUtil.GetConnectionString())
.Object; .Object;
DiagnosticProcessorObserver = new Mock<DiagnosticProcessorObserver>().Object;
InitializeDatabase(); InitializeDatabase();
} }
...@@ -42,7 +46,7 @@ IF NOT EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}') ...@@ -42,7 +46,7 @@ IF NOT EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}')
CREATE DATABASE [{databaseName}];"); CREATE DATABASE [{databaseName}];");
} }
new SqlServerStorage(Logger, CapOptions, SqlSeverOptions).InitializeAsync().GetAwaiter().GetResult(); new SqlServerStorage(Logger, CapOptions, SqlSeverOptions, DiagnosticProcessorObserver).InitializeAsync().GetAwaiter().GetResult();
SqlObjectInstalled = true; SqlObjectInstalled = true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment