Commit cd25f372 authored by yangxiaodong's avatar yangxiaodong

support cancellation token of consumer handler.

parent 475ace0e
using System; using System;
using System.Text; using System.Text;
using System.Threading;
using Confluent.Kafka; using Confluent.Kafka;
using Confluent.Kafka.Serialization; using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
...@@ -38,10 +39,11 @@ namespace DotNetCore.CAP.Kafka ...@@ -38,10 +39,11 @@ namespace DotNetCore.CAP.Kafka
_consumerClient.Subscribe(topicName); _consumerClient.Subscribe(topicName);
} }
public void Listening(TimeSpan timeout) public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{ {
while (true) while (true)
{ {
cancellationToken.ThrowIfCancellationRequested();
_consumerClient.Poll(timeout); _consumerClient.Poll(timeout);
} }
} }
......
using System; using System;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using RabbitMQ.Client; using RabbitMQ.Client;
...@@ -49,14 +50,14 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -49,14 +50,14 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.QueueDeclare(_queueName, exclusive: false); _channel.QueueDeclare(_queueName, exclusive: false);
} }
public void Listening(TimeSpan timeout) public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{ {
var consumer = new EventingBasicConsumer(_channel); var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived; consumer.Received += OnConsumerReceived;
_channel.BasicConsume(_queueName, false, consumer); _channel.BasicConsume(_queueName, false, consumer);
while (true) while (true)
{ {
Task.Delay(timeout).Wait(); Task.Delay(timeout, cancellationToken).Wait();
} }
} }
......
...@@ -68,8 +68,6 @@ namespace DotNetCore.CAP ...@@ -68,8 +68,6 @@ namespace DotNetCore.CAP
if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;
if (_cts.IsCancellationRequested) return;
await BootstrapCoreAsync(); await BootstrapCoreAsync();
if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;
......
using System; using System;
using System.Threading;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP namespace DotNetCore.CAP
...@@ -12,7 +13,7 @@ namespace DotNetCore.CAP ...@@ -12,7 +13,7 @@ namespace DotNetCore.CAP
void Subscribe(string topic, int partition); void Subscribe(string topic, int partition);
void Listening(TimeSpan timeout); void Listening(TimeSpan timeout, CancellationToken cancellationToken);
void Commit(); void Commit();
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
...@@ -18,12 +17,11 @@ namespace DotNetCore.CAP ...@@ -18,12 +17,11 @@ namespace DotNetCore.CAP
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerClientFactory _consumerClientFactory; private readonly IConsumerClientFactory _consumerClientFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly CancellationTokenSource _cts;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
...@@ -34,13 +32,12 @@ namespace DotNetCore.CAP ...@@ -34,13 +32,12 @@ namespace DotNetCore.CAP
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory, IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory, IConsumerClientFactory consumerClientFactory,
ILoggerFactory loggerFactory, ILogger<ConsumerHandler> logger,
MethodMatcherCache selector, MethodMatcherCache selector,
IOptions<CapOptions> options) IOptions<CapOptions> options)
{ {
_selector = selector; _selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>(); _logger = logger;
_loggerFactory = loggerFactory;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory; _consumerInvokerFactory = consumerInvokerFactory;
_consumerClientFactory = consumerClientFactory; _consumerClientFactory = consumerClientFactory;
...@@ -65,7 +62,7 @@ namespace DotNetCore.CAP ...@@ -65,7 +62,7 @@ namespace DotNetCore.CAP
client.Subscribe(item.Attribute.Name); client.Subscribe(item.Attribute.Name);
} }
client.Listening(_pollingDelay); client.Listening(_pollingDelay, _cts.Token);
} }
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} }
...@@ -85,7 +82,7 @@ namespace DotNetCore.CAP ...@@ -85,7 +82,7 @@ namespace DotNetCore.CAP
try try
{ {
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); _compositeTask.Wait(TimeSpan.FromSeconds(60));
} }
catch (AggregateException ex) catch (AggregateException ex)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment