Commit 1184cd0e authored by Savorboard's avatar Savorboard

[break changes] modify api for fix #141

parent 362b6f52
...@@ -27,6 +27,7 @@ namespace DotNetCore.CAP ...@@ -27,6 +27,7 @@ namespace DotNetCore.CAP
services.AddScoped<ICapPublisher, CapPublisher>(); services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>(); services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<ICollectProcessor, MySqlCollectProcessor>(); services.AddTransient<ICollectProcessor, MySqlCollectProcessor>();
services.AddTransient<CapTransactionBase, MySqlCapTransaction>();
AddSingletionMySqlOptions(services); AddSingletionMySqlOptions(services);
} }
......
...@@ -3,27 +3,27 @@ ...@@ -3,27 +3,27 @@
using System; using System;
using System.Data; using System.Data;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient; using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql namespace DotNetCore.CAP.MySql
{ {
public class CapPublisher : CapPublisherBase, ICallbackPublisher public class CapPublisher : CapPublisherBase, ICallbackPublisher, IDisposable
{ {
private readonly DbContext _dbContext; private readonly DbContext _dbContext;
private readonly MySqlOptions _options; private readonly MySqlOptions _options;
private readonly bool _isUsingEF;
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher, IServiceProvider provider, private MySqlConnection _connection;
MySqlOptions options)
: base(logger, dispatcher) public CapPublisher(IServiceProvider provider, MySqlOptions options) : base(provider)
{ {
ServiceProvider = provider;
_options = options; _options = options;
if (_options.DbContextType == null) if (_options.DbContextType == null)
...@@ -31,47 +31,41 @@ namespace DotNetCore.CAP.MySql ...@@ -31,47 +31,41 @@ namespace DotNetCore.CAP.MySql
return; return;
} }
IsUsingEF = true; _isUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType); _dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
} }
public async Task PublishCallbackAsync(CapPublishedMessage message) public async Task PublishCallbackAsync(CapPublishedMessage message)
{ {
using (var conn = new MySqlConnection(_options.ConnectionString)) await PublishAsyncInternal(message);
}
protected override Task<int> ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{ {
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message); dbTrans = dbContextTrans.GetDbTransaction();
message.Id = id;
Enqueue(message);
} }
var conn = dbTrans?.Connection;
return conn.ExecuteScalarAsync<int>(PrepareSql(), message, dbTrans);
} }
protected override void PrepareConnectionForEF() protected override object GetDbTransaction()
{
if (_isUsingEF)
{ {
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction; var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction(); if (dbContextTransaction == null)
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{ {
IsCapOpenedTrans = true; return InitDbConnection();
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
} }
DbTransaction = dbTrans; return dbContextTransaction;
} }
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, return InitDbConnection();
CapPublishedMessage message)
{
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return await dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
} }
#region private methods #region private methods
...@@ -82,6 +76,19 @@ namespace DotNetCore.CAP.MySql ...@@ -82,6 +76,19 @@ namespace DotNetCore.CAP.MySql
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID()"; $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID()";
} }
private IDbTransaction InitDbConnection()
{
_connection = new MySqlConnection(_options.ConnectionString);
_connection.Open();
return _connection.BeginTransaction(IsolationLevel.ReadCommitted);
}
#endregion private methods #endregion private methods
public void Dispose()
{
_dbContext?.Dispose();
_connection?.Dispose();
}
} }
} }
\ No newline at end of file
...@@ -37,7 +37,6 @@ ...@@ -37,7 +37,6 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Options" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" /> <PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.0" />
......
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System.Data; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
...@@ -12,66 +11,23 @@ namespace DotNetCore.CAP ...@@ -12,66 +11,23 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public interface ICapPublisher public interface ICapPublisher
{ {
/// <summary> ICapTransaction CapTransaction { get; }
/// (EntityFramework) Asynchronous publish an object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbTransaction.
/// </para>
/// </summary>
/// <typeparam name="T">The type of content object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null);
/// <summary> /// <summary>
/// (EntityFramework) Publish an object message. /// Asynchronous publish an object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbTransaction.
/// </para>
/// </summary> /// </summary>
/// <typeparam name="T">The type of content object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param> /// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null); /// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default(CancellationToken));
/// <summary> /// <summary>
/// (ado.net) Asynchronous publish an object message. /// Publish an object message.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param>
/// <param name="callbackName">callback subscriber name</param> /// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); void Publish<T>(string name, T contentObj, string callbackName = null);
/// <summary>
/// (ado.net) Publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param>
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null);
/// <summary>
/// Publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
/// <summary>
/// Asynchronous publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
} }
} }
\ No newline at end of file
...@@ -173,8 +173,6 @@ namespace DotNetCore.CAP ...@@ -173,8 +173,6 @@ namespace DotNetCore.CAP
du); du);
s_diagnosticListener.WritePublishAfter(eventData); s_diagnosticListener.WritePublishAfter(eventData);
_logger.MessageHasBeenSent(du.TotalSeconds);
} }
private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du) private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment