Commit 7b3e99a9 authored by Savorboard's avatar Savorboard

Add broker name in address for AMP tracing

parent 3fed6247
......@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.AzureServiceBus
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _asbOptions.ConnectionString;
public BrokerAddress BrokerAddress => new BrokerAddress("AzureServiceBus", _asbOptions.ConnectionString);
public void Subscribe(IEnumerable<string> topics)
{
......
......@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.AzureServiceBus
_asbOptions = asbOptions;
}
public string Address => _asbOptions.Value.ConnectionString;
public BrokerAddress BrokerAddress => new BrokerAddress("AzureServiceBus", _asbOptions.Value.ConnectionString);
public async Task<OperateResult> SendAsync(TransportMessage transportMessage)
{
......@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.AzureServiceBus
{
if (_topicClient == null)
{
_topicClient = new TopicClient(Address, _asbOptions.Value.TopicPath, RetryPolicy.NoRetry);
_topicClient = new TopicClient(BrokerAddress.Endpoint, _asbOptions.Value.TopicPath, RetryPolicy.NoRetry);
}
}
finally
......
......@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.Kafka
_connectionPool = connectionPool;
}
public string Address => _connectionPool.ServersAddress;
public BrokerAddress BrokerAddress => new BrokerAddress("Kafka", _connectionPool.ServersAddress);
public async Task<OperateResult> SendAsync(TransportMessage message)
{
......
......@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Kafka
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _kafkaOptions.Servers;
public BrokerAddress BrokerAddress => new BrokerAddress("Kafka", _kafkaOptions.Servers);
public void Subscribe(IEnumerable<string> topics)
{
......
......@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.RabbitMQ
_exchange = _connectionChannelPool.Exchange;
}
public string Address => _connectionChannelPool.HostAddress;
public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", _connectionChannelPool.HostAddress);
public Task<OperateResult> SendAsync(TransportMessage message)
{
......
......@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _rabbitMQOptions.HostName;
public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", _rabbitMQOptions.HostName);
public void Subscribe(IEnumerable<string> topics)
{
......
using System;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
namespace DotNetCore.CAP.Diagnostics
{
......@@ -24,7 +25,7 @@ namespace DotNetCore.CAP.Diagnostics
public TransportMessage TransportMessage { get; set; }
public string BrokerAddress { get; set; }
public BrokerAddress BrokerAddress { get; set; }
public long? ElapsedTimeMs { get; set; }
......
......@@ -4,6 +4,7 @@
using System;
using System.Reflection;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using JetBrains.Annotations;
namespace DotNetCore.CAP.Diagnostics
......@@ -16,7 +17,7 @@ namespace DotNetCore.CAP.Diagnostics
public TransportMessage TransportMessage { get; set; }
public string BrokerAddress { get; set; }
public BrokerAddress BrokerAddress { get; set; }
public long? ElapsedTimeMs { get; set; }
......
......@@ -29,7 +29,7 @@ namespace DotNetCore.CAP.Internal
private readonly MethodMatcherCache _selector;
private CancellationTokenSource _cts;
private string _serverAddress;
private BrokerAddress _serverAddress;
private Task _compositeTask;
private bool _disposed;
private static bool _isHealthy = true;
......@@ -76,7 +76,7 @@ namespace DotNetCore.CAP.Internal
{
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
_serverAddress = client.ServersAddress;
_serverAddress = client.BrokerAddress;
RegisterMessageProcessor(client);
......@@ -171,7 +171,7 @@ namespace DotNetCore.CAP.Internal
var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
var ex = new SubscriberNotFoundException(error);
TracingError(tracingTimestamp, transportMessage, client.ServersAddress, ex);
TracingError(tracingTimestamp, transportMessage, client.BrokerAddress, ex);
throw ex;
}
......@@ -214,7 +214,7 @@ namespace DotNetCore.CAP.Internal
client.Reject(sender);
TracingError(tracingTimestamp, transportMessage, client.ServersAddress, e);
TracingError(tracingTimestamp, transportMessage, client.BrokerAddress, e);
}
};
......@@ -254,7 +254,7 @@ namespace DotNetCore.CAP.Internal
#region tracing
private long? TracingBefore(TransportMessage message, string broker)
private long? TracingBefore(TransportMessage message, BrokerAddress broker)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforeConsume))
{
......@@ -274,7 +274,7 @@ namespace DotNetCore.CAP.Internal
return null;
}
private void TracingAfter(long? tracingTimestamp, TransportMessage message, string broker)
private void TracingAfter(long? tracingTimestamp, TransportMessage message, BrokerAddress broker)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterConsume))
{
......@@ -292,7 +292,7 @@ namespace DotNetCore.CAP.Internal
}
}
private void TracingError(long? tracingTimestamp, TransportMessage message, string broker, Exception ex)
private void TracingError(long? tracingTimestamp, TransportMessage message, BrokerAddress broker, Exception ex)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorConsume))
{
......
......@@ -63,7 +63,7 @@ namespace DotNetCore.CAP.Internal
{
var transportMsg = await _serializer.SerializeAsync(message.Origin);
var tracingTimestamp = TracingBefore(transportMsg, _transport.Address);
var tracingTimestamp = TracingBefore(transportMsg, _transport.BrokerAddress);
var result = await _transport.SendAsync(transportMsg);
......@@ -71,13 +71,13 @@ namespace DotNetCore.CAP.Internal
{
await SetSuccessfulState(message);
TracingAfter(tracingTimestamp, transportMsg, _transport.Address);
TracingAfter(tracingTimestamp, transportMsg, _transport.BrokerAddress);
return (false, OperateResult.Success);
}
else
{
TracingError(tracingTimestamp, transportMsg, _transport.Address, result);
TracingError(tracingTimestamp, transportMsg, _transport.BrokerAddress, result);
var needRetry = await SetFailedState(message, result.Exception);
......@@ -131,7 +131,7 @@ namespace DotNetCore.CAP.Internal
#region tracing
private long? TracingBefore(TransportMessage message, string broker)
private long? TracingBefore(TransportMessage message, BrokerAddress broker)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforePublish))
{
......@@ -151,7 +151,7 @@ namespace DotNetCore.CAP.Internal
return null;
}
private void TracingAfter(long? tracingTimestamp, TransportMessage message, string broker)
private void TracingAfter(long? tracingTimestamp, TransportMessage message, BrokerAddress broker)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterPublish))
{
......@@ -169,7 +169,7 @@ namespace DotNetCore.CAP.Internal
}
}
private void TracingError(long? tracingTimestamp, TransportMessage message, string broker, OperateResult result)
private void TracingError(long? tracingTimestamp, TransportMessage message, BrokerAddress broker, OperateResult result)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorPublish))
{
......
using System.Linq;
using JetBrains.Annotations;
namespace DotNetCore.CAP.Transport
{
public struct BrokerAddress
{
public BrokerAddress([NotNull]string address)
{
if (address.Contains("$"))
{
var parts = address.Split('$');
Name = parts[0];
Endpoint = string.Join(string.Empty, parts.Skip(1));
}
else
{
Name = string.Empty;
Endpoint = address;
}
}
public BrokerAddress([NotNull]string name, [CanBeNull]string endpoint)
{
Name = name;
Endpoint = endpoint;
}
public string Name { get; set; }
public string Endpoint { get; set; }
public override string ToString()
{
return Name + "$" + Endpoint;
}
}
}
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Transport
/// </summary>
public interface IConsumerClient : IDisposable
{
string ServersAddress { get; }
BrokerAddress BrokerAddress { get; }
/// <summary>
/// Subscribe to a set of topics to the message queue
......
......@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.Transport
{
public interface ITransport
{
string Address { get; }
BrokerAddress BrokerAddress { get; }
Task<OperateResult> SendAsync(TransportMessage message);
}
......
......@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Test.FakeInMemoryQueue
_logger = logger;
}
public string Address { get; } = string.Empty;
public BrokerAddress BrokerAddress { get; } = new BrokerAddress("InMemory", string.Empty);
public Task<OperateResult> SendAsync(TransportMessage message)
{
......
......@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Test.FakeInMemoryQueue
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => string.Empty;
public BrokerAddress BrokerAddress => new BrokerAddress("InMemory", string.Empty);
public void Subscribe(IEnumerable<string> topics)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment