Commit d8fceb69 authored by Savorboard's avatar Savorboard

Replace BlockingCollection with Channel to improve performance. #492

parent 96819388
...@@ -13,7 +13,8 @@ ...@@ -13,7 +13,8 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" /> <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" /> <PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.6.0" /> <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.7.0" />
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -2,14 +2,16 @@ ...@@ -2,14 +2,16 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using System.Collections.Concurrent; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Transport; using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
...@@ -20,32 +22,35 @@ namespace DotNetCore.CAP.Processor ...@@ -20,32 +22,35 @@ namespace DotNetCore.CAP.Processor
private readonly ISubscribeDispatcher _executor; private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger; private readonly ILogger<Dispatcher> _logger;
private readonly BlockingCollection<MediumMessage> _publishedMessageQueue = private readonly Channel<MediumMessage> _publishedChannel;
new BlockingCollection<MediumMessage>(new ConcurrentQueue<MediumMessage>()); private readonly Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;
private readonly BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)> _receivedMessageQueue =
new BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)>(new ConcurrentQueue<(MediumMessage, ConsumerExecutorDescriptor)>());
public Dispatcher(ILogger<Dispatcher> logger, public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender, IMessageSender sender,
IOptions<CapOptions> options,
ISubscribeDispatcher executor) ISubscribeDispatcher executor)
{ {
_logger = logger; _logger = logger;
_sender = sender; _sender = sender;
_executor = executor; _executor = executor;
_publishedChannel = Channel.CreateUnbounded<MediumMessage>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true });
_receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>();
Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
Task.WhenAll(Enumerable.Range(0, options.Value.ConsumerThreadCount)
.Select(_ => Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
} }
public void EnqueueToPublish(MediumMessage message) public void EnqueueToPublish(MediumMessage message)
{ {
_publishedMessageQueue.Add(message); _publishedChannel.Writer.TryWrite(message);
} }
public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor) public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor)
{ {
_receivedMessageQueue.Add((message, descriptor)); _receivedChannel.Writer.TryWrite((message, descriptor));
} }
public void Dispose() public void Dispose()
...@@ -57,21 +62,23 @@ namespace DotNetCore.CAP.Processor ...@@ -57,21 +62,23 @@ namespace DotNetCore.CAP.Processor
{ {
try try
{ {
while (!_publishedMessageQueue.IsCompleted) while (await _publishedChannel.Reader.WaitToReadAsync(_cts.Token))
{ {
if (_publishedMessageQueue.TryTake(out var message, 3000, _cts.Token)) while (_publishedChannel.Reader.TryRead(out var message))
{ {
try try
{ {
var result = await _sender.SendAsync(message); var result = await _sender.SendAsync(message);
if (!result.Succeeded) if (!result.Succeeded)
{ {
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); _logger.MessagePublishException(message.Origin.GetId(), result.ToString(),
result.Exception);
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Id:{message.DbId}"); _logger.LogError(ex,
$"An exception occurred when sending a message to the MQ. Id:{message.DbId}");
} }
} }
} }
...@@ -86,9 +93,12 @@ namespace DotNetCore.CAP.Processor ...@@ -86,9 +93,12 @@ namespace DotNetCore.CAP.Processor
{ {
try try
{ {
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) while (await _receivedChannel.Reader.WaitToReadAsync(_cts.Token))
{ {
await _executor.DispatchAsync(message.Item1, message.Item2, _cts.Token); while (_receivedChannel.Reader.TryRead(out var message))
{
await _executor.DispatchAsync(message.Item1, message.Item2, _cts.Token);
}
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment