Commit d6f72d1d authored by yangxiaodong's avatar yangxiaodong

refactor with new connection pool.

parent 9740e535
......@@ -3,7 +3,6 @@ using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
......@@ -11,35 +10,25 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly IConnection _connection;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
RabbitMQOptions options,
IConnection connection,
RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_rabbitMQOptions = options;
_connection = connection;
_rabbitMQOptions = rabbitMQOptions;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
using (var channel = _connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
......
......@@ -14,7 +14,6 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IConnectionFactory _connectionFactory;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
......@@ -23,9 +22,12 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
public RabbitMQConsumerClient(string queueName,
IConnection connection,
RabbitMQOptions options)
{
_queueName = queueName;
_connection = connection;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;
......@@ -34,19 +36,6 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient()
{
_connectionFactory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
......
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly IConnection _connection;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions)
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection)
{
_rabbitMQOptions = rabbitMQOptions;
_connection = connection;
}
public IConsumerClient Create(string groupId)
{
return new RabbitMQConsumerClient(groupId, _rabbitMQOptions);
return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions);
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment