Commit 4fb1e080 authored by Savorboard's avatar Savorboard Committed by GitHub

Merge pull request #43 from dotnetcore/dev_2.0

Release 2.0.0
parents 9aaf1a6e 2b6c54c4
......@@ -33,4 +33,5 @@ bin/
/.idea/.idea.CAP
/.idea/.idea.CAP
/.idea
Properties
\ No newline at end of file
Properties
/pack.bat
......@@ -6,13 +6,13 @@ matrix:
include:
- os: linux
dist: trusty # Ubuntu 14.04
dotnet: 1.0.1
dotnet: 2.0.0
mono: none
env: DOTNETCORE=1
sudo: required
- os: osx
osx_image: xcode7.3 # macOS 10.11
dotnet: 1.0.1
osx_image: xcode8.3 # macOS 10.12
dotnet: 2.0.0
mono: none
env: DOTNETCORE=1
......

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.16
VisualStudioVersion = 15.0.26730.3
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
......@@ -22,15 +22,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
README.zh-cn.md = README.zh-cn.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE}"
ProjectSection(SolutionItems) = preProject
test\Shared\MessageManagerTestBase.cs = test\Shared\MessageManagerTestBase.cs
test\Shared\TestLogger.cs = test\Shared\TestLogger.cs
EndProjectSection
ProjectSection(FolderStartupServices) = postProject
{82A7F48D-3B50-4B1E-B82E-3ADA8210C358} = {82A7F48D-3B50-4B1E-B82E-3ADA8210C358}
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP", "src\DotNetCore.CAP\DotNetCore.CAP.csproj", "{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{3A6B6931-A123-477A-9469-8B468B5385AF}"
......@@ -63,7 +54,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql", "src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj", "{82C403AB-ED68-4084-9A1D-11334F9F08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.PostgreSql", "samples\Sample.RabbitMQ.PostgreSql\Sample.RabbitMQ.PostgreSql.csproj", "{A17E8E72-DFFC-4822-BB38-73D59A8B264E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.PostgreSql.Test", "test\DotNetCore.CAP.PostgreSql.Test\DotNetCore.CAP.PostgreSql.Test.csproj", "{7CA3625D-1817-4695-881D-7E79A1E1DED2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -110,12 +107,23 @@ Global
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.Build.0 = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
......@@ -126,5 +134,11 @@ Global
{80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{A17E8E72-DFFC-4822-BB38-73D59A8B264E} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
EndGlobalSection
EndGlobal
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at yangxiaodong1214@126.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
[homepage]: http://contributor-covenant.org
[version]: http://contributor-covenant.org/version/1/4/
......@@ -44,10 +44,13 @@ If your Message Queue is using RabbitMQ, you can:
PM> Install-Package DotNetCore.CAP.RabbitMQ
```
CAP provides EntityFramework as default database store extension (The MySQL version is under development)
CAP supported SqlServer, MySql, PostgreSql as message store extension
```
//Select a database provider you are using
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
```
### Configuration
......@@ -66,9 +69,11 @@ public void ConfigureServices(IServiceCollection services)
// If your SqlServer is using EF for data operations, you need to add the following configuration:
// Notice: You don't need to config x.UseSqlServer(""") again!
x.UseEntityFramework<AppDbContext>();
// If you are using Dapper,you need to add the config:
x.UseSqlServer("Your ConnectionStrings");
//x.UseMySql("Your ConnectionStrings");
//x.UsePostgreSql("Your ConnectionStrings");
// If your Message Queue is using RabbitMQ you need to add the config:
x.UseRabbitMQ("localhost");
......@@ -82,7 +87,7 @@ public void Configure(IApplicationBuilder app)
{
.....
app.UseCap();
app.UseCap();
}
```
......@@ -114,12 +119,12 @@ public class PublishController : Controller
[Route("~/checkAccountWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction())
{
using (var trans = dbContext.Database.BeginTransaction())
{
await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
trans.Commit();
}
}
return Ok();
}
}
......@@ -174,7 +179,7 @@ namespace xxx.Service
[CapSubscribe("xxx.services.account.check")]
public void CheckReceivedMessage(Person person)
{
}
}
}
......
......@@ -8,6 +8,8 @@
CAP 是一个在分布式系统(SOA、MicroService)中实现最终一致性的库,它具有轻量级、易使用、高性能等特点。
你可以在这里[CAP Wiki](https://github.com/dotnetcore/CAP/wiki)看到更多详细资料。
## 预览(OverView)
CAP 是在一个 ASP.NET Core 项目中使用的库,当然他可以用于 ASP.NET Core On .NET Framework 中。
......@@ -44,10 +46,13 @@ PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
```
CAP 默认提供了 Sql Server 的扩展作为数据库存储(MySql的正在开发中)
CAP 提供了 Sql Server, MySql, PostgreSQL 的扩展作为数据库存储
```
// 按需选择安装你正在使用的数据库
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
```
### Configuration
......
version: '{build}'
os: Visual Studio 2015
os: Visual Studio 2017
environment:
BUILDING_ON_PLATFORM: win
BuildEnvironment: appveyor
Cap_SqlServer_ConnectionStringTemplate: Server=(local)\SQL2014;Database={0};User ID=sa;Password=Password12!
Cap_MySql_ConnectionStringTemplate: Server=localhost;Database={0};Uid=root;Pwd=Password12!
Cap_PostgreSql_ConnectionStringTemplate: Server=localhost;Database={0};UserId=postgres;Password=Password12!
services:
- mssql2014
- mysql
- postgresql
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1
......
dotnet --info
dotnet restore
dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp1.1
\ No newline at end of file
dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0
\ No newline at end of file
......@@ -2,10 +2,6 @@
<Import Project="version.props" />
<PropertyGroup Label="Build">
<TargetFrameworks>netstandard1.6</TargetFrameworks>
</PropertyGroup>
<PropertyGroup Label="Package">
<Product>CAP</Product>
<Authors>savorboard;dotnetcore</Authors>
......@@ -14,7 +10,7 @@
<PackageIconUrl>https://avatars2.githubusercontent.com/u/19404084</PackageIconUrl>
<PackageProjectUrl>https://github.com/dotnetcore/CAP</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt</PackageLicenseUrl>
<PackageTags>eventbus;rabbitmq;kafka;cap;transaction;</PackageTags>
<PackageTags>CAP;EventBus;Distributed Transaction</PackageTags>
<Description>EventBus and eventually consistency in distributed architectures.</Description>
</PropertyGroup>
......
......@@ -20,6 +20,6 @@ Configuration: {Build.Configuration}
public static string CreateStamp()
{
var seconds = (long)(DateTime.UtcNow - new DateTime(2017, 1, 1)).TotalSeconds;
return seconds.ToString().PadLeft(11, (char)'0');
return seconds.ToString();
}
}
<Project>
<PropertyGroup>
<VersionMajor>1</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionMajor>2</VersionMajor>
<VersionMinor>0</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
......
......@@ -10,7 +10,8 @@ namespace Sample.RabbitMQ.MySql
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;Uid=root;Pwd=123123;");
//optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
}
}
}
......@@ -23,7 +23,16 @@ namespace Sample.RabbitMQ.MySql.Controllers
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.kafka.sqlserver", "");
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
......@@ -34,6 +43,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
......@@ -41,10 +51,9 @@ namespace Sample.RabbitMQ.MySql.Controllers
[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString());
}
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702;1705;3277;</NoWarn>
<WarningsAsErrors>NU1605;MSB3277</WarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="1.1.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.0-rtm-10056" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Tools" Version="1.0.1" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
......@@ -18,7 +18,11 @@ namespace Sample.RabbitMQ.MySql
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("localhost:9092");
x.UseRabbitMQ(y => {
y.HostName = "192.168.2.206";
y.UserName = "admin";
y.Password = "123123";
});
});
services.AddMvc();
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.PostgreSql
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseNpgsql("Server=localhost;Database=Sample.RabbitMQ.PostgreSql;UserId=postgre;Password=123123;");
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.Kafka.Controllers
namespace Sample.RabbitMQ.PostgreSql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
public class ValuesController : Controller
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
{
_capBus = producer;
_dbContext = dbContext;
_capBus = capPublisher;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", "");
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
......@@ -30,18 +41,19 @@ namespace Sample.Kafka.Controllers
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver", Group = "test")]
public void KafkaTest()
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
}
}
}
\ No newline at end of file
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Sample.RabbitMQ.PostgreSql
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Sample.RabbitMQ.PostgreSql
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseMvc();
}
}
}
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka
namespace Sample.RabbitMQ.SqlServer
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
//optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.SqlServer.Controllers
{
public class Person
{
public string Name { get; set; }
public int Age { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
using(var trans = _dbContext.Database.BeginTransaction())
{
//_capBus.Publish("sample.rabbitmq.mysql22222", DateTime.Now);
_capBus.Publish("sample.rabbitmq.mysql33333", new Person { Name = "宜兴", Age = 11 });
trans.Commit();
}
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
......@@ -3,7 +3,7 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.Kafka
namespace Sample.RabbitMQ.SqlServer
{
public class Program
{
......
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Tools" Version="1.0.1" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
......@@ -3,7 +3,7 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Sample.Kafka
namespace Sample.RabbitMQ.SqlServer
{
public class Startup
{
......@@ -14,7 +14,11 @@ namespace Sample.Kafka
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("localhost:9092");
x.UseRabbitMQ(y=> {
y.HostName = "192.168.2.206";
y.UserName = "admin";
y.Password = "123123";
});
});
services.AddMvc();
......
......@@ -42,7 +42,7 @@ namespace DotNetCore.CAP
{
throw new ArgumentNullException(nameof(Servers));
}
MainConfig.Add("bootstrap.servers", Servers);
MainConfig["queue.buffering.max.ms"] = "10";
......
......@@ -3,16 +3,16 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName>
<PackageId>DotNetCore.CAP.Kafka</PackageId>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<PackageTags>$(PackageTags);Kafka</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<WarningsAsErrors>NU1605</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.11.0" />
</ItemGroup>
......
using System;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
......
......@@ -12,13 +12,15 @@ namespace DotNetCore.CAP.Kafka
private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
KafkaOptions options,
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
KafkaOptions kafkaOptions,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
: base(options, stateChanger, logger)
{
_logger = logger;
_kafkaOptions = options;
_kafkaOptions = kafkaOptions;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
......
......@@ -21,6 +21,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorage, MySqlStorage>();
services.AddScoped<IStorageConnection, MySqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
var mysqlOptions = new MySqlOptions();
......@@ -28,17 +29,17 @@ namespace DotNetCore.CAP
if (mysqlOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(mysqlOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
services.AddSingleton(x =>
{
var dbContext = (DbContext)x.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions;
});
}
else
{
services.AddSingleton(mysqlOptions);
}
services.AddSingleton(mysqlOptions);
}
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
}
}
\ No newline at end of file
......@@ -2,34 +2,28 @@
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class CapPublisher : ICapPublisher
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly DbContext _dbContext;
protected bool IsCapOpenedTrans { get; set; }
protected bool IsUsingEF { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
MySqlOptions options)
{
ServiceProvider = provider;
_logger = logger;
_options = options;
_logger = logger;
if (_options.DbContextType != null)
{
......@@ -38,160 +32,45 @@ namespace DotNetCore.CAP.MySql
}
}
public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
PublishCore(name, content);
}
public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
return PublishCoreAsync(name, content);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}
#region private methods
private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj?.ToString();
}
return content;
}
private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
protected override void PrepareConnectionForEF()
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();
if (dbTransaction == null)
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = dbTrans;
}
private void CheckIsUsingEF(string name)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
dbConnection.Execute(PrepareSql(), message, dbTransaction);
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
private async Task PublishCoreAsync(string name, string content)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
public async Task PublishAsync(CapPublishedMessage message)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
using (var conn = new MySqlConnection(_options.ConnectionString))
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
await conn.ExecuteAsync(PrepareSql(), message);
}
PublishQueuer.PulseEvent.Set();
}
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}
#region private methods
private string PrepareSql()
{
......
......@@ -3,21 +3,16 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.MySql</AssemblyName>
<PackageId>DotNetCore.CAP.MySql</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<PackageTags>$(PackageTags);MySQL</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="MySqlConnector" Version="0.23.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="MySqlConnector" Version="0.25.1" />
</ItemGroup>
<ItemGroup>
......
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.MySql
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(
IServiceProvider provider,
......
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
{
public const string DefaultSchema = "cap";
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
internal Type DbContextType { get; set; }
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UsePostgreSql(this CapOptions options, string connectionString)
{
return options.UsePostgreSql(opt =>
{
opt.ConnectionString = connectionString;
});
}
public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
return options;
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions { DbContextType = typeof(TContext) };
configure(efOptions);
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
return options;
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal class PostgreSqlCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<PostgreSqlOptions> _configure;
public PostgreSqlCapOptionsExtension(Action<PostgreSqlOptions> configure)
{
_configure = configure;
}
public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddScoped<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
var postgreSqlOptions = new PostgreSqlOptions();
_configure(postgreSqlOptions);
if (postgreSqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
var dbContext = (DbContext)x.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
});
}
else
{
services.AddSingleton(postgreSqlOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
{
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly DbContext _dbContext;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
PostgreSqlOptions options)
{
ServiceProvider = provider;
_options = options;
_logger = logger;
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
{
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="Npgsql" Version="3.2.5" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
internal class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private static readonly string[] Tables =
{
"published","received"
};
public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
PostgreSqlOptions sqlServerOptions)
{
_logger = logger;
_provider = provider;
_options = sqlServerOptions;
}
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");
foreach (var table in Tables)
{
var removedCount = 0;
do
{
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();
public PostgreSqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly PostgreSqlOptions _options;
private readonly ILogger _logger;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger, PostgreSqlOptions options)
{
_options = options;
_logger = logger;
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(_options.Schema);
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
protected virtual string CreateDbTablesScript(string schema)
{
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""(
""MessageId"" int NOT NULL ,
""MessageType"" int NOT NULL
);
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Group"" VARCHAR(200) NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);
CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);";
return batchSql;
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageConnection : IStorageConnection
{
private readonly PostgreSqlOptions _options;
public PostgreSqlStorageConnection(PostgreSqlOptions options)
{
_options = options;
}
public PostgreSqlOptions Options => _options;
public IStorageTransaction CreateTransaction()
{
return new PostgreSqlStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"Id\"={id}";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
// CapReceviedMessage
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO \"{_options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"Id\"={id}";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}
public void Dispose()
{
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (NpgsqlException)
{
transaction.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageTransaction : IStorageTransaction, IDisposable
{
private readonly string _schema;
private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;
public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection)
{
var options = connection.Options;
_schema = options.Schema;
_dbConnection = new NpgsqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
......@@ -21,6 +21,10 @@ namespace DotNetCore.CAP
services.AddSingleton(options);
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<ConnectionPool>();
services.AddScoped(x => x.GetService<ConnectionPool>().Rent());
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
}
}
......
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private const int DefaultPoolSize = 32;
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();
private readonly Func<IConnection> _activator;
private int _maxSize;
private int _count;
public ConnectionPool(RabbitMQOptions options)
{
_maxSize = DefaultPoolSize;
_activator = CreateActivator(options);
}
private static Func<IConnection> CreateActivator(RabbitMQOptions options)
{
var factory = new ConnectionFactory()
{
HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
VirtualHost = options.VirtualHost,
RequestedConnectionTimeout = options.RequestedConnectionTimeout,
SocketReadTimeout = options.SocketReadTimeout,
SocketWriteTimeout = options.SocketWriteTimeout
};
return () => factory.CreateConnection();
}
public virtual IConnection Rent()
{
if (_pool.TryDequeue(out IConnection connection))
{
Interlocked.Decrement(ref _count);
Debug.Assert(_count >= 0);
return connection;
}
connection = _activator();
return connection;
}
public virtual bool Return(IConnection connection)
{
if (Interlocked.Increment(ref _count) <= _maxSize)
{
_pool.Enqueue(connection);
return true;
}
Interlocked.Decrement(ref _count);
Debug.Assert(_maxSize == 0 || _pool.Count <= _maxSize);
return false;
}
IConnection IConnectionPool.Rent() => Rent();
bool IConnectionPool.Return(IConnection connection) => Return(connection);
public void Dispose()
{
_maxSize = 0;
IConnection context;
while (_pool.TryDequeue(out context))
{
context.Dispose();
}
}
}
}
......@@ -3,8 +3,9 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName>
<PackageTags>$(PackageTags);RabbitMQ</PackageTags>
</PropertyGroup>
<ItemGroup>
......
using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public interface IConnectionPool
{
IConnection Rent();
bool Return(IConnection context);
}
}
......@@ -3,7 +3,6 @@ using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
......@@ -11,35 +10,27 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly IConnection _connection;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
RabbitMQOptions options,
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
IConnection connection,
RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
: base(options, stateChanger, logger)
{
_logger = logger;
_rabbitMQOptions = options;
_connection = connection;
_rabbitMQOptions = rabbitMQOptions;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
using (var channel = _connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
......
......@@ -14,7 +14,6 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IConnectionFactory _connectionFactory;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
......@@ -23,9 +22,12 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
public RabbitMQConsumerClient(string queueName,
IConnection connection,
RabbitMQOptions options)
{
_queueName = queueName;
_connection = connection;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;
......@@ -34,19 +36,6 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient()
{
_connectionFactory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
......
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly IConnection _connection;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions)
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection)
{
_rabbitMQOptions = rabbitMQOptions;
_connection = connection;
}
public IConsumerClient Create(string groupId)
{
return new RabbitMQConsumerClient(groupId, _rabbitMQOptions);
return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions);
}
}
}
\ No newline at end of file
......@@ -21,24 +21,26 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddScoped<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);
if (sqlServerOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(sqlServerOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
services.AddSingleton(x =>
{
var dbContext = (DbContext)x.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
});
}
else
{
services.AddSingleton(sqlServerOptions);
}
services.AddSingleton(sqlServerOptions);
}
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : ICapPublisher
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly DbContext _dbContext;
protected bool IsCapOpenedTrans { get; set; }
protected bool IsUsingEF { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
......@@ -38,158 +32,45 @@ namespace DotNetCore.CAP.SqlServer
}
}
public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
PublishCore(name, content);
}
public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Serialize(contentObj);
return PublishCoreAsync(name, content);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);
var content = Serialize(contentObj);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}
#region private methods
private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj.ToString();
}
return content;
}
private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
protected override void PrepareConnectionForEF()
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();
if (dbTransaction == null)
DbConnection = _dbContext.Database.GetDbConnection();
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = dbTrans;
}
private void CheckIsUsingEF(string name)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
dbConnection.Execute(PrepareSql(), message, dbTransaction);
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
private async Task PublishCoreAsync(string name, string content)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
public async Task PublishAsync(CapPublishedMessage message)
{
var message = new CapPublishedMessage
using (var conn = new SqlConnection(_options.ConnectionString))
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
await conn.ExecuteAsync(PrepareSql(), message);
}
PublishQueuer.PulseEvent.Set();
}
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}
#region private methods
private string PrepareSql()
{
......
......@@ -3,20 +3,16 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName>
<PackageId>DotNetCore.CAP.SqlServer</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<PackageTags>$(PackageTags);SQL Server</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.0" />
</ItemGroup>
<ItemGroup>
......
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.SqlServer
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private static readonly string[] Tables =
{
......
......@@ -2,7 +2,6 @@ using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
......
using System;
using System.Data;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTranasaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
protected bool IsCapOpenedConn { get; set; }
protected bool IsUsingEF { get; set; }
protected IServiceProvider ServiceProvider { get; set; }
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
}
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection,
string callbackName = null, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, dbTransaction);
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection,
string callbackName = null, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, dbTransaction);
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content);
}
protected abstract void PrepareConnectionForEF();
protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);
#region private methods
private string Serialize<T>(T obj, string callbackName = null)
{
var message = new Message(obj)
{
CallbackName = callbackName
};
return Helper.ToJson(message);
}
private void PrepareConnectionForAdo(IDbConnection dbConnection, IDbTransaction dbTransaction)
{
DbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
if (DbConnection.State != ConnectionState.Open)
{
IsCapOpenedConn = true;
DbConnection.Open();
}
DbTranasaction = dbTransaction;
if (DbTranasaction == null)
{
IsCapOpenedTrans = true;
DbTranasaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}
private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}
private async Task PublishWithTransAsync(string name, string content)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await ExecuteAsync(DbConnection, DbTranasaction, message);
ClosedCap();
PublishQueuer.PulseEvent.Set();
}
private void PublishWithTrans(string name, string content)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
Execute(DbConnection, DbTranasaction, message);
ClosedCap();
PublishQueuer.PulseEvent.Set();
}
private void ClosedCap()
{
if (IsCapOpenedTrans)
{
DbTranasaction.Commit();
DbTranasaction.Dispose();
}
if (IsCapOpenedConn)
{
DbConnection.Dispose();
}
}
public void Dispose()
{
DbTranasaction?.Dispose();
DbConnection?.Dispose();
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -20,27 +20,48 @@ namespace DotNetCore.CAP
/// </summary>
public const int DefaultQueueProcessorCount = 2;
/// <summary>
/// Default successed message expriation timespan, in seconds.
/// </summary>
public const int DefaultSuccessMessageExpirationAfter = 3600;
/// <summary>
/// Failed message retry waiting interval.
/// </summary>
public const int DefaultFailedMessageWaitingInterval = 180;
public CapOptions()
{
PollingDelay = DefaultPollingDelay;
QueueProcessorCount = DefaultQueueProcessorCount;
SuccessedMessageExpiredAfter = DefaultSuccessMessageExpirationAfter;
FailedMessageWaitingInterval = DefaultFailedMessageWaitingInterval;
Extensions = new List<ICapOptionsExtension>();
}
/// <summary>
/// Productor job polling delay time. Default is 15 sec.
/// Productor job polling delay time.
/// Default is 15 sec.
/// </summary>
public int PollingDelay { get; set; }
/// <summary>
/// Gets or sets the messages queue (Cap.Queue table) processor count.
/// Default is 2 processor.
/// </summary>
public int QueueProcessorCount { get; set; }
/// <summary>
/// Failed messages polling delay time. Default is 3 min.
/// Sent or received successed message after timespan of due, then the message will be deleted at due time.
/// Dafault is 3600 seconds.
/// </summary>
public int SuccessedMessageExpiredAfter { get; set; }
/// <summary>
/// Failed messages polling delay time.
/// Default is 180 seconds.
/// </summary>
public int FailedMessageWaitingInterval { get; set; } = (int)TimeSpan.FromMinutes(3).TotalSeconds;
public int FailedMessageWaitingInterval { get; set; }
/// <summary>
/// We’ll invoke this call-back with message type,name,content when requeue failed message.
......
......@@ -67,18 +67,20 @@ namespace Microsoft.Extensions.DependencyInjection
private static void AddSubscribeServices(IServiceCollection services)
{
var consumerListenerServices = new Dictionary<Type, Type>();
var consumerListenerServices = new List<KeyValuePair<Type, Type>>();
foreach (var rejectedServices in services)
{
if (rejectedServices.ImplementationType != null
&& typeof(ICapSubscribe).IsAssignableFrom(rejectedServices.ImplementationType))
consumerListenerServices.Add(typeof(ICapSubscribe), rejectedServices.ImplementationType);
{
consumerListenerServices.Add(new KeyValuePair<Type, Type>(typeof(ICapSubscribe),
rejectedServices.ImplementationType));
}
}
foreach (var service in consumerListenerServices)
{
services.AddSingleton(service.Key, service.Value);
services.AddTransient(service.Key, service.Value);
}
var types = Assembly.GetEntryAssembly().ExportedTypes;
......@@ -86,7 +88,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
if (Helper.IsController(type.GetTypeInfo()))
{
services.AddSingleton(typeof(object), type);
services.AddTransient(typeof(object), type);
}
}
}
......
......@@ -3,26 +3,21 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<PackageId>DotNetCore.CAP</PackageId>
<TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<AllowUnsafeBlocks>False</AllowUnsafeBlocks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP</AssemblyName>
<PackageTags>$(PackageTags);</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
</ItemGroup>
</Project>
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface ICallbackPublisher
{
Task PublishAsync(CapPublishedMessage obj);
}
}
......@@ -18,7 +18,8 @@ namespace DotNetCore.CAP
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
Task PublishAsync<T>(string name, T contentObj);
/// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null);
/// <summary>
/// (EntityFramework) Publish a object message.
......@@ -30,24 +31,27 @@ namespace DotNetCore.CAP
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
void Publish<T>(string name, T contentObj);
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null);
/// <summary>
/// (ado.net) Asynchronous publish a object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null);
/// <summary>
/// (ado.net) Publish a object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null);
}
}
\ No newline at end of file
......@@ -10,12 +10,16 @@ namespace DotNetCore.CAP
{
public abstract class BasePublishQueueExecutor : IQueueExecutor
{
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
protected BasePublishQueueExecutor(IStateChanger stateChanger,
protected BasePublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger)
{
_options = options;
_stateChanger = stateChanger;
_logger = logger;
}
......@@ -54,7 +58,7 @@ namespace DotNetCore.CAP
}
else
{
newState = new SucceededState();
newState = new SucceededState(_options.SuccessedMessageExpiredAfter);
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
......
......@@ -15,16 +15,18 @@ namespace DotNetCore.CAP
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;
public SubscibeQueueExecutor(
IStateChanger stateChanger,
MethodMatcherCache selector,
CapOptions options,
IConsumerInvokerFactory consumerInvokerFactory,
ILogger<BasePublishQueueExecutor> logger)
{
_selector = selector;
_options = options;
_consumerInvokerFactory = consumerInvokerFactory;
_stateChanger = stateChanger;
_logger = logger;
......@@ -62,7 +64,7 @@ namespace DotNetCore.CAP
}
else
{
newState = new SucceededState();
newState = new SucceededState(_options.SuccessedMessageExpiredAfter);
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
......
......@@ -72,7 +72,26 @@ namespace DotNetCore.CAP.Infrastructure
public static bool IsComplexType(Type type)
{
return !TypeDescriptor.GetConverter(type).CanConvertFrom(typeof(string));
return !CanConvertFromString(type);
}
private static bool CanConvertFromString(Type destinationType)
{
destinationType = Nullable.GetUnderlyingType(destinationType) ?? destinationType;
return IsSimpleType(destinationType) ||
TypeDescriptor.GetConverter(destinationType).CanConvertFrom(typeof(string));
}
private static bool IsSimpleType(Type type)
{
return type.GetTypeInfo().IsPrimitive ||
type.Equals(typeof(decimal)) ||
type.Equals(typeof(string)) ||
type.Equals(typeof(DateTime)) ||
type.Equals(typeof(Guid)) ||
type.Equals(typeof(DateTimeOffset)) ||
type.Equals(typeof(TimeSpan)) ||
type.Equals(typeof(Uri));
}
}
}
\ No newline at end of file
This diff is collapsed.
using System;
using DotNetCore.CAP.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
......@@ -22,12 +23,15 @@ namespace DotNetCore.CAP.Internal
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext)
{
var context = new ConsumerInvokerContext(consumerContext)
using (var scope = _serviceProvider.CreateScope())
{
Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinderFactory, consumerContext)
};
var context = new ConsumerInvokerContext(consumerContext)
{
Result = new DefaultConsumerInvoker(_logger, scope.ServiceProvider, _modelBinderFactory, consumerContext)
};
return context.Result;
return context.Result;
}
}
}
}
\ No newline at end of file
using System.Collections.Generic;
namespace DotNetCore.CAP.Internal
{
public class ConsumerMethodExecutor
{
public static object[] PrepareArguments(
IDictionary<string, object> actionParameters,
ObjectMethodExecutor actionMethodExecutor)
{
var declaredParameterInfos = actionMethodExecutor.MethodParameters;
var count = declaredParameterInfos.Length;
if (count == 0)
{
return null;
}
var arguments = new object[count];
for (var index = 0; index < count; index++)
{
var parameterInfo = declaredParameterInfos[index];
object value;
if (!actionParameters.TryGetValue(parameterInfo.Name, out value))
{
value = actionMethodExecutor.GetDefaultValueForParameter(index);
}
arguments[index] = value;
}
return arguments;
}
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
......@@ -21,49 +24,102 @@ namespace DotNetCore.CAP.Internal
{
_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger;
_consumerContext = consumerContext;
_consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo,
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
}
public async Task InvokeAsync()
{
using (_logger.BeginScope("consumer invoker begin"))
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var jsonConent = _consumerContext.DeliverMessage.Content;
var message = Helper.FromJson<Message>(jsonConent);
object result = null;
if (_executor.MethodParameters.Length > 0)
{
result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
}
else
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
result = await ExecuteAsync(obj);
}
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
if (!string.IsNullOrEmpty(message.CallbackName))
{
await SentCallbackMessage(message.Id, message.CallbackName, result);
}
}
var value = _consumerContext.DeliverMessage.Content;
if (_executor.MethodParameters.Length > 0)
private async Task<object> ExecuteAsync(object @class)
{
if (_executor.IsMethodAsync)
{
return await _executor.ExecuteAsync(@class);
}
else
{
return _executor.Execute(@class);
}
}
private async Task<object> ExecuteWithParameterAsync(object @class, string parameterString)
{
var firstParameter = _executor.MethodParameters[0];
try
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var bindResult = await binder.BindModelAsync(parameterString);
if (bindResult.IsSuccess)
{
var firstParameter = _executor.MethodParameters[0];
try
if (_executor.IsMethodAsync)
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var result = await binder.BindModelAsync(value);
if (result.IsSuccess)
{
_executor.Execute(obj, result.Model);
}
else
{
_logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + value);
}
return await _executor.ExecuteAsync(@class, bindResult.Model);
}
catch (FormatException ex)
else
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex);
return _executor.Execute(@class, bindResult.Model);
}
}
else
{
_executor.Execute(obj);
throw new MethodBindException($"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} ");
}
}
catch (FormatException ex)
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, parameterString, ex);
return null;
}
}
private async Task SentCallbackMessage(string messageId, string topicName, object bodyObj)
{
var callbackMessage = new Message
{
Id = messageId,
Content = bodyObj
};
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var publisher = provider.GetRequiredService<ICallbackPublisher>();
var publishedMessage = new CapPublishedMessage
{
Name = topicName,
Content = Helper.ToJson(callbackMessage),
StatusName = StatusName.Scheduled
};
await publisher.PublishAsync(publishedMessage);
}
}
}
}
\ No newline at end of file
......@@ -20,7 +20,9 @@ namespace DotNetCore.CAP.Internal
try
{
var type = _parameterInfo.ParameterType;
var value = Helper.FromJson(content, type);
return Task.FromResult(ModelBindingResult.Success(value));
}
catch (Exception)
......
using System;
namespace DotNetCore.CAP.Internal
{
[Serializable]
public class MethodBindException : Exception
{
public MethodBindException()
{
}
public MethodBindException(string message) : base(message)
{
}
public MethodBindException(string message, Exception inner) : base(message, inner)
{
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
namespace DotNetCore.CAP.Internal
{
/// <summary>
/// Provides access to the combined list of attributes associated a <see cref="Type"/> or property.
/// </summary>
public class ModelAttributes
{
/// <summary>
/// Creates a new <see cref="ModelAttributes"/> for a <see cref="Type"/>.
/// </summary>
/// <param name="typeAttributes">The set of attributes for the <see cref="Type"/>.</param>
public ModelAttributes(IEnumerable<object> typeAttributes)
{
Attributes = typeAttributes?.ToArray() ?? throw new ArgumentNullException(nameof(typeAttributes));
TypeAttributes = Attributes;
}
/// <summary>
/// Creates a new <see cref="ModelAttributes"/> for a property.
/// </summary>
/// <param name="propertyAttributes">The set of attributes for the property.</param>
/// <param name="typeAttributes">
/// The set of attributes for the property's <see cref="Type"/>. See <see cref="PropertyInfo.PropertyType"/>.
/// </param>
public ModelAttributes(IEnumerable<object> propertyAttributes, IEnumerable<object> typeAttributes)
{
PropertyAttributes = propertyAttributes?.ToArray()
?? throw new ArgumentNullException(nameof(propertyAttributes));
TypeAttributes = typeAttributes?.ToArray()
?? throw new ArgumentNullException(nameof(typeAttributes));
Attributes = PropertyAttributes.Concat(TypeAttributes).ToArray();
}
/// <summary>
/// Gets the set of all attributes. If this instance represents the attributes for a property, the attributes
/// on the property definition are before those on the property's <see cref="Type"/>.
/// </summary>
public IReadOnlyList<object> Attributes { get; }
/// <summary>
/// Gets the set of attributes on the property, or <c>null</c> if this instance represents the attributes
/// for a <see cref="Type"/>.
/// </summary>
public IReadOnlyList<object> PropertyAttributes { get; }
/// <summary>
/// Gets the set of attributes on the <see cref="Type"/>. If this instance represents a property,
/// then <see cref="TypeAttributes"/> contains attributes retrieved from
/// <see cref="PropertyInfo.PropertyType"/>.
/// </summary>
public IReadOnlyList<object> TypeAttributes { get; }
/// <summary>
/// Gets the attributes for the given <paramref name="property"/>.
/// </summary>
/// <param name="type">The <see cref="Type"/> in which caller found <paramref name="property"/>.
/// </param>
/// <param name="property">A <see cref="PropertyInfo"/> for which attributes need to be resolved.
/// </param>
/// <returns>A <see cref="ModelAttributes"/> instance with the attributes of the property.</returns>
public static ModelAttributes GetAttributesForProperty(Type type, PropertyInfo property)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}
if (property == null)
{
throw new ArgumentNullException(nameof(property));
}
var propertyAttributes = property.GetCustomAttributes();
var typeAttributes = property.PropertyType.GetTypeInfo().GetCustomAttributes();
return new ModelAttributes(propertyAttributes, typeAttributes);
}
/// <summary>
/// Gets the attributes for the given <paramref name="type"/>.
/// </summary>
/// <param name="type">The <see cref="Type"/> for which attributes need to be resolved.
/// </param>
/// <returns>A <see cref="ModelAttributes"/> instance with the attributes of the <see cref="Type"/>.</returns>
public static ModelAttributes GetAttributesForType(Type type)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}
var attributes = type.GetTypeInfo().GetCustomAttributes();
return new ModelAttributes(attributes);
}
}
}
\ No newline at end of file
This diff is collapsed.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
namespace Microsoft.Extensions.Internal
{
internal struct AwaitableInfo
{
public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
public MethodInfo AwaiterGetResultMethod { get; }
public MethodInfo AwaiterOnCompletedMethod { get; }
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; }
public Type ResultType { get; }
public MethodInfo GetAwaiterMethod { get; }
public AwaitableInfo(
Type awaiterType,
PropertyInfo awaiterIsCompletedProperty,
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{
AwaiterType = awaiterType;
AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
AwaiterGetResultMethod = awaiterGetResultMethod;
AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
ResultType = resultType;
GetAwaiterMethod = getAwaiterMethod;
}
public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo)
{
// Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347
// Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
if (getAwaiterMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
var awaiterType = getAwaiterMethod.ReturnType;
// Awaiter must have property matching "bool IsCompleted { get; }"
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p =>
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase)
&& p.PropertyType == typeof(bool)
&& p.GetMethod != null);
if (isCompletedProperty == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
// Awaiter must implement INotifyCompletion
var awaiterInterfaces = awaiterType.GetInterfaces();
var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion));
if (!implementsINotifyCompletion)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
// INotifyCompletion supplies a method matching "void OnCompleted(Action action)"
var iNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(INotifyCompletion));
var onCompletedMethod = iNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
// Awaiter optionally implements ICriticalNotifyCompletion
var implementsICriticalNotifyCompletion = awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion));
MethodInfo unsafeOnCompletedMethod;
if (implementsICriticalNotifyCompletion)
{
// ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)"
var iCriticalNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(ICriticalNotifyCompletion));
unsafeOnCompletedMethod = iCriticalNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
}
else
{
unsafeOnCompletedMethod = null;
}
// Awaiter must have method matching "void GetResult" or "T GetResult()"
var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetResult")
&& m.GetParameters().Length == 0);
if (getResultMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
}
}
}
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq.Expressions;
namespace Microsoft.Extensions.Internal
{
internal struct CoercedAwaitableInfo
{
public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }
public Type CoercerResultType { get; }
public bool RequiresCoercion => CoercerExpression != null;
public CoercedAwaitableInfo(AwaitableInfo awaitableInfo)
{
AwaitableInfo = awaitableInfo;
CoercerExpression = null;
CoercerResultType = null;
}
public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType, AwaitableInfo coercedAwaitableInfo)
{
CoercerExpression = coercerExpression;
CoercerResultType = coercerResultType;
AwaitableInfo = coercedAwaitableInfo;
}
public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info)
{
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo))
{
info = new CoercedAwaitableInfo(directlyAwaitableInfo);
return true;
}
// It's not directly awaitable, but maybe we can coerce it.
// Currently we support coercing FSharpAsync<T>.
if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
out var coercerExpression,
out var coercerResultType))
{
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo))
{
info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
return true;
}
}
info = default(CoercedAwaitableInfo);
return false;
}
}
}
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Runtime.CompilerServices;
namespace Microsoft.Extensions.Internal
{
/// <summary>
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync"/> can
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal struct ObjectMethodExecutorAwaitable
{
private readonly object _customAwaitable;
private readonly Func<object, object> _getAwaiterMethod;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;
// Perf note: since we're requiring the customAwaitable to be supplied here as an object,
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.
public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaitable = customAwaitable;
_getAwaiterMethod = getAwaiterMethod;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}
public Awaiter GetAwaiter()
{
var customAwaiter = _getAwaiterMethod(_customAwaitable);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod, _unsafeOnCompletedMethod);
}
public struct Awaiter : ICriticalNotifyCompletion
{
private readonly object _customAwaiter;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;
public Awaiter(
object customAwaiter,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaiter = customAwaiter;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}
public bool IsCompleted => _isCompletedMethod(_customAwaiter);
public object GetResult() => _getResultMethod(_customAwaiter);
public void OnCompleted(Action continuation)
{
_onCompletedMethod(_customAwaiter, continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
// If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted.
// If not, fall back on using its OnCompleted.
//
// Why this is safe:
// - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it
// needs the execution context to be preserved (which it signals by calling OnCompleted), or
// that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not*
// to preserve and restore the context, so we prefer that where possible.
// - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted,
// there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen
// if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to
// pass the call on to the underlying awaitable's OnCompleted method.
var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod;
underlyingMethodToUse(_customAwaiter, continuation);
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Extensions.Internal
{
/// <summary>
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
/// an <see cref="Expression"/> for mapping instances of that type to a C# awaitable.
/// </summary>
/// <remarks>
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
/// to FSharp types have to be constructed dynamically at runtime.
/// </remarks>
internal static class ObjectMethodExecutorFSharpSupport
{
private static object _fsharpValuesCacheLock = new object();
private static Assembly _fsharpCoreAssembly;
private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod;
private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;
public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
Type possibleFSharpAsyncType,
out Expression coerceToAwaitableExpression,
out Type awaitableType)
{
var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
? possibleFSharpAsyncType.GetGenericTypeDefinition()
: null;
if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
{
coerceToAwaitableExpression = null;
awaitableType = null;
return false;
}
var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single();
awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType);
// coerceToAwaitableExpression = (object fsharpAsync) =>
// {
// return (object)FSharpAsync.StartAsTask<TResult>(
// (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);
return true;
}
private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType)
{
var typeFullName = possibleFSharpAsyncGenericType?.FullName;
if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal))
{
return false;
}
lock (_fsharpValuesCacheLock)
{
if (_fsharpCoreAssembly != null)
{
// Since we've already found the real FSharpAsync.Core assembly, we just have
// to check that the supplied FSharpAsync`1 type is the one from that assembly.
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly;
}
else
{
// We'll keep trying to find the FSharp types/values each time any type called
// FSharpAsync`1 is supplied.
return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType);
}
}
}
private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType)
{
var assembly = possibleFSharpAsyncGenericType.Assembly;
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1");
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync");
if (fsharpOptionType == null || fsharpAsyncType == null)
{
return false;
}
// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetTypeInfo()
.GetRuntimeProperty("None");
// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetTypeInfo()
.GetRuntimeProperty("None");
// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
{
var parameters = candidateMethodInfo.GetParameters();
if (parameters.Length == 3
&& TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType)
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType
&& parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
// This really does look like the correct method (and hence assembly).
_fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
_fsharpCoreAssembly = assembly;
break;
}
}
return _fsharpCoreAssembly != null;
}
private static bool TypesHaveSameIdentity(Type type1, Type type2)
{
return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal);
}
}
}
......@@ -34,5 +34,10 @@ namespace DotNetCore.CAP.Models
public int Retries { get; set; }
public string StatusName { get; set; }
public override string ToString()
{
return "name:" + Name + ", content:" + Content;
}
}
}
\ No newline at end of file
......@@ -47,5 +47,10 @@ namespace DotNetCore.CAP.Models
Content = Content
};
}
public override string ToString()
{
return "name:" + Name + ", content:" + Content;
}
}
}
\ No newline at end of file
using System;
namespace DotNetCore.CAP.Models
{
public class Message
{
public string Id { get; set; }
public DateTime Timestamp { get; set; }
public object Content { get; set; }
public string CallbackName { get; set; }
public Message()
{
Id = ObjectId.GenerateNewStringId();
Timestamp = DateTime.Now;
}
public Message(object content) : this()
{
Content = content;
}
}
}
\ No newline at end of file
......@@ -7,10 +7,20 @@ namespace DotNetCore.CAP.Processor.States
{
public const string StateName = "Succeeded";
public TimeSpan? ExpiresAfter => TimeSpan.FromHours(1);
public TimeSpan? ExpiresAfter { get; private set; }
public string Name => StateName;
public SucceededState()
{
ExpiresAfter = TimeSpan.FromHours(1);
}
public SucceededState(int ExpireAfterSeconds)
{
ExpiresAfter = TimeSpan.FromSeconds(ExpireAfterSeconds);
}
public void Apply(CapPublishedMessage message, IStorageTransaction transaction)
{
}
......
using System.Data;
using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore;
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>DotNetCore.CAP.MySql.Test</AssemblyName>
<PackageId>DotNetCore.CAP.MySql.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net451+win8</PackageTargetFallback>
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Shared\*.cs" Exclude="bin\**;obj\**;**\*.xproj;packages\**" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
......@@ -24,23 +15,19 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="MySqlConnector" Version="0.24.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="1.1.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="MySqlConnector" Version="0.25.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0-beta3-build3705" />
<PackageReference Include="xunit" Version="2.3.0-beta3-build3705" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Moq" Version="4.7.99" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
</Project>
\ No newline at end of file
......@@ -85,7 +85,6 @@ namespace DotNetCore.CAP.MySql.Test
[Fact]
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO `cap.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
......@@ -129,6 +128,5 @@ namespace DotNetCore.CAP.MySql.Test
Assert.Equal("MySqlStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}
}
}
}
\ No newline at end of file
using Xunit;
using Dapper;
using Dapper;
using Xunit;
namespace DotNetCore.CAP.MySql.Test
{
......@@ -9,7 +9,6 @@ namespace DotNetCore.CAP.MySql.Test
private readonly string _dbName;
private readonly string _masterDbConnectionString;
public MySqlStorageTest()
{
_dbName = ConnectionUtil.GetDatabaseName();
......@@ -29,36 +28,12 @@ namespace DotNetCore.CAP.MySql.Test
}
}
[Fact]
public void DatabaseTable_Published_IsExists()
{
var tableName = "cap.published";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}
[Fact]
public void DatabaseTable_Queue_IsExists()
{
var tableName = "cap.queue";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}
[Fact]
public void DatabaseTable_Received_IsExists()
[Theory]
[InlineData("cap.published")]
[InlineData("cap.queue")]
[InlineData("cap.received")]
public void DatabaseTable_IsExists(string tableName)
{
var tableName = "cap.received";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
......@@ -68,4 +43,4 @@ namespace DotNetCore.CAP.MySql.Test
}
}
}
}
}
\ No newline at end of file
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.MySql.Test
......
using System;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql.Test
{
public static class ConnectionUtil
{
private const string DatabaseVariable = "Cap_PostgreSql_DatabaseName";
private const string ConnectionStringTemplateVariable = "Cap_PostgreSql_ConnectionStringTemplate";
private const string MasterDatabaseName = "postgres";
private const string DefaultDatabaseName = @"DotNetCore.CAP.PostgreSql.Test";
private const string DefaultConnectionStringTemplate =
@"Server=localhost;Database={0};UserId=postgres;Password=123123;";
public static string GetDatabaseName()
{
return Environment.GetEnvironmentVariable(DatabaseVariable) ?? DefaultDatabaseName;
}
public static string GetMasterConnectionString()
{
return string.Format(GetConnectionStringTemplate(), MasterDatabaseName);
}
public static string GetConnectionString()
{
return string.Format(GetConnectionStringTemplate(), GetDatabaseName());
}
private static string GetConnectionStringTemplate()
{
return
Environment.GetEnvironmentVariable(ConnectionStringTemplateVariable) ??
DefaultConnectionStringTemplate;
}
public static NpgsqlConnection CreateConnection(string connectionString = null)
{
connectionString = connectionString ?? GetConnectionString();
var connection = new NpgsqlConnection(connectionString);
connection.Open();
return connection;
}
}
}
\ No newline at end of file
using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.PostgreSql.Test
{
public abstract class DatabaseTestHost : TestHost
{
private static bool _sqlObjectInstalled;
public static object _lock = new object();
protected override void PostBuildServices()
{
base.PostBuildServices();
lock (_lock)
{
if (!_sqlObjectInstalled)
{
InitializeDatabase();
}
}
}
public override void Dispose()
{
DeleteAllData();
base.Dispose();
}
private void InitializeDatabase()
{
using (CreateScope())
{
var storage = GetService<PostgreSqlStorage>();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).GetAwaiter().GetResult();
_sqlObjectInstalled = true;
}
}
private void CreateDatabase()
{
var masterConn = ConnectionUtil.GetMasterConnectionString();
var databaseName = ConnectionUtil.GetDatabaseName();
using (var connection = ConnectionUtil.CreateConnection(masterConn))
{
connection.Execute($@"
DROP DATABASE IF EXISTS ""{databaseName}"";
CREATE DATABASE ""{databaseName}"";");
}
}
private void DeleteAllData()
{
var conn = ConnectionUtil.GetConnectionString();
using (var connection = ConnectionUtil.CreateConnection(conn))
{
connection.Execute($@"
TRUNCATE TABLE ""cap"".""published"";
TRUNCATE TABLE ""cap"".""received"";
TRUNCATE TABLE ""cap"".""queue"";");
}
}
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("DotNetCore.CAP.EntityFrameworkCore.Test")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("7442c942-1ddc-40e4-8f1b-654e721eaa45")]
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment