Commit 45c8745e authored by Savorboard's avatar Savorboard

Refactoring azure serverbus transport implementation for version 3.0

parent 3f50de25
......@@ -7,10 +7,12 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Message = Microsoft.Azure.ServiceBus.Message;
namespace DotNetCore.CAP.AzureServiceBus
{
......@@ -36,7 +38,7 @@ namespace DotNetCore.CAP.AzureServiceBus
_asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<TransportMessage> OnMessageReceived;
public event EventHandler<LogMessageEventArgs> OnLog;
......@@ -160,12 +162,10 @@ namespace DotNetCore.CAP.AzureServiceBus
private Task OnConsumerReceived(Message message, CancellationToken token)
{
_lockToken = message.SystemProperties.LockToken;
var context = new MessageContext
{
Group = _subscriptionName,
Name = message.Label,
Content = Encoding.UTF8.GetString(message.Body)
};
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value.ToString());
var context = new TransportMessage(header, message.Body);
OnMessageReceived?.Invoke(null, context);
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Primitives;
// ReSharper disable once CheckNamespace
......
......@@ -3,6 +3,7 @@
using System;
using DotNetCore.CAP.AzureServiceBus;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
......@@ -24,8 +25,7 @@ namespace DotNetCore.CAP
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, AzureServiceBusConsumerClientFactory>();
services.AddSingleton<ITransportPublisher, AzureServiceBusPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, AzureServiceBusPublishMessageSender>();
services.AddSingleton<ITransport,AzureServiceBusTransport>();
}
}
}
\ No newline at end of file
......@@ -2,18 +2,18 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal class AzureServiceBusPublishMessageSender : BasePublishMessageSender
internal class AzureServiceBusTransport : ITransport
{
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
......@@ -22,38 +22,38 @@ namespace DotNetCore.CAP.AzureServiceBus
private ITopicClient _topicClient;
public AzureServiceBusPublishMessageSender(
ILogger<AzureServiceBusPublishMessageSender> logger,
public AzureServiceBusTransport(
ILogger<AzureServiceBusTransport> logger,
IOptions<CapOptions> options,
IOptions<AzureServiceBusOptions> asbOptions,
IStateChanger stateChanger,
IStorageConnection connection)
: base(logger, options, connection, stateChanger)
IOptions<AzureServiceBusOptions> asbOptions)
{
_logger = logger;
_asbOptions = asbOptions;
}
protected override string ServersAddress => _asbOptions.Value.ConnectionString;
public string Address => _asbOptions.Value.ConnectionString;
public override async Task<OperateResult> PublishAsync(string keyName, string content)
public async Task<OperateResult> SendAsync(TransportMessage transportMessage)
{
try
{
Connect();
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = new Message
var message = new Microsoft.Azure.ServiceBus.Message
{
MessageId = Guid.NewGuid().ToString(),
Body = contentBytes,
Label = keyName,
MessageId = transportMessage.GetId(),
Body = transportMessage.Body,
Label = transportMessage.GetName()
};
foreach (var header in transportMessage.Headers)
{
message.UserProperties.Add(header.Key, header.Value);
}
await _topicClient.SendAsync(message);
_logger.LogDebug($"Azure Service Bus message [{keyName}] has been published.");
_logger.LogDebug($"Azure Service Bus message [{transportMessage.GetName()}] has been published.");
return OperateResult.Success;
}
......@@ -78,7 +78,7 @@ namespace DotNetCore.CAP.AzureServiceBus
{
if (_topicClient == null)
{
_topicClient = new TopicClient(ServersAddress, _asbOptions.Value.TopicPath, RetryPolicy.NoRetry);
_topicClient = new TopicClient(Address, _asbOptions.Value.TopicPath, RetryPolicy.NoRetry);
}
}
finally
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment