Commit a3807b6a authored by Savorboard's avatar Savorboard

when storage a received message raising an eception, we will reject the message to queue.

parent eb5971bb
......@@ -45,6 +45,7 @@ namespace DotNetCore.CAP.Kafka
cancellationToken.ThrowIfCancellationRequested();
_consumerClient.Poll(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
public void Commit()
......@@ -52,6 +53,11 @@ namespace DotNetCore.CAP.Kafka
_consumerClient.CommitAsync();
}
public void Reject()
{
// Ignore, Kafka will not commit offset when not commit.
}
public void Dispose()
{
_consumerClient.Dispose();
......@@ -65,11 +71,16 @@ namespace DotNetCore.CAP.Kafka
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
_consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
}
private void ConsumerClient_OnConsumeError(object sender, Message e)
{
OnError?.Invoke(sender, $"Consumer client raised an error. Topic:{e.Topic}, Reason:{e.Error}");
}
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
{
var message = new MessageContext
......@@ -84,7 +95,7 @@ namespace DotNetCore.CAP.Kafka
private void ConsumerClient_OnError(object sender, Error e)
{
OnError?.Invoke(sender, e.Reason);
OnError?.Invoke(sender, e.ToString());
}
#endregion private methods
......
......@@ -57,6 +57,11 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.BasicAck(_deliveryTag, false);
}
public void Reject()
{
_channel.BasicReject(_deliveryTag, true);
}
public void Dispose()
{
_channel.Dispose();
......@@ -73,7 +78,7 @@ namespace DotNetCore.CAP.RabbitMQ
RabbitMQOptions.ExchangeType,
true);
var arguments = new Dictionary<string, object> {{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}};
var arguments = new Dictionary<string, object> { { "x-message-ttl", _rabbitMQOptions.QueueMessageExpires } };
_channel.QueueDeclare(_queueName,
true,
false,
......
......@@ -15,6 +15,8 @@ namespace DotNetCore.CAP
void Commit();
void Reject();
event EventHandler<MessageContext> OnMessageReceived;
event EventHandler<string> OnError;
......
......@@ -90,11 +90,18 @@ namespace DotNetCore.CAP
_logger.EnqueuingReceivedMessage(message.Name, message.Content);
using (var scope = _serviceProvider.CreateScope())
{
try
{
StoreMessage(scope, message);
client.Commit();
}
catch (Exception e)
{
_logger.LogError(e, "Raised an exception when storage received message。 Message:{0}", message);
client.Reject();
}
}
Pulse();
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment