Commit 77282fe1 authored by Savorboard's avatar Savorboard Committed by GitHub

Remove database queue mode (#102)

* Improve the implementation mechanism of queue mode. #96

* refactor the code .

* add Copyright & License description into header text of code files.

* refactor

* set version to 2.2.0

* update unit tests.

* add exception class to process the publish send and subscriber exectution exception

* modify first retry time to three.

* code refactor.

* add retry mechanism

* code refactor

* refactor consumer execution

* fix spell error

* remove dashboard  `processing ` content.

* Modify the retry to retry the message only 4 minutes ago.

* update samples

* update ci configuration.
parent 9829f2a8
......@@ -54,15 +54,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql", "src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj", "{82C403AB-ED68-4084-9A1D-11334F9F08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.PostgreSql", "samples\Sample.RabbitMQ.PostgreSql\Sample.RabbitMQ.PostgreSql.csproj", "{A17E8E72-DFFC-4822-BB38-73D59A8B264E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.Test", "test\DotNetCore.CAP.PostgreSql.Test\DotNetCore.CAP.PostgreSql.Test.csproj", "{7CA3625D-1817-4695-881D-7E79A1E1DED2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{573B4D39-5489-48B3-9B6C-5234249CB980}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.SqlServer\Sample.Kafka.MySql.csproj", "{573B4D39-5489-48B3-9B6C-5234249CB980}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -105,18 +101,10 @@ Global
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.Build.0 = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.ActiveCfg = Release|Any CPU
......@@ -139,9 +127,7 @@ Global
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{A17E8E72-DFFC-4822-BB38-73D59A8B264E} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{573B4D39-5489-48B3-9B6C-5234249CB980} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
......
#
# Enable MSDTC
#
Write-Host "Enabling MSDTC..." -ForegroundColor Yellow
$DTCSecurity = "Incoming"
$RegPath = "HKLM:\SOFTWARE\Microsoft\MSDTC\"
#Set Security and MSDTC path
$RegSecurityPath = "$RegPath\Security"
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccess" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessClients" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessTransactions" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessInbound" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessOutbound" value 1
Set-ItemProperty path $RegSecurityPath name "LuTransactions" value 1
if ($DTCSecurity eq "None")
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 1
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 0
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 0
}
elseif ($DTCSecurity eq "Incoming")
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 0
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 0
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 1
}
else
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 0
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 1
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 0
}
Restart-Service MSDTC
Write-Host "MSDTC has been configured" foregroundcolor green
......@@ -11,7 +11,6 @@ services:
- mysql
- postgresql
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1
test: off
artifacts:
......
<Project>
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>4</VersionPatch>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka.SqlServer
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Sample.Kafka.SqlServer;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
}
}
}
using System;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Newtonsoft.Json;
namespace Sample.RabbitMQ.SqlServer
{
public class MessageContent : CapMessage
{
[JsonProperty("id")]
public override string Id { get; set; }
[JsonProperty("createdTime")]
public override DateTime Timestamp { get; set; }
[JsonProperty("msgBody")]
public override string Content { get; set; }
[JsonProperty("callbackTopicName")]
public override string CallbackName { get; set; }
}
public class MyMessagePacker : IMessagePacker
{
private readonly IContentSerializer _serializer;
public MyMessagePacker(IContentSerializer serializer)
{
_serializer = serializer;
}
public string Pack(CapMessage obj)
{
var content = new MessageContent
{
Id = obj.Id,
Content = obj.Content,
CallbackName = obj.CallbackName,
Timestamp = obj.Timestamp
};
return _serializer.Serialize(content);
}
public CapMessage UnPack(string packingMessage)
{
return _serializer.DeSerialize<MessageContent>(packingMessage);
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
using MySql.Data.MySqlClient;
namespace Sample.Kafka.SqlServer.Controllers
namespace Sample.Kafka.MySql.Controllers
{
public class Person
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("uname")]
public string Name { get; set; }
public HAHA Haha { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Id:" + Id + "Haha:" + Haha?.ToString();
}
}
public class HAHA
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("uname")]
public string Name { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Id:" + Id;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
public ValuesController(ICapPublisher producer)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
public async Task<IActionResult> PublishMessage()
{
var p = new Person
using (var connection = new MySqlConnection("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"))
{
Id = Guid.NewGuid().ToString(),
Name = "杨晓东",
Haha = new HAHA
{
Id = Guid.NewGuid().ToString(),
Name = "1-1杨晓东",
}
};
_capBus.Publish("wl.yxd.test", p, "wl.yxd.test.callback");
//_capBus.Publish("wl.cj.test", p);
return Ok();
}
[CapSubscribe("wl.yxd.test.callback")]
public void KafkaTestCallback(Person p)
{
Console.WriteLine("回调内容:" + p);
}
connection.Open();
var transaction = connection.BeginTransaction();
//your business code here
[CapSubscribe("wl.cj.test")]
public string KafkaTestReceived(Person person)
{
Console.WriteLine(person);
Debug.WriteLine(person);
return "this is callback message";
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
await _capBus.PublishAsync("xxx.xxx.test2", 123456, transaction);
trans.Commit();
transaction.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333", Group = "Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
return Ok("publish successful!");
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
[CapSubscribe("xxx.xxx.test2")]
public void Test2(int value)
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
Console.WriteLine(value);
}
}
}
\ No newline at end of file
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.Kafka.SqlServer
namespace Sample.Kafka.MySql
{
public class Program
{
......
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:57172/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Sample.Kafka.SqlServer": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:57174/"
}
}
}
\ No newline at end of file
......@@ -2,20 +2,21 @@
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
<AssemblyName>Sample.Kafka.MySql</AssemblyName>
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="MySqlConnector" Version="0.37.1" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer;
namespace Sample.Kafka.SqlServer
namespace Sample.Kafka.MySql
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("192.168.2.215:9092");
x.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
x.UseKafka("192.168.10.110:9092");
x.UseDashboard();
//x.UseDiscovery(d =>
//{
// d.DiscoveryServerHostName = "localhost";
......@@ -26,11 +21,12 @@ namespace Sample.Kafka.SqlServer
// d.CurrentNodePort = 5820;
// d.NodeName = "CAP 2号节点";
//});
}).AddMessagePacker<MyMessagePacker>();
});
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
public void Configure(IApplicationBuilder app)
{
app.UseMvc();
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
......
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:57171/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Sample.RabbitMQ.MySql": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:57173/"
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.PostgreSql
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseNpgsql("Server=localhost;Database=Sample.RabbitMQ.PostgreSql;UserId=postgres;Password=123123;");
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.PostgreSql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller
{
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;
public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
{
_dbContext = dbContext;
_capBus = capPublisher;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
}
[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
{
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Sample.RabbitMQ.PostgreSql
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
namespace Sample.RabbitMQ.PostgreSql
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeName = "CAP 一号节点";
});
});
services.AddMvc();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseMvc();
app.UseCap();
}
}
}
using Microsoft.EntityFrameworkCore;
using Sample.RabbitMQ.SqlServer.Controllers;
namespace Sample.RabbitMQ.SqlServer
{
public class AppDbContext : DbContext
{
public DbSet<Person> Persons { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.SqlServer.Controllers
{
public class Person
{
public int Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.sqlserver.order.check", DateTime.Now);
//var person = new Person
//{
// Name = "杨晓东",
// Age = 11,
// Id = 23
//};
//_capBus.Publish("sample.rabbitmq.mysql33333", person);
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333",Group ="Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Sample.RabbitMQ.SqlServer;
using System;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20170824130007_AddPersons")]
partial class AddPersons
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.0.0-rtm-26452")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.RabbitMQ.SqlServer.Controllers.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<int>("Age");
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using System;
using System.Collections.Generic;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
public partial class AddPersons : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "Persons",
columns: table => new
{
Id = table.Column<int>(type: "int", nullable: false)
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn),
Age = table.Column<int>(type: "int", nullable: false),
Name = table.Column<string>(type: "nvarchar(max)", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_Persons", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "Persons");
}
}
}
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Sample.RabbitMQ.SqlServer;
using System;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
partial class AppDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.0.0-rtm-26452")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.RabbitMQ.SqlServer.Controllers.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<int>("Age");
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.RabbitMQ.SqlServer
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseUrls("http://*:5800")
.UseStartup<Startup>()
.Build();
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services
{
public interface ICmsService
{
void Add();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Sample.RabbitMQ.SqlServer.Services
{
public interface IOrderService
{
void Check();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services.Impl
{
public class CmsService : ICmsService, ICapSubscribe
{
public void Add()
{
throw new NotImplementedException();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services.Impl
{
public class OrderService : IOrderService, ICapSubscribe
{
[CapSubscribe("sample.rabbitmq.sqlserver.order.check")]
public void Check()
{
Console.WriteLine("out");
}
}
}
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.Services;
using Sample.RabbitMQ.SqlServer.Services.Impl;
namespace Sample.RabbitMQ.SqlServer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddScoped<IOrderService, OrderService>();
services.AddTransient<ICmsService, CmsService>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "192.168.1.11";
d.CurrentNodePort = 5800;
d.NodeName = "CAP Node Windows";
d.NodeId = 1;
});
});
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();
app.UseMvc();
app.UseCap();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Kafka;
using Microsoft.Extensions.DependencyInjection;
......@@ -23,8 +26,8 @@ namespace DotNetCore.CAP
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, KafkaPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, KafkaPublishMessageSender>();
services.AddSingleton<ConnectionPool>();
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
......@@ -45,16 +48,19 @@ namespace DotNetCore.CAP
if (_kafkaConfig == null)
{
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
MainConfig["bootstrap.servers"] = Servers;
MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";
MainConfig["log.connection.close"] = "false";
_kafkaConfig = MainConfig.AsEnumerable();
}
return _kafkaConfig;
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -24,7 +27,10 @@ namespace Microsoft.Extensions.DependencyInjection
/// <returns></returns>
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new KafkaCapOptionsExtension(configure));
......
using DotNetCore.CAP.Abstractions;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
......@@ -35,7 +38,9 @@ namespace DotNetCore.CAP.Kafka
_maxSize = 0;
while (_pool.TryDequeue(out var context))
{
context.Dispose();
}
}
private static Func<Producer> CreateActivator(KafkaOptions options)
......

// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Confluent.Kafka;
namespace DotNetCore.CAP.Kafka
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Kafka
{
internal class PublishQueueExecutor : BasePublishQueueExecutor
internal class KafkaPublishMessageSender : BasePublishMessageSender
{
private readonly ConnectionPool _connectionPool;
private readonly ILogger _logger;
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ConnectionPool connectionPool,
ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger)
public KafkaPublishMessageSender(
CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
ConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionPool = connectionPool;
......@@ -25,37 +28,37 @@ namespace DotNetCore.CAP.Kafka
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{
var producer = _connectionPool.Rent();
try
{
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = await producer.ProduceAsync(keyName, null, contentBytes);
if (!message.Error.HasError)
if (message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success;
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
}
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.LogError(ex,
$"An error occurred during sending the topic message to kafka. Topic:[{keyName}], Exception: {ex.Message}");
return OperateResult.Failed(ex);
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
return OperateResult.Failed(wapperEx);
}
finally
{
var returned = _connectionPool.Return(producer);
if (!returned)
{
producer.Dispose();
}
}
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -29,10 +32,14 @@ namespace DotNetCore.CAP.Kafka
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
if (_consumerClient == null)
{
InitKafkaClient();
}
_consumerClient.Subscribe(topics);
}
......@@ -44,6 +51,7 @@ namespace DotNetCore.CAP.Kafka
cancellationToken.ThrowIfCancellationRequested();
_consumerClient.Poll(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
......
namespace DotNetCore.CAP.Kafka
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
......@@ -41,7 +44,7 @@ namespace DotNetCore.CAP
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(mysqlOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions;
}
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseMySql(this CapOptions options, Action<MySqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new MySqlCapOptionsExtension(configure));
......@@ -31,7 +37,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new MySqlCapOptionsExtension(x =>
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -14,27 +17,31 @@ namespace DotNetCore.CAP.MySql
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher, IServiceProvider provider,
MySqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType == null) return;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -51,23 +58,20 @@ namespace DotNetCore.CAP.MySql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override async Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return await dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -75,7 +79,7 @@ namespace DotNetCore.CAP.MySql
private string PrepareSql()
{
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID()";
}
#endregion private methods
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.MySql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
......
using Dapper;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly MySqlOptions _options;
private readonly string _processId;
public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;
_processId = processId;
_options = options;
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Requeue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Dispose()
{
// ignored
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -28,9 +31,7 @@ set transaction isolation level read committed;
select count(Id) from `{0}.published` where StatusName = N'Succeeded';
select count(Id) from `{0}.received` where StatusName = N'Succeeded';
select count(Id) from `{0}.published` where StatusName = N'Failed';
select count(Id) from `{0}.received` where StatusName = N'Failed';
select count(Id) from `{0}.published` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');", _prefix);
select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
var statistics = UseConnection(connection =>
{
......@@ -42,10 +43,8 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -70,17 +69,24 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and StatusName in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and StatusName=@StatusName";
{
where += " and StatusName=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and Content like '%@Content%'";
}
var sqlQuery =
$"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
......@@ -101,11 +107,6 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
......@@ -116,11 +117,6 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
......@@ -128,9 +124,7 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from `{_prefix}.{tableName}` where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -179,7 +173,12 @@ select aggr.* from (
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
......@@ -11,10 +14,10 @@ namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly CapOptions _capOptions;
public MySqlStorage(ILogger<MySqlStorage> logger,
MySqlOptions options,
......@@ -37,7 +40,11 @@ namespace DotNetCore.CAP.MySql
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.TableNamePrefix);
using (var connection = new MySqlConnection(_options.ConnectionString))
{
......@@ -51,11 +58,7 @@ namespace DotNetCore.CAP.MySql
{
var batchSql =
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL,
`MessageType` tinyint(4) NOT NULL,
`ProcessId` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `{prefix}.queue`;
CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Id` int(127) NOT NULL AUTO_INCREMENT,
......@@ -102,7 +105,9 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
var connection = _existingConnection ?? new MySqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -115,7 +120,9 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
......@@ -13,8 +16,6 @@ namespace DotNetCore.CAP.MySql
private readonly CapOptions _capOptions;
private readonly string _prefix;
private const string DateTimeMaxValue = "9999-12-31 23:59:59";
public MySqlStorageConnection(MySqlOptions options, CapOptions capOptions)
{
_capOptions = capOptions;
......@@ -39,51 +40,32 @@ namespace DotNetCore.CAP.MySql
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var processId = ObjectId.GenerateNewStringId();
var sql = $@"
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";
return FetchNextMessageCoreAsync(sql, processId);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $@"
UPDATE `{_prefix}.published` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Open();
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -96,23 +78,11 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var sql = $@"
UPDATE `{_prefix}.received` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Open();
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -141,20 +111,6 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, string processId)
{
FetchedMessage fetchedMessage;
using (var connection = new MySqlConnection(Options.ConnectionString))
{
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, new { ProcessId = processId });
}
if (fetchedMessage == null)
return null;
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
}
public void Dispose()
{
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -11,7 +14,7 @@ namespace DotNetCore.CAP.MySql
{
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
//private readonly IDbTransaction _dbTransaction;
private readonly string _prefix;
public MySqlStorageTransaction(MySqlStorageConnection connection)
......@@ -20,55 +23,45 @@ namespace DotNetCore.CAP.MySql
_prefix = options.TableNamePrefix;
_dbConnection = new MySqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
// _dbConnection.Open(); for performance
// _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
_dbConnection.Execute(sql, message);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
_dbConnection.Close();
_dbConnection.Dispose();
//_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
//_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
......@@ -30,7 +36,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new PostgreSqlCapOptionsExtension(x =>
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
......@@ -40,7 +43,7 @@ namespace DotNetCore.CAP
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
}
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -14,16 +17,14 @@ namespace DotNetCore.CAP.PostgreSql
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
PostgreSqlOptions options)
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, PostgreSqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType != null)
{
......@@ -31,12 +32,14 @@ namespace DotNetCore.CAP.PostgreSql
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -53,23 +56,20 @@ namespace DotNetCore.CAP.PostgreSql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
}
#endregion private methods
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public PostgreSqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -27,9 +30,7 @@ namespace DotNetCore.CAP.PostgreSql
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');",
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';",
_options.Schema);
var statistics = UseConnection(connection =>
......@@ -42,10 +43,8 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -57,17 +56,24 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
{
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Lower(\"Name\") = Lower(@Name)";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Lower(\"Group\") = Lower(@Group)";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and \"Content\" ILike '%@Content%'";
}
var sqlQuery =
$"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
......@@ -88,11 +94,6 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
......@@ -103,11 +104,6 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
......@@ -129,11 +125,10 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
}
......@@ -175,12 +170,17 @@ with aggr as (
)
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
var valuesMap = connection.Query(sqlQuery,new { keys = keyMaps.Keys.ToList(), statusName })
.ToList()
.ToDictionary(x => (string)x.Key, x => (int)x.Count);
var valuesMap = connection.Query(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName})
.ToList()
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
......@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly PostgreSqlOptions _options;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger,
......@@ -37,7 +40,10 @@ namespace DotNetCore.CAP.PostgreSql
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.Schema);
......@@ -45,6 +51,7 @@ namespace DotNetCore.CAP.PostgreSql
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
......@@ -68,7 +75,9 @@ namespace DotNetCore.CAP.PostgreSql
var connection = _existingConnection ?? new NpgsqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -81,7 +90,9 @@ namespace DotNetCore.CAP.PostgreSql
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
protected virtual string CreateDbTablesScript(string schema)
......@@ -89,10 +100,7 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""(
""MessageId"" int NOT NULL ,
""MessageType"" int NOT NULL
);
DROP TABLE IF EXISTS ""{schema}"".""queue"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
......@@ -36,44 +38,31 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"DELETE FROM ""{Options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{Options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -86,20 +75,11 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -131,35 +111,5 @@ namespace DotNetCore.CAP.PostgreSql
return connection.Execute(sql) > 0;
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (NpgsqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -26,7 +29,10 @@ namespace DotNetCore.CAP.PostgreSql
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{
......@@ -37,7 +43,10 @@ namespace DotNetCore.CAP.PostgreSql
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{
......@@ -46,9 +55,24 @@ namespace DotNetCore.CAP.PostgreSql
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
......@@ -57,23 +81,14 @@ namespace DotNetCore.CAP.PostgreSql
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -13,7 +16,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseRabbitMQ(this CapOptions options, Action<RabbitMQOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new RabbitMQCapOptionsExtension(configure));
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
......@@ -24,8 +27,8 @@ namespace DotNetCore.CAP
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, RabbitMQPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, RabbitMQPublishMessageSender>();
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Abstractions;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
......@@ -40,7 +43,10 @@ namespace DotNetCore.CAP.RabbitMQ
public IConnection GetConnection()
{
if (_connection != null && _connection.IsOpen)
{
return _connection;
}
_connection = _connectionActivator();
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
return _connection;
......@@ -51,7 +57,9 @@ namespace DotNetCore.CAP.RabbitMQ
_maxSize = 0;
while (_pool.TryDequeue(out var context))
{
context.Dispose();
}
}
private static Func<IConnection> CreateConnection(RabbitMQOptions options)
......
using RabbitMQ.Client;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
internal sealed class RabbitMQPublishMessageSender : BasePublishMessageSender
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(ILogger<PublishQueueExecutor> logger, CapOptions options,
RabbitMQOptions rabbitMQOptions, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(options, stateChanger, logger)
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options, RabbitMQOptions rabbitMQOptions,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
......@@ -30,10 +34,7 @@ namespace DotNetCore.CAP.RabbitMQ
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName,
keyName,
null,
body);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName, keyName, null, body);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
......@@ -41,21 +42,22 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception ex)
{
_logger.LogError(
$"RabbitMQ topic message [{keyName}] has been raised an exception of sending. the exception is: {ex.Message}");
return Task.FromResult(OperateResult.Failed(ex,
new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};
return Task.FromResult(OperateResult.Failed(wapperEx, errors));
}
finally
{
var returned = _connectionChannelPool.Return(channel);
if (!returned)
{
channel.Dispose();
}
}
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -13,9 +16,9 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _exchageName;
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IModel _channel;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
public RabbitMQConsumerClient(string queueName,
......@@ -36,10 +39,15 @@ namespace DotNetCore.CAP.RabbitMQ
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
......@@ -58,6 +66,7 @@ namespace DotNetCore.CAP.RabbitMQ
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
......@@ -88,8 +97,9 @@ namespace DotNetCore.CAP.RabbitMQ
RabbitMQOptions.ExchangeType,
true);
var arguments = new Dictionary<string, object> {
{ "x-message-ttl", _rabbitMQOptions.QueueMessageExpires }
var arguments = new Dictionary<string, object>
{
{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
};
_channel.QueueDeclare(_queueName, true, false, false, arguments);
}
......@@ -100,7 +110,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerCancelled,
LogType = MqLogType.ConsumerCancelled,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
......
namespace DotNetCore.CAP.RabbitMQ
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
......@@ -30,7 +36,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(x =>
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using Microsoft.EntityFrameworkCore;
......@@ -41,7 +44,7 @@ namespace DotNetCore.CAP
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -14,28 +17,31 @@ namespace DotNetCore.CAP.SqlServer
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, SqlServerOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_logger = logger;
_options = options;
if (_options.DbContextType == null) return;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -52,23 +58,20 @@ namespace DotNetCore.CAP.SqlServer
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -76,7 +79,7 @@ namespace DotNetCore.CAP.SqlServer
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
}
#endregion private methods
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public SqlServerFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -28,9 +31,7 @@ set transaction isolation level read committed;
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Published with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');",
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';",
_options.Schema);
var statistics = UseConnection(connection =>
......@@ -43,10 +44,8 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -71,17 +70,24 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and statusname=@StatusName";
{
where += " and statusname=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and content like '%@Content%'";
}
var sqlQuery =
$"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
......@@ -102,11 +108,6 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Succeeded));
......@@ -117,11 +118,6 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Succeeded));
......@@ -129,9 +125,8 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -182,7 +177,12 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
......@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly SqlServerOptions _options;
public SqlServerStorage(ILogger<SqlServerStorage> logger,
......@@ -37,7 +40,10 @@ namespace DotNetCore.CAP.SqlServer
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.Schema);
......@@ -45,6 +51,7 @@ namespace DotNetCore.CAP.SqlServer
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
......@@ -57,12 +64,9 @@ BEGIN
EXEC('CREATE SCHEMA [{schema}]')
END;
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NULL
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NOT NULL
BEGIN
CREATE TABLE [{schema}].[Queue](
[MessageId] [int] NOT NULL,
[MessageType] [tinyint] NOT NULL
) ON [PRIMARY]
DROP TABLE [{schema}].[Queue];
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
......@@ -122,7 +126,9 @@ END;";
var connection = _existingConnection ?? new SqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -135,7 +141,9 @@ END;";
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
......@@ -36,49 +38,32 @@ namespace DotNetCore.CAP.SqlServer
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"
DELETE TOP (1)
FROM [{Options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[MessageType];";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
using (var connection = new SqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -91,20 +76,11 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -136,35 +112,5 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
public void Dispose()
{
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new SqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (SqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -26,7 +29,10 @@ namespace DotNetCore.CAP.SqlServer
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
......@@ -35,16 +41,34 @@ namespace DotNetCore.CAP.SqlServer
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
......@@ -53,23 +77,14 @@ namespace DotNetCore.CAP.SqlServer
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
private readonly IDispatcher _dispatcher;
private readonly ILogger _logger;
protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
{
_logger = logger;
_dispatcher = dispatcher;
}
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTransaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
......@@ -21,9 +33,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
PublishWithTrans(name, contentObj, callbackName);
}
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
......@@ -31,9 +41,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content);
return PublishWithTransAsync(name, contentObj, callbackName);
}
public void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
......@@ -41,9 +49,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
PublishWithTrans(name, contentObj, callbackName);
}
public Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
......@@ -51,28 +57,31 @@ namespace DotNetCore.CAP.Abstractions
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, contentObj, callbackName);
}
return PublishWithTransAsync(name, content);
protected void Enqueue(CapPublishedMessage message)
{
_dispatcher.EnqueueToPublish(message);
}
protected abstract void PrepareConnectionForEF();
protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected abstract int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected abstract Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected virtual string Serialize<T>(T obj, string callbackName = null)
{
var packer = (IMessagePacker)ServiceProvider.GetService(typeof(IMessagePacker));
var packer = (IMessagePacker) ServiceProvider.GetService(typeof(IMessagePacker));
string content;
if (obj != null)
{
if (Helper.IsComplexType(obj.GetType()))
{
var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer));
var serializer = (IContentSerializer) ServiceProvider.GetService(typeof(IContentSerializer));
content = serializer.Serialize(obj);
}
else
......@@ -89,7 +98,6 @@ namespace DotNetCore.CAP.Abstractions
{
CallbackName = callbackName
};
return packer.Pack(message);
}
......@@ -108,51 +116,99 @@ namespace DotNetCore.CAP.Abstractions
private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
if (!IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
}
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
if (IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you do not need to use this overloaded.");
}
}
private async Task PublishWithTransAsync(string name, string content)
private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null)
{
var message = new CapPublishedMessage
try
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var content = Serialize(contentObj, callbackName);
await ExecuteAsync(DbConnection, DbTransaction, message);
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var id = await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap();
ClosedCap();
if (id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
message.Id = id;
PublishQueuer.PulseEvent.Set();
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
Console.WriteLine(e);
throw;
}
}
private void PublishWithTrans(string name, string content)
private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null)
{
var message = new CapPublishedMessage
try
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var id = Execute(DbConnection, DbTransaction, message);
ClosedCap();
Execute(DbConnection, DbTransaction, message);
if (id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
ClosedCap();
message.Id = id;
PublishQueuer.PulseEvent.Set();
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
Console.WriteLine(e);
throw;
}
}
private void ClosedCap()
......@@ -162,8 +218,11 @@ namespace DotNetCore.CAP.Abstractions
DbTransaction.Commit();
DbTransaction.Dispose();
}
if (IsCapOpenedConn)
{
DbConnection.Dispose();
}
}
public void Dispose()
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Message content serializer.
/// <para>By default, CAP will use Json as a serializer, and you can customize this interface to achieve serialization of other methods.</para>
/// <para>
/// By default, CAP will use Json as a serializer, and you can customize this interface to achieve serialization of
/// other methods.
/// </para>
/// </summary>
public interface IContentSerializer
{
......@@ -40,7 +46,7 @@ namespace DotNetCore.CAP.Abstractions
/// </summary>
/// <remarks>
/// We use the wrapper to provide some additional information for the message content,which is important for CAP。
/// Typically, we may need to customize the field display name of the message,
/// Typically, we may need to customize the field display name of the message,
/// which includes interacting with other message components, which can be adapted in this manner
/// </remarks>
public interface IMessagePacker
......@@ -52,9 +58,9 @@ namespace DotNetCore.CAP.Abstractions
string Pack(CapMessage obj);
/// <summary>
/// Unpack a message strings to <see cref="CapMessage"/> object.
/// Unpack a message strings to <see cref="CapMessage" /> object.
/// </summary>
/// <param name="packingMessage">The string of packed message.</param>
CapMessage UnPack(string packingMessage);
}
}
}
\ No newline at end of file
using System.Reflection;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Reflection;
using DotNetCore.CAP.Abstractions.ModelBinding;
namespace DotNetCore.CAP.Abstractions
......
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Consumer method executor.
/// </summary>
public interface ISubscriberExecutor
{
/// <summary>
/// Execute the consumer method.
/// </summary>
/// <param name="receivedMessage">The received message.</param>
Task<OperateResult> ExecuteAsync(CapReceivedMessage receivedMessage);
}
}
using System.Threading.Tasks;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace DotNetCore.CAP.Abstractions.ModelBinding
{
......
using DotNetCore.CAP.Internal;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Internal;
namespace DotNetCore.CAP.Abstractions.ModelBinding
{
......@@ -42,7 +45,10 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
public override string ToString()
{
if (IsSuccess)
{
return $"Success '{Model}'";
}
return "Failed";
}
......@@ -50,7 +56,10 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
{
var other = obj as ModelBindingResult?;
if (other == null)
{
return false;
}
return Equals(other.Value);
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
namespace DotNetCore.CAP.Abstractions
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using Microsoft.Extensions.DependencyInjection;
......@@ -19,7 +22,9 @@ namespace Microsoft.AspNetCore.Builder
public static IApplicationBuilder UseCap(this IApplicationBuilder app)
{
if (app == null)
{
throw new ArgumentNullException(nameof(app));
}
CheckRequirement(app);
......@@ -31,7 +36,10 @@ namespace Microsoft.AspNetCore.Builder
if (provider.GetService<DashboardOptions>() != null)
{
if (provider.GetService<DiscoveryOptions>() != null)
{
app.UseMiddleware<GatewayProxyMiddleware>();
}
app.UseMiddleware<DashboardMiddleware>();
}
......@@ -42,18 +50,24 @@ namespace Microsoft.AspNetCore.Builder
{
var marker = app.ApplicationServices.GetService<CapMarkerService>();
if (marker == null)
{
throw new InvalidOperationException(
"AddCap() must be called on the service collection. eg: services.AddCap(...)");
}
var messageQueueMarker = app.ApplicationServices.GetService<CapMessageQueueMakerService>();
if (messageQueueMarker == null)
{
throw new InvalidOperationException(
"You must be config used message queue provider at AddCap() options! eg: services.AddCap(options=>{ options.UseKafka(...) })");
}
var databaseMarker = app.ApplicationServices.GetService<CapDatabaseStorageMarkerService>();
if (databaseMarker == null)
{
throw new InvalidOperationException(
"You must be config used database provider at AddCap() options! eg: services.AddCap(options=>{ options.UseSqlServer(...) })");
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Abstractions;
using Microsoft.Extensions.DependencyInjection;
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Reflection;
using DotNetCore.CAP.Models;
......@@ -10,16 +13,6 @@ namespace DotNetCore.CAP
/// </summary>
public class CapOptions
{
/// <summary>
/// Default value for polling delay timeout, in seconds.
/// </summary>
public const int DefaultPollingDelay = 15;
/// <summary>
/// Default processor count to process messages of cap.queue.
/// </summary>
public const int DefaultQueueProcessorCount = 2;
/// <summary>
/// Default succeeded message expiration time span, in seconds.
/// </summary>
......@@ -28,18 +21,16 @@ namespace DotNetCore.CAP
/// <summary>
/// Failed message retry waiting interval.
/// </summary>
public const int DefaultFailedMessageWaitingInterval = 600;
public const int DefaultFailedMessageWaitingInterval = 60;
/// <summary>
/// Failed message retry count.
/// </summary>
public const int DefaultFailedRetryCount = 100;
public const int DefaultFailedRetryCount = 50;
public CapOptions()
{
PollingDelay = DefaultPollingDelay;
QueueProcessorCount = DefaultQueueProcessorCount;
SucceedMessageExpiredAfter = DefaultSucceedMessageExpirationAfter;
FailedRetryInterval = DefaultFailedMessageWaitingInterval;
FailedRetryCount = DefaultFailedRetryCount;
......@@ -54,18 +45,6 @@ namespace DotNetCore.CAP
/// </summary>
public string DefaultGroup { get; set; }
/// <summary>
/// Producer job polling delay time.
/// Default is 15 sec.
/// </summary>
public int PollingDelay { get; set; }
/// <summary>
/// Gets or sets the messages queue (Cap.Queue table) processor count.
/// Default is 2 processor.
/// </summary>
public int QueueProcessorCount { get; set; }
/// <summary>
/// Sent or received succeed message after time span of due, then the message will be deleted at due time.
/// Default is 24*3600 seconds.
......@@ -74,7 +53,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Failed messages polling delay time.
/// Default is 600 seconds.
/// Default is 60 seconds.
/// </summary>
public int FailedRetryInterval { get; set; }
......@@ -85,7 +64,7 @@ namespace DotNetCore.CAP
/// <summary>
/// The number of message retries, the retry will stop when the threshold is reached.
/// Default is 100 times.
/// Default is 50 times.
/// </summary>
public int FailedRetryCount { get; set; }
......@@ -96,7 +75,9 @@ namespace DotNetCore.CAP
public void RegisterExtension(ICapOptionsExtension extension)
{
if (extension == null)
{
throw new ArgumentNullException(nameof(extension));
}
Extensions.Add(extension);
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
......@@ -25,7 +28,10 @@ namespace Microsoft.Extensions.DependencyInjection
this IServiceCollection services,
Action<CapOptions> setupAction)
{
if (setupAction == null) throw new ArgumentNullException(nameof(setupAction));
if (setupAction == null)
{
throw new ArgumentNullException(nameof(setupAction));
}
services.TryAddSingleton<CapMarkerService>();
services.Configure(setupAction);
......@@ -49,21 +55,21 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSingleton<IStateChanger, StateChanger>();
//Queue's message processor
services.AddTransient<PublishQueuer>();
services.AddTransient<SubscribeQueuer>();
services.AddTransient<FailedProcessor>();
services.AddTransient<IDispatcher, DefaultDispatcher>();
services.AddTransient<NeedRetryMessageProcessor>();
//Executors
services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>();
services.AddSingleton<IQueueExecutor, SubscribeQueueExecutor>();
services.TryAddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
//Sender and Executors
services.AddSingleton<IDispatcher, Dispatcher>();
// Warning: IPublishMessageSender need to inject at extension project.
services.AddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
//Options and extension service
var options = new CapOptions();
setupAction(options);
foreach (var serviceExtension in options.Extensions)
{
serviceExtension.AddServices(services);
}
services.AddSingleton(options);
return new CapBuilder(services);
......@@ -73,13 +79,19 @@ namespace Microsoft.Extensions.DependencyInjection
{
var consumerListenerServices = new List<KeyValuePair<Type, Type>>();
foreach (var rejectedServices in services)
{
if (rejectedServices.ImplementationType != null
&& typeof(ICapSubscribe).IsAssignableFrom(rejectedServices.ImplementationType))
{
consumerListenerServices.Add(new KeyValuePair<Type, Type>(typeof(ICapSubscribe),
rejectedServices.ImplementationType));
}
}
foreach (var service in consumerListenerServices)
{
services.AddTransient(service.Key, service.Value);
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Threading.Tasks;
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
......@@ -27,7 +30,10 @@ namespace DotNetCore.CAP
public Task Invoke(HttpContext context)
{
if (!context.Request.Path.StartsWithSegments(_options.PathMatch,
out var matchedPath, out var remainingPath)) return _next(context);
out var matchedPath, out var remainingPath))
{
return _next(context);
}
// Update the path
var path = context.Request.Path;
......@@ -41,7 +47,9 @@ namespace DotNetCore.CAP
var findResult = _routes.FindDispatcher(context.Request.Path.Value);
if (findResult == null)
{
return _next.Invoke(context);
}
if (_options.Authorization.Any(filter => !filter.Authorize(dashboardContext)))
{
......
using System.Collections.Generic;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Collections.Generic;
using DotNetCore.CAP.Dashboard;
// ReSharper disable once CheckNamespace
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.GatewayProxy;
......@@ -40,7 +43,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseDashboard(this CapOptions capOptions, Action<DashboardOptions> options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
capOptions.RegisterExtension(new DashboardOptionsExtension(options));
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment