Commit 205e3b3e authored by Savorboard's avatar Savorboard

Refactor mongodb module for new transaction mode

parent c69d1f6f
...@@ -22,10 +22,13 @@ namespace Sample.RabbitMQ.MongoDB.Controllers ...@@ -22,10 +22,13 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
[Route("~/publish")] [Route("~/publish")]
public IActionResult PublishWithTrans() public IActionResult PublishWithTrans()
{ {
//var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
//mycollection.InsertOne(new BsonDocument { { "test", "test" } });
using (var session = _client.StartSession()) using (var session = _client.StartSession())
using (var trans = _capPublisher.CapTransaction.Begin(session)) using (var trans = _capPublisher.CapTransaction.Begin(session))
{ {
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test"); var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); _capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now);
...@@ -39,10 +42,10 @@ namespace Sample.RabbitMQ.MongoDB.Controllers ...@@ -39,10 +42,10 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
public IActionResult PublishNotAutoCommit() public IActionResult PublishNotAutoCommit()
{ {
using (var session = _client.StartSession()) using (var session = _client.StartSession())
using (_capPublisher.CapTransaction.Begin(session,true)) using (_capPublisher.CapTransaction.Begin(session, true))
{ {
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test"); var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); collection.InsertOne(session, new BsonDocument { { "hello2", "world2" } });
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); _capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now);
} }
......
using DotNetCore.CAP; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
...@@ -19,21 +18,11 @@ namespace Sample.RabbitMQ.MongoDB ...@@ -19,21 +18,11 @@ namespace Sample.RabbitMQ.MongoDB
public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
services.AddSingleton<IMongoClient>(new MongoClient(Configuration.GetConnectionString("MongoDB"))); services.AddSingleton<IMongoClient>(new MongoClient("mongodb://192.168.10.110:27017,192.168.10.110:27018,192.168.10.110:27019/?replicaSet=rs0"));
services.AddCap(x => services.AddCap(x =>
{ {
x.UseMongoDB(); x.UseMongoDB("mongodb://192.168.10.110:27017,192.168.10.110:27018,192.168.10.110:27019/?replicaSet=rs0");
x.UseRabbitMQ("localhost");
var mq = new RabbitMQOptions();
Configuration.GetSection("RabbitMQ").Bind(mq);
x.UseRabbitMQ(cfg =>
{
cfg.HostName = mq.HostName;
cfg.Port = mq.Port;
cfg.UserName = mq.UserName;
cfg.Password = mq.Password;
});
x.UseDashboard(); x.UseDashboard();
}); });
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
...@@ -47,8 +36,6 @@ namespace Sample.RabbitMQ.MongoDB ...@@ -47,8 +36,6 @@ namespace Sample.RabbitMQ.MongoDB
} }
app.UseMvc(); app.UseMvc();
app.UseCap();
} }
} }
} }
...@@ -17,20 +17,19 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -17,20 +17,19 @@ namespace DotNetCore.CAP.MongoDB.Test
{ {
_api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions); _api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions);
var helper = new MongoDBUtil();
var collection = Database.GetCollection<CapPublishedMessage>(MongoDBOptions.PublishedCollection); var collection = Database.GetCollection<CapPublishedMessage>(MongoDBOptions.PublishedCollection);
collection.InsertMany(new[] collection.InsertMany(new[]
{ {
new CapPublishedMessage new CapPublishedMessage
{ {
Id = helper.GetNextSequenceValue(Database,MongoDBOptions.PublishedCollection), Id = SnowflakeId.Default().NextId(),
Added = DateTime.Now.AddHours(-1), Added = DateTime.Now.AddHours(-1),
StatusName = "Failed", StatusName = "Failed",
Content = "abc" Content = "abc"
}, },
new CapPublishedMessage new CapPublishedMessage
{ {
Id = helper.GetNextSequenceValue(Database,MongoDBOptions.PublishedCollection), Id = SnowflakeId.Default().NextId(),
Added = DateTime.Now, Added = DateTime.Now,
StatusName = "Failed", StatusName = "Failed",
Content = "bbc" Content = "bbc"
......
...@@ -15,16 +15,19 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -15,16 +15,19 @@ namespace DotNetCore.CAP.MongoDB.Test
Provider.GetService<MongoDBStorage>().GetConnection(); Provider.GetService<MongoDBStorage>().GetConnection();
[Fact] [Fact]
public async void StoreReceivedMessageAsync_TestAsync() public void StoreReceivedMessageAsync_TestAsync()
{ {
var id = await _connection.StoreReceivedMessageAsync(new CapReceivedMessage(new MessageContext var messageContext = new MessageContext
{ {
Group = "test", Group = "test",
Name = "test", Name = "test",
Content = "test-content" Content = "test-content"
})); };
id.Should().BeGreaterThan(0); _connection.StoreReceivedMessage(new CapReceivedMessage(messageContext)
{
Id = SnowflakeId.Default().NextId()
});
} }
[Fact] [Fact]
...@@ -45,14 +48,17 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -45,14 +48,17 @@ namespace DotNetCore.CAP.MongoDB.Test
msgs.Should().BeEmpty(); msgs.Should().BeEmpty();
var id = SnowflakeId.Default().NextId();
var msg = new CapReceivedMessage var msg = new CapReceivedMessage
{ {
Id = id,
Group = "test", Group = "test",
Name = "test", Name = "test",
Content = "test-content", Content = "test-content",
StatusName = StatusName.Failed StatusName = StatusName.Failed
}; };
var id = await _connection.StoreReceivedMessageAsync(msg); _connection.StoreReceivedMessage(msg);
var collection = Database.GetCollection<CapReceivedMessage>(MongoDBOptions.ReceivedCollection); var collection = Database.GetCollection<CapReceivedMessage>(MongoDBOptions.ReceivedCollection);
......
using FluentAssertions; using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Bson;
using MongoDB.Driver; using MongoDB.Driver;
using Xunit; using Xunit;
...@@ -12,18 +10,12 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -12,18 +10,12 @@ namespace DotNetCore.CAP.MongoDB.Test
[Fact] [Fact]
public void InitializeAsync_Test() public void InitializeAsync_Test()
{ {
var storage = Provider.GetService<MongoDBStorage>();
var names = MongoClient.ListDatabaseNames()?.ToList(); var names = MongoClient.ListDatabaseNames()?.ToList();
names.Should().Contain(MongoDBOptions.DatabaseName); names.Should().Contain(MongoDBOptions.DatabaseName);
var collections = Database.ListCollectionNames()?.ToList(); var collections = Database.ListCollectionNames()?.ToList();
collections.Should().Contain(MongoDBOptions.PublishedCollection); collections.Should().Contain(MongoDBOptions.PublishedCollection);
collections.Should().Contain(MongoDBOptions.ReceivedCollection); collections.Should().Contain(MongoDBOptions.ReceivedCollection);
collections.Should().Contain(MongoDBOptions.CounterCollection);
var collection = Database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection);
collection.CountDocuments(new BsonDocument { { "_id", MongoDBOptions.PublishedCollection } }).Should().Be(1);
collection.CountDocuments(new BsonDocument { { "_id", MongoDBOptions.ReceivedCollection } }).Should().Be(1);
} }
} }
} }
\ No newline at end of file
using System.Collections.Concurrent;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
namespace DotNetCore.CAP.MongoDB.Test
{
[Collection("MongoDB")]
public class MongoDBUtilTest : DatabaseTestHost
{
[Fact]
public async void GetNextSequenceValueAsync_Test()
{
var id = await new MongoDBUtil().GetNextSequenceValueAsync(Database, MongoDBOptions.ReceivedCollection);
id.Should().BeGreaterThan(0);
}
[Fact]
public void GetNextSequenceValue_Concurrency_Test()
{
var dic = new ConcurrentDictionary<int, int>();
Parallel.For(0, 30, (x) =>
{
var id = new MongoDBUtil().GetNextSequenceValue(Database, MongoDBOptions.ReceivedCollection);
id.Should().BeGreaterThan(0);
dic.TryAdd(id, x).Should().BeTrue(); //The id shouldn't be same.
});
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment