Commit 7971d672 authored by Savorboard's avatar Savorboard

Fix thread safe issue of ICapPublisher bug. (#371)

parent 075c8275
...@@ -28,8 +28,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -28,8 +28,9 @@ namespace DotNetCore.CAP.MongoDB
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override Task ExecuteAsync(CapPublishedMessage message,
CancellationToken cancel = default(CancellationToken)) ICapTransaction transaction = null,
CancellationToken cancel = default)
{ {
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false }; var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
...@@ -49,7 +50,7 @@ namespace DotNetCore.CAP.MongoDB ...@@ -49,7 +50,7 @@ namespace DotNetCore.CAP.MongoDB
Version = _options.Version, Version = _options.Version,
}; };
if (NotUseTransaction) if (transaction == null)
{ {
return collection.InsertOneAsync(store, insertOptions, cancel); return collection.InsertOneAsync(store, insertOptions, cancel);
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System.Diagnostics; using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver; using MongoDB.Driver;
// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
...@@ -70,7 +71,8 @@ namespace DotNetCore.CAP ...@@ -70,7 +71,8 @@ namespace DotNetCore.CAP
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
var clientSessionHandle = client.StartSession(); var clientSessionHandle = client.StartSession();
var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
var capTrans = publisher.Transaction.Value.Begin(clientSessionHandle, autoCommit);
return new CapMongoDbClientSessionHandle(capTrans); return new CapMongoDbClientSessionHandle(capTrans);
} }
} }
......
...@@ -29,10 +29,11 @@ namespace DotNetCore.CAP.MySql ...@@ -29,10 +29,11 @@ namespace DotNetCore.CAP.MySql
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override async Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
if (NotUseTransaction) if (transaction == null)
{ {
using (var connection = new MySqlConnection(_options.ConnectionString)) using (var connection = new MySqlConnection(_options.ConnectionString))
{ {
......
...@@ -3,15 +3,18 @@ ...@@ -3,15 +3,18 @@
using System.Data; using System.Data;
using System.Diagnostics; using System.Diagnostics;
using System.Threading;
using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
public class MySqlCapTransaction : CapTransactionBase public class MySqlCapTransaction : CapTransactionBase
{ {
public MySqlCapTransaction(IDispatcher dispatcher) : base(dispatcher) public MySqlCapTransaction(
IDispatcher dispatcher) : base(dispatcher)
{ {
} }
...@@ -28,7 +31,6 @@ namespace DotNetCore.CAP ...@@ -28,7 +31,6 @@ namespace DotNetCore.CAP
dbContextTransaction.Commit(); dbContextTransaction.Commit();
break; break;
} }
Flush(); Flush();
} }
...@@ -85,7 +87,8 @@ namespace DotNetCore.CAP ...@@ -85,7 +87,8 @@ namespace DotNetCore.CAP
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
var trans = database.BeginTransaction(); var trans = database.BeginTransaction();
var capTrans = publisher.Transaction.Begin(trans, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans); return new CapEFDbTransaction(capTrans);
} }
...@@ -105,7 +108,8 @@ namespace DotNetCore.CAP ...@@ -105,7 +108,8 @@ namespace DotNetCore.CAP
} }
var dbTransaction = dbConnection.BeginTransaction(); var dbTransaction = dbConnection.BeginTransaction();
return publisher.Transaction.Begin(dbTransaction, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
} }
} }
} }
\ No newline at end of file
...@@ -29,10 +29,10 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -29,10 +29,10 @@ namespace DotNetCore.CAP.PostgreSql
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
if (NotUseTransaction) if (transaction == null)
{ {
using (var connection = InitDbConnection()) using (var connection = InitDbConnection())
{ {
......
...@@ -5,6 +5,7 @@ using System.Data; ...@@ -5,6 +5,7 @@ using System.Data;
using System.Diagnostics; using System.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
namespace DotNetCore.CAP namespace DotNetCore.CAP
...@@ -90,7 +91,8 @@ namespace DotNetCore.CAP ...@@ -90,7 +91,8 @@ namespace DotNetCore.CAP
} }
var dbTransaction = dbConnection.BeginTransaction(); var dbTransaction = dbConnection.BeginTransaction();
return publisher.Transaction.Begin(dbTransaction, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
} }
/// <summary> /// <summary>
...@@ -99,12 +101,13 @@ namespace DotNetCore.CAP ...@@ -99,12 +101,13 @@ namespace DotNetCore.CAP
/// <param name="database">The <see cref="DatabaseFacade" />.</param> /// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param> /// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param> /// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF dbcontext transaction object.</returns> /// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
var trans = database.BeginTransaction(); var trans = database.BeginTransaction();
var capTrans = publisher.Transaction.Begin(trans, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans); return new CapEFDbTransaction(capTrans);
} }
} }
......
...@@ -29,10 +29,10 @@ namespace DotNetCore.CAP.SqlServer ...@@ -29,10 +29,10 @@ namespace DotNetCore.CAP.SqlServer
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
if (NotUseTransaction) if (transaction == null)
{ {
using (var connection = new SqlConnection(_options.ConnectionString)) using (var connection = new SqlConnection(_options.ConnectionString))
{ {
......
...@@ -150,7 +150,8 @@ namespace DotNetCore.CAP ...@@ -150,7 +150,8 @@ namespace DotNetCore.CAP
} }
var dbTransaction = dbConnection.BeginTransaction(); var dbTransaction = dbConnection.BeginTransaction();
var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
return (IDbTransaction)capTransaction.DbTransaction; return (IDbTransaction)capTransaction.DbTransaction;
} }
...@@ -165,7 +166,8 @@ namespace DotNetCore.CAP ...@@ -165,7 +166,8 @@ namespace DotNetCore.CAP
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
var trans = database.BeginTransaction(); var trans = database.BeginTransaction();
var capTrans = publisher.Transaction.Begin(trans, autoCommit); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans); return new CapEFDbTransaction(capTrans);
} }
} }
......
...@@ -7,7 +7,6 @@ using System.Threading; ...@@ -7,7 +7,6 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
...@@ -17,9 +16,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -17,9 +16,7 @@ namespace DotNetCore.CAP.Abstractions
{ {
private readonly IMessagePacker _msgPacker; private readonly IMessagePacker _msgPacker;
private readonly IContentSerializer _serializer; private readonly IContentSerializer _serializer;
private CapTransactionBase _transaction; private readonly IDispatcher _dispatcher;
protected bool NotUseTransaction;
// ReSharper disable once InconsistentNaming // ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener = protected static readonly DiagnosticListener s_diagnosticListener =
...@@ -28,13 +25,15 @@ namespace DotNetCore.CAP.Abstractions ...@@ -28,13 +25,15 @@ namespace DotNetCore.CAP.Abstractions
protected CapPublisherBase(IServiceProvider service) protected CapPublisherBase(IServiceProvider service)
{ {
ServiceProvider = service; ServiceProvider = service;
_dispatcher = service.GetRequiredService<IDispatcher>();
_msgPacker = service.GetRequiredService<IMessagePacker>(); _msgPacker = service.GetRequiredService<IMessagePacker>();
_serializer = service.GetRequiredService<IContentSerializer>(); _serializer = service.GetRequiredService<IContentSerializer>();
Transaction = new AsyncLocal<ICapTransaction>();
} }
protected IServiceProvider ServiceProvider { get; } public IServiceProvider ServiceProvider { get; }
public ICapTransaction Transaction => _transaction ?? (_transaction = ServiceProvider.GetRequiredService<CapTransactionBase>()); public AsyncLocal<ICapTransaction> Transaction { get; }
public void Publish<T>(string name, T contentObj, string callbackName = null) public void Publish<T>(string name, T contentObj, string callbackName = null)
{ {
...@@ -65,27 +64,33 @@ namespace DotNetCore.CAP.Abstractions ...@@ -65,27 +64,33 @@ namespace DotNetCore.CAP.Abstractions
protected async Task PublishAsyncInternal(CapPublishedMessage message) protected async Task PublishAsyncInternal(CapPublishedMessage message)
{ {
if (Transaction.DbTransaction == null) var operationId = default(Guid);
{
NotUseTransaction = true;
Transaction.DbTransaction = new NoopTransaction();
}
Guid operationId = default(Guid);
try try
{ {
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
await ExecuteAsync(message, Transaction); if (Transaction.Value?.DbTransaction == null)
{
await ExecuteAsync(message);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
_dispatcher.EnqueueToPublish(message);
}
else
{
var transaction = (CapTransactionBase)Transaction.Value;
_transaction.AddToSent(message); await ExecuteAsync(message, transaction);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
if (NotUseTransaction || Transaction.AutoCommit) transaction.AddToSent(message);
if (transaction.AutoCommit)
{ {
_transaction.Commit(); transaction.Commit();
}
} }
} }
catch (Exception e) catch (Exception e)
...@@ -94,17 +99,10 @@ namespace DotNetCore.CAP.Abstractions ...@@ -94,17 +99,10 @@ namespace DotNetCore.CAP.Abstractions
throw; throw;
} }
finally
{
if (NotUseTransaction || Transaction.AutoCommit)
{
_transaction?.Dispose();
}
}
} }
protected abstract Task ExecuteAsync(CapPublishedMessage message, protected abstract Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction, ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken)); CancellationToken cancel = default(CancellationToken));
protected virtual string Serialize<T>(T obj, string callbackName = null) protected virtual string Serialize<T>(T obj, string callbackName = null)
......
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -11,10 +12,12 @@ namespace DotNetCore.CAP ...@@ -11,10 +12,12 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public interface ICapPublisher public interface ICapPublisher
{ {
IServiceProvider ServiceProvider { get; }
/// <summary> /// <summary>
/// CAP transaction context object /// CAP transaction context object
/// </summary> /// </summary>
ICapTransaction Transaction { get; } AsyncLocal<ICapTransaction> Transaction { get; }
/// <summary> /// <summary>
/// Asynchronous publish an object message. /// Asynchronous publish an object message.
......
...@@ -120,7 +120,9 @@ namespace DotNetCore.CAP.Test ...@@ -120,7 +120,9 @@ namespace DotNetCore.CAP.Test
private class MyProducerService : ICapPublisher private class MyProducerService : ICapPublisher
{ {
public ICapTransaction Transaction { get; } public IServiceProvider ServiceProvider { get; }
public AsyncLocal<ICapTransaction> Transaction { get; }
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null, public Task PublishAsync<T>(string name, T contentObj, string callbackName = null,
CancellationToken cancellationToken = default(CancellationToken)) CancellationToken cancellationToken = default(CancellationToken))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment