Commit bb687561 authored by Savorboard's avatar Savorboard

Merge branch 'develop' of https://github.com/dotnetcore/CAP

# Conflicts:
#	build/version.props
#	samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj
#	samples/Sample.RabbitMQ.MySql/AppDbContext.cs
#	samples/Sample.RabbitMQ.MySql/Program.cs
#	samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
#	src/DotNetCore.CAP/LoggerExtensions.cs
parents c0538601 5457b41a
<!--
Thank you for reporting an issue.
1. It's RECOMMENDED to submit PR for typo or tiny bug fix.
2. If this's a FEATURE request, please provide: details, pseudo codes if necessary.
3. If this's a BUG, please provide: course repetition, error log and configuration. Fill in as much of the template below as you're able.
感谢您向我们反馈问题。
1. 提交问题前,请先阅读 https://github.com/dotnetcore/CAP/wiki 上的文档。
2. 我们推荐如果是小问题(错别字修改,小的 bug fix)直接提交 PR。
3. 如果是一个新需求,请提供:详细需求描述,最好是有伪代码实现。
4. 如果是一个 BUG,请提供:复现步骤,错误日志以及相关配置,并尽量填写下面的模板中的条目。
6. 扩展阅读:[如何向开源项目提交无法解答的问题](https://zhuanlan.zhihu.com/p/25795393)
-->
Please answer these questions before submitting your issue.
- Why do you submit this issue?
- [ ] Question or discussion
- [ ] Bug
- [ ] Requirement
- [ ] Feature or performance improvement
___
### Question
- What do you want to know?
___
### Bug
- Which version of CAP, OS and .NET Core?
- Which company or project?
- What happen?
If possible, provide a way for reproducing the error. e.g. demo application, component version.
___
### Requirement or improvement
- Please describe about your requirements or improvement suggestions.
\ No newline at end of file
language: cpp language: csharp
sudo: required sudo: required
dist: trusty dist: trusty
solution: CAP.sln
dotnet: 2.1.300
mono: none
matrix: matrix:
include: include:
...@@ -10,33 +13,9 @@ matrix: ...@@ -10,33 +13,9 @@ matrix:
- os: osx - os: osx
osx_image: xcode8.3 # macOS 10.12 osx_image: xcode8.3 # macOS 10.12
env:
global:
- DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true
- DOTNET_CLI_TELEMETRY_OPTOUT: 1
- CLI_VERSION=2.0.0
addons:
apt:
packages:
- gettext
- libcurl4-openssl-dev
- libicu-dev
- libssl-dev
- libunwind8
- zlib1g
# Make sure build dependencies are installed.
before_install:
- if test "$TRAVIS_OS_NAME" == "osx"; then ln -s /usr/local/opt/openssl/lib/libcrypto.1.0.0.dylib /usr/local/lib/; ln -s /usr/local/opt/openssl/lib/libssl.1.0.0.dylib /usr/local/lib/; fi
- export DOTNET_INSTALL_DIR="$PWD/.dotnetcli"
- export PATH="$DOTNET_INSTALL_DIR:$PATH"
install:
- travis_retry curl -sSL https://dot.net/v1/dotnet-install.sh | bash /dev/stdin --channel 2.0 --version "$CLI_VERSION" --install-dir "$DOTNET_INSTALL_DIR"
# Run the build script # Run the build script
script: script:
- dotnet --info - dotnet --info
- dotnet restore - dotnet restore CAP.sln
- dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0 - dotnet build CAP.sln
- dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
...@@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution ...@@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
CHANGELOG.md = CHANGELOG.md CHANGELOG.md = CHANGELOG.md
CODE_OF_CONDUCT.md = CODE_OF_CONDUCT.md CODE_OF_CONDUCT.md = CODE_OF_CONDUCT.md
ConfigureMSDTC.ps1 = ConfigureMSDTC.ps1 ConfigureMSDTC.ps1 = ConfigureMSDTC.ps1
.github\ISSUE_TEMPLATE = .github\ISSUE_TEMPLATE
LICENSE.txt = LICENSE.txt LICENSE.txt = LICENSE.txt
README.md = README.md README.md = README.md
README.zh-cn.md = README.zh-cn.md README.zh-cn.md = README.zh-cn.md
...@@ -35,7 +36,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D ...@@ -35,7 +36,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D
build.cake = build.cake build.cake = build.cake
build.ps1 = build.ps1 build.ps1 = build.ps1
build.sh = build.sh build.sh = build.sh
build\common.props = build\common.props
build\index.cake = build\index.cake build\index.cake = build\index.cake
build\util.cake = build\util.cake build\util.cake = build\util.cake
build\version.cake = build\version.cake build\version.cake = build\version.cake
......
<Project>
<Import Project="build\version.props" />
<PropertyGroup Label="Package">
<Product>CAP</Product>
<Authors>.NET Core Community;Savorboard</Authors>
<RepositoryUrl>https://github.com/dotnetcore/CAP</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<RepositoryRoot>$(MSBuildThisFileDirectory)</RepositoryRoot>
<PackageIconUrl>https://avatars2.githubusercontent.com/u/19404084</PackageIconUrl>
<PackageProjectUrl>https://github.com/dotnetcore/CAP</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt</PackageLicenseUrl>
<PackageTags>CAP;EventBus;Distributed Transaction</PackageTags>
<Description>EventBus and eventually consistency in distributed architectures.</Description>
</PropertyGroup>
</Project>
\ No newline at end of file
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) [![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently. CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently.
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) [![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。 CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
......
<Project>
<Import Project="version.props" />
<PropertyGroup Label="Package">
<Product>CAP</Product>
<Authors>savorboard;dotnetcore</Authors>
<RepositoryUrl>https://github.com/dotnetcore/CAP</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageIconUrl>https://avatars2.githubusercontent.com/u/19404084</PackageIconUrl>
<PackageProjectUrl>https://github.com/dotnetcore/CAP</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt</PackageLicenseUrl>
<PackageTags>CAP;EventBus;Distributed Transaction</PackageTags>
<Description>EventBus and eventually consistency in distributed architectures.</Description>
</PropertyGroup>
</Project>
...@@ -34,7 +34,7 @@ namespace Sample.Kafka.MySql.Controllers ...@@ -34,7 +34,7 @@ namespace Sample.Kafka.MySql.Controllers
return Ok("publish successful!"); return Ok("publish successful!");
} }
[CapSubscribe("xxx.xxx.test2")] [CapSubscribe("#.test2")]
public void Test2(int value) public void Test2(int value)
{ {
Console.WriteLine("Subscriber output message: " + value); Console.WriteLine("Subscriber output message: " + value);
......
...@@ -10,8 +10,8 @@ namespace Sample.Kafka.MySql ...@@ -10,8 +10,8 @@ namespace Sample.Kafka.MySql
{ {
services.AddCap(x => services.AddCap(x =>
{ {
x.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"); x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
x.UseKafka("192.168.10.110:9092"); x.UseKafka("localhost:9092");
x.UseDashboard(); x.UseDashboard();
}); });
......
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
}
}
...@@ -25,7 +25,6 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -25,7 +25,6 @@ namespace Sample.RabbitMQ.MySql.Controllers
return Ok(); return Ok();
} }
[Route("~/publish2")] [Route("~/publish2")]
public IActionResult PublishMessage2() public IActionResult PublishMessage2()
{ {
...@@ -47,7 +46,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -47,7 +46,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
} }
[NonAction] [NonAction]
[CapSubscribe("sample.rabbitmq.mysql")] [CapSubscribe("#.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time) public void ReceiveMessage(DateTime time)
{ {
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time); Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
......
...@@ -23,9 +23,6 @@ namespace Sample.RabbitMQ.MySql ...@@ -23,9 +23,6 @@ namespace Sample.RabbitMQ.MySql
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{ {
loggerFactory.AddConsole();
loggerFactory.AddDebug();
app.UseMvc(); app.UseMvc();
app.UseCap(); app.UseCap();
......
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
}
}
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName> <AssemblyName>DotNetCore.CAP.Kafka</AssemblyName>
......
...@@ -6,23 +6,29 @@ using System.Collections.Concurrent; ...@@ -6,23 +6,29 @@ using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Threading; using System.Threading;
using Confluent.Kafka; using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace DotNetCore.CAP.Kafka namespace DotNetCore.CAP.Kafka
{ {
public class ConnectionPool : IConnectionPool, IDisposable public class ConnectionPool : IConnectionPool, IDisposable
{ {
private readonly ILogger<ConnectionPool> _logger;
private readonly Func<Producer> _activator; private readonly Func<Producer> _activator;
private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>(); private readonly ConcurrentQueue<Producer> _pool;
private int _count; private int _count;
private int _maxSize; private int _maxSize;
public ConnectionPool(KafkaOptions options) public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options)
{ {
_logger = logger;
_pool = new ConcurrentQueue<Producer>();
_maxSize = options.ConnectionPoolSize; _maxSize = options.ConnectionPoolSize;
_activator = CreateActivator(options); _activator = CreateActivator(options);
ServersAddress = options.Servers; ServersAddress = options.Servers;
_logger.LogDebug("Kafka configuration of CAP :\r\n {0}",
JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented));
} }
public string ServersAddress { get; } public string ServersAddress { get; }
......
...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP ...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, MySqlStorageConnection>(); services.AddSingleton<IStorageConnection, MySqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>(); services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>(); services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); services.AddTransient<ICollectProcessor, MySqlCollectProcessor>();
AddSingletionMySqlOptions(services); AddSingletionMySqlOptions(services);
} }
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.MySql</AssemblyName> <AssemblyName>DotNetCore.CAP.MySql</AssemblyName>
...@@ -14,10 +12,10 @@ ...@@ -14,10 +12,10 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" />
<PackageReference Include="MySqlConnector" Version="0.38.0" /> <PackageReference Include="MySqlConnector" Version="0.40.4" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -10,7 +10,7 @@ using MySql.Data.MySqlClient; ...@@ -10,7 +10,7 @@ using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql namespace DotNetCore.CAP.MySql
{ {
internal class DefaultAdditionalProcessor : IAdditionalProcessor internal class MySqlCollectProcessor : ICollectProcessor
{ {
private const int MaxBatch = 1000; private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.MySql ...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.MySql
private readonly MySqlOptions _options; private readonly MySqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, public MySqlCollectProcessor(ILogger<MySqlCollectProcessor> logger,
MySqlOptions mysqlOptions) MySqlOptions mysqlOptions)
{ {
_logger = logger; _logger = logger;
...@@ -27,8 +27,6 @@ namespace DotNetCore.CAP.MySql ...@@ -27,8 +27,6 @@ namespace DotNetCore.CAP.MySql
public async Task ProcessAsync(ProcessingContext context) public async Task ProcessAsync(ProcessingContext context)
{ {
_logger.LogDebug("Collecting expired entities.");
var tables = new[] var tables = new[]
{ {
$"{_options.TableNamePrefix}.published", $"{_options.TableNamePrefix}.published",
...@@ -37,6 +35,8 @@ namespace DotNetCore.CAP.MySql ...@@ -37,6 +35,8 @@ namespace DotNetCore.CAP.MySql
foreach (var table in tables) foreach (var table in tables)
{ {
_logger.LogDebug($"Collecting expired data from table [{table}].");
int removedCount; int removedCount;
do do
{ {
......
...@@ -126,7 +126,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix); ...@@ -126,7 +126,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
{ {
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state"; var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count; return count;
} }
...@@ -167,10 +167,10 @@ select aggr.* from ( ...@@ -167,10 +167,10 @@ select aggr.* from (
group by date_format(`Added`,'%Y-%m-%d-%H') group by date_format(`Added`,'%Y-%m-%d-%H')
) aggr where `Key` in @keys;"; ) aggr where `Key` in @keys;";
var valuesMap = connection.Query( var valuesMap = connection.Query<TimelineCounter>(
sqlQuery, sqlQuery,
new {keys = keyMaps.Keys, statusName}) new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => (string) x.Key, x => (int) x.Count); .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)
{ {
......
...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP ...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>(); services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>(); services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>(); services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); services.AddTransient<ICollectProcessor, PostgreSqlCollectProcessor>();
AddSingletonPostgreSqlOptions(services); AddSingletonPostgreSqlOptions(services);
} }
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName> <AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
...@@ -14,10 +12,10 @@ ...@@ -14,10 +12,10 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" />
<PackageReference Include="Npgsql" Version="3.2.7" /> <PackageReference Include="Npgsql" Version="4.0.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -10,7 +10,7 @@ using Npgsql; ...@@ -10,7 +10,7 @@ using Npgsql;
namespace DotNetCore.CAP.PostgreSql namespace DotNetCore.CAP.PostgreSql
{ {
internal class DefaultAdditionalProcessor : IAdditionalProcessor internal class PostgreSqlCollectProcessor : ICollectProcessor
{ {
private const int MaxBatch = 1000; private const int MaxBatch = 1000;
...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.PostgreSql
private readonly PostgreSqlOptions _options; private readonly PostgreSqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, public PostgreSqlCollectProcessor(ILogger<PostgreSqlCollectProcessor> logger,
PostgreSqlOptions sqlServerOptions) PostgreSqlOptions sqlServerOptions)
{ {
_logger = logger; _logger = logger;
...@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.PostgreSql
public async Task ProcessAsync(ProcessingContext context) public async Task ProcessAsync(ProcessingContext context)
{ {
_logger.LogDebug("Collecting expired entities.");
foreach (var table in Tables) foreach (var table in Tables)
{ {
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
var removedCount = 0; var removedCount = 0;
do do
{ {
......
...@@ -128,7 +128,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' ...@@ -128,7 +128,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
var sqlQuery = var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count; return count;
} }
...@@ -170,9 +170,9 @@ with aggr as ( ...@@ -170,9 +170,9 @@ with aggr as (
) )
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
var valuesMap = connection.Query(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName}) var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new { keys = keyMaps.Keys.ToList(), statusName })
.ToList() .ToList()
.ToDictionary(x => (string) x.Key, x => (int) x.Count); .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)
{ {
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName> <AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName>
......
...@@ -6,6 +6,7 @@ using System.Collections.Concurrent; ...@@ -6,6 +6,7 @@ using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Threading; using System.Threading;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using RabbitMQ.Client; using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ namespace DotNetCore.CAP.RabbitMQ
...@@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ
private const int DefaultPoolSize = 15; private const int DefaultPoolSize = 15;
private readonly Func<IConnection> _connectionActivator; private readonly Func<IConnection> _connectionActivator;
private readonly ILogger<ConnectionChannelPool> _logger; private readonly ILogger<ConnectionChannelPool> _logger;
private readonly ConcurrentQueue<IModel> _pool = new ConcurrentQueue<IModel>(); private readonly ConcurrentQueue<IModel> _pool;
private IConnection _connection; private IConnection _connection;
private int _count; private int _count;
private int _maxSize; private int _maxSize;
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, RabbitMQOptions options)
RabbitMQOptions options)
{ {
_logger = logger; _logger = logger;
_maxSize = DefaultPoolSize; _maxSize = DefaultPoolSize;
_pool = new ConcurrentQueue<IModel>();
_connectionActivator = CreateConnection(options); _connectionActivator = CreateConnection(options);
HostAddress = options.HostName + ":" + options.Port; HostAddress = options.HostName + ":" + options.Port;
Exchange = options.ExchangeName; Exchange = options.ExchangeName;
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}",
JsonConvert.SerializeObject(options, Formatting.Indented));
} }
IModel IConnectionChannelPool.Rent() IModel IConnectionChannelPool.Rent()
...@@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e)
{ {
_logger.LogWarning($"RabbitMQ client connection closed! {e}"); _logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}");
} }
public virtual IModel Rent() public virtual IModel Rent()
......
...@@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ
var args = new LogMessageEventArgs var args = new LogMessageEventArgs
{ {
LogType = MqLogType.ConsumerShutdown, LogType = MqLogType.ConsumerShutdown,
Reason = e.ToString() Reason = e.ReplyText
}; };
OnLog?.Invoke(sender, args); OnLog?.Invoke(sender, args);
} }
......
...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP ...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>(); services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>(); services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); services.AddTransient<ICollectProcessor, SqlServerCollectProcessor>();
AddSqlServerOptions(services); AddSqlServerOptions(services);
} }
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName> <AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName>
<PackageTags>$(PackageTags);SQL Server</PackageTags> <PackageTags>$(PackageTags);SQL Server</PackageTags>
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
...@@ -14,10 +13,10 @@ ...@@ -14,10 +13,10 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.3" /> <PackageReference Include="System.Data.SqlClient" Version="4.5.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -10,7 +10,7 @@ using Microsoft.Extensions.Logging; ...@@ -10,7 +10,7 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
public class DefaultAdditionalProcessor : IAdditionalProcessor public class SqlServerCollectProcessor : ICollectProcessor
{ {
private const int MaxBatch = 1000; private const int MaxBatch = 1000;
...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.SqlServer
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, public SqlServerCollectProcessor(ILogger<SqlServerCollectProcessor> logger,
SqlServerOptions sqlServerOptions) SqlServerOptions sqlServerOptions)
{ {
_logger = logger; _logger = logger;
...@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.SqlServer ...@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.SqlServer
public async Task ProcessAsync(ProcessingContext context) public async Task ProcessAsync(ProcessingContext context)
{ {
_logger.LogDebug("Collecting expired entities.");
foreach (var table in Tables) foreach (var table in Tables)
{ {
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
int removedCount; int removedCount;
do do
{ {
......
...@@ -128,7 +128,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; ...@@ -128,7 +128,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
var sqlQuery = var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state"; $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count; return count;
} }
...@@ -171,10 +171,10 @@ with aggr as ( ...@@ -171,10 +171,10 @@ with aggr as (
) )
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
var valuesMap = connection.Query( var valuesMap = connection.Query<TimelineCounter>(
sqlQuery, sqlQuery,
new {keys = keyMaps.Keys, statusName}) new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => (string) x.Key, x => (int) x.Count); .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)
{ {
......
namespace DotNetCore.CAP.Dashboard
{
public class TimelineCounter
{
public string Key { get; set; }
public int Count { get; set; }
}
}
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP</AssemblyName>
<PackageTags>$(PackageTags);</PackageTags>
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml</DocumentationFile> <DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn> <NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<None Remove="Dashboard\Content\css\bootstrap.min.css" />
<None Remove="Dashboard\Content\css\jsonview.min.css" />
<None Remove="Dashboard\Content\css\rickshaw.min.css" />
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.eot" />
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.svg" />
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.ttf" />
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.woff" />
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.woff2" />
<None Remove="Dashboard\Content\js\bootstrap.min.js" />
<None Remove="Dashboard\Content\js\d3.layout.min.js" />
<None Remove="Dashboard\Content\js\d3.min.js" />
<None Remove="Dashboard\Content\js\jquery-2.1.4.min.js" />
<None Remove="Dashboard\Content\js\jsonview.min.js" />
<None Remove="Dashboard\Content\js\moment-with-locales.min.js" />
<None Remove="Dashboard\Content\js\moment.min.js" />
<None Remove="Dashboard\Content\js\rickshaw.min.js" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<EmbeddedResource Include="Dashboard\Content\css\bootstrap.min.css" /> <EmbeddedResource Include="Dashboard\Content\css\bootstrap.min.css" />
<EmbeddedResource Include="Dashboard\Content\css\cap.css" /> <EmbeddedResource Include="Dashboard\Content\css\cap.css" />
...@@ -47,19 +29,21 @@ ...@@ -47,19 +29,21 @@
<EmbeddedResource Include="Dashboard\Content\js\moment.min.js" /> <EmbeddedResource Include="Dashboard\Content\js\moment.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\rickshaw.min.js" /> <EmbeddedResource Include="Dashboard\Content\js\rickshaw.min.js" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Consul" Version="0.7.2.4" /> <PackageReference Include="Consul" Version="0.7.2.4" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.0.2" /> <PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.2" /> <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Options" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="System.Data.Common" Version="4.3.0" /> <PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.4.1" /> <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" /> <PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Update="Dashboard\Content\resx\Strings.Designer.cs"> <Compile Update="Dashboard\Content\resx\Strings.Designer.cs">
<DesignTime>True</DesignTime> <DesignTime>True</DesignTime>
...@@ -143,9 +127,4 @@ ...@@ -143,9 +127,4 @@
<LastGenOutput>Strings.Designer.cs</LastGenOutput> <LastGenOutput>Strings.Designer.cs</LastGenOutput>
</EmbeddedResource> </EmbeddedResource>
</ItemGroup> </ItemGroup>
<ItemGroup>
<None Update="Dashboard\Pages\PublishedPage.cshtml">
<Generator>RazorGenerator</Generator>
</None>
</ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -59,6 +59,8 @@ namespace DotNetCore.CAP ...@@ -59,6 +59,8 @@ namespace DotNetCore.CAP
private async Task BootstrapTaskAsync() private async Task BootstrapTaskAsync()
{ {
_logger.LogInformation("### CAP starting...");
await Storage.InitializeAsync(_cts.Token); await Storage.InitializeAsync(_cts.Token);
if (_cts.IsCancellationRequested) if (_cts.IsCancellationRequested)
...@@ -83,6 +85,8 @@ namespace DotNetCore.CAP ...@@ -83,6 +85,8 @@ namespace DotNetCore.CAP
_ctsRegistration.Dispose(); _ctsRegistration.Dispose();
_cts.Dispose(); _cts.Dispose();
_logger.LogInformation("### CAP started!");
} }
protected virtual Task BootstrapCoreAsync() protected virtual Task BootstrapCoreAsync()
......
...@@ -146,22 +146,22 @@ namespace DotNetCore.CAP ...@@ -146,22 +146,22 @@ namespace DotNetCore.CAP
switch (logmsg.LogType) switch (logmsg.LogType)
{ {
case MqLogType.ConsumerCancelled: case MqLogType.ConsumerCancelled:
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason); _logger.LogWarning("RabbitMQ consumer cancelled. --> " + logmsg.Reason);
break; break;
case MqLogType.ConsumerRegistered: case MqLogType.ConsumerRegistered:
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason); _logger.LogInformation("RabbitMQ consumer registered. --> " + logmsg.Reason);
break; break;
case MqLogType.ConsumerUnregistered: case MqLogType.ConsumerUnregistered:
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason); _logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason);
break; break;
case MqLogType.ConsumerShutdown: case MqLogType.ConsumerShutdown:
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason); _logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason);
break; break;
case MqLogType.ConsumeError: case MqLogType.ConsumeError:
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason); _logger.LogError("Kakfa client consume error. --> " + logmsg.Reason);
break; break;
case MqLogType.ServerConnError: case MqLogType.ServerConnError:
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason); _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason);
break; break;
default: default:
throw new ArgumentOutOfRangeException(); throw new ArgumentOutOfRangeException();
......
...@@ -72,7 +72,7 @@ namespace DotNetCore.CAP ...@@ -72,7 +72,7 @@ namespace DotNetCore.CAP
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.ExceptionOccuredWhileExecuting(message.Name, ex); _logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");
await SetFailedState(message, ex, out bool stillRetry); await SetFailedState(message, ex, out bool stillRetry);
if (stillRetry) if (stillRetry)
......
...@@ -5,6 +5,7 @@ using System; ...@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using System.Text.RegularExpressions;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
...@@ -19,6 +20,8 @@ namespace DotNetCore.CAP.Internal ...@@ -19,6 +20,8 @@ namespace DotNetCore.CAP.Internal
{ {
private readonly CapOptions _capOptions; private readonly CapOptions _capOptions;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>> _asteriskList;
private List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>> _poundList;
/// <summary> /// <summary>
/// Creates a new <see cref="DefaultConsumerServiceSelector" />. /// Creates a new <see cref="DefaultConsumerServiceSelector" />.
...@@ -29,17 +32,6 @@ namespace DotNetCore.CAP.Internal ...@@ -29,17 +32,6 @@ namespace DotNetCore.CAP.Internal
_capOptions = capOptions; _capOptions = capOptions;
} }
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor" /> candidate from <paramref name="executeDescriptor" /> for
/// the
/// current message associated.
/// </summary>
public ConsumerExecutorDescriptor SelectBestCandidate(string key,
IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
}
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates() public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates()
{ {
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
...@@ -51,6 +43,26 @@ namespace DotNetCore.CAP.Internal ...@@ -51,6 +43,26 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList; return executorDescriptorList;
} }
public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
var result = MatchUsingName(key, executeDescriptor);
if (result != null)
{
return result;
}
//[*] match with regex, i.e. foo.*.abc
result = MatchAsteriskUsingRegex(key, executeDescriptor);
if (result != null)
{
return result;
}
//[#] match regex, i.e. foo.#
result = MatchPoundUsingRegex(key, executeDescriptor);
return result;
}
private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes( private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
IServiceProvider provider) IServiceProvider provider)
{ {
...@@ -130,5 +142,65 @@ namespace DotNetCore.CAP.Internal ...@@ -130,5 +142,65 @@ namespace DotNetCore.CAP.Internal
return descriptor; return descriptor;
} }
private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
}
private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
if (_asteriskList == null)
{
_asteriskList = executeDescriptor
.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
{
Name = ("^" + x.Attribute.Name + "$").Replace("*", "[a-zA-Z]+").Replace(".", "\\."),
Descriptor = x
}).ToList();
}
foreach (var red in _asteriskList)
{
if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
{
return red.Descriptor;
}
}
return null;
}
private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
if (_poundList == null)
{
_poundList = executeDescriptor
.Where(x => x.Attribute.Name.IndexOf('#') >= 0)
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
{
Name = ("^" + x.Attribute.Name + "$").Replace("#", "[a-zA-Z\\.]+"),
Descriptor = x
}).ToList();
}
foreach (var red in _poundList)
{
if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
{
return red.Descriptor;
}
}
return null;
}
private class RegexExecuteDescriptor<T>
{
public string Name { get; set; }
public T Descriptor { get; set; }
}
} }
} }
\ No newline at end of file
...@@ -22,8 +22,6 @@ namespace DotNetCore.CAP.Internal ...@@ -22,8 +22,6 @@ namespace DotNetCore.CAP.Internal
/// </summary> /// </summary>
/// <param name="key">topic or exchange router key.</param> /// <param name="key">topic or exchange router key.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param> /// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param>
/// <returns></returns> ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
ConsumerExecutorDescriptor
SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
} }
} }
\ No newline at end of file
...@@ -11,21 +11,20 @@ namespace DotNetCore.CAP.Internal ...@@ -11,21 +11,20 @@ namespace DotNetCore.CAP.Internal
internal class MethodMatcherCache internal class MethodMatcherCache
{ {
private readonly IConsumerServiceSelector _selector; private readonly IConsumerServiceSelector _selector;
private List<string> _allTopics;
public MethodMatcherCache(IConsumerServiceSelector selector) public MethodMatcherCache(IConsumerServiceSelector selector)
{ {
_selector = selector; _selector = selector;
Entries = new ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>>(); Entries = new ConcurrentDictionary<string, IReadOnlyList<ConsumerExecutorDescriptor>>();
} }
private ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> Entries { get; } private ConcurrentDictionary<string, IReadOnlyList<ConsumerExecutorDescriptor>> Entries { get; }
/// <summary> /// <summary>
/// Get a dictionary of candidates.In the dictionary, /// Get a dictionary of candidates.In the dictionary,
/// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates /// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates
/// </summary> /// </summary>
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped() public ConcurrentDictionary<string, IReadOnlyList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped()
{ {
if (Entries.Count != 0) if (Entries.Count != 0)
{ {
...@@ -44,28 +43,6 @@ namespace DotNetCore.CAP.Internal ...@@ -44,28 +43,6 @@ namespace DotNetCore.CAP.Internal
return Entries; return Entries;
} }
/// <summary>
/// Get a dictionary of specify topic candidates.
/// The Key is Group name, the value is specify topic candidates.
/// </summary>
/// <param name="topicName">message topic name</param>
public IDictionary<string, IList<ConsumerExecutorDescriptor>> GetTopicExector(string topicName)
{
if (Entries == null)
{
throw new ArgumentNullException(nameof(Entries));
}
var dic = new Dictionary<string, IList<ConsumerExecutorDescriptor>>();
foreach (var item in Entries)
{
var topicCandidates = item.Value.Where(x => x.Attribute.Name == topicName);
dic.Add(item.Key, topicCandidates.ToList());
}
return dic;
}
/// <summary> /// <summary>
/// Attempts to get the topic exector associated with the specified topic name and group name from the /// Attempts to get the topic exector associated with the specified topic name and group name from the
/// <see cref="Entries" />. /// <see cref="Entries" />.
...@@ -86,36 +63,12 @@ namespace DotNetCore.CAP.Internal ...@@ -86,36 +63,12 @@ namespace DotNetCore.CAP.Internal
if (Entries.TryGetValue(groupName, out var groupMatchTopics)) if (Entries.TryGetValue(groupName, out var groupMatchTopics))
{ {
matchTopic = groupMatchTopics.FirstOrDefault(x => x.Attribute.Name == topicName); matchTopic = _selector.SelectBestCandidate(topicName, groupMatchTopics);
return matchTopic != null; return matchTopic != null;
} }
return false; return false;
} }
/// <summary>
/// Get all subscribe topics name.
/// </summary>
public IEnumerable<string> GetSubscribeTopics()
{
if (_allTopics != null)
{
return _allTopics;
}
if (Entries == null)
{
throw new ArgumentNullException(nameof(Entries));
}
_allTopics = new List<string>();
foreach (var descriptors in Entries.Values)
{
_allTopics.AddRange(descriptors.Select(x => x.Attribute.Name));
}
return _allTopics;
}
} }
} }
\ No newline at end of file
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public interface IAdditionalProcessor : IProcessor public interface ICollectProcessor : IProcessor
{ {
} }
} }
\ No newline at end of file
...@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Processor ...@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Processor
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.ExceptionOccuredWhileExecuting(message.Name, ex); _logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Topic:{message.Name}, Id:{message.Id}");
} }
} }
} }
...@@ -81,16 +81,9 @@ namespace DotNetCore.CAP.Processor ...@@ -81,16 +81,9 @@ namespace DotNetCore.CAP.Processor
try try
{ {
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token))
{
try
{ {
_executor.ExecuteAsync(message); _executor.ExecuteAsync(message);
} }
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecuting(message.Name, ex);
}
}
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
......
...@@ -57,13 +57,14 @@ namespace DotNetCore.CAP.Processor ...@@ -57,13 +57,14 @@ namespace DotNetCore.CAP.Processor
return; return;
} }
try
{
_disposed = true; _disposed = true;
_logger.ServerShuttingDown(); _logger.ServerShuttingDown();
_cts.Cancel(); _cts.Cancel();
try
{ _compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds);
_compositeTask.Wait((int) TimeSpan.FromSeconds(10).TotalMilliseconds);
} }
catch (AggregateException ex) catch (AggregateException ex)
{ {
...@@ -73,6 +74,14 @@ namespace DotNetCore.CAP.Processor ...@@ -73,6 +74,14 @@ namespace DotNetCore.CAP.Processor
_logger.ExpectedOperationCanceledException(innerEx); _logger.ExpectedOperationCanceledException(innerEx);
} }
} }
catch (Exception ex)
{
_logger.LogWarning(ex, "An exception was occured when disposing.");
}
finally
{
_logger.LogInformation("### CAP shutdown!");
}
} }
private IProcessor InfiniteRetry(IProcessor inner) private IProcessor InfiniteRetry(IProcessor inner)
...@@ -85,7 +94,7 @@ namespace DotNetCore.CAP.Processor ...@@ -85,7 +94,7 @@ namespace DotNetCore.CAP.Processor
var returnedProcessors = new List<IProcessor> var returnedProcessors = new List<IProcessor>
{ {
_provider.GetRequiredService<NeedRetryMessageProcessor>(), _provider.GetRequiredService<NeedRetryMessageProcessor>(),
_provider.GetRequiredService<IAdditionalProcessor>() _provider.GetRequiredService<ICollectProcessor>()
}; };
return returnedProcessors.ToArray(); return returnedProcessors.ToArray();
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework> <TargetFramework>netcoreapp2.1</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> <IsPackable>false</IsPackable>
<AssemblyName>DotNetCore.CAP.MySql.Test</AssemblyName>
<PackageId>DotNetCore.CAP.MySql.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
...@@ -14,20 +11,20 @@ ...@@ -14,20 +11,20 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="MySqlConnector" Version="0.38.0" /> <PackageReference Include="MySqlConnector" Version="0.40.4" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" /> <PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.2" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" />
<PackageReference Include="Moq" Version="4.8.2" /> <PackageReference Include="Moq" Version="4.8.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework> <TargetFramework>netcoreapp2.1</TargetFramework>
<IsPackable>false</IsPackable> <IsPackable>false</IsPackable>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="Npgsql" Version="3.2.7" /> <PackageReference Include="Npgsql" Version="4.0.0" />
<PackageReference Include="xunit" Version="2.3.1" /> <PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
</ItemGroup> </ItemGroup>
......
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework> <TargetFramework>netcoreapp2.1</TargetFramework>
<IsPackable>false</IsPackable> <IsPackable>false</IsPackable>
</PropertyGroup> </PropertyGroup>
...@@ -11,20 +11,20 @@ ...@@ -11,20 +11,20 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.4" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.3" /> <PackageReference Include="System.Data.SqlClient" Version="4.5.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" /> <PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.2" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" />
<PackageReference Include="Moq" Version="4.8.2" /> <PackageReference Include="Moq" Version="4.8.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
...@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.Test ...@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.Test
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates(); var candidates = selector.SelectCandidates();
Assert.Equal(2, candidates.Count); Assert.Equal(6, candidates.Count);
} }
[Fact] [Fact]
...@@ -42,6 +42,66 @@ namespace DotNetCore.CAP.Test ...@@ -42,6 +42,66 @@ namespace DotNetCore.CAP.Test
Assert.NotNull(bestCandidates.MethodInfo); Assert.NotNull(bestCandidates.MethodInfo);
Assert.Equal(typeof(Task), bestCandidates.MethodInfo.ReturnType); Assert.Equal(typeof(Task), bestCandidates.MethodInfo.ReturnType);
} }
[Theory]
[InlineData("Candidates.Asterisk")]
[InlineData("candidates.Asterisk")]
[InlineData("AAA.BBB.Asterisk")]
[InlineData("aaa.bbb.Asterisk")]
public void CanFindAsteriskTopic(string topic)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate(topic, candidates);
Assert.NotNull(bestCandidates);
}
[Theory]
[InlineData("Candidates.Asterisk.AAA")]
[InlineData("AAA.BBB.CCC.Asterisk")]
[InlineData("aaa.BBB.ccc.Asterisk")]
[InlineData("Asterisk.aaa.bbb")]
public void CanNotFindAsteriskTopic(string topic)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate(topic, candidates);
Assert.Null(bestCandidates);
}
[Theory]
[InlineData("Candidates.Pound.AAA")]
[InlineData("Candidates.Pound.AAA.BBB")]
[InlineData("AAA.Pound")]
[InlineData("aaa.Pound")]
[InlineData("aaa.bbb.Pound")]
[InlineData("aaa.BBB.Pound")]
public void CanFindPoundTopic(string topic)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate(topic, candidates);
Assert.NotNull(bestCandidates);
}
[Theory]
[InlineData("Pound")]
[InlineData("aaa.Pound.AAA.BBB")]
[InlineData("Pound.AAA")]
[InlineData("Pound.aaa")]
[InlineData("AAA.Pound.aaa")]
public void CanNotFindPoundTopic(string topic)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate(topic, candidates);
Assert.Null(bestCandidates);
}
} }
public class CandidatesTopic : TopicAttribute public class CandidatesTopic : TopicAttribute
...@@ -73,6 +133,21 @@ namespace DotNetCore.CAP.Test ...@@ -73,6 +133,21 @@ namespace DotNetCore.CAP.Test
{ {
Console.WriteLine("GetFoo2() method has bee excuted."); Console.WriteLine("GetFoo2() method has bee excuted.");
} }
[CandidatesTopic("*.*.Asterisk")]
[CandidatesTopic("*.Asterisk")]
public void GetFooAsterisk()
{
Console.WriteLine("GetFoo2Asterisk() method has bee excuted.");
}
[CandidatesTopic("Candidates.Pound.#")]
[CandidatesTopic("#.Pound")]
public void GetFooPound()
{
Console.WriteLine("GetFoo2Pound() method has bee excuted.");
}
} }
public class CandidatesBarTest : IBarTest public class CandidatesBarTest : IBarTest
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using System.Runtime.CompilerServices;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Internal;
using Xunit;
namespace DotNetCore.CAP.Test
{
public class DiagnosticsTest
{
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
[Fact]
public void WritePublishBeforeTest()
{
Guid operationId = Guid.NewGuid();
DiagnosticsWapper(() =>
{
var eventData = new BrokerPublishEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow);
s_diagnosticListener.WritePublishBefore(eventData);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforePublish))
{
Assert.NotNull(kvp.Value);
Assert.IsType<BrokerPublishEventData>(kvp.Value);
Assert.Equal(operationId, ((BrokerPublishEventData)kvp.Value).OperationId);
}
});
}
[Fact]
public void WritePublishAfterTest()
{
Guid operationId = Guid.NewGuid();
DiagnosticsWapper(() =>
{
var eventData = new BrokerPublishEndEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow, TimeSpan.FromMinutes(1));
s_diagnosticListener.WritePublishAfter(eventData);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterPublish))
{
Assert.NotNull(kvp.Value);
Assert.IsType<BrokerPublishEndEventData>(kvp.Value);
Assert.Equal(operationId, ((BrokerPublishEndEventData)kvp.Value).OperationId);
Assert.Equal(TimeSpan.FromMinutes(1), ((BrokerPublishEndEventData)kvp.Value).Duration);
}
});
}
[Fact]
public void WritePublishErrorTest()
{
Guid operationId = Guid.NewGuid();
var ex = new Exception("WritePublishErrorTest");
DiagnosticsWapper(() =>
{
var eventData = new BrokerPublishErrorEventData(operationId, "", "", "", "", ex, DateTimeOffset.UtcNow, default(TimeSpan));
s_diagnosticListener.WritePublishError(eventData);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorPublish))
{
Assert.NotNull(kvp.Value);
Assert.IsType<BrokerPublishErrorEventData>(kvp.Value);
Assert.Equal(operationId, ((BrokerPublishErrorEventData)kvp.Value).OperationId);
Assert.Equal(ex, ((BrokerPublishErrorEventData)kvp.Value).Exception);
}
});
}
[Fact]
public void WriteConsumeBeforeTest()
{
Guid operationId = Guid.NewGuid();
DiagnosticsWapper(() =>
{
var eventData = new BrokerConsumeEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow);
s_diagnosticListener.WriteConsumeBefore(eventData);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforeConsume))
{
Assert.NotNull(kvp.Value);
Assert.IsType<BrokerConsumeEventData>(kvp.Value);
Assert.Equal(operationId, ((BrokerConsumeEventData)kvp.Value).OperationId);
}
});
}
[Fact]
public void WriteConsumeAfterTest()
{
Guid operationId = Guid.NewGuid();
DiagnosticsWapper(() =>
{
var eventData = new BrokerConsumeEndEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow, TimeSpan.FromMinutes(1));
s_diagnosticListener.WriteConsumeAfter(eventData);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterConsume))
{
Assert.NotNull(kvp.Value);
Assert.IsType<BrokerConsumeEndEventData>(kvp.Value);
Assert.Equal(operationId, ((BrokerConsumeEndEventData)kvp.Value).OperationId);
Assert.Equal(TimeSpan.FromMinutes(1), ((BrokerConsumeEndEventData)kvp.Value).Duration);
}
});
}
[Fact]
public void WriteConsumeErrorTest()
{
Guid operationId = Guid.NewGuid();
var ex = new Exception("WriteConsumeErrorTest");
DiagnosticsWapper(() =>
{
var eventData = new BrokerConsumeErrorEventData(operationId, "", "", "", "", ex, DateTimeOffset.UtcNow, default(TimeSpan));
s_diagnosticListener.WriteConsumeError(eventData);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorPublish))
{
Assert.NotNull(kvp.Value);
Assert.IsType<BrokerConsumeErrorEventData>(kvp.Value);
Assert.Equal(operationId, ((BrokerConsumeErrorEventData)kvp.Value).OperationId);
Assert.Equal(ex, ((BrokerConsumeErrorEventData)kvp.Value).Exception);
}
});
}
[Fact]
public void WriteSubscriberInvokeBeforeTest()
{
DiagnosticsWapper(() =>
{
s_diagnosticListener.WriteSubscriberInvokeBefore(FackConsumerContext());
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforeSubscriberInvoke))
{
Assert.NotNull(kvp.Value);
Assert.IsType<SubscriberInvokeEventData>(kvp.Value);
}
});
}
[Fact]
public void WriteSubscriberInvokeAfterTest()
{
Guid operationId = Guid.NewGuid();
DiagnosticsWapper(() =>
{
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, FackConsumerContext(), DateTimeOffset.Now, TimeSpan.FromMinutes(1));
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterSubscriberInvoke))
{
Assert.NotNull(kvp.Value);
Assert.IsType<SubscriberInvokeEndEventData>(kvp.Value);
Assert.Equal(operationId, ((SubscriberInvokeEndEventData)kvp.Value).OperationId);
}
});
}
[Fact]
public void WriteSubscriberInvokeErrorTest()
{
Guid operationId = Guid.NewGuid();
var ex = new Exception("WriteConsumeErrorTest");
DiagnosticsWapper(() =>
{
s_diagnosticListener.WriteSubscriberInvokeError(operationId, FackConsumerContext(), ex,
DateTimeOffset.Now, TimeSpan.MaxValue);
}, kvp =>
{
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorSubscriberInvoke))
{
Assert.NotNull(kvp.Value);
Assert.IsType<SubscriberInvokeErrorEventData>(kvp.Value);
Assert.Equal(operationId, ((SubscriberInvokeErrorEventData)kvp.Value).OperationId);
Assert.Equal(ex, ((SubscriberInvokeErrorEventData)kvp.Value).Exception);
}
});
}
private ConsumerContext FackConsumerContext()
{
//Mock description
var description = new ConsumerExecutorDescriptor
{
MethodInfo = GetType().GetMethod("WriteSubscriberInvokeAfterTest"),
Attribute = new CapSubscribeAttribute("xxx"),
ImplTypeInfo = GetType().GetTypeInfo()
};
//Mock messageContext
var messageContext = new MessageContext
{
Name= "Name",
Group= "Group",
Content = "Content"
};
return new ConsumerContext(description, messageContext);
}
private void DiagnosticsWapper(Action operation, Action<KeyValuePair<string, object>> assert, [CallerMemberName]string methodName = "")
{
FakeDiagnosticListenerObserver diagnosticListenerObserver = new FakeDiagnosticListenerObserver(assert);
diagnosticListenerObserver.Enable();
using (DiagnosticListener.AllListeners.Subscribe(diagnosticListenerObserver))
{
Console.WriteLine(string.Format("Test: {0} Enabled Listeners", methodName));
operation();
}
}
}
}
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework> <TargetFramework>netcoreapp2.1</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> <IsPackable>false</IsPackable>
<AssemblyName>DotNetCore.CAP.Test</AssemblyName>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="System.Data.Common" Version="4.3.0" /> <PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" /> <PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.2" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" />
<PackageReference Include="Moq" Version="4.8.2" /> <PackageReference Include="Moq" Version="4.8.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using DotNetCore.CAP.Diagnostics;
namespace DotNetCore.CAP.Test
{
public sealed class FakeDiagnosticListenerObserver : IObserver<DiagnosticListener>
{
private class FakeDiagnosticSourceWriteObserver : IObserver<KeyValuePair<string, object>>
{
private readonly Action<KeyValuePair<string, object>> _writeCallback;
public FakeDiagnosticSourceWriteObserver(Action<KeyValuePair<string, object>> writeCallback)
{
_writeCallback = writeCallback;
}
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(KeyValuePair<string, object> value)
{
_writeCallback(value);
}
}
private readonly Action<KeyValuePair<string, object>> _writeCallback;
private bool _writeObserverEnabled;
public FakeDiagnosticListenerObserver(Action<KeyValuePair<string, object>> writeCallback)
{
_writeCallback = writeCallback;
}
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(DiagnosticListener value)
{
if (value.Name.Equals(CapDiagnosticListenerExtensions.DiagnosticListenerName))
{
value.Subscribe(new FakeDiagnosticSourceWriteObserver(_writeCallback), IsEnabled);
}
}
public void Enable()
{
_writeObserverEnabled = true;
}
public void Disable()
{
_writeObserverEnabled = false;
}
private bool IsEnabled(string s)
{
return _writeObserverEnabled;
}
}
}
using System;
using System.Reflection;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using Newtonsoft.Json.Linq;
using Xunit;
namespace DotNetCore.CAP.Test
{
public class HelperTest
{
[Fact]
public void ToTimestampTest()
{
//Arrange
var time = DateTime.Parse("2018-01-01 00:00:00");
//Act
var result = Helper.ToTimestamp(time);
//Assert
Assert.Equal(1514764800, result);
}
[Fact]
public void FromTimestampTest()
{
//Arrange
var time = DateTime.Parse("2018-01-01 00:00:00");
//Act
var result = Helper.FromTimestamp(1514764800);
//Assert
Assert.Equal(time, result);
}
[Fact]
public void IsControllerTest()
{
//Arrange
var typeInfo = typeof(HomeController).GetTypeInfo();
//Act
var result = Helper.IsController(typeInfo);
//Assert
Assert.True(result);
}
[Theory]
[InlineData(typeof(string))]
[InlineData(typeof(decimal))]
[InlineData(typeof(DateTime))]
[InlineData(typeof(DateTimeOffset))]
[InlineData(typeof(Guid))]
[InlineData(typeof(TimeSpan))]
[InlineData(typeof(Uri))]
public void IsSimpleTypeTest(Type type)
{
//Act
var result = Helper.IsComplexType(type);
//Assert
Assert.False(result);
}
[Theory]
[InlineData(typeof(HomeController))]
[InlineData(typeof(Exception))]
[InlineData(typeof(Person))]
public void IsComplexTypeTest(Type type)
{
//Act
var result = Helper.IsComplexType(type);
//Assert
Assert.True(result);
}
[Fact]
public void AddExceptionPropertyTest()
{
//Arrange
var json = "{}";
var exception = new Exception("Test Exception Message")
{
Source = "Test Source",
InnerException = { }
};
var expected = new
{
ExceptionMessage = new
{
Source = "Test Source",
Message = "Test Exception Message",
InnerMessage = new { }
}
};
//Act
var result = Helper.AddExceptionProperty(json, exception);
//Assert
var jObj = JObject.Parse(result);
Assert.Equal(jObj["ExceptionMessage"]["Source"].Value<string>(), expected.ExceptionMessage.Source);
Assert.Equal(jObj["ExceptionMessage"]["Message"].Value<string>(), expected.ExceptionMessage.Message);
}
[Theory]
[InlineData("10.0.0.1")]
[InlineData("172.16.0.1")]
[InlineData("192.168.1.1")]
public void IsInnerIPTest(string ipAddress)
{
Assert.True(Helper.IsInnerIP(ipAddress));
}
[Fact]
public void AddTracingHeaderPropertyTest()
{
//Arrange
var json = "{}";
var header = new TracingHeaders { { "key", "value" } };
//Act
var result = Helper.AddTracingHeaderProperty(json, header);
//Assert
var expected = "{\"TracingHeaders\":{\"key\":\"value\"}}";
Assert.Equal(expected, result);
}
[Fact]
public void TryExtractTracingHeadersTest()
{
//Arrange
var json = "{\"TracingHeaders\":{\"key\":\"value\"}}";
TracingHeaders header = null;
string removedHeadersJson = "";
//Act
var result = Helper.TryExtractTracingHeaders(json, out header, out removedHeadersJson);
//Assert
Assert.True(result);
Assert.NotNull(header);
Assert.Single(header);
Assert.Equal("{}", removedHeadersJson);
}
}
}
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using DotNetCore.CAP.Models;
//using DotNetCore.CAP.Processor;
//using Microsoft.Extensions.DependencyInjection;
//using Microsoft.Extensions.Options;
//using Moq;
//using Xunit;
//namespace DotNetCore.CAP.Test
//{
// public class DefaultDispatcherTest
// {
// private CancellationTokenSource _cancellationTokenSource;
// private ProcessingContext _context;
// private IServiceProvider _provider;
// private Mock<IStorageConnection> _mockStorageConnection;
// public DefaultDispatcherTest()
// {
// _mockStorageConnection = new Mock<IStorageConnection>();
// _cancellationTokenSource = new CancellationTokenSource();
// var services = new ServiceCollection();
// services.AddLogging();
// services.Configure<IOptions<CapOptions>>(x => { });
// services.AddOptions();
// services.AddSingleton(_mockStorageConnection.Object);
// _provider = services.BuildServiceProvider();
// _context = new ProcessingContext(_provider, _cancellationTokenSource.Token);
// }
// [Fact]
// public void MockTest()
// {
// Assert.NotNull(_provider.GetServices<IStorageConnection>());
// }
// [Fact]
// public async void ProcessAsync_CancellationTokenCancelled_ThrowsImmediately()
// {
// // Arrange
// _cancellationTokenSource.Cancel();
// var fixture = Create();
// // Act
// await Assert.ThrowsAsync<OperationCanceledException>(() => fixture.ProcessAsync(_context));
// }
// [Fact]
// public async Task ProcessAsync()
// {
// // Arrange
// var job = new CapPublishedMessage
// {
// };
// var mockFetchedJob = Mock.Get(Mock.Of<IFetchedMessage>(fj => fj.MessageId == 42 && fj.MessageType == MessageType.Publish));
// _mockStorageConnection
// .Setup(m => m.FetchNextMessageAsync())
// .ReturnsAsync(mockFetchedJob.Object).Verifiable();
// _mockQueueExecutor
// .Setup(x => x.ExecuteAsync(_mockStorageConnection.Object, mockFetchedJob.Object))
// .Returns(Task.FromResult(OperateResult.Success));
// var fixture = Create();
// // Act
// await fixture.ProcessAsync(_context);
// // Assert
// _mockStorageConnection.VerifyAll();
// }
// private DefaultDispatcher Create()
// => _provider.GetService<DefaultDispatcher>();
// }
//}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment