Unverified Commit b2a0420e authored by Savorboard's avatar Savorboard Committed by GitHub

Features: Add Azure Service Bus Support (#287)

Add Azure Service Bus supported
parent 096cb876
...@@ -62,6 +62,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MongoDB", " ...@@ -62,6 +62,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MongoDB", "
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{11563D1A-27CC-45CF-8C04-C16BCC21250A}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{11563D1A-27CC-45CF-8C04-C16BCC21250A}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureServiceBus", "src\DotNetCore.CAP.AzureServiceBus\DotNetCore.CAP.AzureServiceBus.csproj", "{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.MySql", "samples\Sample.AzureServiceBus.MySql\Sample.AzureServiceBus.MySql.csproj", "{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
...@@ -124,6 +128,14 @@ Global ...@@ -124,6 +128,14 @@ Global
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Debug|Any CPU.Build.0 = Debug|Any CPU
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Release|Any CPU.ActiveCfg = Release|Any CPU
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Release|Any CPU.Build.0 = Release|Any CPU
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
...@@ -144,6 +156,8 @@ Global ...@@ -144,6 +156,8 @@ Global
{77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF} {11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;
namespace Sample.AzureServiceBus.MySql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
public ValuesController(ICapPublisher producer)
{
_capBus = producer;
}
[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
{
await _capBus.PublishAsync("sample.azure.mysql", DateTime.Now);
return Ok();
}
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(""))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false))
{
//your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
for (int i = 0; i < 5; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
transaction.Commit();
}
}
return Ok();
}
[CapSubscribe("sample.azure.mysql")]
public void Test2(DateTime value)
{
Console.WriteLine("Subscriber output message: " + value);
}
[CapSubscribe("sample.azure.mysql")]
public void Test2T2(DateTime value)
{
Console.WriteLine("Test2T2-->Subscriber output message: " + value);
}
[CapSubscribe("sample.azure.mysql",Group = "groupd")]
public void Test2Group(DateTime value)
{
Console.WriteLine("Group--> Subscriber output message: " + value);
}
}
}
\ No newline at end of file
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
namespace Sample.AzureServiceBus.MySql
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<AssemblyName>Sample.Kafka.MySql</AssemblyName>
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.AzureServiceBus\DotNetCore.CAP.AzureServiceBus.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
namespace Sample.AzureServiceBus.MySql
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddCap(x =>
{
x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
x.UseAzureServiceBus("Endpoint=sb://testcap.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<yourkey>");
x.UseDashboard();
});
services.AddMvc();
}
public void Configure(IApplicationBuilder app)
{
app.UseMvc();
}
}
}
\ No newline at end of file
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
}
}
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.AzureServiceBus
{
internal sealed class AzureServiceBusConsumerClient : IConsumerClient
{
private readonly ILogger _logger;
private readonly string _subscriptionName;
private readonly AzureServiceBusOptions _asbOptions;
private SubscriptionClient _consumerClient;
private string _lockToken;
public AzureServiceBusConsumerClient(
ILogger logger,
string subscriptionName,
AzureServiceBusOptions options)
{
_logger = logger;
_subscriptionName = subscriptionName;
_asbOptions = options ?? throw new ArgumentNullException(nameof(options));
InitAzureServiceBusClient().GetAwaiter().GetResult();
}
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _asbOptions.ConnectionString;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
var allRuleNames = _consumerClient.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name);
foreach (var newRule in topics.Except(allRuleNames))
{
_consumerClient.AddRuleAsync(new RuleDescription
{
Filter = new CorrelationFilter { Label = newRule },
Name = newRule
}).GetAwaiter().GetResult();
_logger.LogInformation($"Azure Service Bus add rule: {newRule}");
}
foreach (var oldRule in allRuleNames.Except(topics))
{
_consumerClient.RemoveRuleAsync(oldRule).GetAwaiter().GetResult();
_logger.LogInformation($"Azure Service Bus remove rule: {oldRule}");
}
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
_consumerClient.RegisterMessageHandler(OnConsumerReceived,
new MessageHandlerOptions(OnExceptionReceived)
{
AutoComplete = false,
MaxConcurrentCalls = 10,
MaxAutoRenewDuration = TimeSpan.FromSeconds(30)
});
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
public void Commit()
{
_consumerClient.CompleteAsync(_lockToken);
}
public void Reject()
{
// ignore
}
public void Dispose()
{
_consumerClient.CloseAsync().Wait();
}
#region private methods
private async Task InitAzureServiceBusClient()
{
ManagementClient mClient;
if (_asbOptions.ManagementTokenProvider != null)
{
mClient = new ManagementClient(new ServiceBusConnectionStringBuilder(
_asbOptions.ConnectionString), _asbOptions.ManagementTokenProvider);
}
else
{
mClient = new ManagementClient(_asbOptions.ConnectionString);
}
if (!await mClient.TopicExistsAsync(_asbOptions.TopicPath))
{
await mClient.CreateTopicAsync(_asbOptions.TopicPath);
_logger.LogInformation($"Azure Service Bus created topic: {_asbOptions.TopicPath}");
}
if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName))
{
await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName);
_logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}");
}
_consumerClient = new SubscriptionClient(_asbOptions.ConnectionString, _asbOptions.TopicPath, _subscriptionName,
ReceiveMode.PeekLock, RetryPolicy.Default);
}
private Task OnConsumerReceived(Message message, CancellationToken token)
{
_lockToken = message.SystemProperties.LockToken;
var context = new MessageContext
{
Group = _subscriptionName,
Name = message.Label,
Content = Encoding.UTF8.GetString(message.Body)
};
OnMessageReceived?.Invoke(null, context);
return Task.CompletedTask;
}
private Task OnExceptionReceived(ExceptionReceivedEventArgs args)
{
var context = args.ExceptionReceivedContext;
var exceptionMessage =
$"- Endpoint: {context.Endpoint}\r\n" +
$"- Entity Path: {context.EntityPath}\r\n" +
$"- Executing Action: {context.Action}\r\n" +
$"- Exception: {args.Exception}";
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ExceptionReceived,
Reason = exceptionMessage
};
OnLog?.Invoke(null, logArgs);
return Task.CompletedTask;
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.AzureServiceBus
{
internal sealed class AzureServiceBusConsumerClientFactory : IConsumerClientFactory
{
private readonly ILoggerFactory _loggerFactory;
private readonly AzureServiceBusOptions _asbOptions;
public AzureServiceBusConsumerClientFactory(
ILoggerFactory loggerFactory,
AzureServiceBusOptions asbOptions)
{
_loggerFactory = loggerFactory;
_asbOptions = asbOptions;
}
public IConsumerClient Create(string groupId)
{
var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
return new AzureServiceBusConsumerClient(logger, groupId, _asbOptions);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Primitives;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
/// Provides programmatic configuration for the CAP Azure Service Bus project.
/// </summary>
public class AzureServiceBusOptions
{
/// <summary>
/// TopicPath default value for CAP.
/// </summary>
public const string DefaultTopicPath = "cap";
/// <summary>
/// Azure Service Bus Namespace connection string. Must not contain topic information.
/// </summary>
public string ConnectionString { get; set; }
/// <summary>
/// The name of the topic relative to the service namespace base address.
/// </summary>
public string TopicPath { get; set; } = DefaultTopicPath;
/// <summary>
/// Represents the Azure Active Directory token provider for Azure Managed Service Identity integration.
/// </summary>
public ITokenProvider ManagementTokenProvider { get; set; }
/// <summary>
/// Used to generate Service Bus connection strings
/// </summary>
public ServiceBusConnectionStringBuilder ConnectionStringBuilder { get; set; }
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.AzureServiceBus;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal sealed class AzureServiceBusOptionsExtension : ICapOptionsExtension
{
private readonly Action<AzureServiceBusOptions> _configure;
public AzureServiceBusOptionsExtension(Action<AzureServiceBusOptions> configure)
{
_configure = configure;
}
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();
var azureServiceBusOptions = new AzureServiceBusOptions();
_configure?.Invoke(azureServiceBusOptions);
services.AddSingleton(azureServiceBusOptions);
services.AddSingleton<IConsumerClientFactory, AzureServiceBusConsumerClientFactory>();
services.AddSingleton<IPublishExecutor, AzureServiceBusPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, AzureServiceBusPublishMessageSender>();
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
/// <summary>
/// Configuration to use Azure Service Bus in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="connectionString">Connection string for namespace or the entity.</param>
public static CapOptions UseAzureServiceBus(this CapOptions options, string connectionString)
{
if (connectionString == null)
{
throw new ArgumentNullException(nameof(connectionString));
}
return options.UseAzureServiceBus(opt => { opt.ConnectionString = connectionString; });
}
/// <summary>
/// Configuration to use Azure Service Bus in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="configure">Provides programmatic configuration for the Azure Service Bus.</param>
public static CapOptions UseAzureServiceBus(this CapOptions options, Action<AzureServiceBusOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new AzureServiceBusOptionsExtension(configure));
return options;
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
/// An attribute for subscribe Kafka messages.
/// </summary>
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string name)
: base(name)
{
}
public override string ToString()
{
return Name;
}
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.AzureServiceBus</AssemblyName>
<PackageTags>$(PackageTags);AzureServiceBus</PackageTags>
</PropertyGroup>
<PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.AzureServiceBus.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.3.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.AzureServiceBus
{
internal class AzureServiceBusPublishMessageSender : BasePublishMessageSender
{
private readonly ILogger _logger;
private readonly ITopicClient _topicClient;
public AzureServiceBusPublishMessageSender(
ILogger<AzureServiceBusPublishMessageSender> logger,
CapOptions options,
AzureServiceBusOptions asbOptions,
IStateChanger stateChanger,
IStorageConnection connection)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
ServersAddress = asbOptions.ConnectionString;
_topicClient = new TopicClient(
ServersAddress,
asbOptions.TopicPath,
RetryPolicy.NoRetry);
}
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{
try
{
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = new Message
{
MessageId = Guid.NewGuid().ToString(),
Body = contentBytes,
Label = keyName,
};
await _topicClient.SendAsync(message);
_logger.LogDebug($"Azure Service Bus message [{keyName}] has been published.");
return OperateResult.Success;
}
catch (Exception ex)
{
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);
return OperateResult.Failed(wrapperEx);
}
}
}
}
\ No newline at end of file
...@@ -159,11 +159,14 @@ namespace DotNetCore.CAP ...@@ -159,11 +159,14 @@ namespace DotNetCore.CAP
_logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason); _logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason);
break; break;
case MqLogType.ConsumeError: case MqLogType.ConsumeError:
_logger.LogError("Kakfa client consume error. --> " + logmsg.Reason); _logger.LogError("Kafka client consume error. --> " + logmsg.Reason);
break; break;
case MqLogType.ServerConnError: case MqLogType.ServerConnError:
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason);
break; break;
case MqLogType.ExceptionReceived:
_logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason);
break;
default: default:
throw new ArgumentOutOfRangeException(); throw new ArgumentOutOfRangeException();
} }
......
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System.Collections.Generic;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
...@@ -21,7 +23,7 @@ namespace DotNetCore.CAP ...@@ -21,7 +23,7 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Message content /// Message content
/// </summary> /// </summary>
public string Content { get; set; } public string Content { get; set; }
public override string ToString() public override string ToString()
{ {
......
...@@ -15,7 +15,10 @@ namespace DotNetCore.CAP ...@@ -15,7 +15,10 @@ namespace DotNetCore.CAP
//Kafka //Kafka
ConsumeError, ConsumeError,
ServerConnError ServerConnError,
//AzureServiceBus
ExceptionReceived
} }
public class LogMessageEventArgs : EventArgs public class LogMessageEventArgs : EventArgs
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment