Commit 3e59bf17 authored by Savorboard's avatar Savorboard

refactor Diagnostics module.

parent 139b69cf
...@@ -5,6 +5,7 @@ using System; ...@@ -5,6 +5,7 @@ using System;
using System.Diagnostics; using System.Diagnostics;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -13,24 +14,32 @@ namespace DotNetCore.CAP.Kafka ...@@ -13,24 +14,32 @@ namespace DotNetCore.CAP.Kafka
{ {
internal class KafkaPublishMessageSender : BasePublishMessageSender internal class KafkaPublishMessageSender : BasePublishMessageSender
{ {
private readonly ConnectionPool _connectionPool; private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly string _serversAddress;
public KafkaPublishMessageSender( public KafkaPublishMessageSender(
CapOptions options, IStateChanger stateChanger, IStorageConnection connection, CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
ConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger) IConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger)
: base(logger, options, connection, stateChanger) : base(logger, options, connection, stateChanger)
{ {
_logger = logger; _logger = logger;
_connectionPool = connectionPool; _connectionPool = connectionPool;
_serversAddress = _connectionPool.ServersAddress;
} }
public override async Task<OperateResult> PublishAsync(string keyName, string content) public override async Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
Guid operationId = Guid.Empty;
var producer = _connectionPool.Rent(); var producer = _connectionPool.Rent();
try try
{ {
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _serversAddress);
var contentBytes = Encoding.UTF8.GetBytes(content); var contentBytes = Encoding.UTF8.GetBytes(content);
var message = await producer.ProduceAsync(keyName, null, contentBytes); var message = await producer.ProduceAsync(keyName, null, contentBytes);
...@@ -43,13 +52,18 @@ namespace DotNetCore.CAP.Kafka ...@@ -43,13 +52,18 @@ namespace DotNetCore.CAP.Kafka
}); });
} }
s_diagnosticListener.WritePublishAfter(operationId, message.Topic, content, _serversAddress, startTime, stopwatch.Elapsed);
_logger.LogDebug($"kafka topic message [{keyName}] has been published."); _logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success; return OperateResult.Success;
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WritePublishError(operationId, keyName, content, _serversAddress, ex, startTime, stopwatch.Elapsed);
var wapperEx = new PublisherSentFailedException(ex.Message, ex); var wapperEx = new PublisherSentFailedException(ex.Message, ex);
return OperateResult.Failed(wapperEx); return OperateResult.Failed(wapperEx);
} }
finally finally
...@@ -61,5 +75,6 @@ namespace DotNetCore.CAP.Kafka ...@@ -61,5 +75,6 @@ namespace DotNetCore.CAP.Kafka
} }
} }
} }
} }
} }
\ No newline at end of file
...@@ -5,8 +5,8 @@ using System; ...@@ -5,8 +5,8 @@ using System;
using System.Data; using System.Data;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -152,8 +152,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -152,8 +152,7 @@ namespace DotNetCore.CAP.Abstractions
private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null) private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null)
{ {
try Guid operationId = default(Guid);
{
var content = Serialize(contentObj, callbackName); var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage var message = new CapPublishedMessage
...@@ -163,6 +162,10 @@ namespace DotNetCore.CAP.Abstractions ...@@ -163,6 +162,10 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = await ExecuteAsync(DbConnection, DbTransaction, message); var id = await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap(); ClosedCap();
...@@ -170,6 +173,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -170,6 +173,7 @@ namespace DotNetCore.CAP.Abstractions
if (id > 0) if (id > 0)
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database."); _logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id; message.Id = id;
...@@ -179,6 +183,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -179,6 +183,7 @@ namespace DotNetCore.CAP.Abstractions
catch (Exception e) catch (Exception e)
{ {
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e); Console.WriteLine(e);
throw; throw;
} }
......
...@@ -6,6 +6,7 @@ using System.Diagnostics; ...@@ -6,6 +6,7 @@ using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
...@@ -99,7 +100,9 @@ namespace DotNetCore.CAP ...@@ -99,7 +100,9 @@ namespace DotNetCore.CAP
{ {
client.OnMessageReceived += (sender, messageContext) => client.OnMessageReceived += (sender, messageContext) =>
{ {
Guid operationId = default(Guid); var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;
var receivedMessage = new CapReceivedMessage(messageContext) var receivedMessage = new CapReceivedMessage(messageContext)
{ {
...@@ -108,13 +111,22 @@ namespace DotNetCore.CAP ...@@ -108,13 +111,22 @@ namespace DotNetCore.CAP
try try
{ {
operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(receivedMessage); operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(
messageContext.Name,
messageContext.Content,
messageContext.Group);
StoreMessage(receivedMessage); StoreMessage(receivedMessage);
client.Commit(); client.Commit();
s_diagnosticListener.WriteReceiveMessageStoreAfter(operationId, receivedMessage); s_diagnosticListener.WriteReceiveMessageStoreAfter(
operationId,
messageContext.Name,
messageContext.Content,
messageContext.Group,
startTime,
stopwatch.Elapsed);
_dispatcher.EnqueueToExecute(receivedMessage); _dispatcher.EnqueueToExecute(receivedMessage);
} }
...@@ -124,7 +136,13 @@ namespace DotNetCore.CAP ...@@ -124,7 +136,13 @@ namespace DotNetCore.CAP
client.Reject(); client.Reject();
s_diagnosticListener.WriteReceiveMessageStoreError(operationId, receivedMessage, e); s_diagnosticListener.WriteReceiveMessageStoreError(operationId,
messageContext.Name,
messageContext.Content,
messageContext.Group,
e,
startTime,
stopwatch.Elapsed);
} }
}; };
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
...@@ -22,7 +23,7 @@ namespace DotNetCore.CAP ...@@ -22,7 +23,7 @@ namespace DotNetCore.CAP
// diagnostics listener // diagnostics listener
// ReSharper disable once InconsistentNaming // ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener = protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected BasePublishMessageSender( protected BasePublishMessageSender(
...@@ -42,7 +43,6 @@ namespace DotNetCore.CAP ...@@ -42,7 +43,6 @@ namespace DotNetCore.CAP
public async Task<OperateResult> SendAsync(CapPublishedMessage message) public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{ {
var sp = Stopwatch.StartNew(); var sp = Stopwatch.StartNew();
var operationId = s_diagnosticListener.WritePublishBefore(message);
var result = await PublishAsync(message.Name, message.Content); var result = await PublishAsync(message.Name, message.Content);
...@@ -52,14 +52,12 @@ namespace DotNetCore.CAP ...@@ -52,14 +52,12 @@ namespace DotNetCore.CAP
{ {
await SetSuccessfulState(message); await SetSuccessfulState(message);
s_diagnosticListener.WritePublishAfter(operationId, message);
_logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); _logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds);
return OperateResult.Success; return OperateResult.Success;
} }
else else
{ {
s_diagnosticListener.WritePublishError(operationId, message, result.Exception);
_logger.MessagePublishException(message.Id, result.Exception); _logger.MessagePublishException(message.Id, result.Exception);
await SetFailedState(message, result.Exception, out bool stillRetry); await SetFailedState(message, result.Exception, out bool stillRetry);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
...@@ -144,7 +145,10 @@ namespace DotNetCore.CAP ...@@ -144,7 +145,10 @@ namespace DotNetCore.CAP
throw new SubscriberNotFoundException(error); throw new SubscriberNotFoundException(error);
} }
Guid operationId = default(Guid); var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());
try try
...@@ -153,7 +157,7 @@ namespace DotNetCore.CAP ...@@ -153,7 +157,7 @@ namespace DotNetCore.CAP
var ret = await Invoker.InvokeAsync(consumerContext); var ret = await Invoker.InvokeAsync(consumerContext);
s_diagnosticListener.WriteConsumerInvokeAfter(operationId,consumerContext); s_diagnosticListener.WriteConsumerInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed);
if (!string.IsNullOrEmpty(ret.CallbackName)) if (!string.IsNullOrEmpty(ret.CallbackName))
{ {
...@@ -162,7 +166,7 @@ namespace DotNetCore.CAP ...@@ -162,7 +166,7 @@ namespace DotNetCore.CAP
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex); s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed);
throw new SubscriberExecutionFailedException(ex.Message, ex); throw new SubscriberExecutionFailedException(ex.Message, ex);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment