Commit ddc99e4e authored by Savorboard's avatar Savorboard

Modify the consumption side, use an ack mechanism.

parent 22c4ad55
......@@ -43,6 +43,7 @@ namespace DotNetCore.CAP.Kafka
{
MainConfig.Add("bootstrap.servers", Servers);
}
MainConfig["enable.auto.commit"] = "false";
return MainConfig.AsEnumerable();
}
}
......
......@@ -46,6 +46,11 @@ namespace DotNetCore.CAP.Kafka
}
}
public void Commit()
{
_consumerClient.CommitAsync();
}
public void Dispose()
{
_consumerClient.Dispose();
......@@ -74,6 +79,7 @@ namespace DotNetCore.CAP.Kafka
MessageReceieved?.Invoke(sender, message);
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -16,6 +16,7 @@ namespace DotNetCore.CAP.RabbitMQ
private IConnectionFactory _connectionFactory;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
public event EventHandler<MessageContext> MessageReceieved;
......@@ -52,7 +53,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
_channel.BasicConsume(_queueName, true, consumer);
_channel.BasicConsume(_queueName, false, consumer);
while (true)
{
Task.Delay(timeout);
......@@ -69,6 +70,11 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.QueueBind(_queueName, _exchageName, topic);
}
public void Commit()
{
_channel.BasicAck(_deliveryTag, false);
}
public void Dispose()
{
_channel.Dispose();
......@@ -77,6 +83,7 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
_deliveryTag = e.DeliveryTag;
var message = new MessageContext
{
Group = _queueName,
......
......@@ -14,6 +14,8 @@ namespace DotNetCore.CAP
void Listening(TimeSpan timeout);
void Commit();
event EventHandler<MessageContext> MessageReceieved;
}
}
\ No newline at end of file
......@@ -23,7 +23,7 @@ namespace DotNetCore.CAP
private readonly CapOptions _options;
private readonly CancellationTokenSource _cts;
public event EventHandler<MessageContext> MessageReceieved;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private Task _compositeTask;
private bool _disposed;
......@@ -46,11 +46,6 @@ namespace DotNetCore.CAP
_cts = new CancellationTokenSource();
}
protected virtual void OnMessageReceieved(MessageContext message)
{
MessageReceieved?.Invoke(this, message);
}
public void Start()
{
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
......@@ -61,54 +56,20 @@ namespace DotNetCore.CAP
{
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
client.MessageReceieved += OnMessageReceieved;
RegisterMessageProcessor(client);
foreach (var item in matchGroup.Value)
{
client.Subscribe(item.Attribute.Name);
}
client.Listening(TimeSpan.FromSeconds(1));
client.Listening(_pollingDelay);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
_compositeTask = Task.CompletedTask;
}
public virtual void OnMessageReceieved(object sender, MessageContext message)
{
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content);
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var capMessage = new CapReceivedMessage(message)
{
StatusName = StatusName.Enqueued,
};
messageStore.StoreReceivedMessageAsync(capMessage).Wait();
try
{
var executeDescriptorGroup = _selector.GetTopicExector(message.KeyName);
if (executeDescriptorGroup.ContainsKey(message.Group))
{
messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Processing).Wait();
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[message.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, message);
_consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait();
}
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed($"Group:{message.Group}, Topic:{message.KeyName}", ex);
}
}
}
public void Dispose()
{
if (_disposed)
......@@ -133,5 +94,61 @@ namespace DotNetCore.CAP
}
}
}
private void RegisterMessageProcessor(IConsumerClient client)
{
client.MessageReceieved += (sender, message) =>
{
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content);
using (var scope = _serviceProvider.CreateScope())
{
var receviedMessage = StoreMessage(scope, message);
client.Commit();
ProcessMessage(scope, receviedMessage);
}
};
}
private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
{
var provider = serviceScope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var receivedMessage = new CapReceivedMessage(messageContext)
{
StatusName = StatusName.Enqueued,
};
messageStore.StoreReceivedMessageAsync(receivedMessage).Wait();
return receivedMessage;
}
private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
{
var provider = serviceScope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
try
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);
if (executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait();
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
_consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait();
}
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex);
}
}
}
}
\ No newline at end of file
......@@ -42,8 +42,9 @@ namespace DotNetCore.CAP.Job
public void Start()
{
var processorCount = Environment.ProcessorCount;
//processorCount = 1;
_processors = GetProcessors(processorCount);
_logger.ServerStarting(processorCount, 1);
_logger.ServerStarting(processorCount, processorCount);
_context = new ProcessingContext(
_provider,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment