Commit 0cc7f5e5 authored by Savorboard's avatar Savorboard

Refactoring to specification

parent 584852b6
...@@ -7,7 +7,7 @@ namespace DotNetCore.CAP.MongoDB ...@@ -7,7 +7,7 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBCapOptionsExtension : ICapOptionsExtension public class MongoDBCapOptionsExtension : ICapOptionsExtension
{ {
private Action<MongoDBOptions> _configure; private readonly Action<MongoDBOptions> _configure;
public MongoDBCapOptionsExtension(Action<MongoDBOptions> configure) public MongoDBCapOptionsExtension(Action<MongoDBOptions> configure)
{ {
......
...@@ -2,6 +2,7 @@ using System; ...@@ -2,6 +2,7 @@ using System;
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.MongoDB; using DotNetCore.CAP.MongoDB;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
{ {
public static class CapOptionsExtensions public static class CapOptionsExtensions
......
...@@ -12,16 +12,18 @@ namespace DotNetCore.CAP.MongoDB ...@@ -12,16 +12,18 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class CapPublisher : CapPublisherBase, ICallbackPublisher public class CapPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly IMongoClient _client;
private readonly MongoDBOptions _options; private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private bool _isInTransaction = true; private bool _isInTransaction = true;
public CapPublisher(ILogger<CapPublisherBase> logger, IDispatcher dispatcher, public CapPublisher(
IMongoClient client, MongoDBOptions options, IServiceProvider provider) ILogger<CapPublisherBase> logger,
: base(logger, dispatcher) IDispatcher dispatcher,
IMongoClient client,
MongoDBOptions options,
IServiceProvider provider)
: base(logger, dispatcher)
{ {
_client = client;
_options = options; _options = options;
_database = client.GetDatabase(_options.Database); _database = client.GetDatabase(_options.Database);
ServiceProvider = provider; ServiceProvider = provider;
...@@ -122,7 +124,6 @@ namespace DotNetCore.CAP.MongoDB ...@@ -122,7 +124,6 @@ namespace DotNetCore.CAP.MongoDB
return message.Id; return message.Id;
} }
private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session, string callbackName) private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
{ {
Guid operationId = default(Guid); Guid operationId = default(Guid);
...@@ -159,7 +160,6 @@ namespace DotNetCore.CAP.MongoDB ...@@ -159,7 +160,6 @@ namespace DotNetCore.CAP.MongoDB
throw; throw;
} }
} }
private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message)
{ {
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.MongoDB</AssemblyName>
<PackageTags>$(PackageTags);MongoDB</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MongoDB.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="MongoDB.Bson" Version="2.7.0" /> <PackageReference Include="MongoDB.Bson" Version="2.7.0" />
<PackageReference Include="MongoDB.Driver" Version="2.7.0" /> <PackageReference Include="MongoDB.Driver" Version="2.7.0" />
......
...@@ -9,16 +9,15 @@ namespace DotNetCore.CAP.MongoDB ...@@ -9,16 +9,15 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBCollectProcessor : ICollectProcessor public class MongoDBCollectProcessor : ICollectProcessor
{ {
private readonly IMongoClient _client;
private readonly MongoDBOptions _options; private readonly MongoDBOptions _options;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public MongoDBCollectProcessor(IMongoClient client, MongoDBOptions options, public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger,
ILogger<MongoDBCollectProcessor> logger) MongoDBOptions options,
IMongoClient client)
{ {
_client = client;
_options = options; _options = options;
_logger = logger; _logger = logger;
_database = client.GetDatabase(_options.Database); _database = client.GetDatabase(_options.Database);
......
...@@ -12,16 +12,15 @@ namespace DotNetCore.CAP.MongoDB ...@@ -12,16 +12,15 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBMonitoringApi : IMonitoringApi public class MongoDBMonitoringApi : IMonitoringApi
{ {
private IMongoClient _client; private readonly MongoDBOptions _options;
private MongoDBOptions _options; private readonly IMongoDatabase _database;
private IMongoDatabase _database;
public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options) public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
{ {
_client = client ?? throw new ArgumentNullException(nameof(client)); var mongoClient = client ?? throw new ArgumentNullException(nameof(client));
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_database = _client.GetDatabase(_options.Database); _database = mongoClient.GetDatabase(_options.Database);
} }
public StatisticsDto GetStatistics() public StatisticsDto GetStatistics()
...@@ -87,12 +86,12 @@ namespace DotNetCore.CAP.MongoDB ...@@ -87,12 +86,12 @@ namespace DotNetCore.CAP.MongoDB
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
} }
var result = var result = collection
collection.Find(filter) .Find(filter)
.SortByDescending(x => x.Added) .SortByDescending(x => x.Added)
.Skip(queryDto.PageSize * queryDto.CurrentPage) .Skip(queryDto.PageSize * queryDto.CurrentPage)
.Limit(queryDto.PageSize) .Limit(queryDto.PageSize)
.ToList(); .ToList();
return result; return result;
} }
...@@ -130,30 +129,45 @@ namespace DotNetCore.CAP.MongoDB ...@@ -130,30 +129,45 @@ namespace DotNetCore.CAP.MongoDB
var endDate = DateTime.UtcNow; var endDate = DateTime.UtcNow;
var groupby = new BsonDocument { var groupby = new BsonDocument {
{ "$group", new BsonDocument{ {
{ "_id", new BsonDocument { "$group", new BsonDocument {
{ "Key", new BsonDocument { { "_id", new BsonDocument {
{ "$dateToString", new BsonDocument { { "Key", new BsonDocument{
{ "format", "%Y-%m-%d %H:00:00"}, { "$dateToString", new BsonDocument {
{ "date", "$Added"} { "format", "%Y-%m-%d %H:00:00"},
}} { "date", "$Added"}
}} }
}
}
}
}
},
{ "Count", new BsonDocument{{ "$sum", 1}}}
}
}
};
var match = new BsonDocument {
{ "$match", new BsonDocument {
{ "Added", new BsonDocument
{
{ "$gt", endDate.AddHours(-24) }
}
},
{ "StatusName",
new BsonDocument
{
{ "$eq", statusName}
}
} }
}, }
{ "Count", new BsonDocument{ }
{ "$sum", 1}
}}
}}
}; };
var match = new BsonDocument { { "$match", new BsonDocument { var pipeline = new[] { match, groupby };
{ "Added", new BsonDocument { { "$gt", endDate.AddHours(-24) } } },
{ "StatusName", new BsonDocument { { "$eq", statusName} }
} } } };
var pipeline = new BsonDocument[] { match, groupby };
var collection = _database.GetCollection<BsonDocument>(collectionName); var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline: pipeline).ToList(); var result = collection.Aggregate<BsonDocument>(pipeline).ToList();
var dic = new Dictionary<DateTime, int>(); var dic = new Dictionary<DateTime, int>();
for (var i = 0; i < 24; i++) for (var i = 0; i < 24; i++)
......
...@@ -16,9 +16,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -16,9 +16,9 @@ namespace DotNetCore.CAP.MongoDB
private readonly ILogger<MongoDBStorage> _logger; private readonly ILogger<MongoDBStorage> _logger;
public MongoDBStorage(CapOptions capOptions, public MongoDBStorage(CapOptions capOptions,
MongoDBOptions options, MongoDBOptions options,
IMongoClient client, IMongoClient client,
ILogger<MongoDBStorage> logger) ILogger<MongoDBStorage> logger)
{ {
_capOptions = capOptions; _capOptions = capOptions;
_options = options; _options = options;
...@@ -44,26 +44,30 @@ namespace DotNetCore.CAP.MongoDB ...@@ -44,26 +44,30 @@ namespace DotNetCore.CAP.MongoDB
} }
var database = _client.GetDatabase(_options.Database); var database = _client.GetDatabase(_options.Database);
var names = (await database.ListCollectionNamesAsync())?.ToList(); var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList();
if (!names.Any(n => n == _options.ReceivedCollection)) if (!names.Any(n => n == _options.ReceivedCollection))
{ {
await database.CreateCollectionAsync(_options.ReceivedCollection); await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken);
} }
if (!names.Any(n => n == _options.PublishedCollection))
if (names.All(n => n != _options.PublishedCollection))
{ {
await database.CreateCollectionAsync(_options.PublishedCollection); await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken);
} }
if (!names.Any(n => n == "Counter"))
if (names.All(n => n != "Counter"))
{ {
await database.CreateCollectionAsync("Counter"); await database.CreateCollectionAsync("Counter", cancellationToken: cancellationToken);
var collection = database.GetCollection<BsonDocument>("Counter"); var collection = database.GetCollection<BsonDocument>("Counter");
await collection.InsertManyAsync(new BsonDocument[] await collection.InsertManyAsync(new BsonDocument[]
{ {
new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}}, new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}} new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
}); }, cancellationToken: cancellationToken);
} }
_logger.LogDebug("Ensuring all create database tables script are applied.");
} }
} }
} }
\ No newline at end of file
...@@ -10,8 +10,8 @@ namespace DotNetCore.CAP.MongoDB ...@@ -10,8 +10,8 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBStorageConnection : IStorageConnection public class MongoDBStorageConnection : IStorageConnection
{ {
private CapOptions _capOptions; private readonly CapOptions _capOptions;
private MongoDBOptions _options; private readonly MongoDBOptions _options;
private readonly IMongoClient _client; private readonly IMongoClient _client;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
...@@ -58,10 +58,6 @@ namespace DotNetCore.CAP.MongoDB ...@@ -58,10 +58,6 @@ namespace DotNetCore.CAP.MongoDB
return new MongoDBStorageTransaction(_client, _options); return new MongoDBStorageTransaction(_client, _options);
} }
public void Dispose()
{
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id) public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{ {
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
...@@ -72,13 +68,10 @@ namespace DotNetCore.CAP.MongoDB ...@@ -72,13 +68,10 @@ namespace DotNetCore.CAP.MongoDB
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4); var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await return await collection
collection.Find(x => .Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
x.Retries < _capOptions.FailedRetryCount .Limit(200)
&& x.Added < fourMinsAgo .ToListAsync();
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
} }
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id) public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
...@@ -92,12 +85,10 @@ namespace DotNetCore.CAP.MongoDB ...@@ -92,12 +85,10 @@ namespace DotNetCore.CAP.MongoDB
var fourMinsAgo = DateTime.Now.AddMinutes(-4); var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
return await return await collection
collection.Find(x => .Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
x.Retries < _capOptions.FailedRetryCount .Limit(200)
&& x.Added < fourMinsAgo .ToListAsync();
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)
).Limit(200).ToListAsync();
} }
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message) public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
...@@ -114,5 +105,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -114,5 +105,9 @@ namespace DotNetCore.CAP.MongoDB
return message.Id; return message.Id;
} }
public void Dispose()
{
}
} }
} }
\ No newline at end of file
...@@ -7,17 +7,15 @@ namespace DotNetCore.CAP.MongoDB ...@@ -7,17 +7,15 @@ namespace DotNetCore.CAP.MongoDB
{ {
internal class MongoDBStorageTransaction : IStorageTransaction internal class MongoDBStorageTransaction : IStorageTransaction
{ {
private IMongoClient _client;
private readonly MongoDBOptions _options; private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly IClientSessionHandle _session; private readonly IClientSessionHandle _session;
public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)
{ {
_client = client;
_options = options; _options = options;
_database = client.GetDatabase(options.Database); _database = client.GetDatabase(options.Database);
_session = _client.StartSession(); _session = client.StartSession();
_session.StartTransaction(); _session.StartTransaction();
} }
...@@ -41,10 +39,10 @@ namespace DotNetCore.CAP.MongoDB ...@@ -41,10 +39,10 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var updateDef = Builders<CapPublishedMessage>.Update var updateDef = Builders<CapPublishedMessage>.Update
.Set(x => x.Retries, message.Retries) .Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content) .Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt) .Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName); .Set(x => x.StatusName, message.StatusName);
collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
} }
...@@ -59,10 +57,10 @@ namespace DotNetCore.CAP.MongoDB ...@@ -59,10 +57,10 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
var updateDef = Builders<CapReceivedMessage>.Update var updateDef = Builders<CapReceivedMessage>.Update
.Set(x => x.Retries, message.Retries) .Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content) .Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt) .Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName); .Set(x => x.StatusName, message.StatusName);
collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
} }
......
...@@ -5,9 +5,9 @@ using MongoDB.Driver; ...@@ -5,9 +5,9 @@ using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBUtil internal class MongoDBUtil
{ {
FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>() readonly FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>()
{ {
ReturnDocument = ReturnDocument.After ReturnDocument = ReturnDocument.After
}; };
...@@ -43,15 +43,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -43,15 +43,9 @@ namespace DotNetCore.CAP.MongoDB
var filter = new BsonDocument { { "_id", collectionName } }; var filter = new BsonDocument { { "_id", collectionName } };
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);
BsonDocument result; var result = session == null
if (session == null) ? collection.FindOneAndUpdate(filter, updateDef, _options)
{ : collection.FindOneAndUpdate(session, filter, updateDef, _options);
result = collection.FindOneAndUpdate(filter, updateDef, _options);
}
else
{
result = collection.FindOneAndUpdate(session, filter, updateDef, _options);
}
if (result.TryGetValue("sequence_value", out var value)) if (result.TryGetValue("sequence_value", out var value))
{ {
......
...@@ -12,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -12,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB.Test
{ {
public class MongoDBMonitoringApiTest public class MongoDBMonitoringApiTest
{ {
private MongoClient _client; private readonly MongoClient _client;
private MongoDBOptions _options; private readonly MongoDBOptions _options;
private MongoDBMonitoringApi _api; private readonly MongoDBMonitoringApi _api;
public MongoDBMonitoringApiTest() public MongoDBMonitoringApiTest()
{ {
......
...@@ -12,10 +12,10 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -12,10 +12,10 @@ namespace DotNetCore.CAP.MongoDB.Test
[TestCaseOrderer(PriorityOrderer.Name, PriorityOrderer.Assembly)] [TestCaseOrderer(PriorityOrderer.Name, PriorityOrderer.Assembly)]
public class MongoDBStorageConnectionTest public class MongoDBStorageConnectionTest
{ {
private MongoClient _client; private readonly MongoClient _client;
private MongoDBOptions _options; private readonly MongoDBOptions _options;
private MongoDBStorage _storage; private readonly MongoDBStorage _storage;
private IStorageConnection _connection; private readonly IStorageConnection _connection;
public MongoDBStorageConnectionTest() public MongoDBStorageConnectionTest()
{ {
......
...@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.MongoDB.Test
{ {
public class MongoDBStorageTest public class MongoDBStorageTest
{ {
private MongoClient _client; private readonly MongoClient _client;
public MongoDBStorageTest() public MongoDBStorageTest()
{ {
......
...@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.MongoDB.Test
{ {
public class MongoDBTest public class MongoDBTest
{ {
private MongoClient _client; private readonly MongoClient _client;
public MongoDBTest() public MongoDBTest()
{ {
......
...@@ -11,17 +11,16 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -11,17 +11,16 @@ namespace DotNetCore.CAP.MongoDB.Test
{ {
public class MongoDBUtilTest public class MongoDBUtilTest
{ {
private readonly MongoClient _client;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
string _recieved = "ReceivedTest"; string _recieved = "ReceivedTest";
public MongoDBUtilTest() public MongoDBUtilTest()
{ {
_client = new MongoClient(ConnectionUtil.ConnectionString); var client = new MongoClient(ConnectionUtil.ConnectionString);
_database = _client.GetDatabase("CAP_Test"); _database = client.GetDatabase("CAP_Test");
//Initialize MongoDB //Initialize MongoDB
if (!_database.ListCollectionNames().ToList().Any(x => x == "Counter")) if (_database.ListCollectionNames().ToList().All(x => x != "Counter"))
{ {
var collection = _database.GetCollection<BsonDocument>("Counter"); var collection = _database.GetCollection<BsonDocument>("Counter");
collection.InsertOne(new BsonDocument { { "_id", _recieved }, { "sequence_value", 0 } }); collection.InsertOne(new BsonDocument { { "_id", _recieved }, { "sequence_value", 0 } });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment