Commit d2444936 authored by Savorboard's avatar Savorboard

Refactoring mongo implementation for version 3.0

parent 65a5ea83
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
...@@ -23,12 +23,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -23,12 +23,9 @@ namespace DotNetCore.CAP.MongoDB
public void AddServices(IServiceCollection services) public void AddServices(IServiceCollection services)
{ {
services.AddSingleton<CapStorageMarkerService>(); services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IStorage, MongoDBStorage>();
services.AddSingleton<IStorageConnection, MongoDBStorageConnection>();
services.AddSingleton<ICapPublisher, MongoDBPublisher>(); services.AddSingleton<IDataStorage, MongoDBDataStorage>();
services.AddSingleton<ICallbackPublisher>(x => (MongoDBPublisher)x.GetService<ICapPublisher>()); services.AddSingleton<IStorageInitializer, MongoDBStorageInitializer>();
services.AddSingleton<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<CapTransactionBase, MongoDBCapTransaction>(); services.AddTransient<CapTransactionBase, MongoDBCapTransaction>();
......
...@@ -22,10 +22,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -22,10 +22,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseMongoDB(this CapOptions options, Action<MongoDBOptions> configure) public static CapOptions UseMongoDB(this CapOptions options, Action<MongoDBOptions> configure)
{ {
if (configure == null) if (configure == null) throw new ArgumentNullException(nameof(configure));
{
throw new ArgumentNullException(nameof(configure));
}
configure += x => x.Version = options.Version; configure += x => x.Version = options.Version;
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
public MongoDBPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<IOptions<MongoDBOptions>>().Value;
_client = ServiceProvider.GetRequiredService<IMongoClient>();
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction = null,
CancellationToken cancel = default)
{
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
var collection = _client
.GetDatabase(_options.DatabaseName)
.GetCollection<PublishedMessage>(_options.PublishedCollection);
var store = new PublishedMessage()
{
Id = message.Id,
Name = message.Name,
Content = message.Content,
Added = message.Added,
StatusName = message.StatusName,
ExpiresAt = message.ExpiresAt,
Retries = message.Retries,
Version = _options.Version,
};
if (transaction == null)
{
return collection.InsertOneAsync(store, insertOptions, cancel);
}
var dbTrans = (IClientSessionHandle)transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, store, insertOptions, cancel);
}
}
}
\ No newline at end of file
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System.Diagnostics; using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver; using MongoDB.Driver;
...@@ -19,10 +21,16 @@ namespace DotNetCore.CAP ...@@ -19,10 +21,16 @@ namespace DotNetCore.CAP
{ {
Debug.Assert(DbTransaction != null); Debug.Assert(DbTransaction != null);
if (DbTransaction is IClientSessionHandle session) if (DbTransaction is IClientSessionHandle session) session.CommitTransaction();
{
session.CommitTransaction(); Flush();
} }
public override async Task CommitAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(DbTransaction != null);
if (DbTransaction is IClientSessionHandle session) await session.CommitTransactionAsync(cancellationToken);
Flush(); Flush();
} }
...@@ -31,10 +39,14 @@ namespace DotNetCore.CAP ...@@ -31,10 +39,14 @@ namespace DotNetCore.CAP
{ {
Debug.Assert(DbTransaction != null); Debug.Assert(DbTransaction != null);
if (DbTransaction is IClientSessionHandle session) if (DbTransaction is IClientSessionHandle session) session.AbortTransaction();
{ }
session.AbortTransaction();
} public override async Task RollbackAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(DbTransaction != null);
if (DbTransaction is IClientSessionHandle session) await session.AbortTransactionAsync(cancellationToken);
} }
public override void Dispose() public override void Dispose()
...@@ -49,10 +61,7 @@ namespace DotNetCore.CAP ...@@ -49,10 +61,7 @@ namespace DotNetCore.CAP
public static ICapTransaction Begin(this ICapTransaction transaction, public static ICapTransaction Begin(this ICapTransaction transaction,
IClientSessionHandle dbTransaction, bool autoCommit = false) IClientSessionHandle dbTransaction, bool autoCommit = false)
{ {
if (!dbTransaction.IsInTransaction) if (!dbTransaction.IsInTransaction) dbTransaction.StartTransaction();
{
dbTransaction.StartTransaction();
}
transaction.DbTransaction = dbTransaction; transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit; transaction.AutoCommit = autoCommit;
......
...@@ -26,12 +26,12 @@ namespace MongoDB.Driver ...@@ -26,12 +26,12 @@ namespace MongoDB.Driver
_transaction.Dispose(); _transaction.Dispose();
} }
public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken)) public void AbortTransaction(CancellationToken cancellationToken = default)
{ {
_transaction.Rollback(); _transaction.Rollback();
} }
public Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken)) public Task AbortTransactionAsync(CancellationToken cancellationToken = default)
{ {
_transaction.Rollback(); _transaction.Rollback();
return Task.CompletedTask; return Task.CompletedTask;
...@@ -47,12 +47,12 @@ namespace MongoDB.Driver ...@@ -47,12 +47,12 @@ namespace MongoDB.Driver
_sessionHandle.AdvanceOperationTime(newOperationTime); _sessionHandle.AdvanceOperationTime(newOperationTime);
} }
public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken)) public void CommitTransaction(CancellationToken cancellationToken = default)
{ {
_transaction.Commit(); _transaction.Commit();
} }
public Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken)) public Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{ {
_transaction.Commit(); _transaction.Commit();
return Task.CompletedTask; return Task.CompletedTask;
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBCollectProcessor : ICollectProcessor
{
private readonly IMongoDatabase _database;
private readonly ILogger _logger;
private readonly MongoDBOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public MongoDBCollectProcessor(
ILogger<MongoDBCollectProcessor> logger,
IOptions<MongoDBOptions> options,
IMongoClient client)
{
_options = options.Value;
_logger = logger;
_database = client.GetDatabase(_options.DatabaseName);
}
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug($"Collecting expired data from collection [{_options.PublishedCollection}].");
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
await publishedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<PublishedMessage>(
Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
await receivedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<ReceivedMessage>(
Builders<ReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBDataStorage : IDataStorage
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IMongoClient _client;
private readonly IMongoDatabase _database;
private readonly ILogger<MongoDBDataStorage> _logger;
private readonly IOptions<MongoDBOptions> _options;
public MongoDBDataStorage(
IOptions<CapOptions> capOptions,
IOptions<MongoDBOptions> options,
IMongoClient client,
ILogger<MongoDBDataStorage> logger)
{
_capOptions = capOptions;
_options = options;
_client = client;
_logger = logger;
_database = _client.GetDatabase(_options.Value.DatabaseName);
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var updateDef = Builders<PublishedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, state.ToString("G"));
await collection.UpdateOneAsync(x => x.Id == long.Parse(message.DbId), updateDef);
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.PublishedCollection);
var updateDef = Builders<ReceivedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, state.ToString("G"));
await collection.UpdateOneAsync(x => x.Id == long.Parse(message.DbId), updateDef);
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
{
var insertOptions = new InsertOneOptions {BypassDocumentValidation = false};
var message = new MediumMessage
{
DbId = content.GetId(),
Origin = content,
Content = StringSerializer.Serialize(content),
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var store = new PublishedMessage
{
Id = long.Parse(message.DbId),
Name = name,
Content = message.Content,
Added = message.Added,
StatusName = nameof(StatusName.Scheduled),
ExpiresAt = message.ExpiresAt,
Retries = message.Retries,
Version = _options.Value.Version
};
if (dbTransaction == null)
{
await collection.InsertOneAsync(store, insertOptions, cancellationToken);
}
else
{
var dbTrans = dbTransaction as IClientSessionHandle;
await collection.InsertOneAsync(dbTrans, store, insertOptions, cancellationToken);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var store = new ReceivedMessage
{
Id = SnowflakeId.Default().NextId(),
Group = group,
Name = name,
Content = content,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
Retries = _capOptions.Value.FailedRetryCount,
Version = _capOptions.Value.Version,
StatusName = nameof(StatusName.Failed)
};
await collection.InsertOneAsync(store);
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
Origin = message,
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var store = new ReceivedMessage
{
Id = long.Parse(mdMessage.DbId),
Group = group,
Name = name,
Content = content,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
Retries = mdMessage.Retries,
Version = _capOptions.Value.Version,
StatusName = nameof(StatusName.Scheduled)
};
await collection.InsertOneAsync(store);
return mdMessage;
}
public async Task<int> DeleteExpiresAsync(string collection, DateTime timeout, int batchCount = 1000,
CancellationToken cancellationToken = default)
{
if (collection == _options.Value.PublishedCollection)
{
Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, timeout);
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var ret = await publishedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken);
return (int) ret.DeletedCount;
}
else
{
var receivedCollection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var ret = await receivedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken);
;
return (int) ret.DeletedCount;
}
}
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < fourMinAgo
&& x.Version == _capOptions.Value.Version
&& (x.StatusName == nameof(StatusName.Failed) ||
x.StatusName == nameof(StatusName.Scheduled)))
.Limit(200)
.ToListAsync();
return queryResult.Select(x => new MediumMessage
{
DbId = x.Id.ToString(),
Origin = StringSerializer.DeSerialize(x.Content),
Retries = x.Retries,
Added = x.Added
}).ToList();
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.PublishedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < fourMinAgo
&& x.Version == _capOptions.Value.Version
&& (x.StatusName == nameof(StatusName.Failed) ||
x.StatusName == nameof(StatusName.Scheduled)))
.Limit(200)
.ToListAsync();
return queryResult.Select(x => new MediumMessage
{
DbId = x.Id.ToString(),
Origin = StringSerializer.DeSerialize(x.Content),
Retries = x.Retries,
Added = x.Added
}).ToList();
}
public IMonitoringApi GetMonitoringApi()
{
return new MongoDBMonitoringApi(_client, _options);
}
}
}
\ No newline at end of file
...@@ -3,10 +3,11 @@ ...@@ -3,10 +3,11 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using DotNetCore.CAP.Dashboard; using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard.Monitoring; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MongoDB.Bson; using MongoDB.Bson;
using MongoDB.Driver; using MongoDB.Driver;
...@@ -26,61 +27,64 @@ namespace DotNetCore.CAP.MongoDB ...@@ -26,61 +27,64 @@ namespace DotNetCore.CAP.MongoDB
_database = mongoClient.GetDatabase(_options.DatabaseName); _database = mongoClient.GetDatabase(_options.DatabaseName);
} }
public StatisticsDto GetStatistics() public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{ {
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var message = await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
return new MediumMessage
var statistics = new StatisticsDto();
{
if (int.TryParse(
publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(),
out var count))
{
statistics.PublishedSucceeded = count;
}
}
{
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(),
out var count))
{
statistics.PublishedFailed = count;
}
}
{ {
if (int.TryParse( Added = message.Added,
receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), Content = message.Content,
out var count)) DbId = message.Id.ToString(),
{ ExpiresAt = message.ExpiresAt,
statistics.ReceivedSucceeded = count; Retries = message.Retries
} };
} }
public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var message = await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
return new MediumMessage
{ {
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), Added = message.Added,
out var count)) Content = message.Content,
{ DbId = message.Id.ToString(),
statistics.ReceivedFailed = count; ExpiresAt = message.ExpiresAt,
} Retries = message.Retries
} };
}
public StatisticsDto GetStatistics()
{
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var statistics = new StatisticsDto
{
PublishedSucceeded =
(int) publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)),
PublishedFailed =
(int) publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)),
ReceivedSucceeded =
(int) receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)),
ReceivedFailed = (int) receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed))
};
return statistics; return statistics;
} }
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type) public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{ {
return GetHourlyTimelineStats(type, StatusName.Failed); return GetHourlyTimelineStats(type, nameof(StatusName.Failed));
} }
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type) public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{ {
return GetHourlyTimelineStats(type, StatusName.Succeeded); return GetHourlyTimelineStats(type, nameof(StatusName.Succeeded));
} }
public IList<MessageDto> Messages(MessageQueryDto queryDto) public IList<MessageDto> Messages(MessageQueryDto queryDto)
{ {
queryDto.StatusName = StatusName.Standardized(queryDto.StatusName);
var name = queryDto.MessageType == MessageType.Publish var name = queryDto.MessageType == MessageType.Publish
? _options.PublishedCollection ? _options.PublishedCollection
: _options.ReceivedCollection; : _options.ReceivedCollection;
...@@ -89,24 +93,14 @@ namespace DotNetCore.CAP.MongoDB ...@@ -89,24 +93,14 @@ namespace DotNetCore.CAP.MongoDB
var builder = Builders<MessageDto>.Filter; var builder = Builders<MessageDto>.Filter;
var filter = builder.Empty; var filter = builder.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName)) if (!string.IsNullOrEmpty(queryDto.StatusName))
{ filter &= builder.Eq(x => x.StatusName, queryDto.StatusName);
filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName);
}
if (!string.IsNullOrEmpty(queryDto.Name)) if (!string.IsNullOrEmpty(queryDto.Name)) filter &= builder.Eq(x => x.Name, queryDto.Name);
{
filter = filter & builder.Eq(x => x.Name, queryDto.Name);
}
if (!string.IsNullOrEmpty(queryDto.Group)) if (!string.IsNullOrEmpty(queryDto.Group)) filter &= builder.Eq(x => x.Group, queryDto.Group);
{
filter = filter & builder.Eq(x => x.Group, queryDto.Group);
}
if (!string.IsNullOrEmpty(queryDto.Content)) if (!string.IsNullOrEmpty(queryDto.Content))
{ filter &= builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
}
var result = collection var result = collection
.Find(filter) .Find(filter)
...@@ -120,28 +114,28 @@ namespace DotNetCore.CAP.MongoDB ...@@ -120,28 +114,28 @@ namespace DotNetCore.CAP.MongoDB
public int PublishedFailedCount() public int PublishedFailedCount()
{ {
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed); return GetNumberOfMessage(_options.PublishedCollection, nameof(StatusName.Failed));
} }
public int PublishedSucceededCount() public int PublishedSucceededCount()
{ {
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded); return GetNumberOfMessage(_options.PublishedCollection, nameof(StatusName.Succeeded));
} }
public int ReceivedFailedCount() public int ReceivedFailedCount()
{ {
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed); return GetNumberOfMessage(_options.ReceivedCollection, nameof(StatusName.Failed));
} }
public int ReceivedSucceededCount() public int ReceivedSucceededCount()
{ {
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded); return GetNumberOfMessage(_options.ReceivedCollection, nameof(StatusName.Succeeded));
} }
private int GetNumberOfMessage(string collectionName, string statusName) private int GetNumberOfMessage(string collectionName, string statusName)
{ {
var collection = _database.GetCollection<BsonDocument>(collectionName); var collection = _database.GetCollection<BsonDocument>(collectionName);
var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } }); var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}});
return int.Parse(count.ToString()); return int.Parse(count.ToString());
} }
...@@ -200,7 +194,7 @@ namespace DotNetCore.CAP.MongoDB ...@@ -200,7 +194,7 @@ namespace DotNetCore.CAP.MongoDB
} }
}; };
var pipeline = new[] { match, groupby }; var pipeline = new[] {match, groupby};
var collection = _database.GetCollection<BsonDocument>(collectionName); var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline).ToList(); var result = collection.Aggregate<BsonDocument>(pipeline).ToList();
...@@ -215,10 +209,7 @@ namespace DotNetCore.CAP.MongoDB ...@@ -215,10 +209,7 @@ namespace DotNetCore.CAP.MongoDB
result.ForEach(d => result.ForEach(d =>
{ {
var key = d["_id"].AsBsonDocument["Key"].AsString; var key = d["_id"].AsBsonDocument["Key"].AsString;
if (DateTime.TryParse(key, out var dateTime)) if (DateTime.TryParse(key, out var dateTime)) dic[dateTime.ToLocalTime()] = d["Count"].AsInt32;
{
dic[dateTime.ToLocalTime()] = d["Count"].AsInt32;
}
}); });
return dic; return dic;
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
private readonly IMongoClient _client;
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
public MongoDBStorageConnection(
IOptions<CapOptions> capOptions,
IOptions<MongoDBOptions> options,
IMongoClient client)
{
_capOptions = capOptions.Value;
_options = options.Value;
_client = client;
_database = _client.GetDatabase(_options.DatabaseName);
}
public bool ChangePublishedState(long messageId, string state)
{
var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var updateDef = Builders<PublishedMessage>
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);
var result =
collection.UpdateOne(x => x.Id == messageId, updateDef);
return result.ModifiedCount > 0;
}
public bool ChangeReceivedState(long messageId, string state)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var updateDef = Builders<ReceivedMessage>
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);
var result =
collection.UpdateOne(x => x.Id == messageId, updateDef);
return result.ModifiedCount > 0;
}
public IStorageTransaction CreateTransaction()
{
return new MongoDBStorageTransaction(_client, _options);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
}
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount
&& x.Added < fourMinsAgo
&& x.Version == _capOptions.Version
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
}
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount
&& x.Added < fourMinsAgo
&& x.Version == _capOptions.Version
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
}
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var store = new ReceivedMessage()
{
Id = message.Id,
Group = message.Group,
Name = message.Name,
Content = message.Content,
Added = message.Added,
StatusName = message.StatusName,
ExpiresAt = message.ExpiresAt,
Retries = message.Retries,
Version = _capOptions.Version
};
collection.InsertOne(store);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MongoDB.Driver; using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBStorage : IStorage public class MongoDBStorageInitializer : IStorageInitializer
{ {
private readonly IOptions<CapOptions> _capOptions; private readonly IMongoClient _client;
private readonly IMongoClient _client; private readonly ILogger _logger;
private readonly ILogger<MongoDBStorage> _logger; private readonly IOptions<MongoDBOptions> _options;
private readonly IOptions<MongoDBOptions> _options;
public MongoDBStorageInitializer(
public MongoDBStorage( ILogger<MongoDBStorageInitializer> logger,
IOptions<CapOptions> capOptions, IMongoClient client,
IOptions<MongoDBOptions> options, IOptions<MongoDBOptions> options)
IMongoClient client, {
ILogger<MongoDBStorage> logger) _options = options;
{ _logger = logger;
_capOptions = capOptions; _client = client;
_options = options; }
_client = client;
_logger = logger; public string GetPublishedTableName()
} {
return _options.Value.PublishedCollection;
public IStorageConnection GetConnection() }
{
return new MongoDBStorageConnection(_capOptions, _options, _client); public string GetReceivedTableName()
} {
return _options.Value.ReceivedCollection;
public IMonitoringApi GetMonitoringApi() }
{
return new MongoDBMonitoringApi(_client, _options); public async Task InitializeAsync(CancellationToken cancellationToken)
} {
if (cancellationToken.IsCancellationRequested) return;
public async Task InitializeAsync(CancellationToken cancellationToken)
{ var options = _options.Value;
if (cancellationToken.IsCancellationRequested) var database = _client.GetDatabase(options.DatabaseName);
{ var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken)).ToList();
return;
} if (names.All(n => n != options.ReceivedCollection))
await database.CreateCollectionAsync(options.ReceivedCollection, cancellationToken: cancellationToken);
var options = _options.Value;
var database = _client.GetDatabase(options.DatabaseName); if (names.All(n => n != options.PublishedCollection))
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken)).ToList(); await database.CreateCollectionAsync(options.PublishedCollection, cancellationToken: cancellationToken);
if (names.All(n => n != options.ReceivedCollection)) await Task.WhenAll(
{ TryCreateIndexesAsync<ReceivedMessage>(options.ReceivedCollection),
await database.CreateCollectionAsync(options.ReceivedCollection, cancellationToken: cancellationToken); TryCreateIndexesAsync<PublishedMessage>(options.PublishedCollection));
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
if (names.All(n => n != options.PublishedCollection))
{
await database.CreateCollectionAsync(options.PublishedCollection, async Task TryCreateIndexesAsync<T>(string collectionName)
cancellationToken: cancellationToken); {
} var indexNames = new[] {"Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version"};
var col = database.GetCollection<T>(collectionName);
var receivedMessageIndexNames = new[] { using (var cursor = await col.Indexes.ListAsync(cancellationToken))
nameof(ReceivedMessage.Name), nameof(ReceivedMessage.Added), nameof(ReceivedMessage.ExpiresAt), {
nameof(ReceivedMessage.StatusName), nameof(ReceivedMessage.Retries), nameof(ReceivedMessage.Version) }; var existingIndexes = await cursor.ToListAsync(cancellationToken);
var existingIndexNames = existingIndexes.Select(o => o["name"].AsString).ToArray();
var publishedMessageIndexNames = new[] { indexNames = indexNames.Except(existingIndexNames).ToArray();
nameof(PublishedMessage.Name), nameof(PublishedMessage.Added), nameof(PublishedMessage.ExpiresAt), }
nameof(PublishedMessage.StatusName), nameof(PublishedMessage.Retries), nameof(PublishedMessage.Version) };
if (indexNames.Any() == false)
await Task.WhenAll( return;
TryCreateIndexesAsync<ReceivedMessage>(options.ReceivedCollection, receivedMessageIndexNames),
TryCreateIndexesAsync<PublishedMessage>(options.PublishedCollection, publishedMessageIndexNames) var indexes = indexNames.Select(indexName =>
); {
var indexOptions = new CreateIndexOptions
_logger.LogDebug("Ensuring all create database tables script are applied."); {
Name = indexName,
async Task TryCreateIndexesAsync<T>(string collectionName, string[] indexNames) Background = true
{ };
var col = database.GetCollection<T>(collectionName); var indexBuilder = Builders<T>.IndexKeys;
using (var cursor = await col.Indexes.ListAsync(cancellationToken)) return new CreateIndexModel<T>(indexBuilder.Descending(indexName), indexOptions);
{ }).ToArray();
var existingIndexes = await cursor.ToListAsync(cancellationToken);
var existingIndexNames = existingIndexes.Select(o => o["name"].AsString).ToArray(); await col.Indexes.CreateManyAsync(indexes, cancellationToken);
indexNames = indexNames.Except(existingIndexNames).ToArray(); }
} }
}
if (indexNames.Any() == false)
return;
var indexes = indexNames.Select(indexName =>
{
var indexOptions = new CreateIndexOptions
{
Name = indexName,
Background = true,
};
var indexBuilder = Builders<T>.IndexKeys;
return new CreateIndexModel<T>(indexBuilder.Descending(indexName), indexOptions);
}).ToArray();
await col.Indexes.CreateManyAsync(indexes, cancellationToken);
}
}
}
} }
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
internal class MongoDBStorageTransaction : IStorageTransaction
{
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
private readonly IClientSessionHandle _session;
public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)
{
_options = options;
_database = client.GetDatabase(_options.DatabaseName);
_session = client.StartSession();
_session.StartTransaction();
}
public async Task CommitAsync()
{
await _session.CommitTransactionAsync();
}
public void Dispose()
{
_session.Dispose();
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var updateDef = Builders<PublishedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);
collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var updateDef = Builders<ReceivedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);
collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Messages; // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {
internal class ReceivedMessage : CapReceivedMessage internal class ReceivedMessage
{ {
public long Id { get; set; }
public string Version { get; set; } public string Version { get; set; }
public string Group { get; set; }
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
public string StatusName { get; set; }
} }
internal class PublishedMessage : CapPublishedMessage internal class PublishedMessage
{ {
public long Id { get; set; }
public string Version { get; set; } public string Version { get; set; }
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
public string StatusName { get; set; }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment