Commit 46cdfa17 authored by Savorboard's avatar Savorboard

Fix Kafka transport check not working bug. #436

parent 72f7094b
...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.Kafka
_producerPool = new ConcurrentQueue<IProducer<string, byte[]>>(); _producerPool = new ConcurrentQueue<IProducer<string, byte[]>>();
_maxSize = _options.ConnectionPoolSize; _maxSize = _options.ConnectionPoolSize;
logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(_options.AsKafkaConfig())); logger.LogDebug("CAP Kafka configuration: {0}", JsonConvert.SerializeObject(_options.AsKafkaConfig(), Formatting.Indented));
} }
public string ServersAddress => _options.Servers; public string ServersAddress => _options.Servers;
......
...@@ -27,8 +27,8 @@ namespace DotNetCore.CAP.Internal ...@@ -27,8 +27,8 @@ namespace DotNetCore.CAP.Internal
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly CancellationTokenSource _cts;
private CancellationTokenSource _cts;
private string _serverAddress; private string _serverAddress;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
...@@ -111,6 +111,7 @@ namespace DotNetCore.CAP.Internal ...@@ -111,6 +111,7 @@ namespace DotNetCore.CAP.Internal
{ {
Pulse(); Pulse();
_cts = new CancellationTokenSource();
_isHealthy = true; _isHealthy = true;
Start(); Start();
...@@ -247,6 +248,7 @@ namespace DotNetCore.CAP.Internal ...@@ -247,6 +248,7 @@ namespace DotNetCore.CAP.Internal
_logger.LogError("Kafka client consume error. --> " + logmsg.Reason); _logger.LogError("Kafka client consume error. --> " + logmsg.Reason);
break; break;
case MqLogType.ServerConnError: case MqLogType.ServerConnError:
_isHealthy = false;
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason);
break; break;
case MqLogType.ExceptionReceived: case MqLogType.ExceptionReceived:
......
...@@ -64,7 +64,11 @@ namespace DotNetCore.CAP.Processor ...@@ -64,7 +64,11 @@ namespace DotNetCore.CAP.Processor
{ {
try try
{ {
await _sender.SendAsync(message); var result = await _sender.SendAsync(message);
if (!result.Succeeded)
{
_logger.LogWarning(result.Exception, "Message send failed! -->" + result);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
......
...@@ -38,7 +38,7 @@ namespace DotNetCore.CAP.Processor ...@@ -38,7 +38,7 @@ namespace DotNetCore.CAP.Processor
foreach (var table in tables) foreach (var table in tables)
{ {
_logger.LogDebug($"Collecting expired data from table [{table}]."); _logger.LogDebug($"Collecting expired data from table: {table}");
int deletedCount; int deletedCount;
var time = DateTime.Now; var time = DateTime.Now;
......
...@@ -4,16 +4,19 @@ ...@@ -4,16 +4,19 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class TransportCheckProcessor : IProcessor public class TransportCheckProcessor : IProcessor
{ {
private readonly ILogger<TransportCheckProcessor> _logger;
private readonly IConsumerRegister _register; private readonly IConsumerRegister _register;
private readonly TimeSpan _waitingInterval; private readonly TimeSpan _waitingInterval;
public TransportCheckProcessor(IConsumerRegister register) public TransportCheckProcessor(ILogger<TransportCheckProcessor> logger, IConsumerRegister register)
{ {
_logger = logger;
_register = register; _register = register;
_waitingInterval = TimeSpan.FromSeconds(30); _waitingInterval = TimeSpan.FromSeconds(30);
} }
...@@ -25,12 +28,20 @@ namespace DotNetCore.CAP.Processor ...@@ -25,12 +28,20 @@ namespace DotNetCore.CAP.Processor
throw new ArgumentNullException(nameof(context)); throw new ArgumentNullException(nameof(context));
} }
_logger.LogDebug("Transport connection checking...");
if (!_register.IsHealthy()) if (!_register.IsHealthy())
{ {
_logger.LogWarning("Transport connection is unhealthy, reconnection...");
_register.ReStart(); _register.ReStart();
} }
else
{
_logger.LogDebug("Transport connection healthy!");
}
await context.WaitAsync(_waitingInterval); await context.WaitAsync(_waitingInterval);
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment