Commit 31b0c8c7 authored by yangxiaodong's avatar yangxiaodong

add MQ exceptions message to log.

parent 41b983cf
...@@ -12,7 +12,9 @@ namespace DotNetCore.CAP.Kafka ...@@ -12,7 +12,9 @@ namespace DotNetCore.CAP.Kafka
private readonly KafkaOptions _kafkaOptions; private readonly KafkaOptions _kafkaOptions;
private Consumer<Null, string> _consumerClient; private Consumer<Null, string> _consumerClient;
public event EventHandler<MessageContext> MessageReceieved; public event EventHandler<MessageContext> OnMessageReceieved;
public event EventHandler<string> OnError;
public IDeserializer<string> StringDeserializer { get; set; } public IDeserializer<string> StringDeserializer { get; set; }
...@@ -67,6 +69,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -67,6 +69,7 @@ namespace DotNetCore.CAP.Kafka
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer); _consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage; _consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
} }
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
...@@ -77,7 +80,12 @@ namespace DotNetCore.CAP.Kafka ...@@ -77,7 +80,12 @@ namespace DotNetCore.CAP.Kafka
Name = e.Topic, Name = e.Topic,
Content = e.Value Content = e.Value
}; };
MessageReceieved?.Invoke(sender, message); OnMessageReceieved?.Invoke(sender, message);
}
private void ConsumerClient_OnError(object sender, Error e)
{
OnError?.Invoke(sender, e.Reason);
} }
#endregion private methods #endregion private methods
......
...@@ -18,7 +18,8 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -18,7 +18,8 @@ namespace DotNetCore.CAP.RabbitMQ
private IModel _channel; private IModel _channel;
private ulong _deliveryTag; private ulong _deliveryTag;
public event EventHandler<MessageContext> MessageReceieved; public event EventHandler<MessageContext> OnMessageReceieved;
public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName, RabbitMQOptions options) public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
{ {
...@@ -53,6 +54,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -53,6 +54,7 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
var consumer = new EventingBasicConsumer(_channel); var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived; consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown;
_channel.BasicConsume(_queueName, false, consumer); _channel.BasicConsume(_queueName, false, consumer);
while (true) while (true)
{ {
...@@ -90,7 +92,12 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -90,7 +92,12 @@ namespace DotNetCore.CAP.RabbitMQ
Name = e.RoutingKey, Name = e.RoutingKey,
Content = Encoding.UTF8.GetString(e.Body) Content = Encoding.UTF8.GetString(e.Body)
}; };
MessageReceieved?.Invoke(sender, message); OnMessageReceieved?.Invoke(sender, message);
}
private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
{
OnError?.Invoke(sender, e.Cause?.ToString());
} }
} }
} }
\ No newline at end of file
using System; using System;
using System.Threading; using System.Threading;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
...@@ -17,6 +16,8 @@ namespace DotNetCore.CAP ...@@ -17,6 +16,8 @@ namespace DotNetCore.CAP
void Commit(); void Commit();
event EventHandler<MessageContext> MessageReceieved; event EventHandler<MessageContext> OnMessageReceieved;
event EventHandler<string> OnError;
} }
} }
\ No newline at end of file
...@@ -95,7 +95,7 @@ namespace DotNetCore.CAP ...@@ -95,7 +95,7 @@ namespace DotNetCore.CAP
private void RegisterMessageProcessor(IConsumerClient client) private void RegisterMessageProcessor(IConsumerClient client)
{ {
client.MessageReceieved += (sender, message) => client.OnMessageReceieved += (sender, message) =>
{ {
_logger.EnqueuingReceivedMessage(message.Name, message.Content); _logger.EnqueuingReceivedMessage(message.Name, message.Content);
...@@ -106,6 +106,11 @@ namespace DotNetCore.CAP ...@@ -106,6 +106,11 @@ namespace DotNetCore.CAP
} }
Pulse(); Pulse();
}; };
client.OnError += (sender, reason) =>
{
_logger.LogError(reason);
};
} }
private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext) private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment