Commit 56138f52 authored by Savorboard's avatar Savorboard

merge transaction branch

parents bc4a840f 9c3f3a27
......@@ -58,13 +58,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.Test", "test\DotNetCore.CAP.PostgreSql.Test\DotNetCore.CAP.PostgreSql.Test.csproj", "{7CA3625D-1817-4695-881D-7E79A1E1DED2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MongoDB.Test", "test\DotNetCore.CAP.MongoDB.Test\DotNetCore.CAP.MongoDB.Test.csproj", "{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB.Test", "test\DotNetCore.CAP.MongoDB.Test\DotNetCore.CAP.MongoDB.Test.csproj", "{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MongoDB", "src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj", "{77C0AC02-C44B-49D5-B969-7D5305FC20A5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB", "src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj", "{77C0AC02-C44B-49D5-B969-7D5305FC20A5}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{CD276810-09A2-4105-8798-D65A8AA7C509}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -115,10 +115,6 @@ Global
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.Build.0 = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.ActiveCfg = Release|Any CPU
......@@ -131,6 +127,10 @@ Global
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU
{CD276810-09A2-4105-8798-D65A8AA7C509}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CD276810-09A2-4105-8798-D65A8AA7C509}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CD276810-09A2-4105-8798-D65A8AA7C509}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CD276810-09A2-4105-8798-D65A8AA7C509}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -147,10 +147,10 @@ Global
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{CD276810-09A2-4105-8798-D65A8AA7C509} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
<Project>
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>2</VersionMinor>
<VersionPatch>6</VersionPatch>
<VersionMinor>3</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Sample.Kafka.SqlServer;
namespace Sample.Kafka.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20180825123925_init")]
partial class init
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.1.1-rtm-30846")
.HasAnnotation("Relational:MaxIdentifierLength", 128)
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.Kafka.SqlServer.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
namespace Sample.Kafka.SqlServer.Migrations
{
public partial class init : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "Persons",
columns: table => new
{
Id = table.Column<int>(nullable: false)
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn),
Name = table.Column<string>(nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_Persons", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "Persons");
}
}
}
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Sample.Kafka.SqlServer;
namespace Sample.Kafka.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
partial class AppDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.1.1-rtm-30846")
.HasAnnotation("Relational:MaxIdentifierLength", 128)
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.Kafka.SqlServer.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
}
}
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.MongoDB;
using Microsoft.AspNetCore.Mvc;
using MongoDB.Bson;
using MongoDB.Driver;
......@@ -14,72 +11,58 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
public class ValuesController : ControllerBase
{
private readonly IMongoClient _client;
private readonly ICapPublisher _capPublisher;
private readonly IMongoTransaction _mongoTransaction;
private readonly ICapPublisher _capBus;
public ValuesController(IMongoClient client, ICapPublisher capPublisher, IMongoTransaction mongoTransaction)
public ValuesController(IMongoClient client, ICapPublisher capBus)
{
_client = client;
_capPublisher = capPublisher;
_mongoTransaction = mongoTransaction;
_capBus = capBus;
}
[Route("~/publish")]
public async Task<IActionResult> PublishWithTrans()
[Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
using (var trans = await _mongoTransaction.BegeinAsync())
{
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(trans.GetSession(), new BsonDocument { { "hello", "world" } });
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
await _capPublisher.PublishWithMongoAsync("sample.rabbitmq.mongodb", DateTime.Now, trans);
}
return Ok();
}
[Route("~/publish/not/autocommit")]
[Route("~/transaction/not/autocommit")]
public IActionResult PublishNotAutoCommit()
{
using (var trans = _mongoTransaction.Begein(autoCommit: false))
{
var session = trans.GetSession();
//NOTE: before your test, your need to create database and collection at first
//注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建
//var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
//mycollection.InsertOne(new BsonDocument { { "test", "test" } });
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "Hello", "World" } });
using (var session = _client.StartTransaction(_capBus, autoCommit: false))
{
var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
//Do something, and commit by yourself.
session.CommitTransaction();
}
return Ok();
}
[Route("~/publish/rollback")]
public IActionResult PublishRollback()
[Route("~/transaction/autocommit")]
public IActionResult PublishWithoutTrans()
{
using (var trans = _mongoTransaction.Begein(autoCommit: false))
//NOTE: before your test, your need to create database and collection at first
//注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建
//var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
//mycollection.InsertOne(new BsonDocument { { "test", "test" } });
using (var session = _client.StartTransaction(_capBus, autoCommit: true))
{
var session = trans.GetSession();
try
{
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
//Do something, but
throw new Exception("Foo");
session.CommitTransaction();
}
catch (System.Exception ex)
{
session.AbortTransaction();
return StatusCode(500, ex.Message);
}
var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
}
}
[Route("~/publish/without/trans")]
public IActionResult PublishWithoutTrans()
{
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now);
return Ok();
}
......@@ -87,7 +70,7 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
[CapSubscribe("sample.rabbitmq.mongodb")]
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mongodb] message received: " + DateTime.Now + ",sent time: " + time);
Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
}
}
}
using DotNetCore.CAP;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
......@@ -19,21 +18,11 @@ namespace Sample.RabbitMQ.MongoDB
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMongoClient>(new MongoClient(Configuration.GetConnectionString("MongoDB")));
services.AddSingleton<IMongoClient>(new MongoClient("mongodb://192.168.10.110:27017,192.168.10.110:27018,192.168.10.110:27019/?replicaSet=rs0"));
services.AddCap(x =>
{
x.UseMongoDB();
var mq = new RabbitMQOptions();
Configuration.GetSection("RabbitMQ").Bind(mq);
x.UseRabbitMQ(cfg =>
{
cfg.HostName = mq.HostName;
cfg.Port = mq.Port;
cfg.UserName = mq.UserName;
cfg.Password = mq.Password;
});
x.UseMongoDB("mongodb://192.168.10.110:27017,192.168.10.110:27018,192.168.10.110:27019/?replicaSet=rs0");
x.UseRabbitMQ("localhost");
x.UseDashboard();
});
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
......@@ -46,9 +35,7 @@ namespace Sample.RabbitMQ.MongoDB
app.UseDeveloperExceptionPage();
}
app.UseMvc();
app.UseCap();
app.UseMvc();
}
}
}
......@@ -2,11 +2,22 @@
namespace Sample.RabbitMQ.MySql
{
public class Person
{
public int Id { get; set; }
public string Name { get; set; }
}
public class AppDbContext : DbContext
{
public const string ConnectionString = "Server=localhost;Database=testcap;UserId=root;Password=123123;";
public DbSet<Person> Persons { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
optionsBuilder.UseMySql(ConnectionString);
}
}
}
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;
namespace Sample.RabbitMQ.MySql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller
{
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;
public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
public ValuesController(ICapPublisher capPublisher)
{
_dbContext = dbContext;
_capBus = capPublisher;
}
[Route("~/publish")]
public IActionResult PublishMessage()
[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false))
{
//your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
for (int i = 0; i < 5; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
transaction.Commit();
}
}
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
dbContext.Persons.Add(new Person() { Name = "ef.transaction" });
for (int i = 0; i < 5; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
dbContext.SaveChanges();
trans.Commit();
}
......@@ -47,9 +69,9 @@ namespace Sample.RabbitMQ.MySql.Controllers
[NonAction]
[CapSubscribe("#.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time)
public void Subscriber(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
}
}
}
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Sample.RabbitMQ.MySql;
namespace Sample.RabbitMQ.MySql.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20180821021736_init")]
partial class init
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.1.1-rtm-30846")
.HasAnnotation("Relational:MaxIdentifierLength", 64);
modelBuilder.Entity("Sample.RabbitMQ.MySql.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
namespace Sample.RabbitMQ.MySql.Migrations
{
public partial class init : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "Persons",
columns: table => new
{
Id = table.Column<int>(nullable: false)
.Annotation("MySql:ValueGenerationStrategy", MySqlValueGenerationStrategy.IdentityColumn),
Name = table.Column<string>(nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_Persons", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "Persons");
}
}
}
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Sample.RabbitMQ.MySql;
namespace Sample.RabbitMQ.MySql.Migrations
{
[DbContext(typeof(AppDbContext))]
partial class AppDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.1.1-rtm-30846")
.HasAnnotation("Relational:MaxIdentifierLength", 64);
modelBuilder.Entity("Sample.RabbitMQ.MySql.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
......@@ -30,8 +30,6 @@ namespace Sample.RabbitMQ.MySql
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
app.UseMvc();
app.UseCap();
}
}
}
......@@ -2,13 +2,12 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.MongoDB
{
// ReSharper disable once InconsistentNaming
public class MongoDBCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<MongoDBOptions> _configure;
......@@ -23,11 +22,12 @@ namespace DotNetCore.CAP.MongoDB
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, MongoDBStorage>();
services.AddSingleton<IStorageConnection, MongoDBStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<IMongoTransaction, MongoTransaction>();
services.AddScoped<ICapPublisher, MongoDBPublisher>();
services.AddScoped<ICallbackPublisher, MongoDBPublisher>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<CapTransactionBase, MongoDBCapTransaction>();
var options = new MongoDBOptions();
_configure?.Invoke(options);
......
......@@ -3,6 +3,7 @@
namespace DotNetCore.CAP.MongoDB
{
// ReSharper disable once InconsistentNaming
public class MongoDBOptions
{
/// <summary>
......@@ -28,7 +29,5 @@ namespace DotNetCore.CAP.MongoDB
/// Default value: "published"
/// </summary>
public string PublishedCollection { get; set; } = "cap.published";
internal const string CounterCollection = "cap.counter";
}
}
\ No newline at end of file
......@@ -15,6 +15,11 @@ namespace Microsoft.Extensions.DependencyInjection
return options.UseMongoDB(x => { });
}
public static CapOptions UseMongoDB(this CapOptions options, string connectionString)
{
return options.UseMongoDB(x => { x.DatabaseConnection = connectionString; });
}
public static CapOptions UseMongoDB(this CapOptions options, Action<MongoDBOptions> configure)
{
if (configure == null)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly MongoDBOptions _options;
private readonly IMongoClient _client;
public MongoDBPublisher(IServiceProvider provider, MongoDBOptions options)
: base(provider)
{
_options = options;
_client = ServiceProvider.GetRequiredService<IMongoClient>();
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
var collection = _client
.GetDatabase(_options.DatabaseName)
.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (NotUseTransaction)
{
return collection.InsertOneAsync(message, insertOptions, cancel);
}
var dbTrans = (IClientSessionHandle)transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Diagnostics;
using MongoDB.Driver;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MongoDBCapTransaction : CapTransactionBase
{
public MongoDBCapTransaction(IDispatcher dispatcher)
: base(dispatcher)
{
}
public override void Commit()
{
Debug.Assert(DbTransaction != null);
if (DbTransaction is IClientSessionHandle session)
{
session.CommitTransaction();
}
Flush();
}
public override void Rollback()
{
Debug.Assert(DbTransaction != null);
if (DbTransaction is IClientSessionHandle session)
{
session.AbortTransaction();
}
}
public override void Dispose()
{
(DbTransaction as IClientSessionHandle)?.Dispose();
}
}
public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IClientSessionHandle dbTransaction, bool autoCommit = false)
{
if (!dbTransaction.IsInTransaction)
{
dbTransaction.StartTransaction();
}
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static IClientSessionHandle StartTransaction(this IMongoClient client,
ICapPublisher publisher, bool autoCommit = false)
{
var clientSessionHandle = client.StartSession();
var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit);
return new CapMongoDbClientSessionHandle(capTrans);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP;
using MongoDB.Bson;
using MongoDB.Driver.Core.Bindings;
// ReSharper disable once CheckNamespace
namespace MongoDB.Driver
{
internal class CapMongoDbClientSessionHandle : IClientSessionHandle
{
private readonly IClientSessionHandle _sessionHandle;
private readonly ICapTransaction _transaction;
public CapMongoDbClientSessionHandle(ICapTransaction transaction)
{
_transaction = transaction;
_sessionHandle = (IClientSessionHandle) _transaction.DbTransaction;
}
public void Dispose()
{
_transaction.Dispose();
}
public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
{
_transaction.Rollback();
}
public Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
{
_transaction.Rollback();
return Task.CompletedTask;
}
public void AdvanceClusterTime(BsonDocument newClusterTime)
{
_sessionHandle.AdvanceClusterTime(newClusterTime);
}
public void AdvanceOperationTime(BsonTimestamp newOperationTime)
{
_sessionHandle.AdvanceOperationTime(newOperationTime);
}
public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken))
{
_transaction.Commit();
}
public Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken))
{
_transaction.Commit();
return Task.CompletedTask;
}
public void StartTransaction(TransactionOptions transactionOptions = null)
{
_sessionHandle.StartTransaction(transactionOptions);
}
public IMongoClient Client => _sessionHandle.Client;
public BsonDocument ClusterTime => _sessionHandle.ClusterTime;
public bool IsImplicit => _sessionHandle.IsImplicit;
public bool IsInTransaction => _sessionHandle.IsInTransaction;
public BsonTimestamp OperationTime => _sessionHandle.OperationTime;
public ClientSessionOptions Options => _sessionHandle.Options;
public IServerSession ServerSession => _sessionHandle.ServerSession;
public ICoreSessionHandle WrappedCoreSession => _sessionHandle.WrappedCoreSession;
public IClientSessionHandle Fork()
{
return _sessionHandle.Fork();
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBCollectProcessor : ICollectProcessor
{
private readonly IMongoDatabase _database;
private readonly ILogger _logger;
private readonly MongoDBOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger,
MongoDBOptions options,
IMongoClient client)
{
_options = options;
_logger = logger;
_database = client.GetDatabase(_options.DatabaseName);
}
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug(
$"Collecting expired data from collection [{_options.PublishedCollection}].");
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
await publishedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<CapPublishedMessage>(
Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
await receivedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<CapReceivedMessage>(
Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using MongoDB.Bson;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBMonitoringApi : IMonitoringApi
{
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
{
var mongoClient = client ?? throw new ArgumentNullException(nameof(client));
_options = options ?? throw new ArgumentNullException(nameof(options));
_database = mongoClient.GetDatabase(_options.DatabaseName);
}
public StatisticsDto GetStatistics()
{
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
var statistics = new StatisticsDto();
{
if (int.TryParse(
publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(),
out var count))
{
statistics.PublishedSucceeded = count;
}
}
{
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(),
out var count))
{
statistics.PublishedFailed = count;
}
}
{
if (int.TryParse(
receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(),
out var count))
{
statistics.ReceivedSucceeded = count;
}
}
{
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(),
out var count))
{
statistics.ReceivedFailed = count;
}
}
return statistics;
}
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Failed);
}
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Succeeded);
}
public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
queryDto.StatusName = StatusName.Standardized(queryDto.StatusName);
var name = queryDto.MessageType == MessageType.Publish
? _options.PublishedCollection
: _options.ReceivedCollection;
var collection = _database.GetCollection<MessageDto>(name);
var builder = Builders<MessageDto>.Filter;
var filter = builder.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName);
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
filter = filter & builder.Eq(x => x.Name, queryDto.Name);
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
filter = filter & builder.Eq(x => x.Group, queryDto.Group);
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
}
var result = collection
.Find(filter)
.SortByDescending(x => x.Added)
.Skip(queryDto.PageSize * queryDto.CurrentPage)
.Limit(queryDto.PageSize)
.ToList();
return result;
}
public int PublishedFailedCount()
{
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed);
}
public int PublishedSucceededCount()
{
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded);
}
public int ReceivedFailedCount()
{
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed);
}
public int ReceivedSucceededCount()
{
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded);
}
private int GetNumberOfMessage(string collectionName, string statusName)
{
var collection = _database.GetCollection<BsonDocument>(collectionName);
var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}});
return int.Parse(count.ToString());
}
private IDictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName)
{
var collectionName =
type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection;
var endDate = DateTime.UtcNow;
var groupby = new BsonDocument
{
{
"$group", new BsonDocument
{
{
"_id", new BsonDocument
{
{
"Key", new BsonDocument
{
{
"$dateToString", new BsonDocument
{
{"format", "%Y-%m-%d %H:00:00"},
{"date", "$Added"}
}
}
}
}
}
},
{"Count", new BsonDocument {{"$sum", 1}}}
}
}
};
var match = new BsonDocument
{
{
"$match", new BsonDocument
{
{
"Added", new BsonDocument
{
{"$gt", endDate.AddHours(-24)}
}
},
{
"StatusName",
new BsonDocument
{
{"$eq", statusName}
}
}
}
}
};
var pipeline = new[] {match, groupby};
var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline).ToList();
var dic = new Dictionary<DateTime, int>();
for (var i = 0; i < 24; i++)
{
dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0);
endDate = endDate.AddHours(-1);
}
result.ForEach(d =>
{
var key = d["_id"].AsBsonDocument["Key"].AsString;
if (DateTime.TryParse(key, out var dateTime))
{
dic[dateTime.ToLocalTime()] = d["Count"].AsInt32;
}
});
return dic;
}
}
}
\ No newline at end of file
......@@ -6,7 +6,6 @@ using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
......@@ -49,26 +48,14 @@ namespace DotNetCore.CAP.MongoDB
var database = _client.GetDatabase(_options.DatabaseName);
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList();
if (!names.Any(n => n == _options.ReceivedCollection))
if (names.All(n => n != _options.ReceivedCollection))
{
await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken);
}
if (names.All(n => n != _options.PublishedCollection))
{
await database.CreateCollectionAsync(_options.PublishedCollection,
cancellationToken: cancellationToken);
}
if (names.All(n => n != MongoDBOptions.CounterCollection))
{
await database.CreateCollectionAsync(MongoDBOptions.CounterCollection, cancellationToken: cancellationToken);
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection);
await collection.InsertManyAsync(new[]
{
new BsonDocument {{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument {{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
}, cancellationToken: cancellationToken);
await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
......
......@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.MongoDB
_database = _client.GetDatabase(_options.DatabaseName);
}
public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
......@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.MongoDB
return result.ModifiedCount > 0;
}
public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
......@@ -60,7 +60,7 @@ namespace DotNetCore.CAP.MongoDB
return new MongoDBStorageTransaction(_client, _options);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
......@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.MongoDB
.ToListAsync();
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
......@@ -95,7 +95,7 @@ namespace DotNetCore.CAP.MongoDB
.ToListAsync();
}
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
......@@ -104,15 +104,7 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection);
collection.InsertOne(message);
return message.Id;
}
public void Dispose()
{
}
}
}
}
\ No newline at end of file
......@@ -24,9 +24,12 @@ namespace DotNetCore.CAP
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, MySqlStorage>();
services.AddSingleton<IStorageConnection, MySqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddScoped<ICapPublisher, MySqlPublisher>();
services.AddScoped<ICallbackPublisher, MySqlPublisher>();
services.AddTransient<ICollectProcessor, MySqlCollectProcessor>();
services.AddTransient<CapTransactionBase, MySqlCapTransaction>();
AddSingletionMySqlOptions(services);
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly MySqlOptions _options;
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher, IServiceProvider provider,
MySqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return await dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID()";
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly MySqlOptions _options;
public MySqlPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<MySqlOptions>();
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
if (NotUseTransaction)
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Data;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MySqlCapTransaction : CapTransactionBase
{
public MySqlCapTransaction(IDispatcher dispatcher) : base(dispatcher)
{
}
public override void Commit()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Commit();
break;
}
Flush();
}
public override void Rollback()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Rollback();
break;
}
}
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
}
}
public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
var capTrans = publisher.Transaction.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
}
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed)
{
dbConnection.Open();
}
var dbTransaction = dbConnection.BeginTransaction();
return publisher.Transaction.Begin(dbTransaction, autoCommit);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.EntityFrameworkCore.Storage
{
// ReSharper disable once InconsistentNaming
internal class CapEFDbTransaction : IDbContextTransaction
{
private readonly ICapTransaction _transaction;
public CapEFDbTransaction(ICapTransaction transaction)
{
_transaction = transaction;
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction;
TransactionId = dbContextTransaction.TransactionId;
}
public void Dispose()
{
_transaction.Dispose();
}
public void Commit()
{
_transaction.Commit();
}
public void Rollback()
{
_transaction.Rollback();
}
public Guid TransactionId { get; }
}
}
\ No newline at end of file
......@@ -81,7 +81,11 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
`ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(40) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `{prefix}.published` MODIFY Id BIGINT NOT NULL;
ALTER TABLE `{prefix}.received` MODIFY Id BIGINT NOT NULL;
";
return batchSql;
}
......
......@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.MySql
return new MySqlStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};";
......@@ -52,7 +52,7 @@ namespace DotNetCore.CAP.MySql
}
}
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
......@@ -60,16 +60,16 @@ namespace DotNetCore.CAP.MySql
}
var sql = $@"
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID();";
INSERT INTO `{_prefix}.received`(`Id`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.ExecuteScalarAsync<int>(sql, message);
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};";
using (var connection = new MySqlConnection(Options.ConnectionString))
......@@ -89,7 +89,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST
}
}
public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
......@@ -100,7 +100,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST
}
}
public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
......@@ -109,10 +109,6 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST
{
return connection.Execute(sql) > 0;
}
}
public void Dispose()
{
}
}
}
}
\ No newline at end of file
......@@ -14,7 +14,6 @@ namespace DotNetCore.CAP.MySql
{
private readonly IDbConnection _dbConnection;
//private readonly IDbTransaction _dbTransaction;
private readonly string _prefix;
public MySqlStorageTransaction(MySqlStorageConnection connection)
......@@ -23,8 +22,6 @@ namespace DotNetCore.CAP.MySql
_prefix = options.TableNamePrefix;
_dbConnection = new MySqlConnection(options.ConnectionString);
// _dbConnection.Open(); for performance
// _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
......@@ -55,13 +52,11 @@ namespace DotNetCore.CAP.MySql
{
_dbConnection.Close();
_dbConnection.Dispose();
//_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
//_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
......
......@@ -24,9 +24,12 @@ namespace DotNetCore.CAP
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddScoped<ICapPublisher, PostgreSqlPublisher>();
services.AddScoped<ICallbackPublisher, PostgreSqlPublisher>();
services.AddTransient<ICollectProcessor, PostgreSqlCollectProcessor>();
services.AddTransient<CapTransactionBase, PostgreSqlCapTransaction>();
AddSingletonPostgreSqlOptions(services);
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly PostgreSqlOptions _options;
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, PostgreSqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly PostgreSqlOptions _options;
public PostgreSqlPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<PostgreSqlOptions>();
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
if (NotUseTransaction)
{
using (var connection = InitDbConnection())
{
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
private IDbConnection InitDbConnection()
{
var conn = new NpgsqlConnection(_options.ConnectionString);
conn.Open();
return conn;
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Data;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlCapTransaction : CapTransactionBase
{
public PostgreSqlCapTransaction(IDispatcher dispatcher) : base(dispatcher)
{
}
public override void Commit()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Commit();
break;
}
Flush();
}
public override void Rollback()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Rollback();
break;
}
}
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
}
}
public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed)
{
dbConnection.Open();
}
var dbTransaction = dbConnection.BeginTransaction();
return publisher.Transaction.Begin(dbTransaction, autoCommit);
}
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
var capTrans = publisher.Transaction.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.EntityFrameworkCore.Storage
{
internal class CapEFDbTransaction : IDbContextTransaction
{
private readonly ICapTransaction _transaction;
public CapEFDbTransaction(ICapTransaction transaction)
{
_transaction = transaction;
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction;
TransactionId = dbContextTransaction.TransactionId;
}
public void Dispose()
{
_transaction.Dispose();
}
public void Commit()
{
_transaction.Commit();
}
public void Rollback()
{
_transaction.Rollback();
}
public Guid TransactionId { get; }
}
}
\ No newline at end of file
......@@ -100,10 +100,8 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
DROP TABLE IF EXISTS ""{schema}"".""queue"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Group"" VARCHAR(200) NULL,
""Content"" TEXT NULL,
......@@ -114,7 +112,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
);
CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
......
......@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.PostgreSql
return new PostgreSqlStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
......@@ -50,7 +50,7 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
......@@ -58,15 +58,15 @@ namespace DotNetCore.CAP.PostgreSql
}
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.ExecuteScalarAsync<int>(sql, message);
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
......@@ -90,7 +90,7 @@ namespace DotNetCore.CAP.PostgreSql
{
}
public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
......@@ -101,7 +101,7 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
......
......@@ -35,9 +35,7 @@ namespace DotNetCore.CAP.PostgreSql
}
var sql =
$@"UPDATE ""{
_schema
}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
$@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -49,9 +47,7 @@ namespace DotNetCore.CAP.PostgreSql
}
var sql =
$@"UPDATE ""{
_schema
}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
$@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -66,29 +62,5 @@ namespace DotNetCore.CAP.PostgreSql
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
......@@ -22,11 +23,15 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<DiagnosticProcessorObserver>();
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddScoped<ICapPublisher, SqlServerPublisher>();
services.AddScoped<ICallbackPublisher, SqlServerPublisher>();
services.AddTransient<ICollectProcessor, SqlServerCollectProcessor>();
services.AddTransient<CapTransactionBase, SqlServerCapTransaction>();
AddSqlServerOptions(services);
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly SqlServerOptions _options;
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, SqlServerOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Reflection;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer.Diagnostics
{
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>>
{
private readonly IDispatcher _dispatcher;
private readonly ConcurrentDictionary<Guid, List<CapPublishedMessage>> _bufferList;
public DiagnosticObserver(IDispatcher dispatcher,
ConcurrentDictionary<Guid, List<CapPublishedMessage>> bufferList)
{
_dispatcher = dispatcher;
_bufferList = bufferList;
}
private const string SqlClientPrefix = "System.Data.SqlClient.";
public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError";
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(KeyValuePair<string, object> evt)
{
if (evt.Key == SqlAfterCommitTransaction)
{
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId;
if (_bufferList.TryRemove(transactionKey, out var msgList))
{
foreach (var message in msgList)
{
_dispatcher.EnqueueToPublish(message);
}
}
}
else if (evt.Key == SqlErrorCommitTransaction)
{
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId;
_bufferList.TryRemove(transactionKey, out _);
}
}
static object GetProperty(object _this, string propertyName)
{
return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer.Diagnostics
{
public class DiagnosticProcessorObserver : IObserver<DiagnosticListener>
{
private readonly IDispatcher _dispatcher;
public const string DiagnosticListenerName = "SqlClientDiagnosticListener";
public ConcurrentDictionary<Guid, List<CapPublishedMessage>> BufferList { get; }
public DiagnosticProcessorObserver(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
BufferList = new ConcurrentDictionary<Guid, List<CapPublishedMessage>>();
}
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(DiagnosticListener listener)
{
if (listener.Name == DiagnosticListenerName)
{
listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList));
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly SqlServerOptions _options;
public SqlServerPublisher(IServiceProvider provider) : base(provider)
{
_options = ServiceProvider.GetService<SqlServerOptions>();
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
if (NotUseTransaction)
{
using (var connection = new SqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
#endregion private methods
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerCapTransaction : CapTransactionBase
{
private readonly DbContext _dbContext;
private readonly DiagnosticProcessorObserver _diagnosticProcessor;
public SqlServerCapTransaction(
IDispatcher dispatcher,
SqlServerOptions sqlServerOptions,
IServiceProvider serviceProvider) : base(dispatcher)
{
if (sqlServerOptions.DbContextType != null)
{
_dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext;
}
_diagnosticProcessor = serviceProvider.GetRequiredService<DiagnosticProcessorObserver>();
}
protected override void AddToSent(CapPublishedMessage msg)
{
if (DbTransaction is NoopTransaction)
{
base.AddToSent(msg);
return;
}
var dbTransaction = DbTransaction as IDbTransaction;
if (dbTransaction == null)
{
if (DbTransaction is IDbContextTransaction dbContextTransaction)
{
dbTransaction = dbContextTransaction.GetDbTransaction();
}
if (dbTransaction == null)
{
throw new ArgumentNullException(nameof(DbTransaction));
}
}
var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId;
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list))
{
list.Add(msg);
}
else
{
var msgList = new List<CapPublishedMessage>(1) { msg };
_diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList);
}
}
public override void Commit()
{
switch (DbTransaction)
{
case NoopTransaction _:
Flush();
break;
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
_dbContext?.SaveChanges();
dbContextTransaction.Commit();
break;
}
}
public override void Rollback()
{
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Rollback();
break;
}
}
public override void Dispose()
{
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Dispose();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Dispose();
break;
}
}
}
public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static IDbTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed)
{
dbConnection.Open();
}
var dbTransaction = dbConnection.BeginTransaction();
var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit);
return (IDbTransaction)capTransaction.DbTransaction;
}
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
var capTrans = publisher.Transaction.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.EntityFrameworkCore.Storage
{
internal class CapEFDbTransaction : IDbContextTransaction
{
private readonly ICapTransaction _transaction;
public CapEFDbTransaction(ICapTransaction transaction)
{
_transaction = transaction;
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction;
TransactionId = dbContextTransaction.TransactionId;
}
public void Dispose()
{
_transaction.Dispose();
}
public void Commit()
{
_transaction.Commit();
}
public void Rollback()
{
_transaction.Rollback();
}
public Guid TransactionId { get; }
}
}
\ No newline at end of file
......@@ -4,10 +4,12 @@
using System;
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
......@@ -18,12 +20,15 @@ namespace DotNetCore.CAP.SqlServer
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver;
public SqlServerStorage(ILogger<SqlServerStorage> logger,
CapOptions capOptions,
SqlServerOptions options)
SqlServerOptions options,
DiagnosticProcessorObserver diagnosticProcessorObserver)
{
_options = options;
_diagnosticProcessorObserver = diagnosticProcessorObserver;
_logger = logger;
_capOptions = capOptions;
}
......@@ -38,7 +43,7 @@ namespace DotNetCore.CAP.SqlServer
return new SqlServerMonitoringApi(this, _options);
}
public async Task InitializeAsync(CancellationToken cancellationToken)
public async Task InitializeAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
......@@ -53,6 +58,8 @@ namespace DotNetCore.CAP.SqlServer
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver);
}
protected virtual string CreateDbTablesScript(string schema)
......@@ -64,15 +71,10 @@ BEGIN
EXEC('CREATE SCHEMA [{schema}]')
END;
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NOT NULL
BEGIN
DROP TABLE [{schema}].[Queue];
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Received](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Id] [bigint] NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Group] [nvarchar](200) NULL,
[Content] [nvarchar](max) NULL,
......@@ -90,7 +92,7 @@ END;
IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Published](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Id] [bigint] NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL,
......
......@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.SqlServer
return new SqlServerStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
......@@ -50,7 +50,7 @@ namespace DotNetCore.CAP.SqlServer
}
}
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
......@@ -58,16 +58,16 @@ namespace DotNetCore.CAP.SqlServer
}
var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
INSERT INTO [{Options.Schema}].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.ExecuteScalarAsync<int>(sql, message);
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(Options.ConnectionString))
......@@ -87,7 +87,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
}
}
public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
......@@ -98,7 +98,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
}
}
public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
......@@ -108,9 +108,5 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
return connection.Execute(sql) > 0;
}
}
public void Dispose()
{
}
}
}
\ No newline at end of file
......@@ -14,7 +14,6 @@ namespace DotNetCore.CAP.SqlServer
{
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly string _schema;
public SqlServerStorageTransaction(SqlServerStorageConnection connection)
......@@ -24,7 +23,6 @@ namespace DotNetCore.CAP.SqlServer
_dbConnection = new SqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
......@@ -36,7 +34,7 @@ namespace DotNetCore.CAP.SqlServer
var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
......@@ -48,43 +46,17 @@ namespace DotNetCore.CAP.SqlServer
var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
_dbConnection.Execute(sql, message);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
......@@ -12,7 +13,7 @@ namespace Microsoft.AspNetCore.Builder
/// <summary>
/// app extensions for <see cref="IApplicationBuilder" />
/// </summary>
public static class AppBuilderExtensions
internal static class AppBuilderExtensions
{
/// <summary>
/// Enables cap for the current application
......@@ -70,4 +71,17 @@ namespace Microsoft.AspNetCore.Builder
}
}
}
sealed class CapStartupFilter : IStartupFilter
{
public Action<IApplicationBuilder> Configure(Action<IApplicationBuilder> next)
{
return app =>
{
app.UseCap();
next(app);
};
}
}
}
\ No newline at end of file
......@@ -8,6 +8,8 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection.Extensions;
// ReSharper disable once CheckNamespace
......@@ -24,9 +26,7 @@ namespace Microsoft.Extensions.DependencyInjection
/// <param name="services">The services available in the application.</param>
/// <param name="setupAction">An action to configure the <see cref="CapOptions" />.</param>
/// <returns>An <see cref="CapBuilder" /> for application services.</returns>
public static CapBuilder AddCap(
this IServiceCollection services,
Action<CapOptions> setupAction)
public static CapBuilder AddCap(this IServiceCollection services, Action<CapOptions> setupAction)
{
if (setupAction == null)
{
......@@ -34,8 +34,8 @@ namespace Microsoft.Extensions.DependencyInjection
}
services.TryAddSingleton<CapMarkerService>();
services.Configure(setupAction);
//Consumer service
AddSubscribeServices(services);
//Serializer and model binder
......@@ -49,18 +49,18 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<MethodMatcherCache>();
//Bootstrapper and Processors
services.AddSingleton<IProcessingServer, ConsumerHandler>();
services.AddSingleton<IProcessingServer, CapProcessingServer>();
services.AddSingleton<IBootstrapper, DefaultBootstrapper>();
services.AddSingleton<IStateChanger, StateChanger>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerHandler>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, CapProcessingServer>());
services.TryAddSingleton<IBootstrapper, DefaultBootstrapper>();
services.TryAddSingleton<IStateChanger, StateChanger>();
//Queue's message processor
services.AddTransient<NeedRetryMessageProcessor>();
services.TryAddSingleton<NeedRetryMessageProcessor>();
//Sender and Executors
services.AddSingleton<IDispatcher, Dispatcher>();
services.TryAddSingleton<IDispatcher, Dispatcher>();
// Warning: IPublishMessageSender need to inject at extension project.
services.AddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
services.TryAddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
//Options and extension service
var options = new CapOptions();
......@@ -69,9 +69,11 @@ namespace Microsoft.Extensions.DependencyInjection
{
serviceExtension.AddServices(services);
}
services.AddSingleton(options);
//Startup and Middleware
services.AddTransient<IStartupFilter, CapStartupFilter>();
return new CapBuilder(services);
}
......@@ -90,7 +92,7 @@ namespace Microsoft.Extensions.DependencyInjection
foreach (var service in consumerListenerServices)
{
services.AddTransient(service.Key, service.Value);
services.TryAddEnumerable(ServiceDescriptor.Transient(service.Key, service.Value));
}
}
}
......
......@@ -82,14 +82,14 @@ namespace DotNetCore.CAP.Dashboard
Routes.AddJsonResult("/published/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var id = long.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});
Routes.AddJsonResult("/received/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var id = long.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
......
......@@ -7,7 +7,7 @@ namespace DotNetCore.CAP.Dashboard.Monitoring
{
public class MessageDto
{
public int Id { get; set; }
public long Id { get; set; }
public string Group { get; set; }
......
......@@ -34,13 +34,9 @@
<PackageReference Include="Consul" Version="0.7.2.6" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" />
</ItemGroup>
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP
{
......@@ -12,66 +11,23 @@ namespace DotNetCore.CAP
/// </summary>
public interface ICapPublisher
{
/// <summary>
/// (EntityFramework) Asynchronous publish an object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbTransaction.
/// </para>
/// </summary>
/// <typeparam name="T">The type of content object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null);
ICapTransaction Transaction { get; }
/// <summary>
/// (EntityFramework) Publish an object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbTransaction.
/// </para>
/// Asynchronous publish an object message.
/// </summary>
/// <typeparam name="T">The type of content object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null);
/// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// (ado.net) Asynchronous publish an object message.
/// Publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null);
/// <summary>
/// (ado.net) Publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param>
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null);
/// <summary>
/// Publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
/// <summary>
/// Asynchronous publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
void Publish<T>(string name, T contentObj, string callbackName = null);
}
}
\ No newline at end of file
using System.Collections.Generic;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public abstract class CapTransactionBase : ICapTransaction
{
private readonly IDispatcher _dispatcher;
private readonly IList<CapPublishedMessage> _bufferList;
protected CapTransactionBase(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
_bufferList = new List<CapPublishedMessage>(1);
}
public bool AutoCommit { get; set; }
public object DbTransaction { get; set; }
protected internal virtual void AddToSent(CapPublishedMessage msg)
{
_bufferList.Add(msg);
}
protected void Flush()
{
foreach (var message in _bufferList)
{
_dispatcher.EnqueueToPublish(message);
}
}
public abstract void Commit();
public abstract void Rollback();
public abstract void Dispose();
}
}
using System;
namespace DotNetCore.CAP
{
public interface ICapTransaction : IDisposable
{
bool AutoCommit { get; set; }
object DbTransaction { get; set; }
void Commit();
void Rollback();
}
}
......@@ -112,6 +112,7 @@ namespace DotNetCore.CAP
var receivedMessage = new CapReceivedMessage(messageContext)
{
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled,
Content = messageBody
};
......@@ -170,10 +171,7 @@ namespace DotNetCore.CAP
private void StoreMessage(CapReceivedMessage receivedMessage)
{
var id = _connection.StoreReceivedMessageAsync(receivedMessage)
.GetAwaiter().GetResult();
receivedMessage.Id = id;
_connection.StoreReceivedMessage(receivedMessage);
}
private (Guid, string) TracingBefore(string topic, string values)
......
......@@ -173,8 +173,6 @@ namespace DotNetCore.CAP
du);
s_diagnosticListener.WritePublishAfter(eventData);
_logger.MessageHasBeenSent(du.TotalSeconds);
}
private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
......@@ -11,7 +10,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Represents a connection to the storage.
/// </summary>
public interface IStorageConnection : IDisposable
public interface IStorageConnection
{
//Sent messages
......@@ -19,7 +18,7 @@ namespace DotNetCore.CAP
/// Returns the message with the given id.
/// </summary>
/// <param name="id">The message's id.</param>
Task<CapPublishedMessage> GetPublishedMessageAsync(int id);
Task<CapPublishedMessage> GetPublishedMessageAsync(long id);
/// <summary>
/// Returns executed failed messages.
......@@ -32,13 +31,13 @@ namespace DotNetCore.CAP
/// Stores the message.
/// </summary>
/// <param name="message">The message to store.</param>
Task<int> StoreReceivedMessageAsync(CapReceivedMessage message);
void StoreReceivedMessage(CapReceivedMessage message);
/// <summary>
/// Returns the message with the given id.
/// </summary>
/// <param name="id">The message's id.</param>
Task<CapReceivedMessage> GetReceivedMessageAsync(int id);
Task<CapReceivedMessage> GetReceivedMessageAsync(long id);
/// <summary>
/// Returns executed failed message.
......@@ -55,13 +54,13 @@ namespace DotNetCore.CAP
/// </summary>
/// <param name="messageId">Message id</param>
/// <param name="state">State name</param>
bool ChangePublishedState(int messageId, string state);
bool ChangePublishedState(long messageId, string state);
/// <summary>
/// Change specified message's state of received message
/// </summary>
/// <param name="messageId">Message id</param>
/// <param name="state">State name</param>
bool ChangeReceivedState(int messageId, string state);
bool ChangeReceivedState(long messageId, string state);
}
}
\ No newline at end of file
// Copyright 2010-2012 Twitter, Inc.
// An object that generates IDs. This is broken into a separate class in case we ever want to support multiple worker threads per process
using System;
namespace DotNetCore.CAP.Infrastructure
{
public class SnowflakeId
{
public const long Twepoch = 1288834974657L;
private const int WorkerIdBits = 5;
private const int DatacenterIdBits = 5;
private const int SequenceBits = 12;
private const long MaxWorkerId = -1L ^ (-1L << WorkerIdBits);
private const long MaxDatacenterId = -1L ^ (-1L << DatacenterIdBits);
private const int WorkerIdShift = SequenceBits;
private const int DatacenterIdShift = SequenceBits + WorkerIdBits;
public const int TimestampLeftShift = SequenceBits + WorkerIdBits + DatacenterIdBits;
private const long SequenceMask = -1L ^ (-1L << SequenceBits);
private static SnowflakeId _snowflakeId;
private readonly object _lock = new object();
private static readonly object s_lock = new object();
private long _lastTimestamp = -1L;
private SnowflakeId(long workerId, long datacenterId, long sequence = 0L)
{
WorkerId = workerId;
DatacenterId = datacenterId;
Sequence = sequence;
// sanity check for workerId
if (workerId > MaxWorkerId || workerId < 0)
throw new ArgumentException($"worker Id can't be greater than {MaxWorkerId} or less than 0");
if (datacenterId > MaxDatacenterId || datacenterId < 0)
throw new ArgumentException($"datacenter Id can't be greater than {MaxDatacenterId} or less than 0");
}
public long WorkerId { get; protected set; }
public long DatacenterId { get; protected set; }
public long Sequence { get; internal set; }
public static SnowflakeId Default(long datacenterId = 0)
{
lock (s_lock)
{
return _snowflakeId ?? (_snowflakeId = new SnowflakeId(AppDomain.CurrentDomain.Id, datacenterId));
}
}
public virtual long NextId()
{
lock (_lock)
{
var timestamp = TimeGen();
if (timestamp < _lastTimestamp)
throw new Exception(
$"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds");
if (_lastTimestamp == timestamp)
{
Sequence = (Sequence + 1) & SequenceMask;
if (Sequence == 0) timestamp = TilNextMillis(_lastTimestamp);
}
else
{
Sequence = 0;
}
_lastTimestamp = timestamp;
var id = ((timestamp - Twepoch) << TimestampLeftShift) |
(DatacenterId << DatacenterIdShift) |
(WorkerId << WorkerIdShift) | Sequence;
return id;
}
}
protected virtual long TilNextMillis(long lastTimestamp)
{
var timestamp = TimeGen();
while (timestamp <= lastTimestamp) timestamp = TimeGen();
return timestamp;
}
protected virtual long TimeGen()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
}
}
\ No newline at end of file
......@@ -54,6 +54,7 @@ namespace DotNetCore.CAP.Internal
var publishedMessage = new CapPublishedMessage
{
Id = SnowflakeId.Default().NextId(),
Name = topicName,
Content = content,
StatusName = StatusName.Scheduled
......
......@@ -10,12 +10,12 @@ namespace DotNetCore.CAP
[SuppressMessage("ReSharper", "InconsistentNaming")]
internal static class LoggerExtensions
{
public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries)
public static void ConsumerExecutedAfterThreshold(this ILogger logger, long messageId, int retries)
{
logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying.");
}
public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries)
public static void SenderAfterThreshold(this ILogger logger, long messageId, int retries)
{
logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying.");
}
......@@ -25,22 +25,22 @@ namespace DotNetCore.CAP
logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message);
}
public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries)
public static void ConsumerExecutionRetrying(this ILogger logger, long messageId, int retries)
{
logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}");
}
public static void SenderRetrying(this ILogger logger, int messageId, int retries)
public static void SenderRetrying(this ILogger logger, long messageId, int retries)
{
logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
}
public static void MessageHasBeenSent(this ILogger logger, double seconds)
public static void MessageHasBeenSent(this ILogger logger, string name, string content)
{
logger.LogDebug($"Message published. Took: {seconds} secs.");
logger.LogDebug($"Message published. name: {name}, content:{content}.");
}
public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex)
public static void MessagePublishException(this ILogger logger, long messageId, string reason, Exception ex)
{
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Models
namespace DotNetCore.CAP.Internal
{
public class CapQueue
public class NoopTransaction
{
public int MessageId { get; set; }
/// <summary>
/// 0 is CapSentMessage, 1 is CapReceivedMessage
/// </summary>
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
}
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Models
Added = DateTime.Now;
}
public int Id { get; set; }
public long Id { get; set; }
public string Name { get; set; }
......
......@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.Models
Content = message.Content;
}
public int Id { get; set; }
public long Id { get; set; }
public string Group { get; set; }
......
......@@ -4,7 +4,6 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
{
......@@ -16,13 +15,13 @@ namespace DotNetCore.CAP.Processor
private readonly TimeSpan _waitingInterval;
public NeedRetryMessageProcessor(
IOptions<CapOptions> options,
CapOptions options,
ISubscriberExecutor subscriberExecutor,
IPublishMessageSender publishMessageSender)
{
_subscriberExecutor = subscriberExecutor;
_publishMessageSender = publishMessageSender;
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
_waitingInterval = TimeSpan.FromSeconds(options.FailedRetryInterval);
}
public async Task ProcessAsync(ProcessingContext context)
......
......@@ -17,20 +17,19 @@ namespace DotNetCore.CAP.MongoDB.Test
{
_api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions);
var helper = new MongoDBUtil();
var collection = Database.GetCollection<CapPublishedMessage>(MongoDBOptions.PublishedCollection);
collection.InsertMany(new[]
{
new CapPublishedMessage
{
Id = helper.GetNextSequenceValue(Database,MongoDBOptions.PublishedCollection),
Id = SnowflakeId.Default().NextId(),
Added = DateTime.Now.AddHours(-1),
StatusName = "Failed",
Content = "abc"
},
new CapPublishedMessage
{
Id = helper.GetNextSequenceValue(Database,MongoDBOptions.PublishedCollection),
Id = SnowflakeId.Default().NextId(),
Added = DateTime.Now,
StatusName = "Failed",
Content = "bbc"
......
......@@ -11,20 +11,23 @@ namespace DotNetCore.CAP.MongoDB.Test
[Collection("MongoDB")]
public class MongoDBStorageConnectionTest : DatabaseTestHost
{
private IStorageConnection _connection =>
private IStorageConnection _connection =>
Provider.GetService<MongoDBStorage>().GetConnection();
[Fact]
public async void StoreReceivedMessageAsync_TestAsync()
public void StoreReceivedMessageAsync_TestAsync()
{
var id = await _connection.StoreReceivedMessageAsync(new CapReceivedMessage(new MessageContext
var messageContext = new MessageContext
{
Group = "test",
Name = "test",
Content = "test-content"
}));
};
id.Should().BeGreaterThan(0);
_connection.StoreReceivedMessage(new CapReceivedMessage(messageContext)
{
Id = SnowflakeId.Default().NextId()
});
}
[Fact]
......@@ -45,14 +48,17 @@ namespace DotNetCore.CAP.MongoDB.Test
msgs.Should().BeEmpty();
var id = SnowflakeId.Default().NextId();
var msg = new CapReceivedMessage
{
Id = id,
Group = "test",
Name = "test",
Content = "test-content",
StatusName = StatusName.Failed
};
var id = await _connection.StoreReceivedMessageAsync(msg);
_connection.StoreReceivedMessage(msg);
var collection = Database.GetCollection<CapReceivedMessage>(MongoDBOptions.ReceivedCollection);
......
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Bson;
using MongoDB.Driver;
using Xunit;
......@@ -12,18 +10,12 @@ namespace DotNetCore.CAP.MongoDB.Test
[Fact]
public void InitializeAsync_Test()
{
var storage = Provider.GetService<MongoDBStorage>();
var names = MongoClient.ListDatabaseNames()?.ToList();
names.Should().Contain(MongoDBOptions.DatabaseName);
var collections = Database.ListCollectionNames()?.ToList();
collections.Should().Contain(MongoDBOptions.PublishedCollection);
collections.Should().Contain(MongoDBOptions.ReceivedCollection);
collections.Should().Contain(MongoDBOptions.CounterCollection);
var collection = Database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection);
collection.CountDocuments(new BsonDocument { { "_id", MongoDBOptions.PublishedCollection } }).Should().Be(1);
collection.CountDocuments(new BsonDocument { { "_id", MongoDBOptions.ReceivedCollection } }).Should().Be(1);
}
}
}
\ No newline at end of file
......@@ -22,17 +22,18 @@ namespace DotNetCore.CAP.MySql.Test
[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = "INSERT INTO `cap.published`(`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
var sql = "INSERT INTO `cap.published`(`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage
{
Id = insertedId,
Name = "MySqlStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
await connection.ExecuteAsync(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
......@@ -41,10 +42,11 @@ namespace DotNetCore.CAP.MySql.Test
}
[Fact]
public async Task StoreReceivedMessageAsync_Test()
public void StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Id = SnowflakeId.Default().NextId(),
Name = "MySqlStorageConnectionTest",
Content = "",
Group = "mygroup",
......@@ -54,7 +56,7 @@ namespace DotNetCore.CAP.MySql.Test
Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
_storage.StoreReceivedMessage(receivedMessage);
}
catch (Exception ex)
{
......@@ -67,23 +69,23 @@ namespace DotNetCore.CAP.MySql.Test
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO `cap.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
INSERT INTO `cap.received`(`Id`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage
{
Id = insertedId,
Name = "MySqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
await connection.ExecuteAsync(sql, receivedMessage);
}
var message = await _storage.GetReceivedMessageAsync(insertedId);
Assert.NotNull(message);
Assert.Equal(StatusName.Scheduled, message.StatusName);
Assert.Equal("MySqlStorageConnectionTest", message.Name);
......
......@@ -22,17 +22,18 @@ namespace DotNetCore.CAP.PostgreSql.Test
[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = @"INSERT INTO ""cap"".""published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage
{
Id = insertedId,
Name = "PostgreSqlStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
await connection.ExecuteAsync(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
......@@ -41,10 +42,11 @@ namespace DotNetCore.CAP.PostgreSql.Test
}
[Fact]
public async Task StoreReceivedMessageAsync_Test()
public void StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Id = SnowflakeId.Default().NextId(),
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
......@@ -54,7 +56,7 @@ namespace DotNetCore.CAP.PostgreSql.Test
Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
_storage.StoreReceivedMessage(receivedMessage);
}
catch (Exception ex)
{
......@@ -67,19 +69,21 @@ namespace DotNetCore.CAP.PostgreSql.Test
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO ""cap"".""received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
INSERT INTO ""cap"".""received""(""Id"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage
{
Id= insertedId,
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
await connection.ExecuteAsync(sql, receivedMessage);
}
var message = await _storage.GetReceivedMessageAsync(insertedId);
......
......@@ -6,13 +6,11 @@ namespace DotNetCore.CAP.PostgreSql.Test
[Collection("postgresql")]
public class SqlServerStorageTest : DatabaseTestHost
{
private readonly string _dbName;
private readonly string _masterDbConnectionString;
private readonly string _dbConnectionString;
public SqlServerStorageTest()
{
_dbName = ConnectionUtil.GetDatabaseName();
_masterDbConnectionString = ConnectionUtil.GetMasterConnectionString();
_dbConnectionString = ConnectionUtil.GetConnectionString();
}
......
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using Dapper;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging;
using Moq;
namespace DotNetCore.CAP.SqlServer.Test
{
public abstract class DatabaseTestHost : TestHost
public abstract class DatabaseTestHost : IDisposable
{
private static bool _sqlObjectInstalled;
public static object _lock = new object();
protected ILogger<SqlServerStorage> Logger;
protected CapOptions CapOptions;
protected SqlServerOptions SqlSeverOptions;
protected DiagnosticProcessorObserver DiagnosticProcessorObserver;
protected override void PostBuildServices()
public bool SqlObjectInstalled;
protected DatabaseTestHost()
{
base.PostBuildServices();
lock (_lock)
Logger = new Mock<ILogger<SqlServerStorage>>().Object;
CapOptions = new Mock<CapOptions>().Object;
SqlSeverOptions = new SqlServerOptions()
{
if (!_sqlObjectInstalled)
{
InitializeDatabase();
}
}
ConnectionString = ConnectionUtil.GetConnectionString()
};
DiagnosticProcessorObserver = new DiagnosticProcessorObserver(new Mock<IDispatcher>().Object);
InitializeDatabase();
}
public override void Dispose()
public void Dispose()
{
DeleteAllData();
base.Dispose();
}
private void InitializeDatabase()
{
using (CreateScope())
{
var storage = GetService<SqlServerStorage>();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).GetAwaiter().GetResult();
_sqlObjectInstalled = true;
}
}
private void CreateDatabase()
{
var masterConn = ConnectionUtil.GetMasterConnectionString();
var databaseName = ConnectionUtil.GetDatabaseName();
......@@ -50,8 +46,12 @@ namespace DotNetCore.CAP.SqlServer.Test
IF NOT EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}')
CREATE DATABASE [{databaseName}];");
}
new SqlServerStorage(Logger, CapOptions, SqlSeverOptions, DiagnosticProcessorObserver).InitializeAsync().GetAwaiter().GetResult();
SqlObjectInstalled = true;
}
private void DeleteAllData()
{
var conn = ConnectionUtil.GetConnectionString();
......
......@@ -10,30 +10,31 @@ namespace DotNetCore.CAP.SqlServer.Test
[Collection("sqlserver")]
public class SqlServerStorageConnectionTest : DatabaseTestHost
{
private SqlServerStorageConnection _storage;
private readonly SqlServerStorageConnection _storage;
public SqlServerStorageConnectionTest()
{
var options = GetService<SqlServerOptions>();
var capOptions = GetService<CapOptions>();
_storage = new SqlServerStorageConnection(options, capOptions);
_storage = new SqlServerStorageConnection(SqlSeverOptions, CapOptions);
}
[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = "INSERT INTO [Cap].[Published]([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) OUTPUT INSERTED.Id VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = "INSERT INTO [Cap].[Published]([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage
{
Id= insertedId,
Name = "SqlServerStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
await connection.ExecuteAsync(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
Assert.Equal("SqlServerStorageConnectionTest", message.Name);
......@@ -41,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer.Test
}
[Fact]
public async Task StoreReceivedMessageAsync_Test()
public void StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
......@@ -54,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer.Test
Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
_storage.StoreReceivedMessage(receivedMessage);
}
catch (Exception ex)
{
......@@ -66,20 +67,20 @@ namespace DotNetCore.CAP.SqlServer.Test
[Fact]
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO [Cap].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) OUTPUT INSERTED.Id
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = @"INSERT INTO [Cap].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage
{
Id= insertedId,
Name = "SqlServerStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
await connection.ExecuteAsync(sql, receivedMessage);
}
var message = await _storage.GetReceivedMessageAsync(insertedId);
......
using System;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.SqlServer.Test
{
public abstract class TestHost : IDisposable
{
protected IServiceCollection _services;
protected string _connectionString;
private IServiceProvider _provider;
private IServiceProvider _scopedProvider;
public TestHost()
{
CreateServiceCollection();
PreBuildServices();
BuildServices();
PostBuildServices();
}
protected IServiceProvider Provider => _scopedProvider ?? _provider;
private void CreateServiceCollection()
{
var services = new ServiceCollection();
services.AddOptions();
services.AddLogging();
_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new SqlServerOptions { ConnectionString = _connectionString });
services.AddSingleton(new CapOptions());
services.AddSingleton<SqlServerStorage>();
_services = services;
}
protected virtual void PreBuildServices()
{
}
private void BuildServices()
{
_provider = _services.BuildServiceProvider();
}
protected virtual void PostBuildServices()
{
}
public IDisposable CreateScope()
{
var scope = CreateScope(_provider);
var loc = scope.ServiceProvider;
_scopedProvider = loc;
return new DelegateDisposable(() =>
{
if (_scopedProvider == loc)
{
_scopedProvider = null;
}
scope.Dispose();
});
}
public IServiceScope CreateScope(IServiceProvider provider)
{
var scope = provider.GetService<IServiceScopeFactory>().CreateScope();
return scope;
}
public T GetService<T>() => Provider.GetService<T>();
public T Ensure<T>(ref T service)
where T : class
=> service ?? (service = GetService<T>());
public virtual void Dispose()
{
(_provider as IDisposable)?.Dispose();
}
private class DelegateDisposable : IDisposable
{
private Action _dispose;
public DelegateDisposable(Action dispose)
{
_dispose = dispose;
}
public void Dispose()
{
_dispose();
}
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
......@@ -119,62 +119,15 @@ namespace DotNetCore.CAP.Test
private class MyProducerService : ICapPublisher
{
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
throw new NotImplementedException();
}
public ICapTransaction Transaction { get; }
public void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null,
CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
}
public void Publish<T>(string name, T contentObj, object mongoSession, string callbackName = null)
{
throw new NotImplementedException();
}
public Task PublishAsync(string topic, string content)
{
throw new NotImplementedException();
}
public Task PublishAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException();
}
public Task PublishAsync(string topic, string content, IDbConnection dbConnection)
{
throw new NotImplementedException();
}
public Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
throw new NotImplementedException();
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
{
throw new NotImplementedException();
}
public Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
{
throw new NotImplementedException();
}
public void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException();
}
public Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
throw new NotImplementedException();
}
......
......@@ -16,6 +16,7 @@ namespace DotNetCore.CAP.Test
var fixture = Create();
var message = new CapPublishedMessage
{
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled
};
var state = Mock.Of<IState>(s => s.Name == "s" && s.ExpiresAfter == null);
......@@ -39,6 +40,7 @@ namespace DotNetCore.CAP.Test
var fixture = Create();
var message = new CapPublishedMessage
{
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled
};
var state = Mock.Of<IState>(s => s.Name == "s" && s.ExpiresAfter == TimeSpan.FromHours(1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment