Commit 0c73f8de authored by Savorboard's avatar Savorboard

refactor MQ log message

parent 365fbe6b
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Events; using RabbitMQ.Client.Events;
...@@ -32,7 +31,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -32,7 +31,7 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<MessageContext> OnMessageReceived; public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<string> OnError; public event EventHandler<LogMessageEventArgs> OnLog;
public void Subscribe(IEnumerable<string> topics) public void Subscribe(IEnumerable<string> topics)
{ {
...@@ -47,9 +46,17 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -47,9 +46,17 @@ namespace DotNetCore.CAP.RabbitMQ
var consumer = new EventingBasicConsumer(_channel); var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived; consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown; consumer.Shutdown += OnConsumerShutdown;
consumer.Registered += OnConsumerRegistered;
consumer.Unregistered += OnConsumerUnregistered;
consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_channel.BasicConsume(_queueName, false, consumer); _channel.BasicConsume(_queueName, false, consumer);
while (true) while (true)
Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult(); {
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
} }
public void Commit() public void Commit()
...@@ -69,9 +76,9 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -69,9 +76,9 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient() private void InitClient()
{ {
var connection = _connectionChannelPool.GetConnection(); var _connection = _connectionChannelPool.GetConnection();
_channel = connection.CreateModel(); _channel = _connection.CreateModel();
_channel.ExchangeDeclare( _channel.ExchangeDeclare(
_exchageName, _exchageName,
...@@ -84,6 +91,38 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -84,6 +91,38 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.QueueDeclare(_queueName, true, false, false, arguments); _channel.QueueDeclare(_queueName, true, false, false, arguments);
} }
#region events
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerCancelled,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
}
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerUnregistered,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
}
private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerRegistered,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
}
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{ {
_deliveryTag = e.DeliveryTag; _deliveryTag = e.DeliveryTag;
...@@ -98,7 +137,14 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -98,7 +137,14 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
{ {
OnError?.Invoke(sender, e.Cause?.ToString()); var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerShutdown,
Reason = e.ToString()
};
OnLog?.Invoke(sender, args);
} }
#endregion
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment