Commit e7d99983 authored by Savorboard's avatar Savorboard Committed by GitHub

Merge pull request #46 from dotnetcore/dev_2.0

merge .net standard 2.0 to develop branch 
parents 54be9a9d 5c82ba1d
......@@ -33,4 +33,9 @@ bin/
/.idea/.idea.CAP
/.idea/.idea.CAP
/.idea
Properties
\ No newline at end of file
Properties
/pack.bat
/src/DotNetCore.CAP/project.json
/src/DotNetCore.CAP/packages.config
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj
/NuGet.config
......@@ -6,13 +6,13 @@ matrix:
include:
- os: linux
dist: trusty # Ubuntu 14.04
dotnet: 1.0.1
dotnet: 2.0.0
mono: none
env: DOTNETCORE=1
sudo: required
- os: osx
osx_image: xcode7.3 # macOS 10.11
dotnet: 1.0.1
osx_image: xcode8.3 # macOS 10.12
dotnet: 2.0.0
mono: none
env: DOTNETCORE=1
......

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.0
VisualStudioVersion = 15.0.26730.3
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
......@@ -22,15 +22,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
README.zh-cn.md = README.zh-cn.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE}"
ProjectSection(SolutionItems) = preProject
test\Shared\MessageManagerTestBase.cs = test\Shared\MessageManagerTestBase.cs
test\Shared\TestLogger.cs = test\Shared\TestLogger.cs
EndProjectSection
ProjectSection(FolderStartupServices) = postProject
{82A7F48D-3B50-4B1E-B82E-3ADA8210C358} = {82A7F48D-3B50-4B1E-B82E-3ADA8210C358}
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP", "src\DotNetCore.CAP\DotNetCore.CAP.csproj", "{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{3A6B6931-A123-477A-9469-8B468B5385AF}"
......@@ -63,10 +54,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql", "src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj", "{82C403AB-ED68-4084-9A1D-11334F9F08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.PostgreSql", "samples\Sample.RabbitMQ.PostgreSql\Sample.RabbitMQ.PostgreSql.csproj", "{A17E8E72-DFFC-4822-BB38-73D59A8B264E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.PostgreSql.Test", "test\DotNetCore.CAP.PostgreSql.Test\DotNetCore.CAP.PostgreSql.Test.csproj", "{7CA3625D-1817-4695-881D-7E79A1E1DED2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -116,12 +111,19 @@ Global
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A17E8E72-DFFC-4822-BB38-73D59A8B264E}.Release|Any CPU.Build.0 = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CA3625D-1817-4695-881D-7E79A1E1DED2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
......@@ -133,6 +135,8 @@ Global
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{A17E8E72-DFFC-4822-BB38-73D59A8B264E} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
......@@ -44,10 +44,13 @@ If your Message Queue is using RabbitMQ, you can:
PM> Install-Package DotNetCore.CAP.RabbitMQ
```
CAP provides EntityFramework as default database store extension (The MySQL version is under development)
CAP supported SqlServer, MySql, PostgreSql as message store extension
```
//Select a database provider you are using
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
```
### Configuration
......@@ -66,9 +69,11 @@ public void ConfigureServices(IServiceCollection services)
// If your SqlServer is using EF for data operations, you need to add the following configuration:
// Notice: You don't need to config x.UseSqlServer(""") again!
x.UseEntityFramework<AppDbContext>();
// If you are using Dapper,you need to add the config:
x.UseSqlServer("Your ConnectionStrings");
//x.UseMySql("Your ConnectionStrings");
//x.UsePostgreSql("Your ConnectionStrings");
// If your Message Queue is using RabbitMQ you need to add the config:
x.UseRabbitMQ("localhost");
......@@ -82,7 +87,7 @@ public void Configure(IApplicationBuilder app)
{
.....
app.UseCap();
app.UseCap();
}
```
......@@ -114,12 +119,12 @@ public class PublishController : Controller
[Route("~/checkAccountWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction())
{
using (var trans = dbContext.Database.BeginTransaction())
{
await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
trans.Commit();
}
}
return Ok();
}
}
......@@ -174,7 +179,7 @@ namespace xxx.Service
[CapSubscribe("xxx.services.account.check")]
public void CheckReceivedMessage(Person person)
{
}
}
}
......
version: '{build}'
os: Visual Studio 2017 Preview
os: Visual Studio 2017
environment:
BUILDING_ON_PLATFORM: win
BuildEnvironment: appveyor
Cap_SqlServer_ConnectionStringTemplate: Server=(local)\SQL2014;Database={0};User ID=sa;Password=Password12!
Cap_MySql_ConnectionStringTemplate: Server=localhost;Database={0};Uid=root;Pwd=Password12!
Cap_PostgreSql_ConnectionStringTemplate: Server=localhost;Database={0};UserId=postgres;Password=Password12!
services:
- mssql2014
- mysql
- postgresql
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1
......
dotnet --info
dotnet restore
dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp1.1
\ No newline at end of file
dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0
\ No newline at end of file
......@@ -20,6 +20,6 @@ Configuration: {Build.Configuration}
public static string CreateStamp()
{
var seconds = (long)(DateTime.UtcNow - new DateTime(2017, 1, 1)).TotalSeconds;
return seconds.ToString().PadLeft(11, (char)'0');
return seconds.ToString();
}
}
<Project>
<PropertyGroup>
<VersionMajor>1</VersionMajor>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionMajor>2</VersionMajor>
<VersionMinor>0</VersionMinor>
<VersionPatch>1</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
......@@ -10,7 +10,8 @@ namespace Sample.RabbitMQ.MySql
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;Uid=root;Pwd=123123;");
//optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
}
}
}
......@@ -23,7 +23,16 @@ namespace Sample.RabbitMQ.MySql.Controllers
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.kafka.sqlserver", "");
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
......@@ -34,6 +43,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
......@@ -41,10 +51,9 @@ namespace Sample.RabbitMQ.MySql.Controllers
[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString());
}
}
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702;1705;3277;</NoWarn>
<WarningsAsErrors>NU1605;MSB3277</WarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="1.1.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.0-rtm-10056" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Tools" Version="1.0.1" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
......@@ -18,7 +18,11 @@ namespace Sample.RabbitMQ.MySql
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("localhost:9092");
x.UseRabbitMQ(y => {
y.HostName = "192.168.2.206";
y.UserName = "admin";
y.Password = "123123";
});
});
services.AddMvc();
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.PostgreSql
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseNpgsql("Server=localhost;Database=Sample.RabbitMQ.PostgreSql;UserId=postgre;Password=123123;");
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.Kafka.Controllers
namespace Sample.RabbitMQ.PostgreSql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
public class ValuesController : Controller
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
{
_capBus = producer;
_dbContext = dbContext;
_capBus = capPublisher;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", "");
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok();
}
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
_capBus.Publish("sample.kafka.sqlserver4", DateTime.Now);
return Ok();
}
......@@ -30,18 +41,19 @@ namespace Sample.Kafka.Controllers
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver", Group = "test")]
public void KafkaTest()
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
}
}
}
\ No newline at end of file
}
using System.IO;
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Sample.Kafka
namespace Sample.RabbitMQ.PostgreSql
{
public class Program
{
public static void Main(string[] args)
{
var config = new ConfigurationBuilder()
.AddCommandLine(args)
.AddEnvironmentVariables("ASPNETCORE_")
.Build();
BuildWebHost(args).Run();
}
var host = new WebHostBuilder()
.UseConfiguration(config)
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
host.Run();
}
}
}
\ No newline at end of file
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Sample.RabbitMQ.PostgreSql
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseMvc();
}
}
}
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka
namespace Sample.RabbitMQ.SqlServer
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
//optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Sample.RabbitMQ.SqlServer.Controllers
{
public class Person
{
public string Name { get; set; }
public int Age { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
}
}
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}
[Route("~/publish")]
public IActionResult PublishMessage()
{
using(var trans = _dbContext.Database.BeginTransaction())
{
//_capBus.Publish("sample.rabbitmq.mysql22222", DateTime.Now);
_capBus.Publish("sample.rabbitmq.mysql33333", new Person { Name = "宜兴", Age = 11 });
trans.Commit();
}
return Ok();
}
[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}
[CapSubscribe("sample.rabbitmq.mysql33333")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;
_dbContext.Dispose();
Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}
//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}
[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}
[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}
\ No newline at end of file
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
namespace Sample.RabbitMQ.SqlServer
{
public class Program
{
//var config = new ConfigurationBuilder()
// .AddCommandLine(args)
// .AddEnvironmentVariables("ASPNETCORE_")
// .Build();
//var host = new WebHostBuilder()
// .UseConfiguration(config)
// .UseKestrel()
// .UseContentRoot(Directory.GetCurrentDirectory())
// .UseIISIntegration()
// .UseStartup<Startup>()
// .Build();
//host.Run();
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Tools" Version="1.0.1" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
......@@ -3,7 +3,7 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Sample.Kafka
namespace Sample.RabbitMQ.SqlServer
{
public class Startup
{
......@@ -14,7 +14,11 @@ namespace Sample.Kafka
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("localhost:9092");
x.UseRabbitMQ(y=> {
y.HostName = "192.168.2.206";
y.UserName = "admin";
y.Password = "123123";
});
});
services.AddMvc();
......
......@@ -21,7 +21,7 @@ namespace DotNetCore.CAP
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
}
}
}
\ No newline at end of file
......@@ -42,7 +42,7 @@ namespace DotNetCore.CAP
{
throw new ArgumentNullException(nameof(Servers));
}
MainConfig.Add("bootstrap.servers", Servers);
MainConfig["queue.buffering.max.ms"] = "10";
......
......@@ -3,11 +3,16 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName>
<PackageTags>$(PackageTags);Kafka</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<WarningsAsErrors>NU1605</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.11.0" />
</ItemGroup>
......
using System;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
......
......@@ -12,13 +12,15 @@ namespace DotNetCore.CAP.Kafka
private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
KafkaOptions options,
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
KafkaOptions kafkaOptions,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
: base(options, stateChanger, logger)
{
_logger = logger;
_kafkaOptions = options;
_kafkaOptions = kafkaOptions;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
......
......@@ -21,6 +21,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorage, MySqlStorage>();
services.AddScoped<IStorageConnection, MySqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
var mysqlOptions = new MySqlOptions();
......@@ -28,24 +29,17 @@ namespace DotNetCore.CAP
if (mysqlOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(mysqlOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
services.AddSingleton(x =>
{
var dbContext = (DbContext)x.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions;
});
}
else
{
services.AddSingleton(mysqlOptions);
}
services.AddSingleton(mysqlOptions);
}
#if NETSTANDARD1_6
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#else
private ServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#endif
}
}
\ No newline at end of file
......@@ -7,10 +7,11 @@ using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class CapPublisher : CapPublisherBase
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly ILogger _logger;
private readonly MySqlOptions _options;
......@@ -34,13 +35,17 @@ namespace DotNetCore.CAP.MySql
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = transaction.GetDbTransaction();
DbTranasaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
......@@ -53,19 +58,25 @@ namespace DotNetCore.CAP.MySql
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
#region private methods
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
{
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -3,20 +3,16 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;netstandard2.0;</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.MySql</AssemblyName>
<PackageTags>$(PackageTags);MySQL</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Debug|netstandard1.6|AnyCPU'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="MySqlConnector" Version="0.24.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="MySqlConnector" Version="0.25.1" />
</ItemGroup>
<ItemGroup>
......
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.MySql
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(
IServiceProvider provider,
......
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
......@@ -21,6 +21,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddScoped<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
var postgreSqlOptions = new PostgreSqlOptions();
......@@ -28,24 +29,17 @@ namespace DotNetCore.CAP
if (postgreSqlOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
services.AddSingleton(x =>
{
var dbContext = (DbContext)x.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
});
}
else
{
services.AddSingleton(postgreSqlOptions);
}
services.AddSingleton(postgreSqlOptions);
}
#if NETSTANDARD1_6
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#else
private ServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#endif
}
}
\ No newline at end of file
......@@ -2,15 +2,16 @@
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
......@@ -27,20 +28,24 @@ namespace DotNetCore.CAP.PostgreSql
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = transaction.GetDbTransaction();
DbTranasaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
......@@ -57,9 +62,21 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
{
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -3,19 +3,15 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;netstandard2.0;</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Debug|netstandard1.6|AnyCPU'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="Npgsql" Version="3.2.5" />
</ItemGroup>
......
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.PostgreSql
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private static readonly string[] Tables =
{
......@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.PostgreSql
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new { now = DateTime.Now, count = MaxBatch });
new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
......
......@@ -21,7 +21,10 @@ namespace DotNetCore.CAP
services.AddSingleton(options);
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<ConnectionPool>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
}
}
}
\ No newline at end of file
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private const int DefaultPoolSize = 15;
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();
private readonly Func<IConnection> _activator;
private int _maxSize;
private int _count;
public ConnectionPool(RabbitMQOptions options)
{
_maxSize = DefaultPoolSize;
_activator = CreateActivator(options);
}
private static Func<IConnection> CreateActivator(RabbitMQOptions options)
{
var factory = new ConnectionFactory()
{
HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
VirtualHost = options.VirtualHost,
RequestedConnectionTimeout = options.RequestedConnectionTimeout,
SocketReadTimeout = options.SocketReadTimeout,
SocketWriteTimeout = options.SocketWriteTimeout
};
return () => factory.CreateConnection();
}
public virtual IConnection Rent()
{
if (_pool.TryDequeue(out IConnection connection))
{
Interlocked.Decrement(ref _count);
Debug.Assert(_count >= 0);
return connection;
}
connection = _activator();
return connection;
}
public virtual bool Return(IConnection connection)
{
if (Interlocked.Increment(ref _count) <= _maxSize)
{
_pool.Enqueue(connection);
return true;
}
Interlocked.Decrement(ref _count);
Debug.Assert(_maxSize == 0 || _pool.Count <= _maxSize);
return false;
}
IConnection IConnectionPool.Rent() => Rent();
bool IConnectionPool.Return(IConnection connection) => Return(connection);
public void Dispose()
{
_maxSize = 0;
IConnection context;
while (_pool.TryDequeue(out context))
{
context.Dispose();
}
}
}
}
......@@ -3,7 +3,7 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName>
<PackageTags>$(PackageTags);RabbitMQ</PackageTags>
</PropertyGroup>
......
using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public interface IConnectionPool
{
IConnection Rent();
bool Return(IConnection context);
}
}
......@@ -3,7 +3,6 @@ using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
......@@ -11,34 +10,28 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly ConnectionPool _connectionPool;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
RabbitMQOptions options,
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ConnectionPool connectionPool,
RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
: base(options, stateChanger, logger)
{
_logger = logger;
_rabbitMQOptions = options;
_connectionPool = connectionPool;
_rabbitMQOptions = rabbitMQOptions;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
var connection = _connectionPool.Rent();
try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
......@@ -64,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ
Description = ex.Message
}));
}
finally
{
_connectionPool.Return(connection);
}
}
}
}
\ No newline at end of file
......@@ -14,8 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IConnectionFactory _connectionFactory;
private IConnection _connection;
private ConnectionPool _connectionPool;
private IModel _channel;
private ulong _deliveryTag;
......@@ -23,9 +22,12 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
public RabbitMQConsumerClient(string queueName,
ConnectionPool connectionPool,
RabbitMQOptions options)
{
_queueName = queueName;
_connectionPool = connectionPool;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;
......@@ -34,20 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient()
{
_connectionFactory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
var connection = _connectionPool.Rent();
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel = connection.CreateModel();
_channel.ExchangeDeclare(
exchange: _exchageName,
......@@ -60,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ
exclusive: false,
autoDelete: false,
arguments: arguments);
_connectionPool.Return(connection);
}
public void Subscribe(IEnumerable<string> topics)
......@@ -92,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ
public void Dispose()
{
_channel.Dispose();
_connection.Dispose();
}
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
......
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly ConnectionPool _connectionPool;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions)
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool)
{
_rabbitMQOptions = rabbitMQOptions;
_connectionPool = pool;
}
public IConsumerClient Create(string groupId)
{
return new RabbitMQConsumerClient(groupId, _rabbitMQOptions);
return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions);
}
}
}
\ No newline at end of file
......@@ -19,34 +19,36 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddScoped<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddTransient<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
AddSqlServerOptions(services);
}
private void AddSqlServerOptions(IServiceCollection services)
{
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);
if (sqlServerOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(sqlServerOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
});
}
else
{
services.AddSingleton(sqlServerOptions);
}
services.AddSingleton(sqlServerOptions);
}
#if NETSTANDARD1_6
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#else
private ServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#endif
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
......@@ -10,7 +11,7 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : CapPublisherBase
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
......@@ -34,13 +35,17 @@ namespace DotNetCore.CAP.SqlServer
protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
var dbContextTransaction = _dbContext.Database.CurrentTransaction;
var dbTrans = dbContextTransaction?.GetDbTransaction();
//DbTransaction is dispose in original
if (dbTrans?.Connection == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbContextTransaction?.Dispose();
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = transaction.GetDbTransaction();
DbTranasaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
......@@ -57,6 +62,14 @@ namespace DotNetCore.CAP.SqlServer
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
......@@ -64,7 +77,6 @@ namespace DotNetCore.CAP.SqlServer
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -3,19 +3,16 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;netstandard2.0;</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName>
<PackageTags>$(PackageTags);SQL Server</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Debug|netstandard1.6|AnyCPU'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.0" />
</ItemGroup>
<ItemGroup>
......
......@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.SqlServer
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private static readonly string[] Tables =
{
......
......@@ -2,7 +2,6 @@ using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
......
......@@ -7,52 +7,55 @@ using DotNetCore.CAP.Processor;
namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTranasaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
protected bool IsCapOpenedConn { get; set; }
protected bool IsUsingEF { get; set; }
protected IServiceProvider ServiceProvider { get; set; }
public void Publish<T>(string name, T contentObj)
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj);
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content, DbConnection, DbTranasaction);
PublishWithTrans(name, content);
}
public Task PublishAsync<T>(string name, T contentObj)
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();
var content = Serialize(contentObj);
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content, DbConnection, DbTranasaction);
return PublishWithTransAsync(name, content);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection,
string callbackName = null, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction);
PrepareConnectionForAdo(dbConnection, dbTransaction);
var content = Serialize(contentObj);
var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content, dbConnection, dbTransaction);
PublishWithTrans(name, content);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection,
string callbackName = null, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction);
PrepareConnectionForAdo(dbConnection, dbTransaction);
var content = Serialize(contentObj);
var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
return PublishWithTransAsync(name, content);
}
protected abstract void PrepareConnectionForEF();
......@@ -63,32 +66,29 @@ namespace DotNetCore.CAP.Abstractions
#region private methods
private string Serialize<T>(T obj)
private string Serialize<T>(T obj, string callbackName = null)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
var message = new Message(obj)
{
content = obj.ToString();
}
return content;
CallbackName = callbackName
};
return Helper.ToJson(message);
}
private void PrepareConnectionForAdo(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
private void PrepareConnectionForAdo(IDbConnection dbConnection, IDbTransaction dbTransaction)
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();
if (dbTransaction == null)
DbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
if (DbConnection.State != ConnectionState.Open)
{
IsCapOpenedConn = true;
DbConnection.Open();
}
DbTranasaction = dbTransaction;
if (DbTranasaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
DbTranasaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}
......@@ -107,7 +107,7 @@ namespace DotNetCore.CAP.Abstractions
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
private async Task PublishWithTransAsync(string name, string content)
{
var message = new CapPublishedMessage
{
......@@ -116,18 +116,14 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
await ExecuteAsync(dbConnection, dbTransaction, message);
await ExecuteAsync(DbConnection, DbTranasaction, message);
ClosedCap();
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
private void PublishWithTrans(string name, string content)
{
var message = new CapPublishedMessage
{
......@@ -136,17 +132,32 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
Execute(dbConnection, dbTransaction, message);
Execute(DbConnection, DbTranasaction, message);
ClosedCap();
PublishQueuer.PulseEvent.Set();
}
private void ClosedCap()
{
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
DbTranasaction.Commit();
DbTranasaction.Dispose();
}
PublishQueuer.PulseEvent.Set();
if (IsCapOpenedConn)
{
DbConnection.Dispose();
}
}
public void Dispose()
{
DbTranasaction?.Dispose();
DbConnection?.Dispose();
}
#endregion private methods
}
}
}
\ No newline at end of file
......@@ -11,10 +11,9 @@ namespace DotNetCore.CAP.Abstractions
/// <summary>
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with
/// <paramref name="provider"/>.
/// </summary>
/// <param name="provider"> <see cref="IServiceProvider"/>.</param>
/// </summary>
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns>
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider);
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates();
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
......
......@@ -20,27 +20,48 @@ namespace DotNetCore.CAP
/// </summary>
public const int DefaultQueueProcessorCount = 2;
/// <summary>
/// Default successed message expriation timespan, in seconds.
/// </summary>
public const int DefaultSuccessMessageExpirationAfter = 3600;
/// <summary>
/// Failed message retry waiting interval.
/// </summary>
public const int DefaultFailedMessageWaitingInterval = 180;
public CapOptions()
{
PollingDelay = DefaultPollingDelay;
QueueProcessorCount = DefaultQueueProcessorCount;
SuccessedMessageExpiredAfter = DefaultSuccessMessageExpirationAfter;
FailedMessageWaitingInterval = DefaultFailedMessageWaitingInterval;
Extensions = new List<ICapOptionsExtension>();
}
/// <summary>
/// Productor job polling delay time. Default is 15 sec.
/// Productor job polling delay time.
/// Default is 15 sec.
/// </summary>
public int PollingDelay { get; set; }
/// <summary>
/// Gets or sets the messages queue (Cap.Queue table) processor count.
/// Default is 2 processor.
/// </summary>
public int QueueProcessorCount { get; set; }
/// <summary>
/// Failed messages polling delay time. Default is 3 min.
/// Sent or received successed message after timespan of due, then the message will be deleted at due time.
/// Dafault is 3600 seconds.
/// </summary>
public int SuccessedMessageExpiredAfter { get; set; }
/// <summary>
/// Failed messages polling delay time.
/// Default is 180 seconds.
/// </summary>
public int FailedMessageWaitingInterval { get; set; } = (int)TimeSpan.FromMinutes(3).TotalSeconds;
public int FailedMessageWaitingInterval { get; set; }
/// <summary>
/// We’ll invoke this call-back with message type,name,content when requeue failed message.
......
......@@ -67,27 +67,20 @@ namespace Microsoft.Extensions.DependencyInjection
private static void AddSubscribeServices(IServiceCollection services)
{
var consumerListenerServices = new Dictionary<Type, Type>();
var consumerListenerServices = new List<KeyValuePair<Type, Type>>();
foreach (var rejectedServices in services)
{
if (rejectedServices.ImplementationType != null
&& typeof(ICapSubscribe).IsAssignableFrom(rejectedServices.ImplementationType))
consumerListenerServices.Add(typeof(ICapSubscribe), rejectedServices.ImplementationType);
{
consumerListenerServices.Add(new KeyValuePair<Type, Type>(typeof(ICapSubscribe),
rejectedServices.ImplementationType));
}
}
foreach (var service in consumerListenerServices)
{
services.AddSingleton(service.Key, service.Value);
}
var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types)
{
if (Helper.IsController(type.GetTypeInfo()))
{
services.AddSingleton(typeof(object), type);
}
services.AddTransient(service.Key, service.Value);
}
}
}
......
......@@ -3,28 +3,21 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;netstandard2.0;</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP</AssemblyName>
<PackageTags>$(PackageTags);</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.6'">
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0-*" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0-*" />
</ItemGroup>
</Project>
......@@ -25,14 +25,13 @@ namespace DotNetCore.CAP
IOptions<CapOptions> options,
IStorage storage,
IApplicationLifetime appLifetime,
IServiceProvider provider)
IEnumerable<IProcessingServer> servers)
{
_logger = logger;
_appLifetime = appLifetime;
Options = options.Value;
Storage = storage;
Provider = provider;
Servers = Provider.GetServices<IProcessingServer>();
Servers = servers;
_cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() =>
......@@ -55,8 +54,6 @@ namespace DotNetCore.CAP
protected IEnumerable<IProcessingServer> Servers { get; }
public IServiceProvider Provider { get; private set; }
public Task BootstrapAsync()
{
return (_bootstrappingTask = BootstrapTaskAsync());
......
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface ICallbackPublisher
{
Task PublishAsync(CapPublishedMessage obj);
}
}
......@@ -18,7 +18,8 @@ namespace DotNetCore.CAP
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
Task PublishAsync<T>(string name, T contentObj);
/// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null);
/// <summary>
/// (EntityFramework) Publish a object message.
......@@ -30,24 +31,27 @@ namespace DotNetCore.CAP
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
void Publish<T>(string name, T contentObj);
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null);
/// <summary>
/// (ado.net) Asynchronous publish a object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null);
/// <summary>
/// (ado.net) Publish a object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null);
}
}
\ No newline at end of file
......@@ -47,7 +47,7 @@ namespace DotNetCore.CAP
public void Start()
{
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped();
foreach (var matchGroup in groupingMatchs)
{
......
......@@ -10,12 +10,16 @@ namespace DotNetCore.CAP
{
public abstract class BasePublishQueueExecutor : IQueueExecutor
{
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
protected BasePublishQueueExecutor(IStateChanger stateChanger,
protected BasePublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger)
{
_options = options;
_stateChanger = stateChanger;
_logger = logger;
}
......@@ -54,7 +58,7 @@ namespace DotNetCore.CAP
}
else
{
newState = new SucceededState();
newState = new SucceededState(_options.SuccessedMessageExpiredAfter);
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
......
......@@ -15,16 +15,18 @@ namespace DotNetCore.CAP
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;
public SubscibeQueueExecutor(
IStateChanger stateChanger,
MethodMatcherCache selector,
CapOptions options,
IConsumerInvokerFactory consumerInvokerFactory,
ILogger<BasePublishQueueExecutor> logger)
{
_selector = selector;
_options = options;
_consumerInvokerFactory = consumerInvokerFactory;
_stateChanger = stateChanger;
_logger = logger;
......@@ -62,7 +64,7 @@ namespace DotNetCore.CAP
}
else
{
newState = new SucceededState();
newState = new SucceededState(_options.SuccessedMessageExpiredAfter);
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
......
This diff is collapsed.
......@@ -24,7 +24,8 @@ namespace DotNetCore.CAP.Internal
{
var context = new ConsumerInvokerContext(consumerContext)
{
Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinderFactory, consumerContext)
Result = new DefaultConsumerInvoker(_logger, _serviceProvider,
_modelBinderFactory, consumerContext)
};
return context.Result;
......
using System.Collections.Generic;
namespace DotNetCore.CAP.Internal
{
public class ConsumerMethodExecutor
{
public static object[] PrepareArguments(
IDictionary<string, object> actionParameters,
ObjectMethodExecutor actionMethodExecutor)
{
var declaredParameterInfos = actionMethodExecutor.MethodParameters;
var count = declaredParameterInfos.Length;
if (count == 0)
{
return null;
}
var arguments = new object[count];
for (var index = 0; index < count; index++)
{
var parameterInfo = declaredParameterInfos[index];
object value;
if (!actionParameters.TryGetValue(parameterInfo.Name, out value))
{
value = actionMethodExecutor.GetDefaultValueForParameter(index);
}
arguments[index] = value;
}
return arguments;
}
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
......@@ -21,49 +24,106 @@ namespace DotNetCore.CAP.Internal
{
_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger;
_consumerContext = consumerContext;
_consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo,
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
}
public async Task InvokeAsync()
{
using (_logger.BeginScope("consumer invoker begin"))
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
using (var scope = _serviceProvider.CreateScope())
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
var provider = scope.ServiceProvider;
var serviceType = _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType();
var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType);
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var jsonConent = _consumerContext.DeliverMessage.Content;
var message = Helper.FromJson<Message>(jsonConent);
var value = _consumerContext.DeliverMessage.Content;
object result = null;
if (_executor.MethodParameters.Length > 0)
{
var firstParameter = _executor.MethodParameters[0];
try
result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
}
else
{
result = await ExecuteAsync(obj);
}
if (!string.IsNullOrEmpty(message.CallbackName))
{
await SentCallbackMessage(message.Id, message.CallbackName, result);
}
}
}
private async Task<object> ExecuteAsync(object @class)
{
if (_executor.IsMethodAsync)
{
return await _executor.ExecuteAsync(@class);
}
else
{
return _executor.Execute(@class);
}
}
private async Task<object> ExecuteWithParameterAsync(object @class, string parameterString)
{
var firstParameter = _executor.MethodParameters[0];
try
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var bindResult = await binder.BindModelAsync(parameterString);
if (bindResult.IsSuccess)
{
if (_executor.IsMethodAsync)
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var result = await binder.BindModelAsync(value);
if (result.IsSuccess)
{
_executor.Execute(obj, result.Model);
}
else
{
_logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + value);
}
return await _executor.ExecuteAsync(@class, bindResult.Model);
}
catch (FormatException ex)
else
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex);
return _executor.Execute(@class, bindResult.Model);
}
}
else
{
_executor.Execute(obj);
throw new MethodBindException($"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} ");
}
}
catch (FormatException ex)
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, parameterString, ex);
return null;
}
}
private async Task SentCallbackMessage(string messageId, string topicName, object bodyObj)
{
var callbackMessage = new Message
{
Id = messageId,
Content = bodyObj
};
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var publisher = provider.GetRequiredService<ICallbackPublisher>();
var publishedMessage = new CapPublishedMessage
{
Name = topicName,
Content = Helper.ToJson(callbackMessage),
StatusName = StatusName.Scheduled
};
await publisher.PublishAsync(publishedMessage);
}
}
}
}
\ No newline at end of file
......@@ -33,13 +33,13 @@ namespace DotNetCore.CAP.Internal
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
}
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider)
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates()
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(provider));
executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider));
executorDescriptorList.AddRange(FindConsumersFromControllerTypes(provider));
executorDescriptorList.AddRange(FindConsumersFromControllerTypes(_serviceProvider));
return executorDescriptorList;
}
......@@ -48,35 +48,38 @@ namespace DotNetCore.CAP.Internal
IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
var consumerServices = provider.GetServices<ICapSubscribe>();
foreach (var service in consumerServices)
using (var scoped = provider.CreateScope())
{
var typeInfo = service.GetType().GetTypeInfo();
if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo))
var scopedProvider = scoped.ServiceProvider;
var consumerServices = scopedProvider.GetServices<ICapSubscribe>();
foreach (var service in consumerServices)
{
continue;
}
var typeInfo = service.GetType().GetTypeInfo();
if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo))
{
continue;
}
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}
return executorDescriptorList;
}
return executorDescriptorList;
}
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes(
IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
// at cap startup time, find all Controller into the DI container,the type is object.
var controllers = provider.GetServices<object>();
foreach (var controller in controllers)
{
var typeInfo = controller.GetType().GetTypeInfo();
//double check
if (!Helper.IsController(typeInfo)) continue;
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types)
{
var typeInfo = type.GetTypeInfo();
if (Helper.IsController(typeInfo))
{
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}
}
return executorDescriptorList;
......
......@@ -20,7 +20,9 @@ namespace DotNetCore.CAP.Internal
try
{
var type = _parameterInfo.ParameterType;
var value = Helper.FromJson(content, type);
return Task.FromResult(ModelBindingResult.Success(value));
}
catch (Exception)
......
using System;
namespace DotNetCore.CAP.Internal
{
[Serializable]
public class MethodBindException : Exception
{
public MethodBindException()
{
}
public MethodBindException(string message) : base(message)
{
}
public MethodBindException(string message, Exception inner) : base(message, inner)
{
}
}
}
\ No newline at end of file
......@@ -22,12 +22,11 @@ namespace DotNetCore.CAP.Internal
/// Get a dictionary of candidates.In the dictionary,
/// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates
/// </summary>
/// <param name="provider"><see cref="IServiceProvider"/></param>
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped(IServiceProvider provider)
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped()
{
if (Entries.Count != 0) return Entries;
var executorCollection = _selector.SelectCandidates(provider);
var executorCollection = _selector.SelectCandidates();
var groupedCandidates = executorCollection.GroupBy(x => x.Attribute.Group);
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
namespace DotNetCore.CAP.Internal
{
/// <summary>
/// Provides access to the combined list of attributes associated a <see cref="Type"/> or property.
/// </summary>
public class ModelAttributes
{
/// <summary>
/// Creates a new <see cref="ModelAttributes"/> for a <see cref="Type"/>.
/// </summary>
/// <param name="typeAttributes">The set of attributes for the <see cref="Type"/>.</param>
public ModelAttributes(IEnumerable<object> typeAttributes)
{
Attributes = typeAttributes?.ToArray() ?? throw new ArgumentNullException(nameof(typeAttributes));
TypeAttributes = Attributes;
}
/// <summary>
/// Creates a new <see cref="ModelAttributes"/> for a property.
/// </summary>
/// <param name="propertyAttributes">The set of attributes for the property.</param>
/// <param name="typeAttributes">
/// The set of attributes for the property's <see cref="Type"/>. See <see cref="PropertyInfo.PropertyType"/>.
/// </param>
public ModelAttributes(IEnumerable<object> propertyAttributes, IEnumerable<object> typeAttributes)
{
PropertyAttributes = propertyAttributes?.ToArray()
?? throw new ArgumentNullException(nameof(propertyAttributes));
TypeAttributes = typeAttributes?.ToArray()
?? throw new ArgumentNullException(nameof(typeAttributes));
Attributes = PropertyAttributes.Concat(TypeAttributes).ToArray();
}
/// <summary>
/// Gets the set of all attributes. If this instance represents the attributes for a property, the attributes
/// on the property definition are before those on the property's <see cref="Type"/>.
/// </summary>
public IReadOnlyList<object> Attributes { get; }
/// <summary>
/// Gets the set of attributes on the property, or <c>null</c> if this instance represents the attributes
/// for a <see cref="Type"/>.
/// </summary>
public IReadOnlyList<object> PropertyAttributes { get; }
/// <summary>
/// Gets the set of attributes on the <see cref="Type"/>. If this instance represents a property,
/// then <see cref="TypeAttributes"/> contains attributes retrieved from
/// <see cref="PropertyInfo.PropertyType"/>.
/// </summary>
public IReadOnlyList<object> TypeAttributes { get; }
/// <summary>
/// Gets the attributes for the given <paramref name="property"/>.
/// </summary>
/// <param name="type">The <see cref="Type"/> in which caller found <paramref name="property"/>.
/// </param>
/// <param name="property">A <see cref="PropertyInfo"/> for which attributes need to be resolved.
/// </param>
/// <returns>A <see cref="ModelAttributes"/> instance with the attributes of the property.</returns>
public static ModelAttributes GetAttributesForProperty(Type type, PropertyInfo property)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}
if (property == null)
{
throw new ArgumentNullException(nameof(property));
}
var propertyAttributes = property.GetCustomAttributes();
var typeAttributes = property.PropertyType.GetTypeInfo().GetCustomAttributes();
return new ModelAttributes(propertyAttributes, typeAttributes);
}
/// <summary>
/// Gets the attributes for the given <paramref name="type"/>.
/// </summary>
/// <param name="type">The <see cref="Type"/> for which attributes need to be resolved.
/// </param>
/// <returns>A <see cref="ModelAttributes"/> instance with the attributes of the <see cref="Type"/>.</returns>
public static ModelAttributes GetAttributesForType(Type type)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}
var attributes = type.GetTypeInfo().GetCustomAttributes();
return new ModelAttributes(attributes);
}
}
}
\ No newline at end of file
This diff is collapsed.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
namespace Microsoft.Extensions.Internal
{
internal struct AwaitableInfo
{
public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
public MethodInfo AwaiterGetResultMethod { get; }
public MethodInfo AwaiterOnCompletedMethod { get; }
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; }
public Type ResultType { get; }
public MethodInfo GetAwaiterMethod { get; }
public AwaitableInfo(
Type awaiterType,
PropertyInfo awaiterIsCompletedProperty,
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{
AwaiterType = awaiterType;
AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
AwaiterGetResultMethod = awaiterGetResultMethod;
AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
ResultType = resultType;
GetAwaiterMethod = getAwaiterMethod;
}
public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo)
{
// Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347
// Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
if (getAwaiterMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
var awaiterType = getAwaiterMethod.ReturnType;
// Awaiter must have property matching "bool IsCompleted { get; }"
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p =>
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase)
&& p.PropertyType == typeof(bool)
&& p.GetMethod != null);
if (isCompletedProperty == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
// Awaiter must implement INotifyCompletion
var awaiterInterfaces = awaiterType.GetInterfaces();
var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion));
if (!implementsINotifyCompletion)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
// INotifyCompletion supplies a method matching "void OnCompleted(Action action)"
var iNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(INotifyCompletion));
var onCompletedMethod = iNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
// Awaiter optionally implements ICriticalNotifyCompletion
var implementsICriticalNotifyCompletion = awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion));
MethodInfo unsafeOnCompletedMethod;
if (implementsICriticalNotifyCompletion)
{
// ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)"
var iCriticalNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(ICriticalNotifyCompletion));
unsafeOnCompletedMethod = iCriticalNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
}
else
{
unsafeOnCompletedMethod = null;
}
// Awaiter must have method matching "void GetResult" or "T GetResult()"
var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetResult")
&& m.GetParameters().Length == 0);
if (getResultMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
}
}
}
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq.Expressions;
namespace Microsoft.Extensions.Internal
{
internal struct CoercedAwaitableInfo
{
public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }
public Type CoercerResultType { get; }
public bool RequiresCoercion => CoercerExpression != null;
public CoercedAwaitableInfo(AwaitableInfo awaitableInfo)
{
AwaitableInfo = awaitableInfo;
CoercerExpression = null;
CoercerResultType = null;
}
public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType, AwaitableInfo coercedAwaitableInfo)
{
CoercerExpression = coercerExpression;
CoercerResultType = coercerResultType;
AwaitableInfo = coercedAwaitableInfo;
}
public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info)
{
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo))
{
info = new CoercedAwaitableInfo(directlyAwaitableInfo);
return true;
}
// It's not directly awaitable, but maybe we can coerce it.
// Currently we support coercing FSharpAsync<T>.
if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
out var coercerExpression,
out var coercerResultType))
{
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo))
{
info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
return true;
}
}
info = default(CoercedAwaitableInfo);
return false;
}
}
}
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Runtime.CompilerServices;
namespace Microsoft.Extensions.Internal
{
/// <summary>
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync"/> can
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal struct ObjectMethodExecutorAwaitable
{
private readonly object _customAwaitable;
private readonly Func<object, object> _getAwaiterMethod;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;
// Perf note: since we're requiring the customAwaitable to be supplied here as an object,
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.
public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaitable = customAwaitable;
_getAwaiterMethod = getAwaiterMethod;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}
public Awaiter GetAwaiter()
{
var customAwaiter = _getAwaiterMethod(_customAwaitable);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod, _unsafeOnCompletedMethod);
}
public struct Awaiter : ICriticalNotifyCompletion
{
private readonly object _customAwaiter;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;
public Awaiter(
object customAwaiter,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaiter = customAwaiter;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}
public bool IsCompleted => _isCompletedMethod(_customAwaiter);
public object GetResult() => _getResultMethod(_customAwaiter);
public void OnCompleted(Action continuation)
{
_onCompletedMethod(_customAwaiter, continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
// If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted.
// If not, fall back on using its OnCompleted.
//
// Why this is safe:
// - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it
// needs the execution context to be preserved (which it signals by calling OnCompleted), or
// that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not*
// to preserve and restore the context, so we prefer that where possible.
// - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted,
// there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen
// if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to
// pass the call on to the underlying awaitable's OnCompleted method.
var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod;
underlyingMethodToUse(_customAwaiter, continuation);
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Extensions.Internal
{
/// <summary>
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
/// an <see cref="Expression"/> for mapping instances of that type to a C# awaitable.
/// </summary>
/// <remarks>
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
/// to FSharp types have to be constructed dynamically at runtime.
/// </remarks>
internal static class ObjectMethodExecutorFSharpSupport
{
private static object _fsharpValuesCacheLock = new object();
private static Assembly _fsharpCoreAssembly;
private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod;
private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;
public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
Type possibleFSharpAsyncType,
out Expression coerceToAwaitableExpression,
out Type awaitableType)
{
var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
? possibleFSharpAsyncType.GetGenericTypeDefinition()
: null;
if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
{
coerceToAwaitableExpression = null;
awaitableType = null;
return false;
}
var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single();
awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType);
// coerceToAwaitableExpression = (object fsharpAsync) =>
// {
// return (object)FSharpAsync.StartAsTask<TResult>(
// (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);
return true;
}
private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType)
{
var typeFullName = possibleFSharpAsyncGenericType?.FullName;
if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal))
{
return false;
}
lock (_fsharpValuesCacheLock)
{
if (_fsharpCoreAssembly != null)
{
// Since we've already found the real FSharpAsync.Core assembly, we just have
// to check that the supplied FSharpAsync`1 type is the one from that assembly.
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly;
}
else
{
// We'll keep trying to find the FSharp types/values each time any type called
// FSharpAsync`1 is supplied.
return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType);
}
}
}
private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType)
{
var assembly = possibleFSharpAsyncGenericType.Assembly;
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1");
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync");
if (fsharpOptionType == null || fsharpAsyncType == null)
{
return false;
}
// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetTypeInfo()
.GetRuntimeProperty("None");
// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetTypeInfo()
.GetRuntimeProperty("None");
// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
{
var parameters = candidateMethodInfo.GetParameters();
if (parameters.Length == 3
&& TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType)
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType
&& parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
// This really does look like the correct method (and hence assembly).
_fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
_fsharpCoreAssembly = assembly;
break;
}
}
return _fsharpCoreAssembly != null;
}
private static bool TypesHaveSameIdentity(Type type1, Type type2)
{
return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal);
}
}
}
using System;
namespace DotNetCore.CAP.Models
{
public class Message
{
public string Id { get; set; }
public DateTime Timestamp { get; set; }
public object Content { get; set; }
public string CallbackName { get; set; }
public Message()
{
Id = ObjectId.GenerateNewStringId();
Timestamp = DateTime.Now;
}
public Message(object content) : this()
{
Content = content;
}
}
}
\ No newline at end of file
......@@ -7,10 +7,20 @@ namespace DotNetCore.CAP.Processor.States
{
public const string StateName = "Succeeded";
public TimeSpan? ExpiresAfter => TimeSpan.FromHours(1);
public TimeSpan? ExpiresAfter { get; private set; }
public string Name => StateName;
public SucceededState()
{
ExpiresAfter = TimeSpan.FromHours(1);
}
public SucceededState(int ExpireAfterSeconds)
{
ExpiresAfter = TimeSpan.FromSeconds(ExpireAfterSeconds);
}
public void Apply(CapPublishedMessage message, IStorageTransaction transaction)
{
}
......
using System.Data;
using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore;
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>DotNetCore.CAP.MySql.Test</AssemblyName>
<PackageId>DotNetCore.CAP.MySql.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net451+win8</PackageTargetFallback>
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Shared\*.cs" Exclude="bin\**;obj\**;**\*.xproj;packages\**" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
......@@ -24,23 +15,19 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="MySqlConnector" Version="0.24.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="1.1.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="MySqlConnector" Version="0.25.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0-beta3-build3705" />
<PackageReference Include="xunit" Version="2.3.0-beta3-build3705" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Moq" Version="4.7.99" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
</Project>
\ No newline at end of file
......@@ -85,7 +85,6 @@ namespace DotNetCore.CAP.MySql.Test
[Fact]
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO `cap.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
......@@ -129,6 +128,5 @@ namespace DotNetCore.CAP.MySql.Test
Assert.Equal("MySqlStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}
}
}
}
\ No newline at end of file
using Xunit;
using Dapper;
using Dapper;
using Xunit;
namespace DotNetCore.CAP.MySql.Test
{
......@@ -9,7 +9,6 @@ namespace DotNetCore.CAP.MySql.Test
private readonly string _dbName;
private readonly string _masterDbConnectionString;
public MySqlStorageTest()
{
_dbName = ConnectionUtil.GetDatabaseName();
......@@ -29,36 +28,12 @@ namespace DotNetCore.CAP.MySql.Test
}
}
[Fact]
public void DatabaseTable_Published_IsExists()
{
var tableName = "cap.published";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}
[Fact]
public void DatabaseTable_Queue_IsExists()
{
var tableName = "cap.queue";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}
[Fact]
public void DatabaseTable_Received_IsExists()
[Theory]
[InlineData("cap.published")]
[InlineData("cap.queue")]
[InlineData("cap.received")]
public void DatabaseTable_IsExists(string tableName)
{
var tableName = "cap.received";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
......@@ -68,4 +43,4 @@ namespace DotNetCore.CAP.MySql.Test
}
}
}
}
}
\ No newline at end of file
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.MySql.Test
......
using System;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql.Test
{
public static class ConnectionUtil
{
private const string DatabaseVariable = "Cap_PostgreSql_DatabaseName";
private const string ConnectionStringTemplateVariable = "Cap_PostgreSql_ConnectionStringTemplate";
private const string MasterDatabaseName = "postgres";
private const string DefaultDatabaseName = @"DotNetCore.CAP.PostgreSql.Test";
private const string DefaultConnectionStringTemplate =
@"Server=localhost;Database={0};UserId=postgres;Password=123123;";
public static string GetDatabaseName()
{
return Environment.GetEnvironmentVariable(DatabaseVariable) ?? DefaultDatabaseName;
}
public static string GetMasterConnectionString()
{
return string.Format(GetConnectionStringTemplate(), MasterDatabaseName);
}
public static string GetConnectionString()
{
return string.Format(GetConnectionStringTemplate(), GetDatabaseName());
}
private static string GetConnectionStringTemplate()
{
return
Environment.GetEnvironmentVariable(ConnectionStringTemplateVariable) ??
DefaultConnectionStringTemplate;
}
public static NpgsqlConnection CreateConnection(string connectionString = null)
{
connectionString = connectionString ?? GetConnectionString();
var connection = new NpgsqlConnection(connectionString);
connection.Open();
return connection;
}
}
}
\ No newline at end of file
using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.PostgreSql.Test
{
public abstract class DatabaseTestHost : TestHost
{
private static bool _sqlObjectInstalled;
public static object _lock = new object();
protected override void PostBuildServices()
{
base.PostBuildServices();
lock (_lock)
{
if (!_sqlObjectInstalled)
{
InitializeDatabase();
}
}
}
public override void Dispose()
{
DeleteAllData();
base.Dispose();
}
private void InitializeDatabase()
{
using (CreateScope())
{
var storage = GetService<PostgreSqlStorage>();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).GetAwaiter().GetResult();
_sqlObjectInstalled = true;
}
}
private void CreateDatabase()
{
var masterConn = ConnectionUtil.GetMasterConnectionString();
var databaseName = ConnectionUtil.GetDatabaseName();
using (var connection = ConnectionUtil.CreateConnection(masterConn))
{
connection.Execute($@"
DROP DATABASE IF EXISTS ""{databaseName}"";
CREATE DATABASE ""{databaseName}"";");
}
}
private void DeleteAllData()
{
var conn = ConnectionUtil.GetConnectionString();
using (var connection = ConnectionUtil.CreateConnection(conn))
{
connection.Execute($@"
TRUNCATE TABLE ""cap"".""published"";
TRUNCATE TABLE ""cap"".""received"";
TRUNCATE TABLE ""cap"".""queue"";");
}
}
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
<PackageReference Include="Npgsql" Version="3.2.5" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Xunit;
namespace DotNetCore.CAP.PostgreSql.Test
{
[Collection("postgresql")]
public class PostgreSqlStorageConnectionTest : DatabaseTestHost
{
private PostgreSqlStorageConnection _storage;
public PostgreSqlStorageConnectionTest()
{
var options = GetService<PostgreSqlOptions>();
_storage = new PostgreSqlStorageConnection(options);
}
[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = @"INSERT INTO ""cap"".""published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
var publishMessage = new CapPublishedMessage
{
Name = "PostgreSqlStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
Assert.Equal(StatusName.Scheduled, message.StatusName);
}
[Fact]
public async Task FetchNextMessageAsync_Test()
{
var sql = @"INSERT INTO ""cap"".""queue""(""MessageId"",""MessageType"") VALUES(@MessageId,@MessageType);";
var queue = new CapQueue
{
MessageId = 3333,
MessageType = MessageType.Publish
};
using (var connection = ConnectionUtil.CreateConnection())
{
connection.Execute(sql, queue);
}
var fetchedMessage = await _storage.FetchNextMessageAsync();
fetchedMessage.Dispose();
Assert.NotNull(fetchedMessage);
Assert.Equal(MessageType.Publish, fetchedMessage.MessageType);
Assert.Equal(3333, fetchedMessage.MessageId);
}
[Fact]
public async Task StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
}
catch (Exception ex)
{
exception = ex;
}
Assert.Null(exception);
}
[Fact]
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO ""cap"".""received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
var receivedMessage = new CapReceivedMessage
{
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
}
var message = await _storage.GetReceivedMessageAsync(insertedId);
Assert.NotNull(message);
Assert.Equal(StatusName.Scheduled, message.StatusName);
Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}
[Fact]
public async Task GetNextReceviedMessageToBeEnqueuedAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Name = "PostgreSqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
await _storage.StoreReceivedMessageAsync(receivedMessage);
var message = await _storage.GetNextReceviedMessageToBeEnqueuedAsync();
Assert.NotNull(message);
Assert.Equal(StatusName.Scheduled, message.StatusName);
Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}
}
}
\ No newline at end of file
using Dapper;
using Xunit;
namespace DotNetCore.CAP.PostgreSql.Test
{
[Collection("postgresql")]
public class SqlServerStorageTest : DatabaseTestHost
{
private readonly string _dbName;
private readonly string _masterDbConnectionString;
private readonly string _dbConnectionString;
public SqlServerStorageTest()
{
_dbName = ConnectionUtil.GetDatabaseName();
_masterDbConnectionString = ConnectionUtil.GetMasterConnectionString();
_dbConnectionString = ConnectionUtil.GetConnectionString();
}
[Fact]
public void Database_IsExists()
{
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var databaseName = ConnectionUtil.GetDatabaseName();
var sql = $@"select * from pg_database where datname = '{databaseName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.True(databaseName.Equals(result, System.StringComparison.CurrentCultureIgnoreCase));
}
}
[Theory]
[InlineData("cap.published")]
[InlineData("cap.queue")]
[InlineData("cap.received")]
public void DatabaseTable_IsExists(string tableName)
{
using (var connection = ConnectionUtil.CreateConnection(_dbConnectionString))
{
var sql = $"SELECT to_regclass('{tableName}') is not null;";
var result = connection.QueryFirstOrDefault<bool>(sql);
Assert.True(result);
}
}
}
}
\ No newline at end of file
using System;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.PostgreSql.Test
{
public abstract class TestHost : IDisposable
{
protected IServiceCollection _services;
protected string _connectionString;
private IServiceProvider _provider;
private IServiceProvider _scopedProvider;
public TestHost()
{
CreateServiceCollection();
PreBuildServices();
BuildServices();
PostBuildServices();
}
protected IServiceProvider Provider => _scopedProvider ?? _provider;
private void CreateServiceCollection()
{
var services = new ServiceCollection();
services.AddOptions();
services.AddLogging();
_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new PostgreSqlOptions { ConnectionString = _connectionString });
services.AddSingleton<PostgreSqlStorage>();
_services = services;
}
protected virtual void PreBuildServices()
{
}
private void BuildServices()
{
_provider = _services.BuildServiceProvider();
}
protected virtual void PostBuildServices()
{
}
public IDisposable CreateScope()
{
var scope = CreateScope(_provider);
var loc = scope.ServiceProvider;
_scopedProvider = loc;
return new DelegateDisposable(() =>
{
if (_scopedProvider == loc)
{
_scopedProvider = null;
}
scope.Dispose();
});
}
public IServiceScope CreateScope(IServiceProvider provider)
{
var scope = provider.GetService<IServiceScopeFactory>().CreateScope();
return scope;
}
public T GetService<T>() => Provider.GetService<T>();
public T Ensure<T>(ref T service)
where T : class
=> service ?? (service = GetService<T>());
public virtual void Dispose()
{
(_provider as IDisposable)?.Dispose();
}
private class DelegateDisposable : IDisposable
{
private Action _dispose;
public DelegateDisposable(Action dispose)
{
_dispose = dispose;
}
public void Dispose()
{
_dispose();
}
}
}
}
\ No newline at end of file
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore;
......@@ -54,21 +55,20 @@ CREATE DATABASE [{databaseName}];");
private void DeleteAllData()
{
using (CreateScope())
var conn = ConnectionUtil.GetConnectionString();
using (var connection = new SqlConnection(conn))
{
var context = GetService<TestDbContext>();
var commands = new[]
{
var commands = new[] {
"DISABLE TRIGGER ALL ON ?",
"ALTER TABLE ? NOCHECK CONSTRAINT ALL",
"DELETE FROM ?",
"ALTER TABLE ? CHECK CONSTRAINT ALL",
"ENABLE TRIGGER ALL ON ?"
};
foreach (var command in commands)
{
context.Database.GetDbConnection().Execute(
connection.Execute(
"sp_MSforeachtable",
new { command1 = command },
commandType: CommandType.StoredProcedure);
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>DotNetCore.CAP.SqlServer.Test</AssemblyName>
<PackageId>DotNetCore.CAP.SqlServer.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net451+win8</PackageTargetFallback>
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<TargetFramework>netcoreapp2.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Shared\*.cs" Exclude="bin\**;obj\**;**\*.xproj;packages\**" />
</ItemGroup>
<ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="1.1.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.3.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="1.1.2" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0-beta3-build3705" />
<PackageReference Include="xunit" Version="2.3.0-beta3-build3705" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Moq" Version="4.7.99" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.0" />
</ItemGroup>
</Project>
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("DotNetCore.CAP.EntityFrameworkCore.Test")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("7442c942-1ddc-40e4-8f1b-654e721eaa45")]
\ No newline at end of file
......@@ -85,7 +85,6 @@ namespace DotNetCore.CAP.SqlServer.Test
[Fact]
public async Task GetReceivedMessageAsync_Test()
{
var sql = $@"
INSERT INTO [Cap].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) OUTPUT INSERTED.Id
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
......@@ -129,6 +128,5 @@ namespace DotNetCore.CAP.SqlServer.Test
Assert.Equal("SqlServerStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}
}
}
}
\ No newline at end of file
using Xunit;
using Dapper;
using Dapper;
using Xunit;
namespace DotNetCore.CAP.SqlServer.Test
{
......@@ -14,58 +14,31 @@ namespace DotNetCore.CAP.SqlServer.Test
{
var databaseName = ConnectionUtil.GetDatabaseName();
var sql = $@"
IF EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}')
IF EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}')
SELECT 'True'
ELSE
SELECT 'False'";
var result = connection.QueryFirst<bool>(sql);
Assert.Equal(true, result);
Assert.True(result);
}
}
[Fact]
public void DatabaseTable_Published_IsExists()
{
using (var connection = ConnectionUtil.CreateConnection())
{
var sql = @"
IF OBJECT_ID(N'[Cap].[Published]',N'U') IS NOT NULL
SELECT 'True'
ELSE
SELECT 'False'";
var result = connection.QueryFirst<bool>(sql);
Assert.Equal(true, result);
}
}
[Fact]
public void DatabaseTable_Queue_IsExists()
[Theory]
[InlineData("[Cap].[Published]")]
[InlineData("[Cap].[Queue]")]
[InlineData("[Cap].[Received]")]
public void DatabaseTable_IsExists(string tableName)
{
using (var connection = ConnectionUtil.CreateConnection())
{
var sql = @"
IF OBJECT_ID(N'[Cap].[Queue]',N'U') IS NOT NULL
SELECT 'True'
ELSE
SELECT 'False'";
var result = connection.QueryFirst<bool>(sql);
Assert.Equal(true, result);
}
}
[Fact]
public void DatabaseTable_Received_IsExists()
{
using (var connection = ConnectionUtil.CreateConnection())
{
var sql = @"
IF OBJECT_ID(N'[Cap].[Received]',N'U') IS NOT NULL
var sql = $@"
IF OBJECT_ID(N'{tableName}',N'U') IS NOT NULL
SELECT 'True'
ELSE
SELECT 'False'";
var result = connection.QueryFirst<bool>(sql);
Assert.Equal(true, result);
Assert.True(result);
}
}
}
}
}
\ No newline at end of file
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.SqlServer.Test
{
public class TestDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
var connectionString = ConnectionUtil.GetConnectionString();
optionsBuilder.UseSqlServer(connectionString);
}
}
}
\ No newline at end of file
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.SqlServer.Test
......@@ -31,7 +30,6 @@ namespace DotNetCore.CAP.SqlServer.Test
_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new SqlServerOptions { ConnectionString = _connectionString });
services.AddSingleton<SqlServerStorage>();
services.AddDbContext<TestDbContext>(options => options.UseSqlServer(_connectionString));
_services = services;
}
......
using System;
using System.Data;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using System.Data;
namespace DotNetCore.CAP.Test
{
......@@ -36,7 +36,6 @@ namespace DotNetCore.CAP.Test
Assert.NotNull(markService);
}
[Fact]
public void CanOverridePublishService()
{
......@@ -61,47 +60,47 @@ namespace DotNetCore.CAP.Test
private class MyProducerService : ICapPublisher
{
public void Publish(string name, string content)
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
throw new NotImplementedException();
}
public void Publish<T>(string name, T contentObj)
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}
public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public Task PublishAsync(string topic, string content)
{
throw new NotImplementedException();
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public Task PublishAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException();
}
public Task PublishAsync(string topic, string content)
public Task PublishAsync(string topic, string content, IDbConnection dbConnection)
{
throw new NotImplementedException();
}
public Task PublishAsync<T>(string topic, T contentObj)
public Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
throw new NotImplementedException();
}
public Task PublishAsync(string topic, string content, IDbConnection dbConnection)
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}
public Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
{
throw new NotImplementedException();
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}
......
using System;
using System.Linq;
using System.Reflection;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
namespace DotNetCore.CAP.Test
{
public class ConsumerInvokerFactoryTest
{
private IConsumerInvokerFactory consumerInvokerFactory;
public ConsumerInvokerFactoryTest()
{
var services = new ServiceCollection();
services.AddLogging();
var provider = services.BuildServiceProvider();
var logFactory = provider.GetRequiredService<ILoggerFactory>();
var binder = new ModelBinderFactory();
consumerInvokerFactory = new ConsumerInvokerFactory(logFactory, binder, provider);
}
[Fact]
public void CreateInvokerTest()
{
var methodInfo = typeof(Sample).GetRuntimeMethods()
.Single(x => x.Name == nameof(Sample.ThrowException));
var description = new ConsumerExecutorDescriptor
{
MethodInfo = methodInfo,
ImplTypeInfo = typeof(Sample).GetTypeInfo()
};
var messageContext = new MessageContext();
var context = new ConsumerContext(description, messageContext);
var invoker = consumerInvokerFactory.CreateInvoker(context);
Assert.NotNull(invoker);
}
[Theory]
[InlineData(nameof(Sample.ThrowException))]
[InlineData(nameof(Sample.AsyncMethod))]
public async void InvokeMethodTest(string methodName)
{
var methodInfo = typeof(Sample).GetRuntimeMethods()
.Single(x => x.Name == methodName);
var description = new ConsumerExecutorDescriptor
{
MethodInfo = methodInfo,
ImplTypeInfo = typeof(Sample).GetTypeInfo()
};
var messageContext = new MessageContext();
var context = new ConsumerContext(description, messageContext);
var invoker = consumerInvokerFactory.CreateInvoker(context);
await Assert.ThrowsAsync(typeof(Exception), async () =>
{
await invoker.InvokeAsync();
});
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
......@@ -20,7 +17,7 @@ namespace DotNetCore.CAP.Test
services.AddScoped<IFooTest, CandidatesFooTest>();
services.AddScoped<IBarTest, CandidatesBarTest>();
services.AddLogging();
services.AddCap(x=> { });
services.AddCap(x => { });
_provider = services.BuildServiceProvider();
}
......@@ -28,7 +25,7 @@ namespace DotNetCore.CAP.Test
public void CanFindAllConsumerService()
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates(_provider);
var candidates = selector.SelectCandidates();
Assert.Equal(2, candidates.Count);
}
......@@ -37,12 +34,12 @@ namespace DotNetCore.CAP.Test
public void CanFindSpecifiedTopic()
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates(_provider);
var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate("Candidates.Foo", candidates);
Assert.NotNull(bestCandidates);
Assert.NotNull(bestCandidates.MethodInfo);
Assert.Equal(bestCandidates.MethodInfo.ReturnType, typeof(Task));
Assert.Equal(typeof(Task), bestCandidates.MethodInfo.ReturnType);
}
}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment