Commit 9093a6f9 authored by Savorboard's avatar Savorboard

merged pr#113

parents 0bbfccbd d11f9298
......@@ -54,15 +54,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql", "src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj", "{82C403AB-ED68-4084-9A1D-11334F9F08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.PostgreSql", "samples\Sample.RabbitMQ.PostgreSql\Sample.RabbitMQ.PostgreSql.csproj", "{A17E8E72-DFFC-4822-BB38-73D59A8B264E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.Test", "test\DotNetCore.CAP.PostgreSql.Test\DotNetCore.CAP.PostgreSql.Test.csproj", "{7CA3625D-1817-4695-881D-7E79A1E1DED2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{573B4D39-5489-48B3-9B6C-5234249CB980}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -105,26 +101,18 @@ Global
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.Build.0 = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.Build.0 = Release|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Debug|Any CPU.Build.0 = Debug|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Release|Any CPU.ActiveCfg = Release|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Release|Any CPU.Build.0 = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -139,11 +127,9 @@ Global
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{A17E8E72-DFFC-4822-BB38-73D59A8B264E} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{573B4D39-5489-48B3-9B6C-5234249CB980} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
#
# Enable MSDTC
#
Write-Host "Enabling MSDTC..." -ForegroundColor Yellow
$DTCSecurity = "Incoming"
$RegPath = "HKLM:\SOFTWARE\Microsoft\MSDTC\"
#Set Security and MSDTC path
$RegSecurityPath = "$RegPath\Security"
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccess" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessClients" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessTransactions" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessInbound" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessOutbound" value 1
Set-ItemProperty path $RegSecurityPath name "LuTransactions" value 1
if ($DTCSecurity eq "None")
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 1
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 0
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 0
}
elseif ($DTCSecurity eq "Incoming")
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 0
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 0
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 1
}
else
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 0
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 1
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 0
}
Restart-Service MSDTC
Write-Host "MSDTC has been configured" foregroundcolor green
......@@ -187,7 +187,10 @@ Then inject your `ISubscriberService` class in Startup.cs
```c#
public void ConfigureServices(IServiceCollection services)
{
//Note: The injection of services needs before of `services.AddCap()`
services.AddTransient<ISubscriberService,SubscriberService>();
services.AddCap(x=>{});
}
```
......@@ -218,6 +221,8 @@ services.AddCap(x =>
});
```
The default dashboard address is :[http://localhost:xxx/cap](http://localhost:xxx/cap) , you can also change the `cap` suffix to others with `d.MatchPath` configuration options.
![dashboard](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220827302-189215107.png)
![received](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220934115-1107747665.png)
......
......@@ -187,7 +187,10 @@ namespace xxx.Service
```c#
public void ConfigureServices(IServiceCollection services)
{
//注意: 注入的服务需要在 `services.AddCap()` 之前
services.AddTransient<ISubscriberService,SubscriberService>();
services.AddCap(x=>{});
}
```
......@@ -218,6 +221,8 @@ services.AddCap(x =>
});
```
仪表盘默认的访问地址是:[http://localhost:xxx/cap](http://localhost:xxx/cap),你可以在`d.MatchPath`配置项中修改`cap`路径后缀为其他的名字。
![dashboard](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220827302-189215107.png)
![received](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220934115-1107747665.png)
......
......@@ -11,7 +11,6 @@ services:
- mysql
- postgresql
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1
test: off
artifacts:
......
<Project>
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>4</VersionPatch>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;
namespace Sample.Kafka.MySql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
public ValuesController(ICapPublisher producer)
{
_capBus = producer;
}
[Route("~/publish")]
public async Task<IActionResult> PublishMessage()
{
using (var connection = new MySqlConnection("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"))
{
connection.Open();
var transaction = connection.BeginTransaction();
//your business code here
await _capBus.PublishAsync("xxx.xxx.test2", 123456, transaction);
transaction.Commit();
}
return Ok("publish successful!");
}
[CapSubscribe("xxx.xxx.test2")]
public void Test2(int value)
{
Console.WriteLine(value);
}
}
}
\ No newline at end of file
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.Kafka.SqlServer
namespace Sample.Kafka.MySql
{
public class Program
{
......
......@@ -2,20 +2,22 @@
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
<AssemblyName>Sample.Kafka.MySql</AssemblyName>
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="MySqlConnector" Version="0.38.0" />
<PackageReference Include="zipkin4net" Version="1.2.0" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
namespace Sample.RabbitMQ.PostgreSql
namespace Sample.Kafka.MySql
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
x.UseKafka("192.168.10.110:9092");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeName = "CAP 一号节点";
});
});
services.AddMvc();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
public void Configure(IApplicationBuilder app)
{
app.UseMvc();
app.UseCap();
}
}
}
}
\ No newline at end of file
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka.SqlServer
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Sample.Kafka.SqlServer;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
}
}
}
using System;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Newtonsoft.Json;
namespace Sample.RabbitMQ.SqlServer
{
public class MessageContent : CapMessage
{
[JsonProperty("id")]
public override string Id { get; set; }
[JsonProperty("createdTime")]
public override DateTime Timestamp { get; set; }
[JsonProperty("msgBody")]
public override string Content { get; set; }
[JsonProperty("callbackTopicName")]
public override string CallbackName { get; set; }
}
public class MyMessagePacker : IMessagePacker
{
private readonly IContentSerializer _serializer;
public MyMessagePacker(IContentSerializer serializer)
{
_serializer = serializer;
}
public string Pack(CapMessage obj)
{
var content = new MessageContent
{
Id = obj.Id,
Content = obj.Content,
CallbackName = obj.CallbackName,
Timestamp = obj.Timestamp
};
return _serializer.Serialize(content);
}
public CapMessage UnPack(string packingMessage)
{
return _serializer.DeSerialize<MessageContent>(packingMessage);
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
namespace Sample.Kafka.SqlServer.Controllers
{
public class Person
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("uname")]
public string Name { get; set; }
public HAHA Haha { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Id:" + Id + "Haha:" + Haha?.ToString();
}
}
public class HAHA
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("uname")]
public string Name { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Id:" + Id;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
var p = new Person
{
Id = Guid.NewGuid().ToString(),
Name = "杨晓东",
Haha = new HAHA
{
Id = Guid.NewGuid().ToString(),
Name = "1-1杨晓东",
}
};
_capBus.Publish("wl.yxd.test", p, "wl.yxd.test.callback");
//_capBus.Publish("wl.cj.test", p);
return Ok();
}
[CapSubscribe("wl.yxd.test.callback")]
public void KafkaTestCallback(Person p)
{
Console.WriteLine("回调内容:" + p);
}
[CapSubscribe("wl.cj.test")]
public string KafkaTestReceived(Person person)
{
Console.WriteLine(person);
Debug.WriteLine(person);
return "this is callback message";
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333", Group = "Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer;
namespace Sample.Kafka.SqlServer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("192.168.2.215:9092");
x.UseDashboard();
//x.UseDiscovery(d =>
//{
// d.DiscoveryServerHostName = "localhost";
// d.DiscoveryServerPort = 8500;
// d.CurrentNodeHostName = "localhost";
// d.CurrentNodePort = 5820;
// d.NodeName = "CAP 2号节点";
//});
}).AddMessagePacker<MyMessagePacker>();
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
app.UseMvc();
app.UseCap();
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
......
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:57171/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Sample.RabbitMQ.MySql": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:57173/"
}
}
}
\ No newline at end of file
......@@ -10,7 +10,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" />
</ItemGroup>
<ItemGroup>
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.PostgreSql
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseNpgsql("Server=localhost;Database=Sample.RabbitMQ.PostgreSql;UserId=postgres;Password=123123;");
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.PostgreSql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller
{
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;
public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
{
_dbContext = dbContext;
_capBus = capPublisher;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
}
[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
{
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Sample.RabbitMQ.PostgreSql
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using Microsoft.EntityFrameworkCore;
using Sample.RabbitMQ.SqlServer.Controllers;
namespace Sample.RabbitMQ.SqlServer
{
public class AppDbContext : DbContext
{
public DbSet<Person> Persons { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.SqlServer.Controllers
{
public class Person
{
public int Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.sqlserver.order.check", DateTime.Now);
//var person = new Person
//{
// Name = "杨晓东",
// Age = 11,
// Id = 23
//};
//_capBus.Publish("sample.rabbitmq.mysql33333", person);
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333",Group ="Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Sample.RabbitMQ.SqlServer;
using System;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20170824130007_AddPersons")]
partial class AddPersons
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.0.0-rtm-26452")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.RabbitMQ.SqlServer.Controllers.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<int>("Age");
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using System;
using System.Collections.Generic;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
public partial class AddPersons : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "Persons",
columns: table => new
{
Id = table.Column<int>(type: "int", nullable: false)
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn),
Age = table.Column<int>(type: "int", nullable: false),
Name = table.Column<string>(type: "nvarchar(max)", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_Persons", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "Persons");
}
}
}
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Sample.RabbitMQ.SqlServer;
using System;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
partial class AppDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.0.0-rtm-26452")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.RabbitMQ.SqlServer.Controllers.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<int>("Age");
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.RabbitMQ.SqlServer
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseUrls("http://*:5800")
.UseStartup<Startup>()
.Build();
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services
{
public interface ICmsService
{
void Add();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Sample.RabbitMQ.SqlServer.Services
{
public interface IOrderService
{
void Check();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services.Impl
{
public class CmsService : ICmsService, ICapSubscribe
{
public void Add()
{
throw new NotImplementedException();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services.Impl
{
public class OrderService : IOrderService, ICapSubscribe
{
[CapSubscribe("sample.rabbitmq.sqlserver.order.check")]
public void Check()
{
Console.WriteLine("out");
}
}
}
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.Services;
using Sample.RabbitMQ.SqlServer.Services.Impl;
namespace Sample.RabbitMQ.SqlServer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddScoped<IOrderService, OrderService>();
services.AddTransient<ICmsService, CmsService>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "192.168.1.11";
d.CurrentNodePort = 5800;
d.NodeName = "CAP Node Windows";
d.NodeId = 1;
});
});
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();
app.UseMvc();
app.UseCap();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Kafka;
using Microsoft.Extensions.DependencyInjection;
......@@ -23,9 +26,9 @@ namespace DotNetCore.CAP
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<ConnectionPool>();
services.AddSingleton<IPublishExecutor, KafkaPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, KafkaPublishMessageSender>();
services.AddSingleton<IConnectionPool,ConnectionPool>();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
......@@ -45,16 +48,19 @@ namespace DotNetCore.CAP
if (_kafkaConfig == null)
{
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
MainConfig["bootstrap.servers"] = Servers;
MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";
MainConfig["log.connection.close"] = "false";
_kafkaConfig = MainConfig.AsEnumerable();
}
return _kafkaConfig;
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -24,7 +27,10 @@ namespace Microsoft.Extensions.DependencyInjection
/// <returns></returns>
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new KafkaCapOptionsExtension(configure));
......
using DotNetCore.CAP.Abstractions;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
......@@ -18,8 +21,12 @@ namespace DotNetCore.CAP.Kafka
{
_maxSize = options.ConnectionPoolSize;
_activator = CreateActivator(options);
ServersAddress = options.Servers;
}
public string ServersAddress { get; }
Producer IConnectionPool.Rent()
{
return Rent();
......@@ -35,7 +42,9 @@ namespace DotNetCore.CAP.Kafka
_maxSize = 0;
while (_pool.TryDequeue(out var context))
{
context.Dispose();
}
}
private static Func<Producer> CreateActivator(KafkaOptions options)
......
......@@ -15,7 +15,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.11.3" />
<PackageReference Include="Confluent.Kafka" Version="0.11.4" />
</ItemGroup>
<ItemGroup>
......

// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Confluent.Kafka;
namespace DotNetCore.CAP.Kafka
{
public interface IConnectionPool
{
string ServersAddress { get; }
Producer Rent();
bool Return(Producer context);
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Kafka
{
internal class PublishQueueExecutor : BasePublishQueueExecutor
internal class KafkaPublishMessageSender : BasePublishMessageSender
{
private readonly ConnectionPool _connectionPool;
private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger;
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ConnectionPool connectionPool,
ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger)
public KafkaPublishMessageSender(
CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
IConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionPool = connectionPool;
ServersAddress = _connectionPool.ServersAddress;
}
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{
var producer = _connectionPool.Rent();
try
{
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = await producer.ProduceAsync(keyName, null, contentBytes);
if (!message.Error.HasError)
if (message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success;
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
}
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.LogError(ex,
$"An error occurred during sending the topic message to kafka. Topic:[{keyName}], Exception: {ex.Message}");
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
return OperateResult.Failed(ex);
return OperateResult.Failed(wapperEx);
}
finally
{
var returned = _connectionPool.Return(producer);
if (!returned)
{
producer.Dispose();
}
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -26,13 +29,19 @@ namespace DotNetCore.CAP.Kafka
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _kafkaOptions.Servers;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
if (_consumerClient == null)
{
InitKafkaClient();
}
_consumerClient.Subscribe(topics);
}
......@@ -44,6 +53,7 @@ namespace DotNetCore.CAP.Kafka
cancellationToken.ThrowIfCancellationRequested();
_consumerClient.Poll(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
......
namespace DotNetCore.CAP.Kafka
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseMySql(this CapOptions options, Action<MySqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new MySqlCapOptionsExtension(configure));
......@@ -31,7 +37,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new MySqlCapOptionsExtension(x =>
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -14,27 +17,31 @@ namespace DotNetCore.CAP.MySql
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher, IServiceProvider provider,
MySqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType == null) return;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -51,23 +58,20 @@ namespace DotNetCore.CAP.MySql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override async Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return await dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -75,7 +79,7 @@ namespace DotNetCore.CAP.MySql
private string PrepareSql()
{
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID()";
}
#endregion private methods
......
......@@ -15,9 +15,9 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="MySqlConnector" Version="0.36.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" />
<PackageReference Include="MySqlConnector" Version="0.38.0" />
</ItemGroup>
<ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.MySql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
......
using Dapper;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly MySqlOptions _options;
private readonly string _processId;
public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;
_processId = processId;
_options = options;
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Requeue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Dispose()
{
// ignored
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -28,9 +31,7 @@ set transaction isolation level read committed;
select count(Id) from `{0}.published` where StatusName = N'Succeeded';
select count(Id) from `{0}.received` where StatusName = N'Succeeded';
select count(Id) from `{0}.published` where StatusName = N'Failed';
select count(Id) from `{0}.received` where StatusName = N'Failed';
select count(Id) from `{0}.published` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');", _prefix);
select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
var statistics = UseConnection(connection =>
{
......@@ -42,10 +43,8 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -70,17 +69,24 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and StatusName in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and StatusName=@StatusName";
{
where += " and StatusName=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and Content like '%@Content%'";
}
var sqlQuery =
$"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
......@@ -101,11 +107,6 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
......@@ -116,11 +117,6 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
......@@ -128,9 +124,7 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from `{_prefix}.{tableName}` where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -179,7 +173,12 @@ select aggr.* from (
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
......@@ -11,10 +14,10 @@ namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly CapOptions _capOptions;
public MySqlStorage(ILogger<MySqlStorage> logger,
MySqlOptions options,
......@@ -37,7 +40,11 @@ namespace DotNetCore.CAP.MySql
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.TableNamePrefix);
using (var connection = new MySqlConnection(_options.ConnectionString))
{
......@@ -51,11 +58,7 @@ namespace DotNetCore.CAP.MySql
{
var batchSql =
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL,
`MessageType` tinyint(4) NOT NULL,
`ProcessId` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `{prefix}.queue`;
CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Id` int(127) NOT NULL AUTO_INCREMENT,
......@@ -102,7 +105,9 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
var connection = _existingConnection ?? new MySqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -115,7 +120,9 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
......@@ -13,8 +16,6 @@ namespace DotNetCore.CAP.MySql
private readonly CapOptions _capOptions;
private readonly string _prefix;
private const string DateTimeMaxValue = "9999-12-31 23:59:59";
public MySqlStorageConnection(MySqlOptions options, CapOptions capOptions)
{
_capOptions = capOptions;
......@@ -39,51 +40,32 @@ namespace DotNetCore.CAP.MySql
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var processId = ObjectId.GenerateNewStringId();
var sql = $@"
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";
return FetchNextMessageCoreAsync(sql, processId);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $@"
UPDATE `{_prefix}.published` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Open();
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -96,23 +78,11 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var sql = $@"
UPDATE `{_prefix}.received` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Open();
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -141,20 +111,6 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, string processId)
{
FetchedMessage fetchedMessage;
using (var connection = new MySqlConnection(Options.ConnectionString))
{
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, new { ProcessId = processId });
}
if (fetchedMessage == null)
return null;
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
}
public void Dispose()
{
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -11,7 +14,7 @@ namespace DotNetCore.CAP.MySql
{
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
//private readonly IDbTransaction _dbTransaction;
private readonly string _prefix;
public MySqlStorageTransaction(MySqlStorageConnection connection)
......@@ -20,55 +23,45 @@ namespace DotNetCore.CAP.MySql
_prefix = options.TableNamePrefix;
_dbConnection = new MySqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
// _dbConnection.Open(); for performance
// _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
_dbConnection.Execute(sql, message);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
_dbConnection.Close();
_dbConnection.Dispose();
//_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
//_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
......@@ -30,7 +36,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new PostgreSqlCapOptionsExtension(x =>
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
......@@ -40,7 +43,7 @@ namespace DotNetCore.CAP
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
}
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -14,16 +17,14 @@ namespace DotNetCore.CAP.PostgreSql
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
PostgreSqlOptions options)
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, PostgreSqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType != null)
{
......@@ -31,12 +32,14 @@ namespace DotNetCore.CAP.PostgreSql
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -53,23 +56,20 @@ namespace DotNetCore.CAP.PostgreSql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
}
#endregion private methods
......
......@@ -15,8 +15,8 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" />
<PackageReference Include="Npgsql" Version="3.2.7" />
</ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public PostgreSqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -27,9 +30,7 @@ namespace DotNetCore.CAP.PostgreSql
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');",
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';",
_options.Schema);
var statistics = UseConnection(connection =>
......@@ -42,10 +43,8 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -57,17 +56,24 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
{
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Lower(\"Name\") = Lower(@Name)";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Lower(\"Group\") = Lower(@Group)";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and \"Content\" ILike '%@Content%'";
}
var sqlQuery =
$"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
......@@ -88,11 +94,6 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
......@@ -103,11 +104,6 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
......@@ -129,11 +125,10 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
}
......@@ -175,12 +170,17 @@ with aggr as (
)
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
var valuesMap = connection.Query(sqlQuery,new { keys = keyMaps.Keys.ToList(), statusName })
.ToList()
.ToDictionary(x => (string)x.Key, x => (int)x.Count);
var valuesMap = connection.Query(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName})
.ToList()
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
......@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly PostgreSqlOptions _options;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger,
......@@ -37,7 +40,10 @@ namespace DotNetCore.CAP.PostgreSql
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.Schema);
......@@ -45,6 +51,7 @@ namespace DotNetCore.CAP.PostgreSql
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
......@@ -68,7 +75,9 @@ namespace DotNetCore.CAP.PostgreSql
var connection = _existingConnection ?? new NpgsqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -81,7 +90,9 @@ namespace DotNetCore.CAP.PostgreSql
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
protected virtual string CreateDbTablesScript(string schema)
......@@ -89,10 +100,7 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""(
""MessageId"" int NOT NULL ,
""MessageType"" int NOT NULL
);
DROP TABLE IF EXISTS ""{schema}"".""queue"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
......@@ -36,44 +38,31 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"DELETE FROM ""{Options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{Options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -86,20 +75,11 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -131,35 +111,5 @@ namespace DotNetCore.CAP.PostgreSql
return connection.Execute(sql) > 0;
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (NpgsqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -26,7 +29,10 @@ namespace DotNetCore.CAP.PostgreSql
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{
......@@ -37,7 +43,10 @@ namespace DotNetCore.CAP.PostgreSql
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{
......@@ -46,9 +55,24 @@ namespace DotNetCore.CAP.PostgreSql
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
......@@ -57,23 +81,14 @@ namespace DotNetCore.CAP.PostgreSql
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -13,7 +16,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseRabbitMQ(this CapOptions options, Action<RabbitMQOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new RabbitMQCapOptionsExtension(configure));
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
......@@ -56,7 +58,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Topic exchange name when declare a topic exchange.
/// </summary>
public string TopicExchangeName { get; set; } = DefaultExchangeName;
public string ExchangeName { get; set; } = DefaultExchangeName;
/// <summary>
/// Timeout setting for connection attempts (in milliseconds).
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
......@@ -24,8 +27,8 @@ namespace DotNetCore.CAP
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, RabbitMQPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, RabbitMQPublishMessageSender>();
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Abstractions;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
......@@ -25,6 +28,8 @@ namespace DotNetCore.CAP.RabbitMQ
_maxSize = DefaultPoolSize;
_connectionActivator = CreateConnection(options);
HostAddress = options.HostName + ":" + options.Port;
Exchange = options.ExchangeName;
}
IModel IConnectionChannelPool.Rent()
......@@ -37,10 +42,17 @@ namespace DotNetCore.CAP.RabbitMQ
return Return(connection);
}
public string HostAddress { get; }
public string Exchange { get; }
public IConnection GetConnection()
{
if (_connection != null && _connection.IsOpen)
{
return _connection;
}
_connection = _connectionActivator();
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
return _connection;
......@@ -51,7 +63,9 @@ namespace DotNetCore.CAP.RabbitMQ
_maxSize = 0;
while (_pool.TryDequeue(out var context))
{
context.Dispose();
}
}
private static Func<IConnection> CreateConnection(RabbitMQOptions options)
......
using RabbitMQ.Client;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public interface IConnectionChannelPool
{
string HostAddress { get; }
string Exchange { get; }
IConnection GetConnection();
IModel Rent();
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
internal sealed class RabbitMQPublishMessageSender : BasePublishMessageSender
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly string _exchange;
public PublishQueueExecutor(ILogger<PublishQueueExecutor> logger, CapOptions options,
RabbitMQOptions rabbitMQOptions, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(options, stateChanger, logger)
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = rabbitMQOptions;
_exchange = _connectionChannelPool.Exchange;
ServersAddress = _connectionChannelPool.HostAddress;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
......@@ -28,12 +33,8 @@ namespace DotNetCore.CAP.RabbitMQ
try
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName,
keyName,
null,
body);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, null, body);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
......@@ -41,21 +42,22 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception ex)
{
_logger.LogError(
$"RabbitMQ topic message [{keyName}] has been raised an exception of sending. the exception is: {ex.Message}");
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};
return Task.FromResult(OperateResult.Failed(ex,
new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
return Task.FromResult(OperateResult.Failed(wapperEx, errors));
}
finally
{
var returned = _connectionChannelPool.Return(channel);
if (!returned)
{
channel.Dispose();
}
}
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -13,9 +16,9 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _exchageName;
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IModel _channel;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
public RabbitMQConsumerClient(string queueName,
......@@ -25,7 +28,7 @@ namespace DotNetCore.CAP.RabbitMQ
_queueName = queueName;
_connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;
_exchageName = options.ExchangeName;
InitClient();
}
......@@ -34,12 +37,19 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _rabbitMQOptions.HostName;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
......@@ -58,6 +68,7 @@ namespace DotNetCore.CAP.RabbitMQ
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
......@@ -88,8 +99,9 @@ namespace DotNetCore.CAP.RabbitMQ
RabbitMQOptions.ExchangeType,
true);
var arguments = new Dictionary<string, object> {
{ "x-message-ttl", _rabbitMQOptions.QueueMessageExpires }
var arguments = new Dictionary<string, object>
{
{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
};
_channel.QueueDeclare(_queueName, true, false, false, arguments);
}
......@@ -100,7 +112,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerCancelled,
LogType = MqLogType.ConsumerCancelled,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
......
namespace DotNetCore.CAP.RabbitMQ
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
......@@ -30,7 +36,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(x =>
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using Microsoft.EntityFrameworkCore;
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -14,28 +17,31 @@ namespace DotNetCore.CAP.SqlServer
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, SqlServerOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_logger = logger;
_options = options;
if (_options.DbContextType == null) return;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -52,23 +58,20 @@ namespace DotNetCore.CAP.SqlServer
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -76,7 +79,7 @@ namespace DotNetCore.CAP.SqlServer
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
}
#endregion private methods
......
......@@ -15,9 +15,9 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.3" />
</ItemGroup>
<ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public SqlServerFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -28,9 +31,7 @@ set transaction isolation level read committed;
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Published with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');",
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';",
_options.Schema);
var statistics = UseConnection(connection =>
......@@ -43,10 +44,8 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -71,17 +70,24 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and statusname=@StatusName";
{
where += " and statusname=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and content like '%@Content%'";
}
var sqlQuery =
$"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
......@@ -102,11 +108,6 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Succeeded));
......@@ -117,11 +118,6 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Succeeded));
......@@ -129,9 +125,8 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -182,7 +177,12 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
......@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly SqlServerOptions _options;
public SqlServerStorage(ILogger<SqlServerStorage> logger,
......@@ -37,7 +40,10 @@ namespace DotNetCore.CAP.SqlServer
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.Schema);
......@@ -45,6 +51,7 @@ namespace DotNetCore.CAP.SqlServer
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
......@@ -57,12 +64,9 @@ BEGIN
EXEC('CREATE SCHEMA [{schema}]')
END;
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NULL
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NOT NULL
BEGIN
CREATE TABLE [{schema}].[Queue](
[MessageId] [int] NOT NULL,
[MessageType] [tinyint] NOT NULL
) ON [PRIMARY]
DROP TABLE [{schema}].[Queue];
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
......@@ -122,7 +126,9 @@ END;";
var connection = _existingConnection ?? new SqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -135,7 +141,9 @@ END;";
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
......@@ -36,49 +38,32 @@ namespace DotNetCore.CAP.SqlServer
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"
DELETE TOP (1)
FROM [{Options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[MessageType];";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
using (var connection = new SqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -91,20 +76,11 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -136,35 +112,5 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
public void Dispose()
{
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new SqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (SqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -26,7 +29,10 @@ namespace DotNetCore.CAP.SqlServer
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
......@@ -35,16 +41,34 @@ namespace DotNetCore.CAP.SqlServer
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
......@@ -53,23 +77,14 @@ namespace DotNetCore.CAP.SqlServer
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
private readonly IDispatcher _dispatcher;
private readonly ILogger _logger;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
{
_logger = logger;
_dispatcher = dispatcher;
}
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTransaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
......@@ -21,9 +40,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
PublishWithTrans(name, contentObj, callbackName);
}
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
......@@ -31,9 +48,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content);
return PublishWithTransAsync(name, contentObj, callbackName);
}
public void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
......@@ -41,9 +56,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
PublishWithTrans(name, contentObj, callbackName);
}
public Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
......@@ -51,17 +64,20 @@ namespace DotNetCore.CAP.Abstractions
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, contentObj, callbackName);
}
return PublishWithTransAsync(name, content);
protected void Enqueue(CapPublishedMessage message)
{
_dispatcher.EnqueueToPublish(message);
}
protected abstract void PrepareConnectionForEF();
protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected abstract int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected abstract Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected virtual string Serialize<T>(T obj, string callbackName = null)
......@@ -89,7 +105,6 @@ namespace DotNetCore.CAP.Abstractions
{
CallbackName = callbackName
};
return packer.Pack(message);
}
......@@ -108,23 +123,38 @@ namespace DotNetCore.CAP.Abstractions
private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
if (!IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
}
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
if (IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you do not need to use this overloaded.");
}
}
private async Task PublishWithTransAsync(string name, string content)
private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null)
{
Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage
{
Name = name,
......@@ -132,15 +162,39 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
await ExecuteAsync(DbConnection, DbTransaction, message);
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
ClosedCap();
var id = await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap();
if (id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
PublishQueuer.PulseEvent.Set();
message.Id = id;
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
}
private void PublishWithTrans(string name, string content)
private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null)
{
Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage
{
Name = name,
......@@ -148,11 +202,29 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
Execute(DbConnection, DbTransaction, message);
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(DbConnection, DbTransaction, message);
ClosedCap();
ClosedCap();
PublishQueuer.PulseEvent.Set();
if (id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id;
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
}
private void ClosedCap()
......@@ -162,8 +234,11 @@ namespace DotNetCore.CAP.Abstractions
DbTransaction.Commit();
DbTransaction.Dispose();
}
if (IsCapOpenedConn)
{
DbConnection.Dispose();
}
}
public void Dispose()
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Message content serializer.
/// <para>By default, CAP will use Json as a serializer, and you can customize this interface to achieve serialization of other methods.</para>
/// <para>
/// By default, CAP will use Json as a serializer, and you can customize this interface to achieve serialization of
/// other methods.
/// </para>
/// </summary>
public interface IContentSerializer
{
......@@ -40,7 +46,7 @@ namespace DotNetCore.CAP.Abstractions
/// </summary>
/// <remarks>
/// We use the wrapper to provide some additional information for the message content,which is important for CAP。
/// Typically, we may need to customize the field display name of the message,
/// Typically, we may need to customize the field display name of the message,
/// which includes interacting with other message components, which can be adapted in this manner
/// </remarks>
public interface IMessagePacker
......@@ -52,9 +58,9 @@ namespace DotNetCore.CAP.Abstractions
string Pack(CapMessage obj);
/// <summary>
/// Unpack a message strings to <see cref="CapMessage"/> object.
/// Unpack a message strings to <see cref="CapMessage" /> object.
/// </summary>
/// <param name="packingMessage">The string of packed message.</param>
CapMessage UnPack(string packingMessage);
}
}
}
\ No newline at end of file
using System.Reflection;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Reflection;
using DotNetCore.CAP.Abstractions.ModelBinding;
namespace DotNetCore.CAP.Abstractions
......
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Consumer method executor.
/// </summary>
public interface ISubscriberExecutor
{
/// <summary>
/// Execute the consumer method.
/// </summary>
/// <param name="receivedMessage">The received message.</param>
Task<OperateResult> ExecuteAsync(CapReceivedMessage receivedMessage);
}
}
using System.Threading.Tasks;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace DotNetCore.CAP.Abstractions.ModelBinding
{
......
using DotNetCore.CAP.Internal;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Internal;
namespace DotNetCore.CAP.Abstractions.ModelBinding
{
......@@ -42,7 +45,10 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
public override string ToString()
{
if (IsSuccess)
{
return $"Success '{Model}'";
}
return "Failed";
}
......@@ -50,7 +56,10 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
{
var other = obj as ModelBindingResult?;
if (other == null)
{
return false;
}
return Equals(other.Value);
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
namespace DotNetCore.CAP.Abstractions
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using Microsoft.Extensions.DependencyInjection;
......@@ -19,7 +22,9 @@ namespace Microsoft.AspNetCore.Builder
public static IApplicationBuilder UseCap(this IApplicationBuilder app)
{
if (app == null)
{
throw new ArgumentNullException(nameof(app));
}
CheckRequirement(app);
......@@ -31,7 +36,10 @@ namespace Microsoft.AspNetCore.Builder
if (provider.GetService<DashboardOptions>() != null)
{
if (provider.GetService<DiscoveryOptions>() != null)
{
app.UseMiddleware<GatewayProxyMiddleware>();
}
app.UseMiddleware<DashboardMiddleware>();
}
......@@ -42,18 +50,24 @@ namespace Microsoft.AspNetCore.Builder
{
var marker = app.ApplicationServices.GetService<CapMarkerService>();
if (marker == null)
{
throw new InvalidOperationException(
"AddCap() must be called on the service collection. eg: services.AddCap(...)");
}
var messageQueueMarker = app.ApplicationServices.GetService<CapMessageQueueMakerService>();
if (messageQueueMarker == null)
{
throw new InvalidOperationException(
"You must be config used message queue provider at AddCap() options! eg: services.AddCap(options=>{ options.UseKafka(...) })");
}
var databaseMarker = app.ApplicationServices.GetService<CapDatabaseStorageMarkerService>();
if (databaseMarker == null)
{
throw new InvalidOperationException(
"You must be config used database provider at AddCap() options! eg: services.AddCap(options=>{ options.UseSqlServer(...) })");
}
}
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment