Unverified Commit 7021dc65 authored by Savorboard's avatar Savorboard Committed by GitHub

v2.6.0 (#362)

* Code refactor

* update code format

* Add options package reference

* Rename need retry processor

* update unit tests

* Improve ICapPublisher to singleton.

* Support options pattern for MySqlOptions

* Support options pattern for PostgreSqlOptions

* Remove reference

* update namespace of SubscriberNotFoundException.cs

* update ut

* Support options pattern for CapOptions

* update version to 2.6.0

* Support options pattern for AzureServiceBusOptions

* Fix client listening bug

* update inmemory project to using  option pattern.

* Support options pattern for KafkaOptions.

* Support options pattern for MongoDbOptions.

* Fix options.

* Support options pattern for SqlServerOptions

* upgrade confluent kafka to 1.1.0

* Remove CastleCoreTest project, it is not need again

* Fix transaction dispose.

* Fix mock

* Fix mock
parent d3c88667

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.15
# Visual Studio Version 16
VisualStudioVersion = 16.0.29025.244
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
......@@ -68,8 +68,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.InMemoryStor
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.CastleCoreTest", "test\DotNetCore.CAP.CastleCoreTest\DotNetCore.CAP.CastleCoreTest.csproj", "{BA499B87-77A9-43A2-98A3-89ECF5034E26}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -144,10 +142,6 @@ Global
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Release|Any CPU.Build.0 = Release|Any CPU
{BA499B87-77A9-43A2-98A3-89ECF5034E26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BA499B87-77A9-43A2-98A3-89ECF5034E26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA499B87-77A9-43A2-98A3-89ECF5034E26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA499B87-77A9-43A2-98A3-89ECF5034E26}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -171,7 +165,6 @@ Global
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{58B6E829-C6C8-457C-9DD0-C600650254DF} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{BA499B87-77A9-43A2-98A3-89ECF5034E26} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=DB/@EntryIndexedValue">DB</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Mongo/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Postgre/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file
<Project>
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>5</VersionMinor>
<VersionPatch>2</VersionPatch>
<VersionMinor>6</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
......@@ -10,11 +10,14 @@ using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal sealed class AzureServiceBusConsumerClient : IConsumerClient
{
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly ILogger _logger;
private readonly string _subscriptionName;
private readonly AzureServiceBusOptions _asbOptions;
......@@ -26,13 +29,11 @@ namespace DotNetCore.CAP.AzureServiceBus
public AzureServiceBusConsumerClient(
ILogger logger,
string subscriptionName,
AzureServiceBusOptions options)
IOptions<AzureServiceBusOptions> options)
{
_logger = logger;
_subscriptionName = subscriptionName;
_asbOptions = options ?? throw new ArgumentNullException(nameof(options));
InitAzureServiceBusClient().GetAwaiter().GetResult();
_asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public event EventHandler<MessageContext> OnMessageReceived;
......@@ -48,6 +49,8 @@ namespace DotNetCore.CAP.AzureServiceBus
throw new ArgumentNullException(nameof(topics));
}
ConnectAsync().GetAwaiter().GetResult();
var allRuleNames = _consumerClient.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name);
foreach (var newRule in topics.Except(allRuleNames))
......@@ -73,6 +76,8 @@ namespace DotNetCore.CAP.AzureServiceBus
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
ConnectAsync().GetAwaiter().GetResult();
_consumerClient.RegisterMessageHandler(OnConsumerReceived,
new MessageHandlerOptions(OnExceptionReceived)
{
......@@ -106,33 +111,50 @@ namespace DotNetCore.CAP.AzureServiceBus
#region private methods
private async Task InitAzureServiceBusClient()
private async Task ConnectAsync()
{
ManagementClient mClient;
if (_asbOptions.ManagementTokenProvider != null)
{
mClient = new ManagementClient(new ServiceBusConnectionStringBuilder(
_asbOptions.ConnectionString), _asbOptions.ManagementTokenProvider);
}
else
if (_consumerClient != null)
{
mClient = new ManagementClient(_asbOptions.ConnectionString);
return;
}
if (!await mClient.TopicExistsAsync(_asbOptions.TopicPath))
_connectionLock.Wait();
try
{
await mClient.CreateTopicAsync(_asbOptions.TopicPath);
_logger.LogInformation($"Azure Service Bus created topic: {_asbOptions.TopicPath}");
if (_consumerClient == null)
{
ManagementClient mClient;
if (_asbOptions.ManagementTokenProvider != null)
{
mClient = new ManagementClient(new ServiceBusConnectionStringBuilder(
_asbOptions.ConnectionString), _asbOptions.ManagementTokenProvider);
}
else
{
mClient = new ManagementClient(_asbOptions.ConnectionString);
}
if (!await mClient.TopicExistsAsync(_asbOptions.TopicPath))
{
await mClient.CreateTopicAsync(_asbOptions.TopicPath);
_logger.LogInformation($"Azure Service Bus created topic: {_asbOptions.TopicPath}");
}
if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName))
{
await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName);
_logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}");
}
_consumerClient = new SubscriptionClient(_asbOptions.ConnectionString, _asbOptions.TopicPath, _subscriptionName,
ReceiveMode.PeekLock, RetryPolicy.Default);
}
}
if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName))
finally
{
await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName);
_logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}");
_connectionLock.Release();
}
_consumerClient = new SubscriptionClient(_asbOptions.ConnectionString, _asbOptions.TopicPath, _subscriptionName,
ReceiveMode.PeekLock, RetryPolicy.Default);
}
private Task OnConsumerReceived(Message message, CancellationToken token)
......
......@@ -2,17 +2,18 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal sealed class AzureServiceBusConsumerClientFactory : IConsumerClientFactory
{
private readonly ILoggerFactory _loggerFactory;
private readonly AzureServiceBusOptions _asbOptions;
private readonly IOptions<AzureServiceBusOptions> _asbOptions;
public AzureServiceBusConsumerClientFactory(
ILoggerFactory loggerFactory,
AzureServiceBusOptions asbOptions)
IOptions<AzureServiceBusOptions> asbOptions)
{
_loggerFactory = loggerFactory;
_asbOptions = asbOptions;
......
......@@ -21,9 +21,7 @@ namespace DotNetCore.CAP
{
services.AddSingleton<CapMessageQueueMakerService>();
var azureServiceBusOptions = new AzureServiceBusOptions();
_configure?.Invoke(azureServiceBusOptions);
services.AddSingleton(azureServiceBusOptions);
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, AzureServiceBusConsumerClientFactory>();
services.AddSingleton<IPublishExecutor, AzureServiceBusPublishMessageSender>();
......
......@@ -3,40 +3,45 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal class AzureServiceBusPublishMessageSender : BasePublishMessageSender
{
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly ILogger _logger;
private readonly ITopicClient _topicClient;
private readonly IOptions<AzureServiceBusOptions> _asbOptions;
private ITopicClient _topicClient;
public AzureServiceBusPublishMessageSender(
ILogger<AzureServiceBusPublishMessageSender> logger,
CapOptions options,
AzureServiceBusOptions asbOptions,
IOptions<CapOptions> options,
IOptions<AzureServiceBusOptions> asbOptions,
IStateChanger stateChanger,
IStorageConnection connection)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
ServersAddress = asbOptions.ConnectionString;
_topicClient = new TopicClient(
ServersAddress,
asbOptions.TopicPath,
RetryPolicy.NoRetry);
_asbOptions = asbOptions;
}
protected override string ServersAddress => _asbOptions.Value.ConnectionString;
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{
try
{
Connect();
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = new Message
......@@ -59,5 +64,27 @@ namespace DotNetCore.CAP.AzureServiceBus
return OperateResult.Failed(wrapperEx);
}
}
private void Connect()
{
if (_topicClient != null)
{
return;
}
_connectionLock.Wait();
try
{
if (_topicClient == null)
{
_topicClient = new TopicClient(ServersAddress, _asbOptions.Value.TopicPath, RetryPolicy.NoRetry);
}
}
finally
{
_connectionLock.Release();
}
}
}
}
\ No newline at end of file
......@@ -17,9 +17,9 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, InMemoryStorageConnection>();
services.AddSingleton<ICapPublisher, InMemoryPublisher>();
services.AddSingleton<ICallbackPublisher, InMemoryPublisher>();
services.AddSingleton<ICallbackPublisher>(x => (InMemoryPublisher)x.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, InMemoryCollectProcessor>();
services.AddTransient<ICollectProcessor, InMemoryCollectProcessor>();
services.AddTransient<CapTransactionBase, InMemoryCapTransaction>();
}
}
......
......@@ -8,6 +8,7 @@ using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.InMemoryStorage
{
......@@ -15,9 +16,9 @@ namespace DotNetCore.CAP.InMemoryStorage
{
private readonly CapOptions _capOptions;
public InMemoryStorageConnection(CapOptions capOptions)
public InMemoryStorageConnection(IOptions<CapOptions> capOptions)
{
_capOptions = capOptions;
_capOptions = capOptions.Value;
PublishedMessages = new List<CapPublishedMessage>();
ReceivedMessages = new List<CapReceivedMessage>();
......
......@@ -21,9 +21,7 @@ namespace DotNetCore.CAP
{
services.AddSingleton<CapMessageQueueMakerService>();
var kafkaOptions = new KafkaOptions();
_configure?.Invoke(kafkaOptions);
services.AddSingleton(kafkaOptions);
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<IPublishExecutor, KafkaPublishMessageSender>();
......
......@@ -13,7 +13,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0" />
<PackageReference Include="Confluent.Kafka" Version="1.1.0" />
</ItemGroup>
<ItemGroup>
......
......@@ -6,6 +6,7 @@ using System.Collections.Concurrent;
using System.Threading;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotNetCore.CAP.Kafka
......@@ -17,18 +18,16 @@ namespace DotNetCore.CAP.Kafka
private int _pCount;
private int _maxSize;
public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options)
public ConnectionPool(ILogger<ConnectionPool> logger, IOptions<KafkaOptions> options)
{
ServersAddress = options.Servers;
_options = options;
_options = options.Value;
_producerPool = new ConcurrentQueue<IProducer<Null, string>>();
_maxSize = options.ConnectionPoolSize;
logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented));
_maxSize = _options.ConnectionPoolSize;
logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(_options.AsKafkaConfig(), Formatting.Indented));
}
public string ServersAddress { get; }
public string ServersAddress => _options.Servers;
public IProducer<Null, string> RentProducer()
{
......
......@@ -7,6 +7,7 @@ using Confluent.Kafka;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
......@@ -16,15 +17,19 @@ namespace DotNetCore.CAP.Kafka
private readonly ILogger _logger;
public KafkaPublishMessageSender(
CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
IConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger)
ILogger<KafkaPublishMessageSender> logger,
IOptions<CapOptions> options,
IStorageConnection connection,
IConnectionPool connectionPool,
IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionPool = connectionPool;
ServersAddress = _connectionPool.ServersAddress;
}
protected override string ServersAddress => _connectionPool.ServersAddress;
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{
var producer = _connectionPool.RentProducer();
......
......@@ -5,25 +5,24 @@ using System;
using System.Collections.Generic;
using System.Threading;
using Confluent.Kafka;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClient : IConsumerClient
{
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions;
private IConsumer<Null, string> _consumerClient;
public KafkaConsumerClient(string groupId, KafkaOptions options)
public KafkaConsumerClient(string groupId, IOptions<KafkaOptions> options)
{
_groupId = groupId;
_kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
InitKafkaClient();
_kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public IDeserializer<string> StringDeserializer { get; set; }
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<LogMessageEventArgs> OnLog;
......@@ -37,11 +36,15 @@ namespace DotNetCore.CAP.Kafka
throw new ArgumentNullException(nameof(topics));
}
Connect();
_consumerClient.Subscribe(topics);
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();
while (true)
{
var consumerResult = _consumerClient.Consume(cancellationToken);
......@@ -77,17 +80,32 @@ namespace DotNetCore.CAP.Kafka
#region private methods
private void InitKafkaClient()
private void Connect()
{
lock (_kafkaOptions)
if (_consumerClient != null)
{
_kafkaOptions.MainConfig["group.id"] = _groupId;
_kafkaOptions.MainConfig["auto.offset.reset"] = "earliest";
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new ConsumerBuilder<Null, string>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
return;
}
_connectionLock.Wait();
try
{
if (_consumerClient == null)
{
_kafkaOptions.MainConfig["group.id"] = _groupId;
_kafkaOptions.MainConfig["auto.offset.reset"] = "earliest";
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new ConsumerBuilder<Null, string>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
}
}
finally
{
_connectionLock.Release();
}
}
private void ConsumerClient_OnConsumeError(IConsumer<Null, string> consumer, Error e)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
private readonly KafkaOptions _kafkaOptions;
private readonly IOptions<KafkaOptions> _kafkaOptions;
public KafkaConsumerClientFactory(KafkaOptions kafkaOptions)
public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions)
{
_kafkaOptions = kafkaOptions;
}
......
......@@ -5,6 +5,7 @@ using System;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
......@@ -25,18 +26,20 @@ namespace DotNetCore.CAP.MongoDB
services.AddSingleton<IStorage, MongoDBStorage>();
services.AddSingleton<IStorageConnection, MongoDBStorageConnection>();
services.AddScoped<ICapPublisher, MongoDBPublisher>();
services.AddScoped<ICallbackPublisher, MongoDBPublisher>();
services.AddSingleton<ICapPublisher, MongoDBPublisher>();
services.AddSingleton<ICallbackPublisher>(x => (MongoDBPublisher)x.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<CapTransactionBase, MongoDBCapTransaction>();
var options = new MongoDBOptions();
_configure?.Invoke(options);
services.AddSingleton(options);
services.Configure(_configure);
//Try to add IMongoClient if does not exists
services.TryAddSingleton<IMongoClient>(new MongoClient(options.DatabaseConnection));
services.TryAddSingleton<IMongoClient>(x =>
{
var options = x.GetService<IOptions<MongoDBOptions>>().Value;
return new MongoClient(options.DatabaseConnection);
});
}
}
}
\ No newline at end of file
......@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
......@@ -16,10 +17,9 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
public MongoDBPublisher(IServiceProvider provider, MongoDBOptions options)
: base(provider)
public MongoDBPublisher(IServiceProvider provider) : base(provider)
{
_options = options;
_options = provider.GetService<IOptions<MongoDBOptions>>().Value;
_client = ServiceProvider.GetRequiredService<IMongoClient>();
}
......@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.MongoDB
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
var insertOptions = new InsertOneOptions {BypassDocumentValidation = false};
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
var collection = _client
.GetDatabase(_options.DatabaseName)
......@@ -51,11 +51,10 @@ namespace DotNetCore.CAP.MongoDB
if (NotUseTransaction)
{
return collection.InsertOneAsync(store, insertOptions, cancel);
}
var dbTrans = (IClientSessionHandle) transaction.DbTransaction;
var dbTrans = (IClientSessionHandle)transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, store, insertOptions, cancel);
}
}
......
......@@ -39,6 +39,7 @@ namespace DotNetCore.CAP
public override void Dispose()
{
(DbTransaction as IClientSessionHandle)?.Dispose();
DbTransaction = null;
}
}
......
......@@ -3,9 +3,9 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
......@@ -17,19 +17,19 @@ namespace DotNetCore.CAP.MongoDB
private readonly MongoDBOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger,
MongoDBOptions options,
public MongoDBCollectProcessor(
ILogger<MongoDBCollectProcessor> logger,
IOptions<MongoDBOptions> options,
IMongoClient client)
{
_options = options;
_options = options.Value;
_logger = logger;
_database = client.GetDatabase(_options.DatabaseName);
}
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug(
$"Collecting expired data from collection [{_options.PublishedCollection}].");
_logger.LogDebug($"Collecting expired data from collection [{_options.PublishedCollection}].");
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
......@@ -39,6 +39,7 @@ namespace DotNetCore.CAP.MongoDB
new DeleteManyModel<PublishedMessage>(
Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
await receivedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<ReceivedMessage>(
......
......@@ -7,6 +7,7 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
......@@ -17,10 +18,10 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
public MongoDBMonitoringApi(IMongoClient client, IOptions<MongoDBOptions> options)
{
var mongoClient = client ?? throw new ArgumentNullException(nameof(client));
_options = options ?? throw new ArgumentNullException(nameof(options));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_database = mongoClient.GetDatabase(_options.DatabaseName);
}
......@@ -140,7 +141,7 @@ namespace DotNetCore.CAP.MongoDB
private int GetNumberOfMessage(string collectionName, string statusName)
{
var collection = _database.GetCollection<BsonDocument>(collectionName);
var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}});
var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } });
return int.Parse(count.ToString());
}
......@@ -199,7 +200,7 @@ namespace DotNetCore.CAP.MongoDB
}
};
var pipeline = new[] {match, groupby};
var pipeline = new[] { match, groupby };
var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline).ToList();
......
......@@ -6,19 +6,21 @@ using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IOptions<CapOptions> _capOptions;
private readonly IMongoClient _client;
private readonly ILogger<MongoDBStorage> _logger;
private readonly MongoDBOptions _options;
private readonly IOptions<MongoDBOptions> _options;
public MongoDBStorage(CapOptions capOptions,
MongoDBOptions options,
public MongoDBStorage(
IOptions<CapOptions> capOptions,
IOptions<MongoDBOptions> options,
IMongoClient client,
ILogger<MongoDBStorage> logger)
{
......@@ -45,31 +47,32 @@ namespace DotNetCore.CAP.MongoDB
return;
}
var database = _client.GetDatabase(_options.DatabaseName);
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList();
var options = _options.Value;
var database = _client.GetDatabase(options.DatabaseName);
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken)).ToList();
if (names.All(n => n != _options.ReceivedCollection))
if (names.All(n => n != options.ReceivedCollection))
{
await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken);
await database.CreateCollectionAsync(options.ReceivedCollection, cancellationToken: cancellationToken);
}
if (names.All(n => n != _options.PublishedCollection))
if (names.All(n => n != options.PublishedCollection))
{
await database.CreateCollectionAsync(_options.PublishedCollection,
await database.CreateCollectionAsync(options.PublishedCollection,
cancellationToken: cancellationToken);
}
var receivedMessageIndexNames = new string[] {
var receivedMessageIndexNames = new[] {
nameof(ReceivedMessage.Name), nameof(ReceivedMessage.Added), nameof(ReceivedMessage.ExpiresAt),
nameof(ReceivedMessage.StatusName), nameof(ReceivedMessage.Retries), nameof(ReceivedMessage.Version) };
var publishedMessageIndexNames = new string[] {
var publishedMessageIndexNames = new[] {
nameof(PublishedMessage.Name), nameof(PublishedMessage.Added), nameof(PublishedMessage.ExpiresAt),
nameof(PublishedMessage.StatusName), nameof(PublishedMessage.Retries), nameof(PublishedMessage.Version) };
await Task.WhenAll(
TryCreateIndexesAsync<ReceivedMessage>(_options.ReceivedCollection, receivedMessageIndexNames),
TryCreateIndexesAsync<PublishedMessage>(_options.PublishedCollection, publishedMessageIndexNames)
TryCreateIndexesAsync<ReceivedMessage>(options.ReceivedCollection, receivedMessageIndexNames),
TryCreateIndexesAsync<PublishedMessage>(options.PublishedCollection, publishedMessageIndexNames)
);
_logger.LogDebug("Ensuring all create database tables script are applied.");
......@@ -87,15 +90,15 @@ namespace DotNetCore.CAP.MongoDB
if (indexNames.Any() == false)
return;
var indexes = indexNames.Select(index_name =>
var indexes = indexNames.Select(indexName =>
{
var indexOptions = new CreateIndexOptions
{
Name = index_name,
Name = indexName,
Background = true,
};
var indexBuilder = Builders<T>.IndexKeys;
return new CreateIndexModel<T>(indexBuilder.Descending(index_name), indexOptions);
return new CreateIndexModel<T>(indexBuilder.Descending(indexName), indexOptions);
}).ToArray();
await col.Indexes.CreateManyAsync(indexes, cancellationToken);
......
......@@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
......@@ -17,10 +18,13 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client)
public MongoDBStorageConnection(
IOptions<CapOptions> capOptions,
IOptions<MongoDBOptions> options,
IMongoClient client)
{
_capOptions = capOptions;
_options = options;
_capOptions = capOptions.Value;
_options = options.Value;
_client = client;
_database = _client.GetDatabase(_options.DatabaseName);
}
......
......@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.MongoDB
public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)
{
_options = options;
_database = client.GetDatabase(options.DatabaseName);
_database = client.GetDatabase(_options.DatabaseName);
_session = client.StartSession();
_session.StartTransaction();
}
......
......@@ -4,8 +4,8 @@
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......@@ -24,39 +24,15 @@ namespace DotNetCore.CAP
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IStorage, MySqlStorage>();
services.AddSingleton<IStorageConnection, MySqlStorageConnection>();
services.AddSingleton<ICapPublisher, MySqlPublisher>();
services.AddSingleton<ICallbackPublisher>(provider => (MySqlPublisher)provider.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, MySqlCollectProcessor>();
services.AddScoped<ICapPublisher, MySqlPublisher>();
services.AddScoped<ICallbackPublisher, MySqlPublisher>();
services.AddTransient<ICollectProcessor, MySqlCollectProcessor>();
services.AddTransient<CapTransactionBase, MySqlCapTransaction>();
AddSingletionMySqlOptions(services);
}
private void AddSingletionMySqlOptions(IServiceCollection services)
{
var mysqlOptions = new MySqlOptions();
_configure(mysqlOptions);
if (mysqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions;
}
});
}
else
{
services.AddSingleton(mysqlOptions);
}
}
//Add MySqlOptions
services.Configure(_configure);
services.AddSingleton<IConfigureOptions<MySqlOptions>, ConfigureMySqlOptions>();
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
......@@ -10,4 +14,29 @@ namespace DotNetCore.CAP
/// </summary>
public string ConnectionString { get; set; }
}
internal class ConfigureMySqlOptions : IConfigureOptions<MySqlOptions>
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public ConfigureMySqlOptions(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public void Configure(MySqlOptions options)
{
if (options.DbContextType != null)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var provider = scope.ServiceProvider;
using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
{
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}
}
}
}
\ No newline at end of file
......@@ -10,6 +10,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
......@@ -20,7 +21,7 @@ namespace DotNetCore.CAP.MySql
public MySqlPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<MySqlOptions>();
_options = provider.GetService<IOptions<MySqlOptions>>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
......
......@@ -50,6 +50,7 @@ namespace DotNetCore.CAP
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
DbTransaction = null;
}
}
......
......@@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
......@@ -18,11 +19,10 @@ namespace DotNetCore.CAP.MySql
private readonly MySqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public MySqlCollectProcessor(ILogger<MySqlCollectProcessor> logger,
MySqlOptions mysqlOptions)
public MySqlCollectProcessor(ILogger<MySqlCollectProcessor> logger, IOptions<MySqlOptions> mysqlOptions)
{
_logger = logger;
_options = mysqlOptions;
_options = mysqlOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
......@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.MySql
{
removedCount = await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;",
new {now = DateTime.Now, count = MaxBatch});
new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
......
......@@ -10,6 +10,7 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.MySql
{
......@@ -18,10 +19,10 @@ namespace DotNetCore.CAP.MySql
private readonly string _prefix;
private readonly MySqlStorage _storage;
public MySqlMonitoringApi(IStorage storage, MySqlOptions options)
public MySqlMonitoringApi(IStorage storage, IOptions<MySqlOptions> options)
{
_storage = storage as MySqlStorage ?? throw new ArgumentNullException(nameof(storage));
_prefix = options?.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
_prefix = options.Value.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
}
public StatisticsDto GetStatistics()
......@@ -126,7 +127,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
{
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
}
......@@ -169,7 +170,7 @@ select aggr.* from (
var valuesMap = connection.Query<TimelineCounter>(
sqlQuery,
new {keys = keyMaps.Keys, statusName})
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
......
......@@ -8,20 +8,22 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<MySqlOptions> _options;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
public MySqlStorage(ILogger<MySqlStorage> logger,
MySqlOptions options,
CapOptions capOptions)
public MySqlStorage(
ILogger<MySqlStorage> logger,
IOptions<MySqlOptions> options,
IOptions<CapOptions> capOptions)
{
_options = options;
_capOptions = capOptions;
......@@ -45,10 +47,10 @@ namespace DotNetCore.CAP.MySql
return;
}
var sql = CreateDbTablesScript(_options.TableNamePrefix);
using (var connection = new MySqlConnection(_options.ConnectionString))
var sql = CreateDbTablesScript(_options.Value.TableNamePrefix);
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
......@@ -103,7 +105,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
internal IDbConnection CreateAndOpenConnection()
{
var connection = _existingConnection ?? new MySqlConnection(_options.ConnectionString);
var connection = _existingConnection ?? new MySqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
......
......@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
......@@ -14,16 +15,17 @@ namespace DotNetCore.CAP.MySql
public class MySqlStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
private readonly IOptions<MySqlOptions> _options;
private readonly string _prefix;
public MySqlStorageConnection(MySqlOptions options, CapOptions capOptions)
public MySqlStorageConnection(IOptions<MySqlOptions> options, IOptions<CapOptions> capOptions)
{
_capOptions = capOptions;
Options = options;
_prefix = Options.TableNamePrefix;
_options = options;
_capOptions = capOptions.Value;
_prefix = options.Value.TableNamePrefix;
}
public MySqlOptions Options { get; }
public MySqlOptions Options => _options.Value;
public IStorageTransaction CreateTransaction()
{
......
......@@ -4,8 +4,8 @@
using System;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......@@ -24,38 +24,14 @@ namespace DotNetCore.CAP
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>();
services.AddSingleton<ICapPublisher, PostgreSqlPublisher>();
services.AddSingleton<ICallbackPublisher>(provider => (PostgreSqlPublisher)provider.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, PostgreSqlCollectProcessor>();
services.AddScoped<ICapPublisher, PostgreSqlPublisher>();
services.AddScoped<ICallbackPublisher, PostgreSqlPublisher>();
services.AddTransient<ICollectProcessor, PostgreSqlCollectProcessor>();
services.AddTransient<CapTransactionBase, PostgreSqlCapTransaction>();
AddSingletonPostgreSqlOptions(services);
}
private void AddSingletonPostgreSqlOptions(IServiceCollection services)
{
var postgreSqlOptions = new PostgreSqlOptions();
_configure(postgreSqlOptions);
if (postgreSqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext) provider.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
}
});
}
else
{
services.AddSingleton(postgreSqlOptions);
}
services.Configure(_configure);
services.AddSingleton<IConfigureOptions<PostgreSqlOptions>, ConfigurePostgreSqlOptions>();
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
......@@ -10,4 +15,29 @@ namespace DotNetCore.CAP
/// </summary>
public string ConnectionString { get; set; }
}
internal class ConfigurePostgreSqlOptions : IConfigureOptions<PostgreSqlOptions>
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public ConfigurePostgreSqlOptions(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public void Configure(PostgreSqlOptions options)
{
if (options.DbContextType != null)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var provider = scope.ServiceProvider;
using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
{
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}
}
}
}
\ No newline at end of file
......@@ -10,6 +10,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
......@@ -20,7 +21,7 @@ namespace DotNetCore.CAP.PostgreSql
public PostgreSqlPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<PostgreSqlOptions>();
_options = provider.GetService<IOptions<PostgreSqlOptions>>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
......
......@@ -50,6 +50,7 @@ namespace DotNetCore.CAP
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
DbTransaction = null;
}
}
......
......@@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
......@@ -25,10 +26,10 @@ namespace DotNetCore.CAP.PostgreSql
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public PostgreSqlCollectProcessor(ILogger<PostgreSqlCollectProcessor> logger,
PostgreSqlOptions sqlServerOptions)
IOptions<PostgreSqlOptions> sqlServerOptions)
{
_logger = logger;
_options = sqlServerOptions;
_options = sqlServerOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
......@@ -44,7 +45,7 @@ namespace DotNetCore.CAP.PostgreSql
{
removedCount = await connection.ExecuteAsync(
$"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new {now = DateTime.Now, count = MaxBatch});
new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
......
......@@ -10,6 +10,7 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.PostgreSql
{
......@@ -18,9 +19,9 @@ namespace DotNetCore.CAP.PostgreSql
private readonly PostgreSqlOptions _options;
private readonly PostgreSqlStorage _storage;
public PostgreSqlMonitoringApi(IStorage storage, PostgreSqlOptions options)
public PostgreSqlMonitoringApi(IStorage storage, IOptions<PostgreSqlOptions> options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_storage = storage as PostgreSqlStorage ?? throw new ArgumentNullException(nameof(storage));
}
......
......@@ -8,20 +8,21 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IOptions<CapOptions> _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly IOptions<PostgreSqlOptions> _options;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger,
CapOptions capOptions,
PostgreSqlOptions options)
IOptions<CapOptions> capOptions,
IOptions<PostgreSqlOptions> options)
{
_options = options;
_logger = logger;
......@@ -45,9 +46,9 @@ namespace DotNetCore.CAP.PostgreSql
return;
}
var sql = CreateDbTablesScript(_options.Schema);
var sql = CreateDbTablesScript(_options.Value.Schema);
using (var connection = new NpgsqlConnection(_options.ConnectionString))
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
......@@ -72,7 +73,7 @@ namespace DotNetCore.CAP.PostgreSql
internal IDbConnection CreateAndOpenConnection()
{
var connection = _existingConnection ?? new NpgsqlConnection(_options.ConnectionString);
var connection = _existingConnection ?? new NpgsqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
......
......@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
......@@ -15,10 +16,12 @@ namespace DotNetCore.CAP.PostgreSql
{
private readonly CapOptions _capOptions;
public PostgreSqlStorageConnection(PostgreSqlOptions options, CapOptions capOptions)
public PostgreSqlStorageConnection(
IOptions<PostgreSqlOptions> options,
IOptions<CapOptions> capOptions)
{
_capOptions = capOptions;
Options = options;
_capOptions = capOptions.Value;
Options = options.Value;
}
public PostgreSqlOptions Options { get; }
......
......@@ -6,6 +6,7 @@
using System;
using RabbitMQ.Client;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
......
......@@ -20,11 +20,8 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();
var options = new RabbitMQOptions();
_configure?.Invoke(options);
services.AddSingleton(options);
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
services.AddSingleton<IPublishExecutor, RabbitMQPublishMessageSender>();
......
......@@ -7,6 +7,7 @@ using System.Diagnostics;
using System.Reflection;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
......@@ -18,37 +19,34 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly ILogger<ConnectionChannelPool> _logger;
private readonly ConcurrentQueue<IModel> _pool;
private IConnection _connection;
private static readonly object s_lock = new object();
private static readonly object SLock = new object();
private int _count;
private int _maxSize;
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger,
CapOptions capOptions,
RabbitMQOptions options)
public ConnectionChannelPool(
ILogger<ConnectionChannelPool> logger,
IOptions<CapOptions> capOptionsAccessor,
IOptions<RabbitMQOptions> optionsAccessor)
{
_logger = logger;
_maxSize = DefaultPoolSize;
_pool = new ConcurrentQueue<IModel>();
_connectionActivator = CreateConnection(options);
HostAddress = options.HostName + ":" + options.Port;
var capOptions = capOptionsAccessor.Value;
var options = optionsAccessor.Value;
if (CapOptions.DefaultVersion == capOptions.Version)
{
Exchange = options.ExchangeName;
}
else
{
Exchange = options.ExchangeName + "." + capOptions.Version;
}
_connectionActivator = CreateConnection(options);
HostAddress = $"{options.HostName}:{options.Port}";
Exchange = CapOptions.DefaultVersion == capOptions.Version ? options.ExchangeName : $"{options.ExchangeName}.{capOptions.Version}";
_logger.LogDebug($"RabbitMQ configuration:'HostName:{options.HostName}, Port:{options.Port}, UserName:{options.UserName}, Password:{options.Password}, ExchangeName:{options.ExchangeName}'");
}
IModel IConnectionChannelPool.Rent()
{
lock (s_lock)
lock (SLock)
{
while (_count > _maxSize)
{
......@@ -100,14 +98,14 @@ namespace DotNetCore.CAP.RabbitMQ
Password = options.Password,
VirtualHost = options.VirtualHost
};
if (options.HostName.Contains(","))
{
options.ConnectionFactoryOptions?.Invoke(factory);
return () => factory.CreateConnection(
options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries), serviceName);
}
factory.HostName = options.HostName;
options.ConnectionFactoryOptions?.Invoke(factory);
return () => factory.CreateConnection(serviceName);
......@@ -135,7 +133,7 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception e)
{
_logger.LogError(e,"RabbitMQ channel model create failed!");
_logger.LogError(e, "RabbitMQ channel model create failed!");
Console.WriteLine(e);
throw;
}
......
......@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
......@@ -18,16 +19,21 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly ILogger _logger;
private readonly string _exchange;
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
public RabbitMQPublishMessageSender(
ILogger<RabbitMQPublishMessageSender> logger,
IOptions<CapOptions> options,
IStorageConnection connection,
IConnectionChannelPool connectionChannelPool,
IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
_exchange = _connectionChannelPool.Exchange;
ServersAddress = _connectionChannelPool.HostAddress;
}
protected override string ServersAddress => _connectionChannelPool.HostAddress;
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var channel = _connectionChannelPool.Rent();
......@@ -48,14 +54,14 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception ex)
{
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};
return Task.FromResult(OperateResult.Failed(wapperEx, errors));
return Task.FromResult(OperateResult.Failed(wrapperEx, errors));
}
finally
{
......
......@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
......@@ -12,6 +13,8 @@ namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClient : IConsumerClient
{
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly string _exchangeName;
private readonly string _queueName;
......@@ -23,14 +26,12 @@ namespace DotNetCore.CAP.RabbitMQ
public RabbitMQConsumerClient(string queueName,
IConnectionChannelPool connectionChannelPool,
RabbitMQOptions options)
IOptions<RabbitMQOptions> options)
{
_queueName = queueName;
_connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = options;
_rabbitMQOptions = options.Value;
_exchangeName = connectionChannelPool.Exchange;
InitClient();
}
public event EventHandler<MessageContext> OnMessageReceived;
......@@ -46,6 +47,8 @@ namespace DotNetCore.CAP.RabbitMQ
throw new ArgumentNullException(nameof(topics));
}
Connect();
foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchangeName, topic);
......@@ -54,6 +57,8 @@ namespace DotNetCore.CAP.RabbitMQ
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown;
......@@ -88,25 +93,39 @@ namespace DotNetCore.CAP.RabbitMQ
_connection.Dispose();
}
private void InitClient()
{
_connection = _connectionChannelPool.GetConnection();
#region events
_channel = _connection.CreateModel();
private void Connect()
{
if (_connection != null)
{
return;
}
_channel.ExchangeDeclare(
_exchangeName,
RabbitMQOptions.ExchangeType,
true);
_connectionLock.Wait();
var arguments = new Dictionary<string, object>
try
{
{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
};
_channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
}
if (_connection == null)
{
_connection = _connectionChannelPool.GetConnection();
#region events
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(_exchangeName, RabbitMQOptions.ExchangeType, true);
var arguments = new Dictionary<string, object>
{
{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
};
_channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
}
}
finally
{
_connectionLock.Release();
}
}
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
{
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly IOptions<RabbitMQOptions> _rabbitMQOptions;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnectionChannelPool channelPool)
public RabbitMQConsumerClientFactory(IOptions<RabbitMQOptions> rabbitMQOptions, IConnectionChannelPool channelPool)
{
_rabbitMQOptions = rabbitMQOptions;
_connectionChannelPool = channelPool;
......
......@@ -5,8 +5,8 @@ using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......@@ -23,42 +23,18 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<DiagnosticProcessorObserver>();
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddSingleton<ICapPublisher, SqlServerPublisher>();
services.AddSingleton<ICallbackPublisher>(x => (SqlServerPublisher)x.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, SqlServerCollectProcessor>();
services.AddScoped<ICapPublisher, SqlServerPublisher>();
services.AddScoped<ICallbackPublisher, SqlServerPublisher>();
services.AddTransient<ICollectProcessor, SqlServerCollectProcessor>();
services.AddTransient<CapTransactionBase, SqlServerCapTransaction>();
AddSqlServerOptions(services);
}
private void AddSqlServerOptions(IServiceCollection services)
{
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);
if (sqlServerOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
});
}
else
{
services.AddSingleton(sqlServerOptions);
}
services.Configure(_configure);
services.AddSingleton<IConfigureOptions<SqlServerOptions>, ConfigureSqlServerOptions>();
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerOptions : EFOptions
......@@ -10,4 +15,30 @@ namespace DotNetCore.CAP
/// </summary>
public string ConnectionString { get; set; }
}
internal class ConfigureSqlServerOptions : IConfigureOptions<SqlServerOptions>
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public ConfigureSqlServerOptions(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public void Configure(SqlServerOptions options)
{
if (options.DbContextType != null)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var provider = scope.ServiceProvider;
using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
{
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}
}
}
}
\ No newline at end of file
......@@ -11,6 +11,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
......@@ -20,7 +21,7 @@ namespace DotNetCore.CAP.SqlServer
public SqlServerPublisher(IServiceProvider provider) : base(provider)
{
_options = ServiceProvider.GetService<SqlServerOptions>();
_options = ServiceProvider.GetService<IOptions<SqlServerOptions>>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
......
......@@ -12,6 +12,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......@@ -23,9 +24,9 @@ namespace DotNetCore.CAP
public SqlServerCapTransaction(
IDispatcher dispatcher,
SqlServerOptions sqlServerOptions,
IServiceProvider serviceProvider) : base(dispatcher)
{
var sqlServerOptions = serviceProvider.GetService<IOptions<SqlServerOptions>>().Value;
if (sqlServerOptions.DbContextType != null)
{
_dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext;
......@@ -56,14 +57,14 @@ namespace DotNetCore.CAP
}
}
var transactionKey = ((SqlConnection) dbTransaction.Connection).ClientConnectionId;
var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId;
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list))
{
list.Add(msg);
}
else
{
var msgList = new List<CapPublishedMessage>(1) {msg};
var msgList = new List<CapPublishedMessage>(1) { msg };
_diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList);
}
}
......@@ -109,6 +110,7 @@ namespace DotNetCore.CAP
dbContextTransaction.Dispose();
break;
}
DbTransaction = null;
}
}
......@@ -149,7 +151,7 @@ namespace DotNetCore.CAP
var dbTransaction = dbConnection.BeginTransaction();
var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit);
return (IDbTransaction) capTransaction.DbTransaction;
return (IDbTransaction)capTransaction.DbTransaction;
}
/// <summary>
......
......@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
......@@ -25,10 +26,10 @@ namespace DotNetCore.CAP.SqlServer
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public SqlServerCollectProcessor(ILogger<SqlServerCollectProcessor> logger,
SqlServerOptions sqlServerOptions)
IOptions<SqlServerOptions> sqlServerOptions)
{
_logger = logger;
_options = sqlServerOptions;
_options = sqlServerOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
......
......@@ -11,6 +11,7 @@ using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
......@@ -23,14 +24,14 @@ namespace DotNetCore.CAP.SqlServer
private readonly SqlServerOptions _options;
public SqlServerStorage(ILogger<SqlServerStorage> logger,
CapOptions capOptions,
SqlServerOptions options,
IOptions<CapOptions> capOptions,
IOptions<SqlServerOptions> options,
DiagnosticProcessorObserver diagnosticProcessorObserver)
{
_options = options;
_options = options.Value;
_diagnosticProcessorObserver = diagnosticProcessorObserver;
_logger = logger;
_capOptions = capOptions;
_capOptions = capOptions.Value;
}
public IStorageConnection GetConnection()
......
......@@ -15,9 +15,9 @@ namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher
{
private readonly CapTransactionBase _transaction;
private readonly IMessagePacker _msgPacker;
private readonly IContentSerializer _serializer;
private CapTransactionBase _transaction;
protected bool NotUseTransaction;
......@@ -28,14 +28,27 @@ namespace DotNetCore.CAP.Abstractions
protected CapPublisherBase(IServiceProvider service)
{
ServiceProvider = service;
_transaction = service.GetRequiredService<CapTransactionBase>();
_msgPacker = service.GetRequiredService<IMessagePacker>();
_serializer = service.GetRequiredService<IContentSerializer>();
}
protected IServiceProvider ServiceProvider { get; }
public ICapTransaction Transaction => _transaction;
public ICapTransaction Transaction
{
get
{
if (_transaction == null)
{
using (var scope = ServiceProvider.CreateScope())
{
_transaction = scope.ServiceProvider.GetRequiredService<CapTransactionBase>();
}
}
return _transaction;
}
}
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
......@@ -99,7 +112,7 @@ namespace DotNetCore.CAP.Abstractions
{
if (NotUseTransaction || Transaction.AutoCommit)
{
_transaction.Dispose();
_transaction?.Dispose();
}
}
}
......
......@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
......@@ -11,7 +10,6 @@ using DotNetCore.CAP.Processor.States;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
......@@ -58,7 +56,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IStateChanger, StateChanger>();
//Queue's message processor
services.TryAddSingleton<NeedRetryMessageProcessor>();
services.TryAddSingleton<MessageNeedToRetryProcessor>();
services.TryAddSingleton<TransportCheckProcessor>();
//Sender and Executors
......@@ -73,11 +71,11 @@ namespace Microsoft.Extensions.DependencyInjection
{
serviceExtension.AddServices(services);
}
services.AddSingleton(options);
services.Configure(setupAction);
//Startup and Middleware
services.AddTransient<IHostedService, DefaultBootstrapper>();
//Startup and Hosted
services.AddTransient<IStartupFilter, CapStartupFilter>();
services.AddHostedService<DefaultBootstrapper>();
return new CapBuilder(services);
}
......
......@@ -33,11 +33,9 @@
<ItemGroup>
<PackageReference Include="Consul" Version="0.7.2.6" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
</ItemGroup>
<ItemGroup>
......
......@@ -11,6 +11,7 @@ using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
{
......@@ -21,7 +22,7 @@ namespace DotNetCore.CAP
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger;
protected string ServersAddress { get; set; }
protected abstract string ServersAddress { get; }
// diagnostics listener
// ReSharper disable once InconsistentNaming
......@@ -30,11 +31,11 @@ namespace DotNetCore.CAP
protected BasePublishMessageSender(
ILogger logger,
CapOptions options,
IOptions<CapOptions> options,
IStorageConnection connection,
IStateChanger stateChanger)
{
_options = options;
_options = options.Value;
_connection = connection;
_stateChanger = stateChanger;
_logger = logger;
......
......@@ -11,6 +11,7 @@ using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
{
......@@ -30,7 +31,7 @@ namespace DotNetCore.CAP
public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger,
CapOptions options,
IOptions<CapOptions> options,
IConsumerInvokerFactory consumerInvokerFactory,
ICallbackMessageSender callbackMessageSender,
IStateChanger stateChanger,
......@@ -39,7 +40,7 @@ namespace DotNetCore.CAP
{
_selector = selector;
_callbackMessageSender = callbackMessageSender;
_options = options;
_options = options.Value;
_stateChanger = stateChanger;
_connection = connection;
_logger = logger;
......
......@@ -94,7 +94,7 @@ namespace DotNetCore.CAP.Processor
var returnedProcessors = new List<IProcessor>
{
_provider.GetRequiredService<TransportCheckProcessor>(),
_provider.GetRequiredService<NeedRetryMessageProcessor>(),
_provider.GetRequiredService<MessageNeedToRetryProcessor>(),
_provider.GetRequiredService<ICollectProcessor>()
};
......
......@@ -7,27 +7,28 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
{
public class NeedRetryMessageProcessor : IProcessor
public class MessageNeedToRetryProcessor : IProcessor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger<NeedRetryMessageProcessor> _logger;
private readonly ILogger<MessageNeedToRetryProcessor> _logger;
private readonly IPublishMessageSender _publishMessageSender;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval;
public NeedRetryMessageProcessor(
CapOptions options,
ILogger<NeedRetryMessageProcessor> logger,
public MessageNeedToRetryProcessor(
IOptions<CapOptions> options,
ILogger<MessageNeedToRetryProcessor> logger,
ISubscriberExecutor subscriberExecutor,
IPublishMessageSender publishMessageSender)
{
_logger = logger;
_subscriberExecutor = subscriberExecutor;
_publishMessageSender = publishMessageSender;
_waitingInterval = TimeSpan.FromSeconds(options.FailedRetryInterval);
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
}
public async Task ProcessAsync(ProcessingContext context)
......
......@@ -3,7 +3,7 @@
using System;
namespace DotNetCore.CAP.Internal
namespace DotNetCore.CAP
{
public class SubscriberNotFoundException : Exception
{
......
using Castle.DynamicProxy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using Xunit;
namespace DotNetCore.CAP.CastleDynamicProxyTest
{
public class ConsumerServiceSelectorTest
{
private IServiceProvider _provider;
public ConsumerServiceSelectorTest()
{
var services = new ServiceCollection();
services.AddSingleton(typeof(ICapSubscribe), f =>
{
var generator = new ProxyGenerator();
return generator.CreateClassProxy(typeof(TestSubscribeClass));
});
services.AddSingleton<ITestSubscribeClass, TestSubscribeClass>();
services.AddLogging();
services.TryAddSingleton<IConsumerServiceSelector, CastleCoreConsumerServiceSelector>();
services.AddCap(x => { });
_provider = services.BuildServiceProvider();
}
[Theory]
[InlineData("cap.castle.sub")]
public void CanFindCapSubscribeTopic(string topic)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
Assert.Equal(1, candidates.Count);
}
}
public interface ITestSubscribeClass
{
}
public class TestSubscribeClass : ITestSubscribeClass, ICapSubscribe
{
[CapSubscribe("cap.castle.sub")]
public void TestSubscribe(DateTime dateTime)
{
Console.WriteLine(dateTime);
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Castle.Core" Version="4.4.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using Castle.Core;
using Castle.DynamicProxy;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Reflection;
namespace DotNetCore.CAP.CastleDynamicProxyTest
{
public class CastleCoreConsumerServiceSelector : DefaultConsumerServiceSelector
{
public CastleCoreConsumerServiceSelector(IServiceProvider serviceProvider, CapOptions capOptions)
: base(serviceProvider, capOptions)
{
}
protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
using (var scoped = provider.CreateScope())
{
var scopedProvider = scoped.ServiceProvider;
var consumerServices = scopedProvider.GetServices<ICapSubscribe>();
foreach (var service in consumerServices)
{
var serviceType = service.GetType();
// Castle dynamic proxy...
TypeInfo typeInfo = ProxyServices.IsDynamicProxy(serviceType) ? ProxyUtil.GetUnproxiedType(service).GetTypeInfo()
: serviceType.GetTypeInfo();
if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo))
{
continue;
}
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}
return executorDescriptorList;
}
}
}
}
using System;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB.Test
......@@ -11,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB.Test
protected IServiceProvider Provider { get; private set; }
protected IMongoClient MongoClient => Provider.GetService<IMongoClient>();
protected IMongoDatabase Database => MongoClient.GetDatabase(MongoDBOptions.DatabaseName);
protected CapOptions CapOptions => Provider.GetService<CapOptions>();
protected MongoDBOptions MongoDBOptions => Provider.GetService<MongoDBOptions>();
protected IMongoDatabase Database => MongoClient.GetDatabase(MongoDBOptions.Value.DatabaseName);
protected CapOptions CapOptions => Provider.GetService<IOptions<CapOptions>>().Value;
protected IOptions<MongoDBOptions> MongoDBOptions => Provider.GetService<IOptions<MongoDBOptions>>();
protected DatabaseTestHost()
{
......@@ -37,9 +38,9 @@ namespace DotNetCore.CAP.MongoDB.Test
services.AddOptions();
services.AddLogging();
_connectionString = ConnectionUtil.ConnectionString;
services.AddSingleton(new MongoDBOptions() { DatabaseConnection = _connectionString });
services.AddSingleton(new CapOptions());
services.AddOptions<CapOptions>();
services.Configure<MongoDBOptions>(x => x.DatabaseConnection = _connectionString);
services.AddSingleton<IMongoClient>(x => new MongoClient(_connectionString));
services.AddSingleton<MongoDBStorage>();
......@@ -49,7 +50,7 @@ namespace DotNetCore.CAP.MongoDB.Test
public void Dispose()
{
MongoClient.DropDatabase(MongoDBOptions.DatabaseName);
MongoClient.DropDatabase(MongoDBOptions.Value.DatabaseName);
}
}
}
\ No newline at end of file
......@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.MongoDB.Test
{
_api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions);
var collection = Database.GetCollection<PublishedMessage>(MongoDBOptions.PublishedCollection);
var collection = Database.GetCollection<PublishedMessage>(MongoDBOptions.Value.PublishedCollection);
collection.InsertMany(new[]
{
new PublishedMessage
......
......@@ -38,7 +38,7 @@ namespace DotNetCore.CAP.MongoDB.Test
public void ChangeReceivedState_Test()
{
StoreReceivedMessageAsync_TestAsync();
var collection = Database.GetCollection<ReceivedMessage>(MongoDBOptions.ReceivedCollection);
var collection = Database.GetCollection<ReceivedMessage>(MongoDBOptions.Value.ReceivedCollection);
var msg = collection.Find(x => true).FirstOrDefault();
_connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue();
......@@ -64,7 +64,7 @@ namespace DotNetCore.CAP.MongoDB.Test
};
_connection.StoreReceivedMessage(msg);
var collection = Database.GetCollection<ReceivedMessage>(MongoDBOptions.ReceivedCollection);
var collection = Database.GetCollection<ReceivedMessage>(MongoDBOptions.Value.ReceivedCollection);
var updateDef = Builders<ReceivedMessage>
.Update.Set(x => x.Added, DateTime.Now.AddMinutes(-5));
......
......@@ -11,11 +11,11 @@ namespace DotNetCore.CAP.MongoDB.Test
public void InitializeAsync_Test()
{
var names = MongoClient.ListDatabaseNames()?.ToList();
names.Should().Contain(MongoDBOptions.DatabaseName);
names.Should().Contain(MongoDBOptions.Value.DatabaseName);
var collections = Database.ListCollectionNames()?.ToList();
collections.Should().Contain(MongoDBOptions.PublishedCollection);
collections.Should().Contain(MongoDBOptions.ReceivedCollection);
collections.Should().Contain(MongoDBOptions.Value.PublishedCollection);
collections.Should().Contain(MongoDBOptions.Value.ReceivedCollection);
}
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using Xunit;
namespace DotNetCore.CAP.MySql.Test
......@@ -14,8 +15,8 @@ namespace DotNetCore.CAP.MySql.Test
public MySqlStorageConnectionTest()
{
var options = GetService<MySqlOptions>();
var capOptions = GetService<CapOptions>();
var options = GetService<IOptions<MySqlOptions>>();
var capOptions = GetService<IOptions<CapOptions>>();
_storage = new MySqlStorageConnection(options, capOptions);
}
......
......@@ -28,8 +28,9 @@ namespace DotNetCore.CAP.MySql.Test
services.AddLogging();
_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new MySqlOptions { ConnectionString = _connectionString });
services.AddSingleton(new CapOptions());
services.AddOptions<CapOptions>();
services.Configure<MySqlOptions>(x => x.ConnectionString = _connectionString);
services.AddSingleton<MySqlStorage>();
_services = services;
......
......@@ -3,6 +3,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using Xunit;
namespace DotNetCore.CAP.PostgreSql.Test
......@@ -14,9 +15,9 @@ namespace DotNetCore.CAP.PostgreSql.Test
public PostgreSqlStorageConnectionTest()
{
var options = GetService<PostgreSqlOptions>();
var capOptions = GetService<CapOptions>();
_storage = new PostgreSqlStorageConnection(options,capOptions);
var options = GetService<IOptions<PostgreSqlOptions>>();
var capOptions = GetService<IOptions<CapOptions>>();
_storage = new PostgreSqlStorageConnection(options, capOptions);
}
[Fact]
......@@ -74,13 +75,13 @@ namespace DotNetCore.CAP.PostgreSql.Test
var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage
{
Id= insertedId,
Id = insertedId,
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
using (var connection = ConnectionUtil.CreateConnection())
{
await connection.ExecuteAsync(sql, receivedMessage);
......
......@@ -28,8 +28,9 @@ namespace DotNetCore.CAP.PostgreSql.Test
services.AddLogging();
_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new PostgreSqlOptions { ConnectionString = _connectionString });
services.AddSingleton(new CapOptions());
services.AddOptions<CapOptions>();
services.Configure<PostgreSqlOptions>(x => x.ConnectionString = _connectionString);
services.AddSingleton<PostgreSqlStorage>();
_services = services;
......
......@@ -4,6 +4,7 @@ using System.Data.SqlClient;
using Dapper;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Moq;
namespace DotNetCore.CAP.SqlServer.Test
......@@ -11,8 +12,8 @@ namespace DotNetCore.CAP.SqlServer.Test
public abstract class DatabaseTestHost : IDisposable
{
protected ILogger<SqlServerStorage> Logger;
protected CapOptions CapOptions;
protected SqlServerOptions SqlSeverOptions;
protected IOptions<CapOptions> CapOptions;
protected IOptions<SqlServerOptions> SqlSeverOptions;
protected DiagnosticProcessorObserver DiagnosticProcessorObserver;
public bool SqlObjectInstalled;
......@@ -20,11 +21,14 @@ namespace DotNetCore.CAP.SqlServer.Test
protected DatabaseTestHost()
{
Logger = new Mock<ILogger<SqlServerStorage>>().Object;
CapOptions = new Mock<CapOptions>().Object;
SqlSeverOptions = new SqlServerOptions()
{
ConnectionString = ConnectionUtil.GetConnectionString()
};
var capOptions = new Mock<IOptions<CapOptions>>();
capOptions.Setup(x => x.Value).Returns(new CapOptions());
CapOptions = capOptions.Object;
var options = new Mock<IOptions<SqlServerOptions>>();
options.Setup(x => x.Value).Returns(new SqlServerOptions { ConnectionString = ConnectionUtil.GetConnectionString() });
SqlSeverOptions = options.Object;
DiagnosticProcessorObserver = new DiagnosticProcessorObserver(new Mock<IDispatcher>().Object);
......
......@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.SqlServer.Test
public SqlServerStorageConnectionTest()
{
_storage = new SqlServerStorageConnection(SqlSeverOptions, CapOptions);
_storage = new SqlServerStorageConnection(SqlSeverOptions.Value, CapOptions.Value);
}
[Fact]
......
......@@ -4,6 +4,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Xunit;
namespace DotNetCore.CAP.Test
......@@ -81,7 +82,7 @@ namespace DotNetCore.CAP.Test
var services = new ServiceCollection();
services.AddCap(x => { });
var builder = services.BuildServiceProvider();
var capOptions = builder.GetService<CapOptions>();
var capOptions = builder.GetService<IOptions<CapOptions>>().Value;
Assert.NotNull(capOptions);
}
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
......
......@@ -4,6 +4,7 @@ using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Xunit;
namespace DotNetCore.CAP.Test
......@@ -48,10 +49,10 @@ namespace DotNetCore.CAP.Test
{
private readonly CapOptions _capOptions;
public MyConsumerServiceSelector(IServiceProvider serviceProvider, CapOptions capOptions)
: base(serviceProvider, capOptions)
public MyConsumerServiceSelector(IServiceProvider serviceProvider)
: base(serviceProvider)
{
_capOptions = capOptions;
_capOptions = serviceProvider.GetService<IOptions<CapOptions>>().Value;
}
protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment