Commit 3ecc92f2 authored by keke's avatar keke Committed by Savorboard

Abstract MongoTransaction to make the usage more comfortable (#167)

* Abstract MongoTransaction to make the usage more comfortable

* Tweak the method name
parent 04a5554c
......@@ -3,6 +3,8 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.MongoDB;
using Microsoft.AspNetCore.Mvc;
using MongoDB.Bson;
using MongoDB.Driver;
......@@ -15,39 +17,58 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
{
private readonly IMongoClient _client;
private readonly ICapPublisher _capPublisher;
private readonly IMongoTransaction _mongoTransaction;
public ValuesController(IMongoClient client, ICapPublisher capPublisher)
public ValuesController(IMongoClient client, ICapPublisher capPublisher, IMongoTransaction mongoTransaction)
{
_client = client;
_capPublisher = capPublisher;
_mongoTransaction = mongoTransaction;
}
[Route("~/publish")]
public IActionResult PublishWithSession()
public async Task<IActionResult> PublishWithTrans()
{
using (var session = _client.StartSession())
using (var trans = await _mongoTransaction.BegeinAsync())
{
session.StartTransaction();
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
collection.InsertOne(trans.GetSession(), new BsonDocument { { "hello", "world" } });
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session);
await _capPublisher.PublishWithMongoAsync("sample.rabbitmq.mongodb", DateTime.Now, trans);
}
return Ok();
}
[Route("~/publish/not/autocommit")]
public IActionResult PublishNotAutoCommit()
{
using (var trans = _mongoTransaction.Begein(autoCommit: false))
{
var session = trans.GetSession();
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "Hello", "World" } });
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
//Do something, and commit by yourself.
session.CommitTransaction();
}
return Ok();
}
[Route("~/publish_rollback")]
[Route("~/publish/rollback")]
public IActionResult PublishRollback()
{
using (var session = _client.StartSession())
using (var trans = _mongoTransaction.Begein(autoCommit: false))
{
var session = trans.GetSession();
try
{
session.StartTransaction();
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session);
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
//Do something, but
throw new Exception("Foo");
session.CommitTransaction();
}
catch (System.Exception ex)
{
......@@ -57,8 +78,8 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
}
}
[Route("~/publish_without_session")]
public IActionResult PublishWithoutSession()
[Route("~/publish/without/trans")]
public IActionResult PublishWithoutTrans()
{
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now);
return Ok();
......
......@@ -2,6 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
......@@ -25,6 +27,8 @@ namespace DotNetCore.CAP.MongoDB
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<IMongoTransaction, MongoTransaction>();
var options = new MongoDBOptions();
_configure?.Invoke(options);
services.AddSingleton(options);
......
......@@ -15,9 +15,9 @@ namespace DotNetCore.CAP.MongoDB
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
private bool _isInTransaction = true;
private readonly IMongoDatabase _database;
private bool _usingTransaction = true;
public CapPublisher(
ILogger<CapPublisherBase> logger,
......@@ -57,31 +57,29 @@ namespace DotNetCore.CAP.MongoDB
throw new NotImplementedException("Not work for MongoDB");
}
public override void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null,
string callbackName = null)
public override void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
var session = mongoSession as IClientSessionHandle;
if (session == null)
if (mongoTransaction == null)
{
_isInTransaction = false;
_usingTransaction = false;
mongoTransaction = new NullMongoTransaction();
}
PublishWithSession(name, contentObj, session, callbackName);
PublishWithTransaction<T>(name, contentObj, mongoTransaction, callbackName);
}
public override async Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null,
string callbackName = null)
public override async Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
var session = mongoSession as IClientSessionHandle;
if (session == null)
if (mongoTransaction == null)
{
_isInTransaction = false;
_usingTransaction = false;
mongoTransaction = new NullMongoTransaction();
}
await PublishWithSessionAsync(name, contentObj, session, callbackName);
await PublishWithTransactionAsync<T>(name, contentObj, mongoTransaction, callbackName);
}
private void PublishWithSession<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
private void PublishWithTransaction<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName)
{
var operationId = default(Guid);
......@@ -94,12 +92,19 @@ namespace DotNetCore.CAP.MongoDB
StatusName = StatusName.Scheduled
};
var session = transaction.GetSession();
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(session, message);
if (!_isInTransaction && id > 0)
if (transaction.AutoCommit)
{
session.CommitTransaction();
}
if (!_usingTransaction || (transaction.AutoCommit && id > 0))
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
......@@ -120,7 +125,7 @@ namespace DotNetCore.CAP.MongoDB
message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction)
if (_usingTransaction)
{
collection.InsertOne(session, message);
}
......@@ -132,8 +137,8 @@ namespace DotNetCore.CAP.MongoDB
return message.Id;
}
private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session,
string callbackName)
private async Task PublishWithTransactionAsync<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName)
{
var operationId = default(Guid);
var content = Serialize(contentObj, callbackName);
......@@ -145,13 +150,20 @@ namespace DotNetCore.CAP.MongoDB
StatusName = StatusName.Scheduled
};
var session = transaction.GetSession();
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = await ExecuteAsync(session, message);
if (!_isInTransaction && id > 0)
if (transaction.AutoCommit)
{
await session.CommitTransactionAsync();
}
if (!_usingTransaction || (transaction.AutoCommit && id > 0))
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
......@@ -175,7 +187,7 @@ namespace DotNetCore.CAP.MongoDB
message.Id =
await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction)
if (_usingTransaction)
{
await collection.InsertOneAsync(session, message);
}
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoTransaction : IMongoTransaction
{
private readonly IMongoClient _client;
public MongoTransaction(IMongoClient client)
{
_client = client;
}
public IClientSessionHandle Session { get; set; }
public bool AutoCommit { get; set; }
public async Task<IMongoTransaction> BegeinAsync(bool autoCommit = true)
{
AutoCommit = autoCommit;
Session = await _client.StartSessionAsync();
Session.StartTransaction();
return this;
}
public IMongoTransaction Begein(bool autoCommit = true)
{
AutoCommit = autoCommit;
Session = _client.StartSession();
Session.StartTransaction();
return this;
}
public void Dispose()
{
Session?.Dispose();
}
}
public class NullMongoTransaction : MongoTransaction
{
public NullMongoTransaction(IMongoClient client = null) : base(client)
{
AutoCommit = false;
}
}
public static class MongoTransactionExtensions
{
public static IClientSessionHandle GetSession(this IMongoTransaction mongoTransaction)
{
var trans = mongoTransaction as MongoTransaction;
return trans?.Session;
}
}
}
\ No newline at end of file
......@@ -67,12 +67,12 @@ namespace DotNetCore.CAP.Abstractions
return PublishWithTransAsync(name, contentObj, callbackName);
}
public virtual void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public virtual void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException("Work for MongoDB only.");
}
public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException("Work for MongoDB only.");
}
......
using System;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Abstractions
{
public interface IMongoTransaction : IDisposable
{
/// <summary>
/// If set true, the session.CommitTransaction() will be called automatically.
/// </summary>
/// <value></value>
bool AutoCommit { get; set; }
Task<IMongoTransaction> BegeinAsync(bool autoCommit = true);
IMongoTransaction Begein(bool autoCommit = true);
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@
using System.Data;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP
{
......@@ -60,17 +61,17 @@ namespace DotNetCore.CAP
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null);
void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
/// <summary>
/// Asynchronous publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null);
Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
}
}
\ No newline at end of file
......@@ -169,12 +169,12 @@ namespace DotNetCore.CAP.Test
throw new NotImplementedException();
}
public void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException();
}
public Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment