Commit 0300a5d0 authored by yangxiaodong's avatar yangxiaodong

refactor with using CapPublisherBase

parent e3cb23da
...@@ -2,34 +2,27 @@ ...@@ -2,34 +2,27 @@
using System.Data; using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.MySql namespace DotNetCore.CAP.MySql
{ {
public class CapPublisher : ICapPublisher public class CapPublisher : CapPublisherBase
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly MySqlOptions _options; private readonly MySqlOptions _options;
private readonly DbContext _dbContext; private readonly DbContext _dbContext;
protected bool IsCapOpenedTrans { get; set; }
protected bool IsUsingEF { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider, public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger, ILogger<CapPublisher> logger,
MySqlOptions options) MySqlOptions options)
{ {
ServiceProvider = provider; ServiceProvider = provider;
_logger = logger;
_options = options; _options = options;
_logger = logger;
if (_options.DbContextType != null) if (_options.DbContextType != null)
{ {
...@@ -38,166 +31,41 @@ namespace DotNetCore.CAP.MySql ...@@ -38,166 +31,41 @@ namespace DotNetCore.CAP.MySql
} }
} }
public void Publish<T>(string name, T contentObj) protected override void PrepareConnectionForEF()
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
PublishCore(name, content);
}
public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
return PublishCoreAsync(name, content);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}
#region private methods
private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj?.ToString();
}
return content;
}
private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{ {
if (dbConnection == null) DbConnection = _dbContext.Database.GetDbConnection();
throw new ArgumentNullException(nameof(dbConnection));
if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();
if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}
private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}
private async Task PublishCoreAsync(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}
private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction; var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null) if (transaction == null)
{ {
IsCapOpenedTrans = true; IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
} }
var dbTransaction = transaction.GetDbTransaction(); DbTranasaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
} }
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{ {
var message = new CapPublishedMessage dbConnection.Execute(PrepareSql(), message, dbTransaction);
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set(); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
} }
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{ {
var message = new CapPublishedMessage await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
{
Name = name, _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
} }
#region private methods
private string PrepareSql() private string PrepareSql()
{ {
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
} }
#endregion private methods #endregion private methods
} }
} }
\ No newline at end of file
...@@ -33,42 +33,28 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -33,42 +33,28 @@ namespace DotNetCore.CAP.PostgreSql
protected override void PrepareConnectionForEF() protected override void PrepareConnectionForEF()
{ {
_dbConnection = _dbContext.Database.GetDbConnection(); DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction; var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null) if (transaction == null)
{ {
IsCapOpenedTrans = true; IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
} }
_dbTranasaction = transaction.GetDbTransaction(); DbTranasaction = transaction.GetDbTransaction();
} }
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{ {
dbConnection.Execute(PrepareSql(), message, dbTransaction); dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogDebug("Message has been persisted in the database. name:" + message.ToString()); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
} }
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{ {
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogDebug("Message has been persisted in the database. name:" + message.ToString()); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
} }
private string PrepareSql() private string PrepareSql()
......
...@@ -2,27 +2,20 @@ ...@@ -2,27 +2,20 @@
using System.Data; using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
public class CapPublisher : ICapPublisher public class CapPublisher : CapPublisherBase
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly DbContext _dbContext; private readonly DbContext _dbContext;
protected bool IsCapOpenedTrans { get; set; }
protected bool IsUsingEF { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider, public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger, ILogger<CapPublisher> logger,
SqlServerOptions options) SqlServerOptions options)
...@@ -38,164 +31,40 @@ namespace DotNetCore.CAP.SqlServer ...@@ -38,164 +31,40 @@ namespace DotNetCore.CAP.SqlServer
} }
} }
public void Publish<T>(string name, T contentObj) protected override void PrepareConnectionForEF()
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
PublishCore(name, content);
}
public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
return PublishCoreAsync(name, content);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}
#region private methods
private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj.ToString();
}
return content;
}
private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();
if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}
private void CheckIsUsingEF(string name)
{ {
if (name == null) throw new ArgumentNullException(nameof(name)); DbConnection = _dbContext.Database.GetDbConnection();
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}
private async Task PublishCoreAsync(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}
private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction; var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null) if (transaction == null)
{ {
IsCapOpenedTrans = true; IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
} }
var dbTransaction = transaction.GetDbTransaction(); DbTranasaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
} }
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{ {
var message = new CapPublishedMessage dbConnection.Execute(PrepareSql(), message, dbTransaction);
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set(); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
} }
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{ {
var message = new CapPublishedMessage await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
} }
#region private methods
private string PrepareSql() private string PrepareSql()
{ {
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
} }
#endregion private methods #endregion private methods
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment