Commit 83aaab26 authored by Savorboard's avatar Savorboard

optimize the RabbitMQ connection pool

parent 52dd95ff
...@@ -23,7 +23,7 @@ namespace DotNetCore.CAP ...@@ -23,7 +23,7 @@ namespace DotNetCore.CAP
services.AddSingleton(options); services.AddSingleton(options);
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<ConnectionPool>(); services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>(); services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>(); services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
} }
......
...@@ -2,38 +2,50 @@ ...@@ -2,38 +2,50 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Threading; using System.Threading;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client; using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ namespace DotNetCore.CAP.RabbitMQ
{ {
public class ConnectionPool : IConnectionPool, IDisposable public class ConnectionChannelPool : IConnectionChannelPool, IDisposable
{ {
private const int DefaultPoolSize = 15; private const int DefaultPoolSize = 15;
private readonly Func<IConnection> _connectionActivator;
private readonly ILogger<ConnectionChannelPool> _logger;
private readonly ConcurrentQueue<IModel> _pool = new ConcurrentQueue<IModel>();
private IConnection _connection;
private readonly Func<IConnection> _activator;
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();
private int _count; private int _count;
private int _maxSize; private int _maxSize;
public ConnectionPool(RabbitMQOptions options) public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger,
RabbitMQOptions options)
{ {
_logger = logger;
_maxSize = DefaultPoolSize; _maxSize = DefaultPoolSize;
_activator = CreateActivator(options); _connectionActivator = CreateConnection(options);
} }
IConnection IConnectionPool.Rent() IModel IConnectionChannelPool.Rent()
{ {
return Rent(); return Rent();
} }
bool IConnectionPool.Return(IConnection connection) bool IConnectionChannelPool.Return(IModel connection)
{ {
return Return(connection); return Return(connection);
} }
public IConnection GetConnection()
{
if (_connection != null && _connection.IsOpen)
return _connection;
_connection = _connectionActivator();
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
return _connection;
}
public void Dispose() public void Dispose()
{ {
_maxSize = 0; _maxSize = 0;
...@@ -42,7 +54,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -42,7 +54,7 @@ namespace DotNetCore.CAP.RabbitMQ
context.Dispose(); context.Dispose();
} }
private static Func<IConnection> CreateActivator(RabbitMQOptions options) private static Func<IConnection> CreateConnection(RabbitMQOptions options)
{ {
var factory = new ConnectionFactory var factory = new ConnectionFactory
{ {
...@@ -59,23 +71,28 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -59,23 +71,28 @@ namespace DotNetCore.CAP.RabbitMQ
return () => factory.CreateConnection(); return () => factory.CreateConnection();
} }
public virtual IConnection Rent() private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
_logger.LogWarning($"RabbitMQ client connection closed! {e}");
}
public virtual IModel Rent()
{ {
if (_pool.TryDequeue(out var connection)) if (_pool.TryDequeue(out var model))
{ {
Interlocked.Decrement(ref _count); Interlocked.Decrement(ref _count);
Debug.Assert(_count >= 0); Debug.Assert(_count >= 0);
return connection; return model;
} }
connection = _activator(); model = GetConnection().CreateModel();
return connection; return model;
} }
public virtual bool Return(IConnection connection) public virtual bool Return(IModel connection)
{ {
if (Interlocked.Increment(ref _count) <= _maxSize) if (Interlocked.Increment(ref _count) <= _maxSize)
{ {
......
...@@ -2,10 +2,12 @@ ...@@ -2,10 +2,12 @@
namespace DotNetCore.CAP.RabbitMQ namespace DotNetCore.CAP.RabbitMQ
{ {
public interface IConnectionPool public interface IConnectionChannelPool
{ {
IConnection Rent(); IConnection GetConnection();
bool Return(IConnection context); IModel Rent();
bool Return(IModel context);
} }
} }
\ No newline at end of file
...@@ -9,41 +9,34 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -9,41 +9,34 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{ {
private readonly ConnectionPool _connectionPool; private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor( public PublishQueueExecutor(ILogger<PublishQueueExecutor> logger, CapOptions options,
CapOptions options, RabbitMQOptions rabbitMQOptions, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
IStateChanger stateChanger,
ConnectionPool connectionPool,
RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger) : base(options, stateChanger, logger)
{ {
_logger = logger; _logger = logger;
_connectionPool = connectionPool; _connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = rabbitMQOptions; _rabbitMQOptions = rabbitMQOptions;
} }
public override Task<OperateResult> PublishAsync(string keyName, string content) public override Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var connection = _connectionPool.Rent(); var channel = _connectionChannelPool.Rent();
try try
{ {
using (var channel = connection.CreateModel()) var body = Encoding.UTF8.GetBytes(content);
{
var body = Encoding.UTF8.GetBytes(content); channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName,
keyName,
null,
body);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName,
keyName,
null,
body);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
}
return Task.FromResult(OperateResult.Success); return Task.FromResult(OperateResult.Success);
} }
catch (Exception ex) catch (Exception ex)
...@@ -60,7 +53,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -60,7 +53,7 @@ namespace DotNetCore.CAP.RabbitMQ
} }
finally finally
{ {
_connectionPool.Return(connection); _connectionChannelPool.Return(channel);
} }
} }
} }
......
...@@ -10,7 +10,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -10,7 +10,7 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
internal sealed class RabbitMQConsumerClient : IConsumerClient internal sealed class RabbitMQConsumerClient : IConsumerClient
{ {
private readonly ConnectionPool _connectionPool; private readonly IConnectionChannelPool _connectionChannelPool;
private readonly string _exchageName; private readonly string _exchageName;
private readonly string _queueName; private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
...@@ -19,11 +19,11 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -19,11 +19,11 @@ namespace DotNetCore.CAP.RabbitMQ
private ulong _deliveryTag; private ulong _deliveryTag;
public RabbitMQConsumerClient(string queueName, public RabbitMQConsumerClient(string queueName,
ConnectionPool connectionPool, IConnectionChannelPool connectionChannelPool,
RabbitMQOptions options) RabbitMQOptions options)
{ {
_queueName = queueName; _queueName = queueName;
_connectionPool = connectionPool; _connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = options; _rabbitMQOptions = options;
_exchageName = options.TopicExchangeName; _exchageName = options.TopicExchangeName;
...@@ -69,7 +69,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -69,7 +69,7 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient() private void InitClient()
{ {
var connection = _connectionPool.Rent(); var connection = _connectionChannelPool.GetConnection();
_channel = connection.CreateModel(); _channel = connection.CreateModel();
...@@ -82,8 +82,6 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -82,8 +82,6 @@ namespace DotNetCore.CAP.RabbitMQ
{ "x-message-ttl", _rabbitMQOptions.QueueMessageExpires } { "x-message-ttl", _rabbitMQOptions.QueueMessageExpires }
}; };
_channel.QueueDeclare(_queueName, true, false, false, arguments); _channel.QueueDeclare(_queueName, true, false, false, arguments);
_connectionPool.Return(connection);
} }
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
......
...@@ -2,19 +2,19 @@ ...@@ -2,19 +2,19 @@
{ {
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{ {
private readonly ConnectionPool _connectionPool; private readonly IConnectionChannelPool _connectionChannelPool;
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnectionChannelPool channelPool)
{ {
_rabbitMQOptions = rabbitMQOptions; _rabbitMQOptions = rabbitMQOptions;
_connectionPool = pool; _connectionChannelPool = channelPool;
} }
public IConsumerClient Create(string groupId) public IConsumerClient Create(string groupId)
{ {
return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions); return new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions);
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment