Commit 81bf77c6 authored by Savorboard's avatar Savorboard

Fixed consumer re-register transport bug. (#329)

parent de04d99c
<Project>
<Import Project="build\version.props" />
<Import Project="..\build\version.props" />
<PropertyGroup Label="Package">
<Product>CAP</Product>
......
......@@ -71,7 +71,6 @@ namespace DotNetCore.CAP.AzureServiceBus
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
_consumerClient.RegisterMessageHandler(OnConsumerReceived,
new MessageHandlerOptions(OnExceptionReceived)
{
......
......@@ -19,9 +19,16 @@ namespace DotNetCore.CAP.AzureServiceBus
}
public IConsumerClient Create(string groupId)
{
try
{
var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
return new AzureServiceBusConsumerClient(logger, groupId, _asbOptions);
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}
\ No newline at end of file
......@@ -21,6 +21,8 @@ namespace DotNetCore.CAP.Kafka
_groupId = groupId;
_kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
StringDeserializer = new StringDeserializer(Encoding.UTF8);
InitKafkaClient();
}
public IDeserializer<string> StringDeserializer { get; set; }
......@@ -38,11 +40,6 @@ namespace DotNetCore.CAP.Kafka
throw new ArgumentNullException(nameof(topics));
}
if (_consumerClient == null)
{
InitKafkaClient();
}
_consumerClient.Subscribe(topics);
}
......
......@@ -13,8 +13,15 @@ namespace DotNetCore.CAP.Kafka
}
public IConsumerClient Create(string groupId)
{
try
{
return new KafkaConsumerClient(groupId, _kafkaOptions);
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}
\ No newline at end of file
......@@ -8,7 +8,6 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly RabbitMQOptions _rabbitMQOptions;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnectionChannelPool channelPool)
{
_rabbitMQOptions = rabbitMQOptions;
......@@ -16,8 +15,15 @@ namespace DotNetCore.CAP.RabbitMQ
}
public IConsumerClient Create(string groupId)
{
try
{
return new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions);
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}
\ No newline at end of file
using System;
namespace DotNetCore.CAP
{
public class BrokerConnectionException : Exception
{
public BrokerConnectionException(Exception innerException)
: base("Broker Unreachable", innerException)
{
}
}
}
......@@ -50,12 +50,15 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<MethodMatcherCache>();
//Processors
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerHandler>());
services.TryAddSingleton<IConsumerRegister, ConsumerRegister>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, CapProcessingServer>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerRegister>());
services.TryAddSingleton<IStateChanger, StateChanger>();
//Queue's message processor
services.TryAddSingleton<NeedRetryMessageProcessor>();
services.TryAddSingleton<TransportCheckProcessor>();
//Sender and Executors
services.TryAddSingleton<IDispatcher, Dispatcher>();
......
......@@ -14,29 +14,30 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
internal class ConsumerHandler : IConsumerHandler
internal class ConsumerRegister : IConsumerRegister
{
private readonly IStorageConnection _connection;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly CancellationTokenSource _cts;
private readonly IDispatcher _dispatcher;
private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly MethodMatcherCache _selector;
private CancellationTokenSource _cts;
private string _serverAddress;
private Task _compositeTask;
private bool _disposed;
private static bool _isHealthy = true;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
public ConsumerHandler(IConsumerClientFactory consumerClientFactory,
public ConsumerRegister(IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher,
IStorageConnection connection,
ILogger<ConsumerHandler> logger,
ILogger<ConsumerRegister> logger,
MethodMatcherCache selector)
{
_selector = selector;
......@@ -44,16 +45,24 @@ namespace DotNetCore.CAP
_consumerClientFactory = consumerClientFactory;
_dispatcher = dispatcher;
_connection = connection;
_cts = new CancellationTokenSource();
}
public bool IsHealthy()
{
return _isHealthy;
}
public void Start()
{
_cts = new CancellationTokenSource();
var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped();
foreach (var matchGroup in groupingMatches)
{
Task.Factory.StartNew(() =>
{
try
{
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
......@@ -65,12 +74,34 @@ namespace DotNetCore.CAP
client.Listening(_pollingDelay, _cts.Token);
}
}
catch (OperationCanceledException)
{
//ignore
}
catch (BrokerConnectionException e)
{
_isHealthy = false;
_logger.LogError(e, e.Message);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
_compositeTask = Task.CompletedTask;
}
public void ReStart(bool force = false)
{
if (!IsHealthy() || force)
{
Pulse();
_isHealthy = true;
Start();
}
}
public void Dispose()
{
if (_disposed)
......@@ -79,9 +110,11 @@ namespace DotNetCore.CAP
}
_disposed = true;
_cts.Cancel();
try
{
Pulse();
_compositeTask?.Wait(TimeSpan.FromSeconds(2));
}
catch (AggregateException ex)
......@@ -96,7 +129,7 @@ namespace DotNetCore.CAP
public void Pulse()
{
//ignore
_cts?.Cancel();
}
private void RegisterMessageProcessor(IConsumerClient client)
......
......@@ -6,7 +6,10 @@ namespace DotNetCore.CAP
/// <summary>
/// Handler received message of subscribed.
/// </summary>
public interface IConsumerHandler : IProcessingServer
public interface IConsumerRegister : IProcessingServer
{
bool IsHealthy();
void ReStart(bool force = false);
}
}
\ No newline at end of file
......@@ -93,6 +93,7 @@ namespace DotNetCore.CAP.Processor
{
var returnedProcessors = new List<IProcessor>
{
_provider.GetRequiredService<TransportCheckProcessor>(),
_provider.GetRequiredService<NeedRetryMessageProcessor>(),
_provider.GetRequiredService<ICollectProcessor>()
};
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Processor
{
public class TransportCheckProcessor : IProcessor
{
private readonly IConsumerRegister _register;
private readonly TimeSpan _waitingInterval;
public TransportCheckProcessor(IConsumerRegister register)
{
_register = register;
_waitingInterval = TimeSpan.FromSeconds(30);
}
public async Task ProcessAsync(ProcessingContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
if (!_register.IsHealthy())
{
_register.ReStart();
}
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment