Commit 4a799569 authored by Savorboard's avatar Savorboard

Fixed thread safety issues of publisher. #331

parent 385dcf56
using System.Collections.Generic; using System.Collections.Concurrent;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
namespace DotNetCore.CAP namespace DotNetCore.CAP
...@@ -7,12 +7,12 @@ namespace DotNetCore.CAP ...@@ -7,12 +7,12 @@ namespace DotNetCore.CAP
{ {
private readonly IDispatcher _dispatcher; private readonly IDispatcher _dispatcher;
private readonly IList<CapPublishedMessage> _bufferList; private readonly ConcurrentQueue<CapPublishedMessage> _bufferList;
protected CapTransactionBase(IDispatcher dispatcher) protected CapTransactionBase(IDispatcher dispatcher)
{ {
_dispatcher = dispatcher; _dispatcher = dispatcher;
_bufferList = new List<CapPublishedMessage>(1); _bufferList = new ConcurrentQueue<CapPublishedMessage>();
} }
public bool AutoCommit { get; set; } public bool AutoCommit { get; set; }
...@@ -21,17 +21,16 @@ namespace DotNetCore.CAP ...@@ -21,17 +21,16 @@ namespace DotNetCore.CAP
protected internal virtual void AddToSent(CapPublishedMessage msg) protected internal virtual void AddToSent(CapPublishedMessage msg)
{ {
_bufferList.Add(msg); _bufferList.Enqueue(msg);
} }
protected virtual void Flush() protected virtual void Flush()
{ {
foreach (var message in _bufferList) while (!_bufferList.IsEmpty)
{ {
_bufferList.TryDequeue(out var message);
_dispatcher.EnqueueToPublish(message); _dispatcher.EnqueueToPublish(message);
} }
_bufferList.Clear();
} }
public abstract void Commit(); public abstract void Commit();
......
...@@ -44,23 +44,20 @@ namespace DotNetCore.CAP ...@@ -44,23 +44,20 @@ namespace DotNetCore.CAP
public async Task<OperateResult> SendAsync(CapPublishedMessage message) public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{ {
return await Task.Run(async () => bool retry;
OperateResult result;
do
{ {
bool retry; var executedResult = await SendWithoutRetryAsync(message);
OperateResult result; result = executedResult.Item2;
do if (result == OperateResult.Success)
{ {
var executedResult = await SendWithoutRetryAsync(message); return result;
result = executedResult.Item2; }
if (result == OperateResult.Success) retry = executedResult.Item1;
{ } while (retry);
return result;
}
retry = executedResult.Item1;
} while (retry);
return result; return result;
});
} }
private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message) private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment