Commit 365fbe6b authored by Savorboard's avatar Savorboard

refactor mq log

parent ac21368a
...@@ -33,6 +33,6 @@ namespace DotNetCore.CAP ...@@ -33,6 +33,6 @@ namespace DotNetCore.CAP
event EventHandler<MessageContext> OnMessageReceived; event EventHandler<MessageContext> OnMessageReceived;
event EventHandler<string> OnError; event EventHandler<LogMessageEventArgs> OnLog;
} }
} }
\ No newline at end of file
...@@ -23,6 +23,7 @@ namespace DotNetCore.CAP ...@@ -23,6 +23,7 @@ namespace DotNetCore.CAP
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
public ConsumerHandler( public ConsumerHandler(
...@@ -44,17 +45,18 @@ namespace DotNetCore.CAP ...@@ -44,17 +45,18 @@ namespace DotNetCore.CAP
foreach (var matchGroup in groupingMatches) foreach (var matchGroup in groupingMatches)
Task.Factory.StartNew(() => Task.Factory.StartNew(() =>
{ {
using (var client = _consumerClientFactory.Create(matchGroup.Key)) using (var client = _consumerClientFactory.Create(matchGroup.Key))
{ {
RegisterMessageProcessor(client); RegisterMessageProcessor(client);
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
client.Listening(_pollingDelay, _cts.Token); client.Listening(_pollingDelay, _cts.Token);
} }
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
_compositeTask = Task.CompletedTask;
_compositeTask = Task.CompletedTask;
} }
public void Dispose() public void Dispose()
...@@ -62,13 +64,10 @@ namespace DotNetCore.CAP ...@@ -62,13 +64,10 @@ namespace DotNetCore.CAP
if (_disposed) if (_disposed)
return; return;
_disposed = true; _disposed = true;
_logger.ServerShuttingDown();
_cts.Cancel(); _cts.Cancel();
try try
{ {
_compositeTask.Wait(TimeSpan.FromSeconds(10)); _compositeTask.Wait(TimeSpan.FromSeconds(2));
} }
catch (AggregateException ex) catch (AggregateException ex)
{ {
...@@ -105,7 +104,34 @@ namespace DotNetCore.CAP ...@@ -105,7 +104,34 @@ namespace DotNetCore.CAP
Pulse(); Pulse();
}; };
client.OnError += (sender, reason) => { _logger.MessageQueueError(reason); }; client.OnLog += WriteLog;
}
private void WriteLog(object sender, LogMessageEventArgs logmsg)
{
switch (logmsg.LogType)
{
case MqLogType.ConsumerCancelled:
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason);
break;
case MqLogType.ConsumerRegistered:
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason);
break;
case MqLogType.ConsumerUnregistered:
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason);
break;
case MqLogType.ConsumerShutdown:
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason);
break;
case MqLogType.ConsumeError:
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason);
break;
case MqLogType.ServerConnError:
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason);
break;
default:
throw new ArgumentOutOfRangeException();
}
} }
private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext) private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment