Commit 975fcb7d authored by Savorboard's avatar Savorboard

fixed not return connection to pool bug.

parent d646c518
...@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
public class ConnectionPool : IConnectionPool, IDisposable public class ConnectionPool : IConnectionPool, IDisposable
{ {
private const int DefaultPoolSize = 32; private const int DefaultPoolSize = 15;
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>(); private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();
......
...@@ -28,9 +28,10 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -28,9 +28,10 @@ namespace DotNetCore.CAP.RabbitMQ
public override Task<OperateResult> PublishAsync(string keyName, string content) public override Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var connection = _connectionPool.Rent();
try try
{ {
var connection = _connectionPool.Rent();
using (var channel = connection.CreateModel()) using (var channel = connection.CreateModel())
{ {
var body = Encoding.UTF8.GetBytes(content); var body = Encoding.UTF8.GetBytes(content);
...@@ -56,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -56,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ
Description = ex.Message Description = ex.Message
})); }));
} }
finally
{
_connectionPool.Return(connection);
}
} }
} }
} }
\ No newline at end of file
...@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName; private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
private IConnection _connection; private ConnectionPool _connectionPool;
private IModel _channel; private IModel _channel;
private ulong _deliveryTag; private ulong _deliveryTag;
...@@ -23,11 +23,11 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -23,11 +23,11 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<string> OnError; public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName, public RabbitMQConsumerClient(string queueName,
IConnection connection, ConnectionPool connectionPool,
RabbitMQOptions options) RabbitMQOptions options)
{ {
_queueName = queueName; _queueName = queueName;
_connection = connection; _connectionPool = connectionPool;
_rabbitMQOptions = options; _rabbitMQOptions = options;
_exchageName = options.TopicExchangeName; _exchageName = options.TopicExchangeName;
...@@ -36,7 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -36,7 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient() private void InitClient()
{ {
_channel = _connection.CreateModel(); var connection = _connectionPool.Rent();
_channel = connection.CreateModel();
_channel.ExchangeDeclare( _channel.ExchangeDeclare(
exchange: _exchageName, exchange: _exchageName,
...@@ -49,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -49,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ
exclusive: false, exclusive: false,
autoDelete: false, autoDelete: false,
arguments: arguments); arguments: arguments);
_connectionPool.Return(connection);
} }
public void Subscribe(IEnumerable<string> topics) public void Subscribe(IEnumerable<string> topics)
...@@ -81,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -81,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ
public void Dispose() public void Dispose()
{ {
_channel.Dispose(); _channel.Dispose();
_connection.Dispose();
} }
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
......
...@@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{ {
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
private readonly IConnection _connection; private readonly ConnectionPool _connectionPool;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool)
{ {
_rabbitMQOptions = rabbitMQOptions; _rabbitMQOptions = rabbitMQOptions;
_connection = pool.Rent(); _connectionPool = pool;
} }
public IConsumerClient Create(string groupId) public IConsumerClient Create(string groupId)
{ {
return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions); return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions);
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment