Commit 70c7c10f authored by savorboard's avatar savorboard

Merge branch 'develop' of https://github.com/dotnetcore/CAP into develop

parents 0d037812 a2efa6b8
<!--
Thank you for reporting an issue.
1. It's RECOMMENDED to submit PR for typo or tiny bug fix.
2. If this's a FEATURE request, please provide: details, pseudo codes if necessary.
3. If this's a BUG, please provide: course repetition, error log and configuration. Fill in as much of the template below as you're able.
感谢您向我们反馈问题。
1. 提交问题前,请先阅读 https://github.com/dotnetcore/CAP/wiki 上的文档。
2. 我们推荐如果是小问题(错别字修改,小的 bug fix)直接提交 PR。
3. 如果是一个新需求,请提供:详细需求描述,最好是有伪代码实现。
4. 如果是一个 BUG,请提供:复现步骤,错误日志以及相关配置,并尽量填写下面的模板中的条目。
6. 扩展阅读:[如何向开源项目提交无法解答的问题](https://zhuanlan.zhihu.com/p/25795393)
-->
Please answer these questions before submitting your issue.
- Why do you submit this issue?
- [ ] Question or discussion
- [ ] Bug
- [ ] Requirement
- [ ] Feature or performance improvement
___
### Question
- What do you want to know?
___
### Bug
- Which version of CAP, OS and .NET Core?
- Which company or project?
- What happen?
If possible, provide a way for reproducing the error. e.g. demo application, component version.
___
### Requirement or improvement
- Please describe about your requirements or improvement suggestions.
\ No newline at end of file
......@@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
CHANGELOG.md = CHANGELOG.md
CODE_OF_CONDUCT.md = CODE_OF_CONDUCT.md
ConfigureMSDTC.ps1 = ConfigureMSDTC.ps1
.github\ISSUE_TEMPLATE = .github\ISSUE_TEMPLATE
LICENSE.txt = LICENSE.txt
README.md = README.md
README.zh-cn.md = README.zh-cn.md
......@@ -54,15 +55,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql", "src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj", "{82C403AB-ED68-4084-9A1D-11334F9F08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.PostgreSql", "samples\Sample.RabbitMQ.PostgreSql\Sample.RabbitMQ.PostgreSql.csproj", "{A17E8E72-DFFC-4822-BB38-73D59A8B264E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.Test", "test\DotNetCore.CAP.PostgreSql.Test\DotNetCore.CAP.PostgreSql.Test.csproj", "{7CA3625D-1817-4695-881D-7E79A1E1DED2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{573B4D39-5489-48B3-9B6C-5234249CB980}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -105,26 +102,18 @@ Global
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.Build.0 = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.Build.0 = Release|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Debug|Any CPU.Build.0 = Debug|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Release|Any CPU.ActiveCfg = Release|Any CPU
{573B4D39-5489-48B3-9B6C-5234249CB980}.Release|Any CPU.Build.0 = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -139,11 +128,9 @@ Global
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{A17E8E72-DFFC-4822-BB38-73D59A8B264E} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{573B4D39-5489-48B3-9B6C-5234249CB980} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
#
# Enable MSDTC
#
Write-Host "Enabling MSDTC..." -ForegroundColor Yellow
$DTCSecurity = "Incoming"
$RegPath = "HKLM:\SOFTWARE\Microsoft\MSDTC\"
#Set Security and MSDTC path
$RegSecurityPath = "$RegPath\Security"
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccess" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessClients" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessTransactions" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessInbound" value 1
Set-ItemProperty path $RegSecurityPath name "NetworkDtcAccessOutbound" value 1
Set-ItemProperty path $RegSecurityPath name "LuTransactions" value 1
if ($DTCSecurity eq "None")
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 1
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 0
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 0
}
elseif ($DTCSecurity eq "Incoming")
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 0
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 0
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 1
}
else
{
Set-ItemProperty path $RegPath name "TurnOffRpcSecurity" value 0
Set-ItemProperty path $RegPath name "AllowOnlySecureRpcCalls" value 1
Set-ItemProperty path $RegPath name "FallbackToUnsecureRPCIfNecessary" value 0
}
Restart-Service MSDTC
Write-Host "MSDTC has been configured" foregroundcolor green
......@@ -3,7 +3,7 @@
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore)
[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently.
......@@ -187,7 +187,10 @@ Then inject your `ISubscriberService` class in Startup.cs
```c#
public void ConfigureServices(IServiceCollection services)
{
//Note: The injection of services needs before of `services.AddCap()`
services.AddTransient<ISubscriberService,SubscriberService>();
services.AddCap(x=>{});
}
```
......@@ -218,6 +221,8 @@ services.AddCap(x =>
});
```
The default dashboard address is :[http://localhost:xxx/cap](http://localhost:xxx/cap) , you can also change the `cap` suffix to others with `d.MatchPath` configuration options.
![dashboard](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220827302-189215107.png)
![received](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220934115-1107747665.png)
......
......@@ -3,7 +3,7 @@
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore)
[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
......@@ -187,7 +187,10 @@ namespace xxx.Service
```c#
public void ConfigureServices(IServiceCollection services)
{
//注意: 注入的服务需要在 `services.AddCap()` 之前
services.AddTransient<ISubscriberService,SubscriberService>();
services.AddCap(x=>{});
}
```
......@@ -218,6 +221,8 @@ services.AddCap(x =>
});
```
仪表盘默认的访问地址是:[http://localhost:xxx/cap](http://localhost:xxx/cap),你可以在`d.MatchPath`配置项中修改`cap`路径后缀为其他的名字。
![dashboard](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220827302-189215107.png)
![received](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220934115-1107747665.png)
......
......@@ -11,7 +11,6 @@ services:
- mysql
- postgresql
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1
test: off
artifacts:
......
......@@ -68,6 +68,7 @@ Task("Pack")
{
Configuration = build.Configuration,
VersionSuffix = build.Version.Suffix,
IncludeSymbols = true,
OutputDirectory = "./artifacts/packages"
};
foreach (var project in build.ProjectFiles)
......
<Project>
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionMinor>2</VersionMinor>
<VersionPatch>3</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;
namespace Sample.Kafka.MySql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
public ValuesController(ICapPublisher producer)
{
_capBus = producer;
}
[Route("~/publish")]
public async Task<IActionResult> PublishMessage()
{
using (var connection = new MySqlConnection("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"))
{
connection.Open();
var transaction = connection.BeginTransaction();
//your business code here
await _capBus.PublishAsync("xxx.xxx.test2", 123456, transaction);
transaction.Commit();
}
return Ok("publish successful!");
}
[CapSubscribe("xxx.xxx.test2")]
public void Test2(int value)
{
Console.WriteLine("Subscriber output message: " + value);
}
}
}
\ No newline at end of file
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using NLog.Web;
namespace Sample.RabbitMQ.SqlServer
namespace Sample.Kafka.MySql
{
public class Program
{
......@@ -16,9 +14,12 @@ namespace Sample.RabbitMQ.SqlServer
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseUrls("http://*:5800")
.UseStartup<Startup>()
.ConfigureLogging((hostingContext, builder) =>
{
hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
})
.UseNLog()
.Build();
}
}
\ No newline at end of file
......@@ -2,21 +2,28 @@
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
<AssemblyName>Sample.Kafka.MySql</AssemblyName>
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.3" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="MySqlConnector" Version="0.40.3" />
<PackageReference Include="NLog.Web.AspNetCore" Version="4.5.2" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
<Content Update="nlog.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
namespace Sample.RabbitMQ.PostgreSql
namespace Sample.Kafka.MySql
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
x.UseKafka("localhost:9092");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeName = "CAP 一号节点";
});
});
services.AddMvc();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
public void Configure(IApplicationBuilder app)
{
app.UseMvc();
app.UseCap();
}
}
}
}
\ No newline at end of file
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
}
}
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka.SqlServer
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Sample.Kafka.SqlServer;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
}
}
}
using System;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Newtonsoft.Json;
namespace Sample.RabbitMQ.SqlServer
{
public class MessageContent : CapMessage
{
[JsonProperty("id")]
public override string Id { get; set; }
[JsonProperty("createdTime")]
public override DateTime Timestamp { get; set; }
[JsonProperty("msgBody")]
public override string Content { get; set; }
[JsonProperty("callbackTopicName")]
public override string CallbackName { get; set; }
}
public class MyMessagePacker : IMessagePacker
{
private readonly IContentSerializer _serializer;
public MyMessagePacker(IContentSerializer serializer)
{
_serializer = serializer;
}
public string Pack(CapMessage obj)
{
var content = new MessageContent
{
Id = obj.Id,
Content = obj.Content,
CallbackName = obj.CallbackName,
Timestamp = obj.Timestamp
};
return _serializer.Serialize(content);
}
public CapMessage UnPack(string packingMessage)
{
return _serializer.DeSerialize<MessageContent>(packingMessage);
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
namespace Sample.Kafka.SqlServer.Controllers
{
public class Person
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("uname")]
public string Name { get; set; }
public HAHA Haha { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Id:" + Id + "Haha:" + Haha?.ToString();
}
}
public class HAHA
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("uname")]
public string Name { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Id:" + Id;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
var p = new Person
{
Id = Guid.NewGuid().ToString(),
Name = "杨晓东",
Haha = new HAHA
{
Id = Guid.NewGuid().ToString(),
Name = "1-1杨晓东",
}
};
_capBus.Publish("wl.yxd.test", p, "wl.yxd.test.callback");
//_capBus.Publish("wl.cj.test", p);
return Ok();
}
[CapSubscribe("wl.yxd.test.callback")]
public void KafkaTestCallback(Person p)
{
Console.WriteLine("回调内容:" + p);
}
[CapSubscribe("wl.cj.test")]
public string KafkaTestReceived(Person person)
{
Console.WriteLine(person);
Debug.WriteLine(person);
return "this is callback message";
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333", Group = "Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.Kafka.SqlServer
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
\ No newline at end of file
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer;
namespace Sample.Kafka.SqlServer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("192.168.2.215:9092");
x.UseDashboard();
//x.UseDiscovery(d =>
//{
// d.DiscoveryServerHostName = "localhost";
// d.DiscoveryServerPort = 8500;
// d.CurrentNodeHostName = "localhost";
// d.CurrentNodePort = 5820;
// d.NodeName = "CAP 2号节点";
//});
}).AddMessagePacker<MyMessagePacker>();
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
app.UseMvc();
app.UseCap();
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.MySql
{
......@@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;Allow User Variables=True");
//optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
optionsBuilder.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
......@@ -24,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
......@@ -53,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString());
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using NLog.Web;
namespace Sample.RabbitMQ.MySql
{
......@@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.ConfigureLogging((hostingContext, builder) =>
{
hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
})
.UseNLog()
.Build();
}
}
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:57171/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Sample.RabbitMQ.MySql": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:57173/"
}
}
}
\ No newline at end of file
......@@ -10,8 +10,9 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.3" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" />
<PackageReference Include="NLog.Web.AspNetCore" Version="4.5.2" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
......@@ -21,5 +22,10 @@
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
<Content Update="nlog.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.PostgreSql
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseNpgsql("Server=localhost;Database=Sample.RabbitMQ.PostgreSql;UserId=postgres;Password=123123;");
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.PostgreSql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller
{
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;
public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
{
_dbContext = dbContext;
_capBus = capPublisher;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
}
[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
{
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Sample.RabbitMQ.PostgreSql
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.3" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using Microsoft.EntityFrameworkCore;
using Sample.RabbitMQ.SqlServer.Controllers;
namespace Sample.RabbitMQ.SqlServer
{
public class AppDbContext : DbContext
{
public DbSet<Person> Persons { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.SqlServer.Controllers
{
public class Person
{
public int Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.sqlserver.order.check", DateTime.Now);
//var person = new Person
//{
// Name = "杨晓东",
// Age = 11,
// Id = 23
//};
//_capBus.Publish("sample.rabbitmq.mysql33333", person);
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333",Group ="Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Sample.RabbitMQ.SqlServer;
using System;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20170824130007_AddPersons")]
partial class AddPersons
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.0.0-rtm-26452")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.RabbitMQ.SqlServer.Controllers.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<int>("Age");
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using System;
using System.Collections.Generic;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
public partial class AddPersons : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "Persons",
columns: table => new
{
Id = table.Column<int>(type: "int", nullable: false)
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn),
Age = table.Column<int>(type: "int", nullable: false),
Name = table.Column<string>(type: "nvarchar(max)", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_Persons", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "Persons");
}
}
}
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Sample.RabbitMQ.SqlServer;
using System;
namespace Sample.RabbitMQ.SqlServer.Migrations
{
[DbContext(typeof(AppDbContext))]
partial class AppDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "2.0.0-rtm-26452")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Sample.RabbitMQ.SqlServer.Controllers.Person", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd();
b.Property<int>("Age");
b.Property<string>("Name");
b.HasKey("Id");
b.ToTable("Persons");
});
#pragma warning restore 612, 618
}
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.3" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services
{
public interface ICmsService
{
void Add();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Sample.RabbitMQ.SqlServer.Services
{
public interface IOrderService
{
void Check();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services.Impl
{
public class CmsService : ICmsService, ICapSubscribe
{
public void Add()
{
throw new NotImplementedException();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Sample.RabbitMQ.SqlServer.Services.Impl
{
public class OrderService : IOrderService, ICapSubscribe
{
[CapSubscribe("sample.rabbitmq.sqlserver.order.check")]
public void Check()
{
Console.WriteLine("out");
}
}
}
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.Services;
using Sample.RabbitMQ.SqlServer.Services.Impl;
namespace Sample.RabbitMQ.SqlServer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddScoped<IOrderService, OrderService>();
services.AddTransient<ICmsService, CmsService>();
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "192.168.1.11";
d.CurrentNodePort = 5800;
d.NodeName = "CAP Node Windows";
d.NodeId = 1;
});
});
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();
app.UseMvc();
app.UseCap();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Kafka;
using Microsoft.Extensions.DependencyInjection;
......@@ -23,9 +26,9 @@ namespace DotNetCore.CAP
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<ConnectionPool>();
services.AddSingleton<IPublishExecutor, KafkaPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, KafkaPublishMessageSender>();
services.AddSingleton<IConnectionPool,ConnectionPool>();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
......@@ -45,16 +48,21 @@ namespace DotNetCore.CAP
if (_kafkaConfig == null)
{
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
MainConfig["bootstrap.servers"] = Servers;
MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";
MainConfig["log.connection.close"] = "false";
MainConfig["request.timeout.ms"] = "3000";
MainConfig["message.timeout.ms"] = "5000";
_kafkaConfig = MainConfig.AsEnumerable();
}
return _kafkaConfig;
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -24,7 +27,10 @@ namespace Microsoft.Extensions.DependencyInjection
/// <returns></returns>
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new KafkaCapOptionsExtension(configure));
......
using DotNetCore.CAP.Abstractions;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
......@@ -8,14 +8,14 @@
<PackageTags>$(PackageTags);Kafka</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\Debug\netstandard2.0\DotNetCore.CAP.Kafka.xml</DocumentationFile>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.Kafka.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.11.3" />
<PackageReference Include="Confluent.Kafka" Version="0.11.4" />
</ItemGroup>
<ItemGroup>
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace DotNetCore.CAP.Kafka
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private readonly ILogger<ConnectionPool> _logger;
private readonly Func<Producer> _activator;
private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>();
private readonly ConcurrentQueue<Producer> _pool;
private int _count;
private int _maxSize;
public ConnectionPool(KafkaOptions options)
public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options)
{
_logger = logger;
_pool = new ConcurrentQueue<Producer>();
_maxSize = options.ConnectionPoolSize;
_activator = CreateActivator(options);
ServersAddress = options.Servers;
_logger.LogDebug("Kafka configuration of CAP :\r\n {0}",
JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented));
}
public string ServersAddress { get; }
Producer IConnectionPool.Rent()
{
return Rent();
......@@ -35,7 +48,9 @@ namespace DotNetCore.CAP.Kafka
_maxSize = 0;
while (_pool.TryDequeue(out var context))
{
context.Dispose();
}
}
private static Func<Producer> CreateActivator(KafkaOptions options)
......

// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Confluent.Kafka;
namespace DotNetCore.CAP.Kafka
{
public interface IConnectionPool
{
string ServersAddress { get; }
Producer Rent();
bool Return(Producer context);
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Kafka
{
internal class PublishQueueExecutor : BasePublishQueueExecutor
internal class KafkaPublishMessageSender : BasePublishMessageSender
{
private readonly ConnectionPool _connectionPool;
private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger;
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ConnectionPool connectionPool,
ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger)
public KafkaPublishMessageSender(
CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
IConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionPool = connectionPool;
ServersAddress = _connectionPool.ServersAddress;
}
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{
var producer = _connectionPool.Rent();
try
{
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = await producer.ProduceAsync(keyName, null, contentBytes);
if (!message.Error.HasError)
if (message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success;
throw new PublisherSentFailedException(message.Error.ToString());
}
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.LogError(ex,
$"An error occurred during sending the topic message to kafka. Topic:[{keyName}], Exception: {ex.Message}");
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
return OperateResult.Failed(ex);
return OperateResult.Failed(wapperEx);
}
finally
{
var returned = _connectionPool.Return(producer);
if (!returned)
{
producer.Dispose();
}
}
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -26,13 +29,19 @@ namespace DotNetCore.CAP.Kafka
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _kafkaOptions.Servers;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
if (_consumerClient == null)
{
InitKafkaClient();
}
_consumerClient.Subscribe(topics);
}
......@@ -44,6 +53,7 @@ namespace DotNetCore.CAP.Kafka
cancellationToken.ThrowIfCancellationRequested();
_consumerClient.Poll(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
......@@ -54,7 +64,7 @@ namespace DotNetCore.CAP.Kafka
public void Reject()
{
// Ignore, Kafka will not commit offset when not commit.
_consumerClient.Assign(_consumerClient.Assignment);
}
public void Dispose()
......@@ -66,16 +76,18 @@ namespace DotNetCore.CAP.Kafka
private void InitKafkaClient()
{
_kafkaOptions.MainConfig["group.id"] = _groupId;
lock (_kafkaOptions)
{
_kafkaOptions.MainConfig["group.id"] = _groupId;
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
_consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
_consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
}
}
private void ConsumerClient_OnConsumeError(object sender, Message e)
{
var message = e.Deserialize<Null, string>(null, StringDeserializer);
......
namespace DotNetCore.CAP.Kafka
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
{
public const string DefaultSchema = "cap";
/// <summary>
/// Gets or sets the table name prefix to use when creating database objects.
/// </summary>
public string TableNamePrefix { get; set; } = DefaultSchema;
/// <summary>
/// EF db context type.
/// </summary>
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
......@@ -23,24 +26,34 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, MySqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
services.AddTransient<ICollectProcessor, MySqlCollectProcessor>();
AddSingletionMySqlOptions(services);
}
private void AddSingletionMySqlOptions(IServiceCollection services)
{
var mysqlOptions = new MySqlOptions();
_configure(mysqlOptions);
if (mysqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType);
var dbContext = (DbContext)provider.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions;
}
});
}
else
{
services.AddSingleton(mysqlOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......@@ -8,7 +9,5 @@ namespace DotNetCore.CAP
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
public string TableNamePrefix { get; set; } = "cap";
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,11 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseMySql(this CapOptions options, Action<MySqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new MySqlCapOptionsExtension(configure));
......@@ -24,18 +31,22 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt => { opt.DbContextType = typeof(TContext); });
return options.UseEntityFramework<TContext>(opt => { });
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions {DbContextType = typeof(TContext)};
configure(efOptions);
options.RegisterExtension(new MySqlCapOptionsExtension(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new MySqlCapOptionsExtension(x =>
{
configure(x);
x.DbContextType = typeof(TContext);
}));
return options;
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -14,27 +17,31 @@ namespace DotNetCore.CAP.MySql
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher, IServiceProvider provider,
MySqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType == null) return;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -51,25 +58,20 @@ namespace DotNetCore.CAP.MySql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override async Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return Task.CompletedTask;
return await dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -77,7 +79,7 @@ namespace DotNetCore.CAP.MySql
private string PrepareSql()
{
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID()";
}
#endregion private methods
......
......@@ -8,16 +8,16 @@
<PackageTags>$(PackageTags);MySQL</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>bin\Debug\netstandard2.0\DotNetCore.CAP.MySql.xml</DocumentationFile>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MySql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="MySqlConnector" Version="0.33.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" />
<PackageReference Include="MySqlConnector" Version="0.40.3" />
</ItemGroup>
<ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.MySql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
......@@ -7,7 +10,7 @@ using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
internal class MySqlCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
......@@ -15,7 +18,7 @@ namespace DotNetCore.CAP.MySql
private readonly MySqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger,
public MySqlCollectProcessor(ILogger<MySqlCollectProcessor> logger,
MySqlOptions mysqlOptions)
{
_logger = logger;
......@@ -24,8 +27,6 @@ namespace DotNetCore.CAP.MySql
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");
var tables = new[]
{
$"{_options.TableNamePrefix}.published",
......@@ -34,6 +35,8 @@ namespace DotNetCore.CAP.MySql
foreach (var table in tables)
{
_logger.LogDebug($"Collecting expired data from table [{table}].");
int removedCount;
do
{
......
using Dapper;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly MySqlOptions _options;
private readonly string _processId;
public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;
_processId = processId;
_options = options;
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Requeue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Dispose()
{
// ignored
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -28,9 +31,7 @@ set transaction isolation level read committed;
select count(Id) from `{0}.published` where StatusName = N'Succeeded';
select count(Id) from `{0}.received` where StatusName = N'Succeeded';
select count(Id) from `{0}.published` where StatusName = N'Failed';
select count(Id) from `{0}.received` where StatusName = N'Failed';
select count(Id) from `{0}.published` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');", _prefix);
select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
var statistics = UseConnection(connection =>
{
......@@ -42,10 +43,8 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -70,17 +69,24 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and StatusName in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and StatusName=@StatusName";
{
where += " and StatusName=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and Content like '%@Content%'";
}
var sqlQuery =
$"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
......@@ -101,11 +107,6 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
......@@ -116,11 +117,6 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
......@@ -128,9 +124,7 @@ select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Sched
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from `{_prefix}.{tableName}` where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -179,7 +173,12 @@ select aggr.* from (
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
......@@ -11,10 +14,10 @@ namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly CapOptions _capOptions;
public MySqlStorage(ILogger<MySqlStorage> logger,
MySqlOptions options,
......@@ -37,7 +40,11 @@ namespace DotNetCore.CAP.MySql
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.TableNamePrefix);
using (var connection = new MySqlConnection(_options.ConnectionString))
{
......@@ -51,11 +58,7 @@ namespace DotNetCore.CAP.MySql
{
var batchSql =
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL,
`MessageType` tinyint(4) NOT NULL,
`ProcessId` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `{prefix}.queue`;
CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Id` int(127) NOT NULL AUTO_INCREMENT,
......@@ -102,7 +105,9 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
var connection = _existingConnection ?? new MySqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -115,7 +120,9 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
......@@ -13,8 +16,6 @@ namespace DotNetCore.CAP.MySql
private readonly CapOptions _capOptions;
private readonly string _prefix;
private const string DateTimeMaxValue = "9999-12-31 23:59:59";
public MySqlStorageConnection(MySqlOptions options, CapOptions capOptions)
{
_capOptions = capOptions;
......@@ -39,50 +40,32 @@ namespace DotNetCore.CAP.MySql
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var processId = ObjectId.GenerateNewStringId();
var sql = $@"
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";
return FetchNextMessageCoreAsync(sql, processId);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $@"
UPDATE `{_prefix}.published` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -95,22 +78,11 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var sql = $@"
UPDATE `{_prefix}.received` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -139,20 +111,6 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, string processId)
{
FetchedMessage fetchedMessage;
using (var connection = new MySqlConnection(Options.ConnectionString))
{
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, new { ProcessId = processId });
}
if (fetchedMessage == null)
return null;
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
}
public void Dispose()
{
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -11,7 +14,7 @@ namespace DotNetCore.CAP.MySql
{
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
//private readonly IDbTransaction _dbTransaction;
private readonly string _prefix;
public MySqlStorageTransaction(MySqlStorageConnection connection)
......@@ -20,55 +23,45 @@ namespace DotNetCore.CAP.MySql
_prefix = options.TableNamePrefix;
_dbConnection = new MySqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
// _dbConnection.Open(); for performance
// _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
_dbConnection.Execute(sql, message);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
_dbConnection.Close();
_dbConnection.Dispose();
//_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
//_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
......@@ -24,18 +30,22 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt => { opt.DbContextType = typeof(TContext); });
return options.UseEntityFramework<TContext>(opt => { });
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions {DbContextType = typeof(TContext)};
configure(efOptions);
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new PostgreSqlCapOptionsExtension(x =>
{
configure(x);
x.DbContextType = typeof(TContext);
}));
return options;
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
......@@ -23,24 +26,33 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
services.AddTransient<ICollectProcessor, PostgreSqlCollectProcessor>();
AddSingletonPostgreSqlOptions(services);
}
private void AddSingletonPostgreSqlOptions(IServiceCollection services)
{
var postgreSqlOptions = new PostgreSqlOptions();
_configure(postgreSqlOptions);
if (postgreSqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
}
});
}
else
{
services.AddSingleton(postgreSqlOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -14,16 +17,14 @@ namespace DotNetCore.CAP.PostgreSql
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
PostgreSqlOptions options)
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, PostgreSqlOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType != null)
{
......@@ -31,12 +32,14 @@ namespace DotNetCore.CAP.PostgreSql
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -53,25 +56,20 @@ namespace DotNetCore.CAP.PostgreSql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return Task.CompletedTask;
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -79,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
}
#endregion private methods
......
......@@ -8,16 +8,16 @@
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>bin\Debug\netstandard2.0\DotNetCore.CAP.PostgreSql.xml</DocumentationFile>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="Npgsql" Version="3.2.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" />
<PackageReference Include="Npgsql" Version="3.2.7" />
</ItemGroup>
<ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
......@@ -7,7 +10,7 @@ using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
internal class PostgreSqlCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
......@@ -21,7 +24,7 @@ namespace DotNetCore.CAP.PostgreSql
private readonly PostgreSqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger,
public PostgreSqlCollectProcessor(ILogger<PostgreSqlCollectProcessor> logger,
PostgreSqlOptions sqlServerOptions)
{
_logger = logger;
......@@ -30,10 +33,10 @@ namespace DotNetCore.CAP.PostgreSql
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");
foreach (var table in Tables)
{
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
var removedCount = 0;
do
{
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public PostgreSqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -27,9 +30,7 @@ namespace DotNetCore.CAP.PostgreSql
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');",
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';",
_options.Schema);
var statistics = UseConnection(connection =>
......@@ -42,10 +43,8 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -57,17 +56,24 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
{
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Lower(\"Name\") = Lower(@Name)";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Lower(\"Group\") = Lower(@Group)";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and \"Content\" ILike '%@Content%'";
}
var sqlQuery =
$"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
......@@ -88,11 +94,6 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
......@@ -103,11 +104,6 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
......@@ -129,11 +125,10 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" in (N'Proce
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
}
......@@ -175,12 +170,17 @@ with aggr as (
)
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
var valuesMap = connection.Query(sqlQuery,new { keys = keyMaps.Keys.ToList(), statusName })
.ToList()
.ToDictionary(x => (string)x.Key, x => (int)x.Count);
var valuesMap = connection.Query(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName})
.ToList()
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
......@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly PostgreSqlOptions _options;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger,
......@@ -37,7 +40,10 @@ namespace DotNetCore.CAP.PostgreSql
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.Schema);
......@@ -45,6 +51,7 @@ namespace DotNetCore.CAP.PostgreSql
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
......@@ -68,7 +75,9 @@ namespace DotNetCore.CAP.PostgreSql
var connection = _existingConnection ?? new NpgsqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -81,7 +90,9 @@ namespace DotNetCore.CAP.PostgreSql
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
protected virtual string CreateDbTablesScript(string schema)
......@@ -89,10 +100,7 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""(
""MessageId"" int NOT NULL ,
""MessageType"" int NOT NULL
);
DROP TABLE IF EXISTS ""{schema}"".""queue"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
......@@ -36,44 +38,31 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"DELETE FROM ""{Options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{Options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -86,20 +75,11 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -131,35 +111,5 @@ namespace DotNetCore.CAP.PostgreSql
return connection.Execute(sql) > 0;
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (NpgsqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
......@@ -26,7 +29,10 @@ namespace DotNetCore.CAP.PostgreSql
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{
......@@ -37,7 +43,10 @@ namespace DotNetCore.CAP.PostgreSql
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{
......@@ -46,9 +55,24 @@ namespace DotNetCore.CAP.PostgreSql
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
......@@ -57,23 +81,14 @@ namespace DotNetCore.CAP.PostgreSql
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -13,7 +16,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseRabbitMQ(this CapOptions options, Action<RabbitMQOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new RabbitMQCapOptionsExtension(configure));
......
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
......@@ -56,7 +58,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Topic exchange name when declare a topic exchange.
/// </summary>
public string TopicExchangeName { get; set; } = DefaultExchangeName;
public string ExchangeName { get; set; } = DefaultExchangeName;
/// <summary>
/// Timeout setting for connection attempts (in milliseconds).
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
......@@ -24,8 +27,8 @@ namespace DotNetCore.CAP
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, RabbitMQPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, RabbitMQPublishMessageSender>();
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Abstractions;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
......@@ -8,8 +8,8 @@
<PackageTags>$(PackageTags);RabbitMQ</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>bin\Debug\netstandard2.0\DotNetCore.CAP.RabbitMQ.xml</DocumentationFile>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.RabbitMQ.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
......@@ -12,19 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ
private const int DefaultPoolSize = 15;
private readonly Func<IConnection> _connectionActivator;
private readonly ILogger<ConnectionChannelPool> _logger;
private readonly ConcurrentQueue<IModel> _pool = new ConcurrentQueue<IModel>();
private readonly ConcurrentQueue<IModel> _pool;
private IConnection _connection;
private int _count;
private int _maxSize;
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger,
RabbitMQOptions options)
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, RabbitMQOptions options)
{
_logger = logger;
_maxSize = DefaultPoolSize;
_pool = new ConcurrentQueue<IModel>();
_connectionActivator = CreateConnection(options);
HostAddress = options.HostName + ":" + options.Port;
Exchange = options.ExchangeName;
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}",
JsonConvert.SerializeObject(options, Formatting.Indented));
}
IModel IConnectionChannelPool.Rent()
......@@ -37,10 +46,17 @@ namespace DotNetCore.CAP.RabbitMQ
return Return(connection);
}
public string HostAddress { get; }
public string Exchange { get; }
public IConnection GetConnection()
{
if (_connection != null && _connection.IsOpen)
{
return _connection;
}
_connection = _connectionActivator();
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
return _connection;
......@@ -51,7 +67,9 @@ namespace DotNetCore.CAP.RabbitMQ
_maxSize = 0;
while (_pool.TryDequeue(out var context))
{
context.Dispose();
}
}
private static Func<IConnection> CreateConnection(RabbitMQOptions options)
......@@ -73,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
_logger.LogWarning($"RabbitMQ client connection closed! {e}");
_logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}");
}
public virtual IModel Rent()
......
using RabbitMQ.Client;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public interface IConnectionChannelPool
{
string HostAddress { get; }
string Exchange { get; }
IConnection GetConnection();
IModel Rent();
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
internal sealed class RabbitMQPublishMessageSender : BasePublishMessageSender
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly string _exchange;
public PublishQueueExecutor(ILogger<PublishQueueExecutor> logger, CapOptions options,
RabbitMQOptions rabbitMQOptions, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(options, stateChanger, logger)
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = rabbitMQOptions;
_exchange = _connectionChannelPool.Exchange;
ServersAddress = _connectionChannelPool.HostAddress;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
......@@ -28,12 +33,8 @@ namespace DotNetCore.CAP.RabbitMQ
try
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName,
keyName,
null,
body);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, null, body);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
......@@ -41,21 +42,22 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception ex)
{
_logger.LogError(
$"RabbitMQ topic message [{keyName}] has been raised an exception of sending. the exception is: {ex.Message}");
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};
return Task.FromResult(OperateResult.Failed(ex,
new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
return Task.FromResult(OperateResult.Failed(wapperEx, errors));
}
finally
{
var returned = _connectionChannelPool.Return(channel);
if (!returned)
{
channel.Dispose();
}
}
}
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -13,9 +16,9 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _exchageName;
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IModel _channel;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
public RabbitMQConsumerClient(string queueName,
......@@ -25,7 +28,7 @@ namespace DotNetCore.CAP.RabbitMQ
_queueName = queueName;
_connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;
_exchageName = options.ExchangeName;
InitClient();
}
......@@ -34,12 +37,19 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<LogMessageEventArgs> OnLog;
public string ServersAddress => _rabbitMQOptions.HostName;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
......@@ -58,6 +68,7 @@ namespace DotNetCore.CAP.RabbitMQ
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
......@@ -82,14 +93,15 @@ namespace DotNetCore.CAP.RabbitMQ
_connection = _connectionChannelPool.GetConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
_exchageName,
RabbitMQOptions.ExchangeType,
true);
var arguments = new Dictionary<string, object> {
{ "x-message-ttl", _rabbitMQOptions.QueueMessageExpires }
var arguments = new Dictionary<string, object>
{
{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
};
_channel.QueueDeclare(_queueName, true, false, false, arguments);
}
......@@ -100,7 +112,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerCancelled,
LogType = MqLogType.ConsumerCancelled,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
......@@ -143,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerShutdown,
Reason = e.ToString()
Reason = e.ReplyText
};
OnLog?.Invoke(sender, args);
}
......
namespace DotNetCore.CAP.RabbitMQ
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
......@@ -14,7 +17,10 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
......@@ -24,18 +30,22 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt => { opt.DbContextType = typeof(TContext); });
return options.UseEntityFramework<TContext>(opt => { });
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions {DbContextType = typeof(TContext)};
configure(efOptions);
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(x =>
{
configure(x);
x.DbContextType = typeof(TContext);
}));
return options;
}
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using Microsoft.EntityFrameworkCore;
......@@ -23,7 +26,8 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
services.AddTransient<ICollectProcessor, SqlServerCollectProcessor>();
AddSqlServerOptions(services);
}
......@@ -34,18 +38,22 @@ namespace DotNetCore.CAP
_configure(sqlServerOptions);
if (sqlServerOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType);
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
});
}
else
{
services.AddSingleton(sqlServerOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP
{
......
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -14,28 +17,31 @@ namespace DotNetCore.CAP.SqlServer
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
public CapPublisher(ILogger<CapPublisher> logger, IDispatcher dispatcher,
IServiceProvider provider, SqlServerOptions options)
: base(logger, dispatcher)
{
ServiceProvider = provider;
_logger = logger;
_options = options;
if (_options.DbContextType == null) return;
if (_options.DbContextType == null)
{
return;
}
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
public async Task PublishAsync(CapPublishedMessage message)
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
var id = await conn.ExecuteScalarAsync<int>(PrepareSql(), message);
message.Id = id;
Enqueue(message);
}
}
......@@ -52,25 +58,20 @@ namespace DotNetCore.CAP.SqlServer
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
return dbConnection.ExecuteScalar<int>(PrepareSql(), message, dbTransaction);
}
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
return Task.CompletedTask;
return dbConnection.ExecuteScalarAsync<int>(PrepareSql(), message, dbTransaction);
}
#region private methods
......@@ -78,7 +79,7 @@ namespace DotNetCore.CAP.SqlServer
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
}
#endregion private methods
......
......@@ -8,16 +8,16 @@
<PackageTags>$(PackageTags);SQL Server</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>bin\Debug\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.3" />
</ItemGroup>
<ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
......@@ -7,7 +10,7 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class DefaultAdditionalProcessor : IAdditionalProcessor
public class SqlServerCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
......@@ -21,7 +24,7 @@ namespace DotNetCore.CAP.SqlServer
private readonly SqlServerOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger,
public SqlServerCollectProcessor(ILogger<SqlServerCollectProcessor> logger,
SqlServerOptions sqlServerOptions)
{
_logger = logger;
......@@ -30,10 +33,10 @@ namespace DotNetCore.CAP.SqlServer
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");
foreach (var table in Tables)
{
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
int removedCount;
do
{
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public SqlServerFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -28,9 +31,7 @@ set transaction isolation level read committed;
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Published with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');",
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';",
_options.Schema);
var statistics = UseConnection(connection =>
......@@ -43,10 +44,8 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
......@@ -71,17 +70,24 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')";
else
where += " and statusname=@StatusName";
{
where += " and statusname=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and content like '%@Content%'";
}
var sqlQuery =
$"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
......@@ -102,11 +108,6 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Failed));
}
public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Processing));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Succeeded));
......@@ -117,11 +118,6 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Failed));
}
public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Processing));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Succeeded));
......@@ -129,9 +125,8 @@ select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Proces
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -182,7 +177,12 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
......@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly SqlServerOptions _options;
public SqlServerStorage(ILogger<SqlServerStorage> logger,
......@@ -37,7 +40,10 @@ namespace DotNetCore.CAP.SqlServer
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
var sql = CreateDbTablesScript(_options.Schema);
......@@ -45,6 +51,7 @@ namespace DotNetCore.CAP.SqlServer
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
......@@ -54,15 +61,12 @@ namespace DotNetCore.CAP.SqlServer
$@"
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
BEGIN
EXEC('CREATE SCHEMA {schema}')
EXEC('CREATE SCHEMA [{schema}]')
END;
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NULL
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NOT NULL
BEGIN
CREATE TABLE [{schema}].[Queue](
[MessageId] [int] NOT NULL,
[MessageType] [tinyint] NOT NULL
) ON [PRIMARY]
DROP TABLE [{schema}].[Queue];
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
......@@ -122,7 +126,9 @@ END;";
var connection = _existingConnection ?? new SqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -135,7 +141,9 @@ END;";
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
......@@ -36,49 +38,32 @@ namespace DotNetCore.CAP.SqlServer
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"
DELETE TOP (1)
FROM [{Options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[MessageType];";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(Options.ConnectionString))
if (message == null)
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
throw new ArgumentNullException(nameof(message));
}
}
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOPE_IDENTITY();";
using (var connection = new SqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
return await connection.ExecuteScalarAsync<int>(sql, message);
}
}
......@@ -91,20 +76,11 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
......@@ -136,35 +112,5 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
public void Dispose()
{
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new SqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (SqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -26,7 +29,10 @@ namespace DotNetCore.CAP.SqlServer
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
......@@ -35,16 +41,34 @@ namespace DotNetCore.CAP.SqlServer
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
......@@ -53,23 +77,14 @@ namespace DotNetCore.CAP.SqlServer
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
private readonly IDispatcher _dispatcher;
private readonly ILogger _logger;
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
{
_logger = logger;
_dispatcher = dispatcher;
}
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTransaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
......@@ -21,9 +40,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
PublishWithTrans(name, contentObj, callbackName);
}
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
......@@ -31,9 +48,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content);
return PublishWithTransAsync(name, contentObj, callbackName);
}
public void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
......@@ -41,9 +56,7 @@ namespace DotNetCore.CAP.Abstractions
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
PublishWithTrans(name, contentObj, callbackName);
}
public Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
......@@ -51,17 +64,20 @@ namespace DotNetCore.CAP.Abstractions
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, contentObj, callbackName);
}
return PublishWithTransAsync(name, content);
protected void Enqueue(CapPublishedMessage message)
{
_dispatcher.EnqueueToPublish(message);
}
protected abstract void PrepareConnectionForEF();
protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected abstract int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected abstract Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected virtual string Serialize<T>(T obj, string callbackName = null)
......@@ -89,7 +105,6 @@ namespace DotNetCore.CAP.Abstractions
{
CallbackName = callbackName
};
return packer.Pack(message);
}
......@@ -108,23 +123,38 @@ namespace DotNetCore.CAP.Abstractions
private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
if (!IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
" otherwise you need to use overloaded method with IDbTransaction.");
}
}
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
if (IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you do not need to use this overloaded.");
}
}
private async Task PublishWithTransAsync(string name, string content)
private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null)
{
Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage
{
Name = name,
......@@ -132,15 +162,39 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
await ExecuteAsync(DbConnection, DbTransaction, message);
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
ClosedCap();
var id = await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap();
if (id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
PublishQueuer.PulseEvent.Set();
message.Id = id;
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
}
private void PublishWithTrans(string name, string content)
private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null)
{
Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);
var message = new CapPublishedMessage
{
Name = name,
......@@ -148,11 +202,29 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
Execute(DbConnection, DbTransaction, message);
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(DbConnection, DbTransaction, message);
ClosedCap();
ClosedCap();
PublishQueuer.PulseEvent.Set();
if (id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id;
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
}
private void ClosedCap()
......@@ -162,8 +234,11 @@ namespace DotNetCore.CAP.Abstractions
DbTransaction.Commit();
DbTransaction.Dispose();
}
if (IsCapOpenedConn)
{
DbConnection.Dispose();
}
}
public void Dispose()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment