Commit d28c2082 authored by Savorboard's avatar Savorboard

support event data for Diagnostics.

parent e6bad00b
...@@ -3,8 +3,10 @@ ...@@ -3,8 +3,10 @@
using System; using System;
using System.Data; using System.Data;
using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -15,6 +17,11 @@ namespace DotNetCore.CAP.Abstractions ...@@ -15,6 +17,11 @@ namespace DotNetCore.CAP.Abstractions
private readonly IDispatcher _dispatcher; private readonly IDispatcher _dispatcher;
private readonly ILogger _logger; private readonly ILogger _logger;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher) protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
{ {
_logger = logger; _logger = logger;
...@@ -75,13 +82,13 @@ namespace DotNetCore.CAP.Abstractions ...@@ -75,13 +82,13 @@ namespace DotNetCore.CAP.Abstractions
protected virtual string Serialize<T>(T obj, string callbackName = null) protected virtual string Serialize<T>(T obj, string callbackName = null)
{ {
var packer = (IMessagePacker) ServiceProvider.GetService(typeof(IMessagePacker)); var packer = (IMessagePacker)ServiceProvider.GetService(typeof(IMessagePacker));
string content; string content;
if (obj != null) if (obj != null)
{ {
if (Helper.IsComplexType(obj.GetType())) if (Helper.IsComplexType(obj.GetType()))
{ {
var serializer = (IContentSerializer) ServiceProvider.GetService(typeof(IContentSerializer)); var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer));
content = serializer.Serialize(obj); content = serializer.Serialize(obj);
} }
else else
...@@ -179,16 +186,20 @@ namespace DotNetCore.CAP.Abstractions ...@@ -179,16 +186,20 @@ namespace DotNetCore.CAP.Abstractions
private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null) private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null)
{ {
try Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage
{ {
var content = Serialize(contentObj, callbackName); Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var message = new CapPublishedMessage try
{ {
Name = name, operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
Content = content,
StatusName = StatusName.Scheduled
};
var id = Execute(DbConnection, DbTransaction, message); var id = Execute(DbConnection, DbTransaction, message);
...@@ -197,15 +208,15 @@ namespace DotNetCore.CAP.Abstractions ...@@ -197,15 +208,15 @@ namespace DotNetCore.CAP.Abstractions
if (id > 0) if (id > 0)
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database."); _logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id; message.Id = id;
Enqueue(message); Enqueue(message);
} }
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e); Console.WriteLine(e);
throw; throw;
} }
......
...@@ -55,6 +55,7 @@ ...@@ -55,6 +55,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="System.Data.Common" Version="4.3.0" /> <PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.4.1" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" /> <PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -25,6 +26,11 @@ namespace DotNetCore.CAP ...@@ -25,6 +26,11 @@ namespace DotNetCore.CAP
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
public ConsumerHandler(IConsumerClientFactory consumerClientFactory, public ConsumerHandler(IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher, IDispatcher dispatcher,
IStorageConnection connection, IStorageConnection connection,
...@@ -91,21 +97,34 @@ namespace DotNetCore.CAP ...@@ -91,21 +97,34 @@ namespace DotNetCore.CAP
private void RegisterMessageProcessor(IConsumerClient client) private void RegisterMessageProcessor(IConsumerClient client)
{ {
client.OnMessageReceived += (sender, message) => client.OnMessageReceived += (sender, messageContext) =>
{ {
Guid operationId = default(Guid);
var receivedMessage = new CapReceivedMessage(messageContext)
{
StatusName = StatusName.Scheduled
};
try try
{ {
var storedMessage = StoreMessage(message); operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(receivedMessage);
StoreMessage(receivedMessage);
client.Commit(); client.Commit();
_dispatcher.EnqueueToExecute(storedMessage); s_diagnosticListener.WriteReceiveMessageStoreAfter(operationId, receivedMessage);
_dispatcher.EnqueueToExecute(receivedMessage);
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.", _logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.", messageContext);
message);
client.Reject(); client.Reject();
s_diagnosticListener.WriteReceiveMessageStoreError(operationId, receivedMessage, e);
} }
}; };
...@@ -139,15 +158,12 @@ namespace DotNetCore.CAP ...@@ -139,15 +158,12 @@ namespace DotNetCore.CAP
} }
} }
private CapReceivedMessage StoreMessage(MessageContext messageContext) private void StoreMessage(CapReceivedMessage receivedMessage)
{ {
var receivedMessage = new CapReceivedMessage(messageContext) var id = _connection.StoreReceivedMessageAsync(receivedMessage)
{ .GetAwaiter().GetResult();
StatusName = StatusName.Scheduled
};
var id = _connection.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult();
receivedMessage.Id = id; receivedMessage.Id = id;
return receivedMessage;
} }
} }
} }
\ No newline at end of file
...@@ -20,6 +20,11 @@ namespace DotNetCore.CAP ...@@ -20,6 +20,11 @@ namespace DotNetCore.CAP
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected BasePublishMessageSender( protected BasePublishMessageSender(
ILogger logger, ILogger logger,
CapOptions options, CapOptions options,
...@@ -37,6 +42,7 @@ namespace DotNetCore.CAP ...@@ -37,6 +42,7 @@ namespace DotNetCore.CAP
public async Task<OperateResult> SendAsync(CapPublishedMessage message) public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{ {
var sp = Stopwatch.StartNew(); var sp = Stopwatch.StartNew();
var operationId = s_diagnosticListener.WritePublishBefore(message);
var result = await PublishAsync(message.Name, message.Content); var result = await PublishAsync(message.Name, message.Content);
...@@ -45,22 +51,26 @@ namespace DotNetCore.CAP ...@@ -45,22 +51,26 @@ namespace DotNetCore.CAP
if (result.Succeeded) if (result.Succeeded)
{ {
await SetSuccessfulState(message); await SetSuccessfulState(message);
s_diagnosticListener.WritePublishAfter(operationId, message);
_logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); _logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds);
return OperateResult.Success; return OperateResult.Success;
} }
else
_logger.MessagePublishException(message.Id, result.Exception);
await SetFailedState(message, result.Exception, out bool stillRetry);
if (stillRetry)
{ {
_logger.SenderRetrying(3); s_diagnosticListener.WritePublishError(operationId, message, result.Exception);
_logger.MessagePublishException(message.Id, result.Exception);
await SendAsync(message); await SetFailedState(message, result.Exception, out bool stillRetry);
if (stillRetry)
{
_logger.SenderRetrying(3);
await SendAsync(message);
}
return OperateResult.Failed(result.Exception);
} }
return OperateResult.Failed(result.Exception);
} }
private static bool UpdateMessageForRetryAsync(CapPublishedMessage message) private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
......
...@@ -18,10 +18,14 @@ namespace DotNetCore.CAP ...@@ -18,10 +18,14 @@ namespace DotNetCore.CAP
private readonly ICallbackMessageSender _callbackMessageSender; private readonly ICallbackMessageSender _callbackMessageSender;
private readonly IStorageConnection _connection; private readonly IStorageConnection _connection;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IStateChanger _stateChanger;
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly IStateChanger _stateChanger;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
public DefaultSubscriberExecutor( public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger, ILogger<DefaultSubscriberExecutor> logger,
...@@ -139,12 +143,18 @@ namespace DotNetCore.CAP ...@@ -139,12 +143,18 @@ namespace DotNetCore.CAP
var error = $"message can not be found subscriber, Message:{receivedMessage},\r\n see: https://github.com/dotnetcore/CAP/issues/63"; var error = $"message can not be found subscriber, Message:{receivedMessage},\r\n see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error); throw new SubscriberNotFoundException(error);
} }
Guid operationId = default(Guid);
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());
try try
{ {
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); operationId = s_diagnosticListener.WriteConsumerInvokeBefore(consumerContext);
var ret = await Invoker.InvokeAsync(consumerContext); var ret = await Invoker.InvokeAsync(consumerContext);
s_diagnosticListener.WriteConsumerInvokeAfter(operationId,consumerContext);
if (!string.IsNullOrEmpty(ret.CallbackName)) if (!string.IsNullOrEmpty(ret.CallbackName))
{ {
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result);
...@@ -152,6 +162,8 @@ namespace DotNetCore.CAP ...@@ -152,6 +162,8 @@ namespace DotNetCore.CAP
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex);
throw new SubscriberExecutionFailedException(ex.Message, ex); throw new SubscriberExecutionFailedException(ex.Message, ex);
} }
} }
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment