Commit 557c3142 authored by Savorboard's avatar Savorboard

Try to fix transport healthy check bug. #503

parent c1c4c44e
......@@ -109,9 +109,7 @@ namespace DotNetCore.CAP.AzureServiceBus
_consumerClient?.CloseAsync().Wait(1500);
}
#region private methods
private async Task ConnectAsync()
public async Task ConnectAsync()
{
if (_consumerClient != null)
{
......@@ -157,6 +155,8 @@ namespace DotNetCore.CAP.AzureServiceBus
}
}
#region private methods
private Task OnConsumerReceived(Message message, CancellationToken token)
{
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString());
......
......@@ -25,7 +25,9 @@ namespace DotNetCore.CAP.AzureServiceBus
try
{
var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
return new AzureServiceBusConsumerClient(logger, groupId, _asbOptions);
var client = new AzureServiceBusConsumerClient(logger, groupId, _asbOptions);
client.ConnectAsync().GetAwaiter().GetResult();
return client;
}
catch (System.Exception e)
{
......
......@@ -95,9 +95,7 @@ namespace DotNetCore.CAP.Kafka
_consumerClient?.Dispose();
}
#region private methods
private void Connect()
public void Connect()
{
if (_consumerClient != null)
{
......@@ -134,7 +132,5 @@ namespace DotNetCore.CAP.Kafka
};
OnLog?.Invoke(null, logArgs);
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -19,7 +19,9 @@ namespace DotNetCore.CAP.Kafka
{
try
{
return new KafkaConsumerClient(groupId, _kafkaOptions);
var client = new KafkaConsumerClient(groupId, _kafkaOptions);
client.Connect();
return client;
}
catch (System.Exception e)
{
......
......@@ -95,9 +95,7 @@ namespace DotNetCore.CAP.RabbitMQ
_connection?.Dispose();
}
#region events
private void Connect()
public void Connect()
{
if (_connection != null)
{
......@@ -129,6 +127,8 @@ namespace DotNetCore.CAP.RabbitMQ
}
}
#region events
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
......
......@@ -21,7 +21,9 @@ namespace DotNetCore.CAP.RabbitMQ
{
try
{
return new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions);
var client = new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions);
client.Connect();
return client;
}
catch (System.Exception e)
{
......
......@@ -235,6 +235,7 @@ namespace DotNetCore.CAP.Internal
_logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason);
break;
case MqLogType.ConsumerShutdown:
_isHealthy = false;
_logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason);
break;
case MqLogType.ConsumeError:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment