Commit 7d1fe75b authored by Savorboard's avatar Savorboard

rename mongdb publisher

parent b136e21d
...@@ -23,8 +23,8 @@ namespace DotNetCore.CAP.MongoDB ...@@ -23,8 +23,8 @@ namespace DotNetCore.CAP.MongoDB
services.AddSingleton<IStorage, MongoDBStorage>(); services.AddSingleton<IStorage, MongoDBStorage>();
services.AddSingleton<IStorageConnection, MongoDBStorageConnection>(); services.AddSingleton<IStorageConnection, MongoDBStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>(); services.AddScoped<ICapPublisher, MongoDBPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>(); services.AddScoped<ICallbackPublisher, MongoDBPublisher>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>(); services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<CapTransactionBase, MongoDBCapTransaction>(); services.AddTransient<CapTransactionBase, MongoDBCapTransaction>();
......
...@@ -11,14 +11,16 @@ using MongoDB.Driver; ...@@ -11,14 +11,16 @@ using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {
public class CapPublisher : CapPublisherBase, ICallbackPublisher public class MongoDBPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly MongoDBOptions _options; private readonly MongoDBOptions _options;
private readonly IMongoClient _client;
public CapPublisher(IServiceProvider provider, MongoDBOptions options) public MongoDBPublisher(IServiceProvider provider, MongoDBOptions options)
: base(provider) : base(provider)
{ {
_options = options; _options = options;
_client = ServiceProvider.GetRequiredService<IMongoClient>();
} }
public async Task PublishCallbackAsync(CapPublishedMessage message) public async Task PublishCallbackAsync(CapPublishedMessage message)
...@@ -29,22 +31,26 @@ namespace DotNetCore.CAP.MongoDB ...@@ -29,22 +31,26 @@ namespace DotNetCore.CAP.MongoDB
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
var dbTrans = (IClientSessionHandle)transaction.DbTransaction; var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
var collection = dbTrans.Client var collection = _client
.GetDatabase(_options.DatabaseName) .GetDatabase(_options.DatabaseName)
.GetCollection<CapPublishedMessage>(_options.PublishedCollection); .GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false }; if (NotUseTransaction)
{
return collection.InsertOneAsync(message, insertOptions, cancel);
}
var dbTrans = (IClientSessionHandle)transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel); return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel);
} }
protected override object GetDbTransaction() //protected override object GetDbTransaction()
{ //{
var client = ServiceProvider.GetRequiredService<IMongoClient>(); // var client = ServiceProvider.GetRequiredService<IMongoClient>();
var session = client.StartSession(new ClientSessionOptions()); // var session = client.StartSession(new ClientSessionOptions());
session.StartTransaction(); // session.StartTransaction();
return session; // return session;
} //}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment