Unverified Commit 74fed843 authored by Savorboard's avatar Savorboard Committed by GitHub

Release 2.4.0 (#250)

* update version to 2.4.0

* Add version options to config file.

* update resource

* add  message version support  for dashboard

* add  message version support  for dashboard

* Support using version to isolate messages. #220

* update mongo unit tests

* update unit tests

* update unit tests

* Set default versions for consumer groups

* solve the problem of issue#181 (#237)

* Issue#235 (#238)

* solve the problem of issue#181

* solve the problem of issue#235

* refactor

* Fix the message persistence bug. #240

* using new CamelCaseNamingStrategy

* update packages to .net core 2.2

* update test framework to netcoreapp2.2

* Update .travis.yml

* update TargetFramework

* Exclude build samples project
parent 49e78da7
...@@ -2,7 +2,7 @@ language: csharp ...@@ -2,7 +2,7 @@ language: csharp
sudo: required sudo: required
dist: trusty dist: trusty
solution: CAP.sln solution: CAP.sln
dotnet: 2.1.300 dotnet: 2.2.100
mono: none mono: none
matrix: matrix:
......
...@@ -100,7 +100,6 @@ Global ...@@ -100,7 +100,6 @@ Global
{80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.ActiveCfg = Release|Any CPU {80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.Build.0 = Release|Any CPU {80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.Build.0 = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
...@@ -120,11 +119,9 @@ Global ...@@ -120,11 +119,9 @@ Global
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
......
dotnet --info dotnet --info
dotnet restore dotnet restore
dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0 dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.2
\ No newline at end of file \ No newline at end of file
<Project> <Project>
<PropertyGroup> <PropertyGroup>
<VersionMajor>2</VersionMajor> <VersionMajor>2</VersionMajor>
<VersionMinor>3</VersionMinor> <VersionMinor>4</VersionMinor>
<VersionPatch>1</VersionPatch> <VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality> <VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix> <VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup> </PropertyGroup>
......
<Project Sdk="Microsoft.NET.Sdk.Web"> <Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework> <TargetFramework>netcoreapp2.2</TargetFramework>
<AssemblyName>Sample.Kafka.MySql</AssemblyName> <AssemblyName>Sample.Kafka.MySql</AssemblyName>
<WarningsAsErrors>NU1701</WarningsAsErrors> <WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn> <NoWarn>NU1701</NoWarn>
......
<Project Sdk="Microsoft.NET.Sdk.Web"> <Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework> <TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" /> <PackageReference Include="Microsoft.AspNetCore.App" />
...@@ -13,4 +13,4 @@ ...@@ -13,4 +13,4 @@
<ProjectReference Include="..\..\src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web"> <Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework> <TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" /> <PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.1.0" /> <PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.1.4" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
using System; using System;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {
...@@ -32,6 +34,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -32,6 +34,9 @@ namespace DotNetCore.CAP.MongoDB
var options = new MongoDBOptions(); var options = new MongoDBOptions();
_configure?.Invoke(options); _configure?.Invoke(options);
services.AddSingleton(options); services.AddSingleton(options);
//Try to add IMongoClient if does not exists
services.TryAddSingleton<IMongoClient>(new MongoClient(options.DatabaseConnection));
} }
} }
} }
\ No newline at end of file
...@@ -29,5 +29,7 @@ namespace DotNetCore.CAP.MongoDB ...@@ -29,5 +29,7 @@ namespace DotNetCore.CAP.MongoDB
/// Default value: "published" /// Default value: "published"
/// </summary> /// </summary>
public string PublishedCollection { get; set; } = "cap.published"; public string PublishedCollection { get; set; } = "cap.published";
internal string Version { get; set; }
} }
} }
\ No newline at end of file
...@@ -27,6 +27,8 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -27,6 +27,8 @@ namespace Microsoft.Extensions.DependencyInjection
throw new ArgumentNullException(nameof(configure)); throw new ArgumentNullException(nameof(configure));
} }
configure += x => x.Version = options.Version;
options.RegisterExtension(new MongoDBCapOptionsExtension(configure)); options.RegisterExtension(new MongoDBCapOptionsExtension(configure));
return options; return options;
......
...@@ -16,9 +16,8 @@ ...@@ -16,9 +16,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="MongoDB.Bson" Version="2.7.0" /> <PackageReference Include="MongoDB.Bson" Version="2.7.2" />
<PackageReference Include="MongoDB.Driver" Version="2.7.0" /> <PackageReference Include="MongoDB.Driver" Version="2.7.2" />
<PackageReference Include="MongoDB.Driver.Core" Version="2.7.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
...@@ -35,15 +35,28 @@ namespace DotNetCore.CAP.MongoDB ...@@ -35,15 +35,28 @@ namespace DotNetCore.CAP.MongoDB
var collection = _client var collection = _client
.GetDatabase(_options.DatabaseName) .GetDatabase(_options.DatabaseName)
.GetCollection<CapPublishedMessage>(_options.PublishedCollection); .GetCollection<PublishedMessage>(_options.PublishedCollection);
var store = new PublishedMessage()
{
Id = message.Id,
Name = message.Name,
Content = message.Content,
Added = message.Added,
StatusName = message.StatusName,
ExpiresAt = message.ExpiresAt,
Retries = message.Retries,
Version = _options.Version,
};
if (NotUseTransaction) if (NotUseTransaction)
{ {
return collection.InsertOneAsync(message, insertOptions, cancel);
return collection.InsertOneAsync(store, insertOptions, cancel);
} }
var dbTrans = (IClientSessionHandle) transaction.DbTransaction; var dbTrans = (IClientSessionHandle) transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel); return collection.InsertOneAsync(dbTrans, store, insertOptions, cancel);
} }
} }
} }
\ No newline at end of file
...@@ -27,9 +27,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -27,9 +27,9 @@ namespace DotNetCore.CAP.MongoDB
public bool ChangePublishedState(long messageId, string state) public bool ChangePublishedState(long messageId, string state)
{ {
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
var updateDef = Builders<CapPublishedMessage> var updateDef = Builders<PublishedMessage>
.Update.Inc(x => x.Retries, 1) .Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null) .Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state); .Set(x => x.StatusName, state);
...@@ -42,9 +42,9 @@ namespace DotNetCore.CAP.MongoDB ...@@ -42,9 +42,9 @@ namespace DotNetCore.CAP.MongoDB
public bool ChangeReceivedState(long messageId, string state) public bool ChangeReceivedState(long messageId, string state)
{ {
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var updateDef = Builders<CapReceivedMessage> var updateDef = Builders<ReceivedMessage>
.Update.Inc(x => x.Retries, 1) .Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null) .Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state); .Set(x => x.StatusName, state);
...@@ -62,35 +62,39 @@ namespace DotNetCore.CAP.MongoDB ...@@ -62,35 +62,39 @@ namespace DotNetCore.CAP.MongoDB
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id) public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{ {
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
} }
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry() public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4); var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection);
return await collection return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && .Find(x => x.Retries < _capOptions.FailedRetryCount
(x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) && x.Added < fourMinsAgo
&& x.Version == _capOptions.Version
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200) .Limit(200)
.ToListAsync(); .ToListAsync();
} }
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id) public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{ {
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
} }
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry() public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4); var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
return await collection return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && .Find(x => x.Retries < _capOptions.FailedRetryCount
(x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) && x.Added < fourMinsAgo
&& x.Version == _capOptions.Version
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200) .Limit(200)
.ToListAsync(); .ToListAsync();
} }
...@@ -101,10 +105,22 @@ namespace DotNetCore.CAP.MongoDB ...@@ -101,10 +105,22 @@ namespace DotNetCore.CAP.MongoDB
{ {
throw new ArgumentNullException(nameof(message)); throw new ArgumentNullException(nameof(message));
} }
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection);
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var store = new ReceivedMessage()
{
collection.InsertOne(message); Id = message.Id,
Group = message.Group,
Name = message.Name,
Content = message.Content,
Added = message.Added,
StatusName = message.StatusName,
ExpiresAt = message.ExpiresAt,
Retries = message.Retries,
Version = _capOptions.Version
};
collection.InsertOne(store);
} }
} }
} }
\ No newline at end of file
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.MongoDB
{
internal class ReceivedMessage : CapReceivedMessage
{
public string Version { get; set; }
}
internal class PublishedMessage : CapPublishedMessage
{
public string Version { get; set; }
}
}
...@@ -19,5 +19,10 @@ namespace DotNetCore.CAP ...@@ -19,5 +19,10 @@ namespace DotNetCore.CAP
/// EF db context type. /// EF db context type.
/// </summary> /// </summary>
internal Type DbContextType { get; set; } internal Type DbContextType { get; set; }
/// <summary>
/// Data version
/// </summary>
internal string Version { get; set; }
} }
} }
\ No newline at end of file
...@@ -22,6 +22,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -22,6 +22,7 @@ namespace Microsoft.Extensions.DependencyInjection
throw new ArgumentNullException(nameof(configure)); throw new ArgumentNullException(nameof(configure));
} }
configure += x => x.Version = options.Version;
options.RegisterExtension(new MySqlCapOptionsExtension(configure)); options.RegisterExtension(new MySqlCapOptionsExtension(configure));
...@@ -46,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -46,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
configure(x); configure(x);
x.DbContextType = typeof(TContext); x.DbContextType = typeof(TContext);
x.Version = options.Version;
})); }));
return options; return options;
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.5" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.0" />
<PackageReference Include="MySqlConnector" Version="0.46.1" /> <PackageReference Include="MySqlConnector" Version="0.47.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -55,7 +55,8 @@ namespace DotNetCore.CAP.MySql ...@@ -55,7 +55,8 @@ namespace DotNetCore.CAP.MySql
private string PrepareSql() private string PrepareSql()
{ {
return return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)" +
$"VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
#endregion private methods #endregion private methods
......
...@@ -48,7 +48,7 @@ namespace DotNetCore.CAP.MySql ...@@ -48,7 +48,7 @@ namespace DotNetCore.CAP.MySql
var sql = CreateDbTablesScript(_options.TableNamePrefix); var sql = CreateDbTablesScript(_options.TableNamePrefix);
using (var connection = new MySqlConnection(_options.ConnectionString)) using (var connection = new MySqlConnection(_options.ConnectionString))
{ {
await connection.ExecuteAsync(sql); await connection.ExecuteAsync(sql);
} }
_logger.LogDebug("Ensuring all create database tables script are applied."); _logger.LogDebug("Ensuring all create database tables script are applied.");
...@@ -60,6 +60,7 @@ namespace DotNetCore.CAP.MySql ...@@ -60,6 +60,7 @@ namespace DotNetCore.CAP.MySql
$@" $@"
CREATE TABLE IF NOT EXISTS `{prefix}.received` ( CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Id` bigint NOT NULL, `Id` bigint NOT NULL,
`Version` varchar(20) DEFAULT NULL,
`Name` varchar(400) NOT NULL, `Name` varchar(400) NOT NULL,
`Group` varchar(200) DEFAULT NULL, `Group` varchar(200) DEFAULT NULL,
`Content` longtext, `Content` longtext,
...@@ -72,6 +73,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.received` ( ...@@ -72,6 +73,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.received` (
CREATE TABLE IF NOT EXISTS `{prefix}.published` ( CREATE TABLE IF NOT EXISTS `{prefix}.published` (
`Id` bigint NOT NULL, `Id` bigint NOT NULL,
`Version` varchar(20) DEFAULT NULL,
`Name` varchar(200) NOT NULL, `Name` varchar(200) NOT NULL,
`Content` longtext, `Content` longtext,
`Retries` int(11) DEFAULT NULL, `Retries` int(11) DEFAULT NULL,
......
...@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.MySql ...@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.MySql
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Version`='{_capOptions.Version}' AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString)) using (var connection = new MySqlConnection(Options.ConnectionString))
{ {
...@@ -60,8 +60,8 @@ namespace DotNetCore.CAP.MySql ...@@ -60,8 +60,8 @@ namespace DotNetCore.CAP.MySql
} }
var sql = $@" var sql = $@"
INSERT INTO `{_prefix}.received`(`Id`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) INSERT INTO `{_prefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new MySqlConnection(Options.ConnectionString)) using (var connection = new MySqlConnection(Options.ConnectionString))
{ {
...@@ -82,7 +82,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -82,7 +82,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Version`='{_capOptions.Version}' AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString)) using (var connection = new MySqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);
......
...@@ -17,5 +17,10 @@ namespace DotNetCore.CAP ...@@ -17,5 +17,10 @@ namespace DotNetCore.CAP
public string Schema { get; set; } = DefaultSchema; public string Schema { get; set; } = DefaultSchema;
internal Type DbContextType { get; set; } internal Type DbContextType { get; set; }
/// <summary>
/// Data version
/// </summary>
internal string Version { get; set; }
} }
} }
\ No newline at end of file
...@@ -22,6 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -22,6 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection
throw new ArgumentNullException(nameof(configure)); throw new ArgumentNullException(nameof(configure));
} }
configure += x => x.Version = options.Version;
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure)); options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
return options; return options;
...@@ -44,6 +46,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -44,6 +46,7 @@ namespace Microsoft.Extensions.DependencyInjection
options.RegisterExtension(new PostgreSqlCapOptionsExtension(x => options.RegisterExtension(new PostgreSqlCapOptionsExtension(x =>
{ {
configure(x); configure(x);
x.Version = options.Version;
x.DbContextType = typeof(TContext); x.DbContextType = typeof(TContext);
})); }));
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.5" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.0" />
<PackageReference Include="Npgsql" Version="4.0.3" /> <PackageReference Include="Npgsql" Version="4.0.3" />
</ItemGroup> </ItemGroup>
......
...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.PostgreSql
private string PrepareSql() private string PrepareSql()
{ {
return return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Version,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
private IDbConnection InitDbConnection() private IDbConnection InitDbConnection()
......
...@@ -102,6 +102,7 @@ CREATE SCHEMA IF NOT EXISTS ""{schema}""; ...@@ -102,6 +102,7 @@ CREATE SCHEMA IF NOT EXISTS ""{schema}"";
CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" BIGINT PRIMARY KEY NOT NULL, ""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL, ""Name"" VARCHAR(200) NOT NULL,
""Group"" VARCHAR(200) NULL, ""Group"" VARCHAR(200) NULL,
""Content"" TEXT NULL, ""Content"" TEXT NULL,
...@@ -113,13 +114,18 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( ...@@ -113,13 +114,18 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
CREATE TABLE IF NOT EXISTS ""{schema}"".""published""( CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Id"" BIGINT PRIMARY KEY NOT NULL, ""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL, ""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL, ""Content"" TEXT NULL,
""Retries"" INT NOT NULL, ""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL, ""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL, ""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL ""StatusName"" VARCHAR(50) NOT NULL
);"; );
ALTER TABLE ""{schema}"".""received"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL;
ALTER TABLE ""{schema}"".""published"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL;
";
return batchSql; return batchSql;
} }
} }
......
...@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.PostgreSql
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString)) using (var connection = new NpgsqlConnection(Options.ConnectionString))
{ {
...@@ -58,7 +58,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -58,7 +58,7 @@ namespace DotNetCore.CAP.PostgreSql
} }
var sql = var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; $"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString)) using (var connection = new NpgsqlConnection(Options.ConnectionString))
{ {
...@@ -79,7 +79,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -79,7 +79,7 @@ namespace DotNetCore.CAP.PostgreSql
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString)) using (var connection = new NpgsqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);
...@@ -107,9 +107,5 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -107,9 +107,5 @@ namespace DotNetCore.CAP.PostgreSql
return connection.Execute(sql) > 0; return connection.Execute(sql) > 0;
} }
} }
public void Dispose()
{
}
} }
} }
\ No newline at end of file
...@@ -22,7 +22,9 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -22,7 +22,9 @@ namespace DotNetCore.CAP.RabbitMQ
private int _count; private int _count;
private int _maxSize; private int _maxSize;
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, RabbitMQOptions options) public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger,
CapOptions capOptions,
RabbitMQOptions options)
{ {
_logger = logger; _logger = logger;
_maxSize = DefaultPoolSize; _maxSize = DefaultPoolSize;
...@@ -30,10 +32,17 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -30,10 +32,17 @@ namespace DotNetCore.CAP.RabbitMQ
_connectionActivator = CreateConnection(options); _connectionActivator = CreateConnection(options);
HostAddress = options.HostName + ":" + options.Port; HostAddress = options.HostName + ":" + options.Port;
Exchange = options.ExchangeName;
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", if (CapOptions.DefaultVersion == capOptions.Version)
JsonConvert.SerializeObject(options, Formatting.Indented)); {
Exchange = options.ExchangeName;
}
else
{
Exchange = options.ExchangeName + "." + capOptions.Version;
}
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(options, Formatting.Indented));
} }
IModel IConnectionChannelPool.Rent() IModel IConnectionChannelPool.Rent()
......
...@@ -8,6 +8,7 @@ using DotNetCore.CAP.Internal; ...@@ -8,6 +8,7 @@ using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
namespace DotNetCore.CAP.RabbitMQ namespace DotNetCore.CAP.RabbitMQ
{ {
...@@ -33,8 +34,13 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -33,8 +34,13 @@ namespace DotNetCore.CAP.RabbitMQ
try try
{ {
var body = Encoding.UTF8.GetBytes(content); var body = Encoding.UTF8.GetBytes(content);
var props = new BasicProperties()
{
DeliveryMode = 2
};
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, null, body); channel.BasicPublish(_exchange, keyName, props, body);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
......
...@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.RabbitMQ
_queueName = queueName; _queueName = queueName;
_connectionChannelPool = connectionChannelPool; _connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = options; _rabbitMQOptions = options;
_exchageName = options.ExchangeName; _exchageName = connectionChannelPool.Exchange;
InitClient(); InitClient();
} }
......
...@@ -23,6 +23,11 @@ namespace DotNetCore.CAP ...@@ -23,6 +23,11 @@ namespace DotNetCore.CAP
internal bool IsSqlServer2008 { get; set; } internal bool IsSqlServer2008 { get; set; }
/// <summary>
/// Data version
/// </summary>
internal string Version { get; set; }
public EFOptions UseSqlServer2008() public EFOptions UseSqlServer2008()
{ {
IsSqlServer2008 = true; IsSqlServer2008 = true;
......
...@@ -22,6 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -22,6 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection
throw new ArgumentNullException(nameof(configure)); throw new ArgumentNullException(nameof(configure));
} }
configure += x => x.Version = options.Version;
options.RegisterExtension(new SqlServerCapOptionsExtension(configure)); options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
return options; return options;
...@@ -44,6 +46,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -44,6 +46,7 @@ namespace Microsoft.Extensions.DependencyInjection
options.RegisterExtension(new SqlServerCapOptionsExtension(x => options.RegisterExtension(new SqlServerCapOptionsExtension(x =>
{ {
configure(x); configure(x);
x.Version = options.Version;
x.DbContextType = typeof(TContext); x.DbContextType = typeof(TContext);
})); }));
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.5" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.5.0" /> <PackageReference Include="System.Data.SqlClient" Version="4.6.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer
private string PrepareSql() private string PrepareSql()
{ {
return return
$"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; $"INSERT INTO {_options.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
} }
#endregion private methods #endregion private methods
......
...@@ -75,6 +75,7 @@ IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL ...@@ -75,6 +75,7 @@ IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
BEGIN BEGIN
CREATE TABLE [{schema}].[Received]( CREATE TABLE [{schema}].[Received](
[Id] [bigint] NOT NULL, [Id] [bigint] NOT NULL,
[Version] [nvarchar](20) NOT NULL,
[Name] [nvarchar](200) NOT NULL, [Name] [nvarchar](200) NOT NULL,
[Group] [nvarchar](200) NULL, [Group] [nvarchar](200) NULL,
[Content] [nvarchar](max) NULL, [Content] [nvarchar](max) NULL,
...@@ -93,6 +94,7 @@ IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL ...@@ -93,6 +94,7 @@ IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL
BEGIN BEGIN
CREATE TABLE [{schema}].[Published]( CREATE TABLE [{schema}].[Published](
[Id] [bigint] NOT NULL, [Id] [bigint] NOT NULL,
[Version] [nvarchar](20) NOT NULL,
[Name] [nvarchar](200) NOT NULL, [Name] [nvarchar](200) NOT NULL,
[Content] [nvarchar](max) NULL, [Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL, [Retries] [int] NOT NULL,
......
...@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; $"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString)) using (var connection = new SqlConnection(Options.ConnectionString))
{ {
...@@ -58,8 +58,8 @@ namespace DotNetCore.CAP.SqlServer ...@@ -58,8 +58,8 @@ namespace DotNetCore.CAP.SqlServer
} }
var sql = $@" var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) INSERT INTO [{Options.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(Options.ConnectionString)) using (var connection = new SqlConnection(Options.ConnectionString))
{ {
...@@ -80,7 +80,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -80,7 +80,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
{ {
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; $"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString)) using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);
......
...@@ -28,6 +28,11 @@ namespace DotNetCore.CAP ...@@ -28,6 +28,11 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public const int DefaultFailedRetryCount = 50; public const int DefaultFailedRetryCount = 50;
/// <summary>
/// Default version
/// </summary>
public const string DefaultVersion = "v1";
public CapOptions() public CapOptions()
{ {
...@@ -35,6 +40,7 @@ namespace DotNetCore.CAP ...@@ -35,6 +40,7 @@ namespace DotNetCore.CAP
FailedRetryInterval = DefaultFailedMessageWaitingInterval; FailedRetryInterval = DefaultFailedMessageWaitingInterval;
FailedRetryCount = DefaultFailedRetryCount; FailedRetryCount = DefaultFailedRetryCount;
Extensions = new List<ICapOptionsExtension>(); Extensions = new List<ICapOptionsExtension>();
Version = DefaultVersion;
DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly().GetName().Name.ToLower(); DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly().GetName().Name.ToLower();
} }
...@@ -45,6 +51,11 @@ namespace DotNetCore.CAP ...@@ -45,6 +51,11 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public string DefaultGroup { get; set; } public string DefaultGroup { get; set; }
/// <summary>
/// The default version of the message, configured to isolate data in the same instance. The length must not exceed 20
/// </summary>
public string Version { get; set; }
/// <summary> /// <summary>
/// Sent or received succeed message after time span of due, then the message will be deleted at due time. /// Sent or received succeed message after time span of due, then the message will be deleted at due time.
/// Default is 24*3600 seconds. /// Default is 24*3600 seconds.
......
...@@ -185,6 +185,15 @@ namespace DotNetCore.CAP.Dashboard.Resources { ...@@ -185,6 +185,15 @@ namespace DotNetCore.CAP.Dashboard.Resources {
return ResourceManager.GetString("Common_Id", resourceCulture); return ResourceManager.GetString("Common_Id", resourceCulture);
} }
} }
/// <summary>
/// Looks up a localized string similar to Version.
/// </summary>
public static string Common_Version {
get {
return ResourceManager.GetString("Common_Version", resourceCulture);
}
}
/// <summary> /// <summary>
/// Looks up a localized string similar to Less details.... /// Looks up a localized string similar to Less details....
...@@ -321,24 +330,6 @@ namespace DotNetCore.CAP.Dashboard.Resources { ...@@ -321,24 +330,6 @@ namespace DotNetCore.CAP.Dashboard.Resources {
} }
} }
/// <summary>
/// Looks up a localized string similar to The queue is empty..
/// </summary>
public static string EnqueuedJobsPage_NoJobs {
get {
return ResourceManager.GetString("EnqueuedJobsPage_NoJobs", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Enqueued jobs.
/// </summary>
public static string EnqueuedJobsPage_Title {
get {
return ResourceManager.GetString("EnqueuedJobsPage_Title", resourceCulture);
}
}
/// <summary> /// <summary>
/// Looks up a localized string similar to Publish Failed. /// Looks up a localized string similar to Publish Failed.
/// </summary> /// </summary>
...@@ -816,60 +807,6 @@ namespace DotNetCore.CAP.Dashboard.Resources { ...@@ -816,60 +807,6 @@ namespace DotNetCore.CAP.Dashboard.Resources {
} }
} }
/// <summary>
/// Looks up a localized string similar to Heartbeat.
/// </summary>
public static string ServersPage_Table_Heartbeat {
get {
return ResourceManager.GetString("ServersPage_Table_Heartbeat", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Name.
/// </summary>
public static string ServersPage_Table_Name {
get {
return ResourceManager.GetString("ServersPage_Table_Name", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Queues.
/// </summary>
public static string ServersPage_Table_Queues {
get {
return ResourceManager.GetString("ServersPage_Table_Queues", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Started.
/// </summary>
public static string ServersPage_Table_Started {
get {
return ResourceManager.GetString("ServersPage_Table_Started", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Workers.
/// </summary>
public static string ServersPage_Table_Workers {
get {
return ResourceManager.GetString("ServersPage_Table_Workers", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Servers.
/// </summary>
public static string ServersPage_Title {
get {
return ResourceManager.GetString("ServersPage_Title", resourceCulture);
}
}
/// <summary> /// <summary>
/// Looks up a localized string similar to Failed. /// Looks up a localized string similar to Failed.
/// </summary> /// </summary>
......
...@@ -123,18 +123,12 @@ ...@@ -123,18 +123,12 @@
<data name="Common_Delete" xml:space="preserve"> <data name="Common_Delete" xml:space="preserve">
<value>Delete</value> <value>Delete</value>
</data> </data>
<data name="Common_DeleteConfirm" xml:space="preserve">
<value>Do you really want to DELETE ALL selected jobs?</value>
</data>
<data name="Common_Deleting" xml:space="preserve"> <data name="Common_Deleting" xml:space="preserve">
<value>Deleting...</value> <value>Deleting...</value>
</data> </data>
<data name="Common_DeleteSelected" xml:space="preserve"> <data name="Common_DeleteSelected" xml:space="preserve">
<value>Delete selected</value> <value>Delete selected</value>
</data> </data>
<data name="Common_EnqueueButton_Text" xml:space="preserve">
<value>Enqueue jobs</value>
</data>
<data name="Common_Enqueueing" xml:space="preserve"> <data name="Common_Enqueueing" xml:space="preserve">
<value>Enqueueing...</value> <value>Enqueueing...</value>
</data> </data>
...@@ -144,6 +138,9 @@ ...@@ -144,6 +138,9 @@
<data name="Common_Id" xml:space="preserve"> <data name="Common_Id" xml:space="preserve">
<value>Id</value> <value>Id</value>
</data> </data>
<data name="Common_Version" xml:space="preserve">
<value>Version</value>
</data>
<data name="Common_LessDetails" xml:space="preserve"> <data name="Common_LessDetails" xml:space="preserve">
<value>Less details...</value> <value>Less details...</value>
</data> </data>
...@@ -177,12 +174,6 @@ ...@@ -177,12 +174,6 @@
<data name="Common_Unknown" xml:space="preserve"> <data name="Common_Unknown" xml:space="preserve">
<value>Unknown</value> <value>Unknown</value>
</data> </data>
<data name="EnqueuedJobsPage_NoJobs" xml:space="preserve">
<value>The queue is empty.</value>
</data>
<data name="EnqueuedJobsPage_Title" xml:space="preserve">
<value>Enqueued jobs</value>
</data>
<data name="HomePage_HistoryGraph" xml:space="preserve"> <data name="HomePage_HistoryGraph" xml:space="preserve">
<value>24h graph</value> <value>24h graph</value>
</data> </data>
...@@ -215,25 +206,7 @@ ...@@ -215,25 +206,7 @@
</data> </data>
<data name="NodePage_NoNodes" xml:space="preserve"> <data name="NodePage_NoNodes" xml:space="preserve">
<value>There are no config distributed node discory. </value> <value>There are no config distributed node discory. </value>
</data> </data>
<data name="ServersPage_Table_Heartbeat" xml:space="preserve">
<value>Heartbeat</value>
</data>
<data name="ServersPage_Table_Name" xml:space="preserve">
<value>Name</value>
</data>
<data name="ServersPage_Table_Queues" xml:space="preserve">
<value>Queues</value>
</data>
<data name="ServersPage_Table_Started" xml:space="preserve">
<value>Started</value>
</data>
<data name="ServersPage_Table_Workers" xml:space="preserve">
<value>Workers</value>
</data>
<data name="ServersPage_Title" xml:space="preserve">
<value>Servers</value>
</data>
<data name="PublishedMessagesPage_Title" xml:space="preserve"> <data name="PublishedMessagesPage_Title" xml:space="preserve">
<value>Published Messages</value> <value>Published Messages</value>
</data> </data>
...@@ -267,9 +240,6 @@ ...@@ -267,9 +240,6 @@
<data name="Metrics_ActiveConnections" xml:space="preserve"> <data name="Metrics_ActiveConnections" xml:space="preserve">
<value>Active Connections</value> <value>Active Connections</value>
</data> </data>
<data name="Metrics_DeletedJobs" xml:space="preserve">
<value>Deleted Jobs</value>
</data>
<data name="Metrics_Retries" xml:space="preserve"> <data name="Metrics_Retries" xml:space="preserve">
<value>Retries</value> <value>Retries</value>
</data> </data>
...@@ -295,7 +265,7 @@ ...@@ -295,7 +265,7 @@
<value>Enqueued / Queues</value> <value>Enqueued / Queues</value>
</data> </data>
<data name="Metrics_FailedCountOrNull" xml:space="preserve"> <data name="Metrics_FailedCountOrNull" xml:space="preserve">
<value>{0} failed job(s) found. Retry or delete them manually.</value> <value>{0} failed message(s) found. </value>
</data> </data>
<data name="HomePage_GraphHover_PFailed" xml:space="preserve"> <data name="HomePage_GraphHover_PFailed" xml:space="preserve">
<value>Publish Failed</value> <value>Publish Failed</value>
...@@ -340,7 +310,7 @@ ...@@ -340,7 +310,7 @@
<value>No messages found.</value> <value>No messages found.</value>
</data> </data>
<data name="PublishedPage_Title" xml:space="preserve"> <data name="PublishedPage_Title" xml:space="preserve">
<value>Published Jobs</value> <value>Published messages</value>
</data> </data>
<data name="MessagesPage_Query_MessageGroup" xml:space="preserve"> <data name="MessagesPage_Query_MessageGroup" xml:space="preserve">
<value>Message group</value> <value>Message group</value>
......
...@@ -144,6 +144,9 @@ ...@@ -144,6 +144,9 @@
<data name="Common_Id" xml:space="preserve"> <data name="Common_Id" xml:space="preserve">
<value>编号</value> <value>编号</value>
</data> </data>
<data name="Common_Version" xml:space="preserve">
<value>版本</value>
</data>
<data name="Common_LessDetails" xml:space="preserve"> <data name="Common_LessDetails" xml:space="preserve">
<value>收起...</value> <value>收起...</value>
</data> </data>
...@@ -174,12 +177,6 @@ ...@@ -174,12 +177,6 @@
<data name="Common_Unknown" xml:space="preserve"> <data name="Common_Unknown" xml:space="preserve">
<value>未知</value> <value>未知</value>
</data> </data>
<data name="EnqueuedJobsPage_NoJobs" xml:space="preserve">
<value>没有任何作业</value>
</data>
<data name="EnqueuedJobsPage_Title" xml:space="preserve">
<value>队列作业</value>
</data>
<data name="HomePage_HistoryGraph" xml:space="preserve"> <data name="HomePage_HistoryGraph" xml:space="preserve">
<value>当日走势</value> <value>当日走势</value>
</data> </data>
...@@ -209,25 +206,7 @@ ...@@ -209,25 +206,7 @@
</data> </data>
<data name="PerPageSelector_ItemsPerPage" xml:space="preserve"> <data name="PerPageSelector_ItemsPerPage" xml:space="preserve">
<value>每页条数</value> <value>每页条数</value>
</data> </data>
<data name="ServersPage_Table_Heartbeat" xml:space="preserve">
<value>心跳</value>
</data>
<data name="ServersPage_Table_Name" xml:space="preserve">
<value>名称</value>
</data>
<data name="ServersPage_Table_Queues" xml:space="preserve">
<value>队列</value>
</data>
<data name="ServersPage_Table_Started" xml:space="preserve">
<value>执行</value>
</data>
<data name="ServersPage_Table_Workers" xml:space="preserve">
<value>工作区</value>
</data>
<data name="ServersPage_Title" xml:space="preserve">
<value>服务器</value>
</data>
<data name="PublishedMessagesPage_Title" xml:space="preserve"> <data name="PublishedMessagesPage_Title" xml:space="preserve">
<value>发送出的消息</value> <value>发送出的消息</value>
</data> </data>
...@@ -271,7 +250,7 @@ ...@@ -271,7 +250,7 @@
<value>队列</value> <value>队列</value>
</data> </data>
<data name="Metrics_FailedCountOrNull" xml:space="preserve"> <data name="Metrics_FailedCountOrNull" xml:space="preserve">
<value>{0} failed job(s) found. Retry or delete them manually.</value> <value>发现了 {0} 个失败的消息</value>
</data> </data>
<data name="HomePage_GraphHover_PFailed" xml:space="preserve"> <data name="HomePage_GraphHover_PFailed" xml:space="preserve">
<value>发送失败</value> <value>发送失败</value>
......
...@@ -34,7 +34,13 @@ namespace DotNetCore.CAP.Dashboard ...@@ -34,7 +34,13 @@ namespace DotNetCore.CAP.Dashboard
var settings = new JsonSerializerSettings var settings = new JsonSerializerSettings
{ {
ContractResolver = new CamelCasePropertyNamesContractResolver(), ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new JsonConverter[] {new StringEnumConverter {CamelCaseText = true}} Converters = new JsonConverter[]
{
new StringEnumConverter
{
NamingStrategy = new CamelCaseNamingStrategy()
}
}
}; };
serialized = JsonConvert.SerializeObject(result, settings); serialized = JsonConvert.SerializeObject(result, settings);
} }
......
...@@ -30,7 +30,13 @@ namespace DotNetCore.CAP.Dashboard ...@@ -30,7 +30,13 @@ namespace DotNetCore.CAP.Dashboard
var settings = new JsonSerializerSettings var settings = new JsonSerializerSettings
{ {
ContractResolver = new CamelCasePropertyNamesContractResolver(), ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new JsonConverter[] {new StringEnumConverter {CamelCaseText = true}} Converters = new JsonConverter[]
{
new StringEnumConverter
{
NamingStrategy = new CamelCaseNamingStrategy()
}
}
}; };
var serialized = JsonConvert.SerializeObject(result, settings); var serialized = JsonConvert.SerializeObject(result, settings);
......
...@@ -9,6 +9,8 @@ namespace DotNetCore.CAP.Dashboard.Monitoring ...@@ -9,6 +9,8 @@ namespace DotNetCore.CAP.Dashboard.Monitoring
{ {
public long Id { get; set; } public long Id { get; set; }
public string Version { get; set; }
public string Group { get; set; } public string Group { get; set; }
public string Name { get; set; } public string Name { get; set; }
......
...@@ -80,7 +80,8 @@ ...@@ -80,7 +80,8 @@
<th style="width: 60px;"> <th style="width: 60px;">
<input type="checkbox" class="js-jobs-list-select-all"/> <input type="checkbox" class="js-jobs-list-select-all"/>
</th> </th>
<th>@Strings.MessagesPage_Table_Code</th> <th>@Strings.Common_Id</th>
<th>@Strings.Common_Version</th>
<th>@Strings.MessagesPage_Table_Name</th> <th>@Strings.MessagesPage_Table_Name</th>
<th class="min-width">@Strings.MessagesPage_Table_Retries</th> <th class="min-width">@Strings.MessagesPage_Table_Retries</th>
@if (string.Equals(StatusName, "Processing", StringComparison.CurrentCultureIgnoreCase)) @if (string.Equals(StatusName, "Processing", StringComparison.CurrentCultureIgnoreCase))
...@@ -95,11 +96,14 @@ ...@@ -95,11 +96,14 @@
{ {
<tr class="js-jobs-list-row hover"> <tr class="js-jobs-list-row hover">
<td> <td>
<input type="checkbox" class="js-jobs-list-checkbox" name="messages[]" value="@message.Id"/> <input type="checkbox" class="js-jobs-list-checkbox" name="messages[]" value="@message.Id" />
</td> </td>
<td class="word-break"> <td class="word-break">
<a href="javascript:;" data-url='@(Url.To("/published/message/") + message.Id)' class="openModal">#@message.Id</a> <a href="javascript:;" data-url='@(Url.To("/published/message/") + message.Id)' class="openModal">#@message.Id</a>
</td> </td>
<td>
@message.Version
</td>
<td> <td>
@message.Name @message.Name
</td> </td>
......
...@@ -267,19 +267,25 @@ namespace DotNetCore.CAP.Dashboard.Pages ...@@ -267,19 +267,25 @@ namespace DotNetCore.CAP.Dashboard.Pages
<th style=""width:60px;""> <th style=""width:60px;"">
<input type=""checkbox"" class=""js-jobs-list-select-all"" /> <input type=""checkbox"" class=""js-jobs-list-select-all"" />
</th> </th>
<th>"); <th style=""width:22%;"">");
#line 83 "..\..\PublishedPage.cshtml" #line 83 "..\..\PublishedPage.cshtml"
Write(Strings.MessagesPage_Table_Code); Write(Strings.Common_Id);
#line default #line default
#line hidden #line hidden
WriteLiteral("</th>\r\n <th>"); WriteLiteral("</th>\r\n <th>");
#line 84 "..\..\PublishedPage.cshtml"
Write(Strings.Common_Version);
#line default
#line hidden
WriteLiteral("</th>\r\n <th>");
#line 84 "..\..\PublishedPage.cshtml" #line 84 "..\..\PublishedPage.cshtml"
Write(Strings.MessagesPage_Table_Name); Write(Strings.MessagesPage_Table_Name);
...@@ -387,7 +393,14 @@ namespace DotNetCore.CAP.Dashboard.Pages ...@@ -387,7 +393,14 @@ namespace DotNetCore.CAP.Dashboard.Pages
" "); " ");
#line 102 "..\..\PublishedPage.cshtml"
Write(message.Version);
#line default
#line hidden
WriteLiteral("</a>\r\n </td>\r\n <td>\r\n " +
" ");
#line 104 "..\..\PublishedPage.cshtml" #line 104 "..\..\PublishedPage.cshtml"
Write(message.Name); Write(message.Name);
......
...@@ -85,7 +85,8 @@ ...@@ -85,7 +85,8 @@
<th style="width: 60px;"> <th style="width: 60px;">
<input type="checkbox" class="js-jobs-list-select-all"/> <input type="checkbox" class="js-jobs-list-select-all"/>
</th> </th>
<th>@Strings.MessagesPage_Table_Code</th> <th>@Strings.Common_Id</th>
<th>@Strings.Common_Version</th>
<th>@Strings.MessagesPage_Table_Group</th> <th>@Strings.MessagesPage_Table_Group</th>
<th>@Strings.MessagesPage_Table_Name</th> <th>@Strings.MessagesPage_Table_Name</th>
<th class="min-width">@Strings.MessagesPage_Table_Retries</th> <th class="min-width">@Strings.MessagesPage_Table_Retries</th>
...@@ -106,6 +107,9 @@ ...@@ -106,6 +107,9 @@
<td class="word-break"> <td class="word-break">
<a href="javascript:;" data-url='@(Url.To("/received/message/") + message.Id)' class="openModal">#@message.Id</a> <a href="javascript:;" data-url='@(Url.To("/received/message/") + message.Id)' class="openModal">#@message.Id</a>
</td> </td>
<td>
@message.Version
</td>
<td> <td>
@message.Group @message.Group
</td> </td>
......
...@@ -297,7 +297,15 @@ namespace DotNetCore.CAP.Dashboard.Pages ...@@ -297,7 +297,15 @@ namespace DotNetCore.CAP.Dashboard.Pages
#line 88 "..\..\ReceivedPage.cshtml" #line 88 "..\..\ReceivedPage.cshtml"
Write(Strings.MessagesPage_Table_Code); Write(Strings.Common_Id);
#line default
#line hidden
WriteLiteral("</th>\r\n <th>");
#line 88 "..\..\ReceivedPage.cshtml"
Write(Strings.Common_Version);
#line default #line default
...@@ -424,6 +432,15 @@ namespace DotNetCore.CAP.Dashboard.Pages ...@@ -424,6 +432,15 @@ namespace DotNetCore.CAP.Dashboard.Pages
#line 111 "..\..\ReceivedPage.cshtml"
Write(message.Version);
#line default
#line hidden
WriteLiteral("\r\n </td>\r\n " +
"<td>\r\n ");
#line 110 "..\..\ReceivedPage.cshtml" #line 110 "..\..\ReceivedPage.cshtml"
Write(message.Group); Write(message.Group);
......
...@@ -32,12 +32,12 @@ ...@@ -32,12 +32,12 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Consul" Version="0.7.2.6" /> <PackageReference Include="Consul" Version="0.7.2.6" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -6,6 +6,9 @@ using DotNetCore.CAP.Models; ...@@ -6,6 +6,9 @@ using DotNetCore.CAP.Models;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
/// <summary>
/// Consumer execotor
/// </summary>
public interface ISubscriberExecutor public interface ISubscriberExecutor
{ {
Task<OperateResult> ExecuteAsync(CapReceivedMessage message); Task<OperateResult> ExecuteAsync(CapReceivedMessage message);
......
...@@ -9,6 +9,7 @@ using System.Text.RegularExpressions; ...@@ -9,6 +9,7 @@ using System.Text.RegularExpressions;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using System.Collections.Concurrent;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
{ {
...@@ -20,8 +21,12 @@ namespace DotNetCore.CAP.Internal ...@@ -20,8 +21,12 @@ namespace DotNetCore.CAP.Internal
{ {
private readonly CapOptions _capOptions; private readonly CapOptions _capOptions;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>> _asteriskList;
private List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>> _poundList; /// <summary>
/// since this class be designed as a Singleton service,the following two list must be thread safe!!!
/// </summary>
private readonly ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>> _asteriskList;
private readonly ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>> _poundList;
/// <summary> /// <summary>
/// Creates a new <see cref="DefaultConsumerServiceSelector" />. /// Creates a new <see cref="DefaultConsumerServiceSelector" />.
...@@ -30,6 +35,9 @@ namespace DotNetCore.CAP.Internal ...@@ -30,6 +35,9 @@ namespace DotNetCore.CAP.Internal
{ {
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_capOptions = capOptions; _capOptions = capOptions;
_asteriskList = new ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>>();
_poundList = new ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>>();
} }
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates() public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates()
...@@ -120,7 +128,11 @@ namespace DotNetCore.CAP.Internal ...@@ -120,7 +128,11 @@ namespace DotNetCore.CAP.Internal
{ {
if (attr.Group == null) if (attr.Group == null)
{ {
attr.Group = _capOptions.DefaultGroup; attr.Group = _capOptions.DefaultGroup + "." + _capOptions.Version;
}
else
{
attr.Group = attr.Group + "." + _capOptions.Version;
} }
yield return InitDescriptor(attr, method, typeInfo); yield return InitDescriptor(attr, method, typeInfo);
...@@ -150,17 +162,19 @@ namespace DotNetCore.CAP.Internal ...@@ -150,17 +162,19 @@ namespace DotNetCore.CAP.Internal
private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{ {
if (_asteriskList == null) var group = executeDescriptor.First().Attribute.Group;
if (!_asteriskList.TryGetValue(group, out var tmpList))
{ {
_asteriskList = executeDescriptor tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor> .Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
{ {
Name = ("^" + x.Attribute.Name + "$").Replace("*", "[a-zA-Z]+").Replace(".", "\\."), Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."),
Descriptor = x Descriptor = x
}).ToList(); }).ToList();
_asteriskList.TryAdd(group, tmpList);
} }
foreach (var red in _asteriskList)
foreach (var red in tmpList)
{ {
if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline)) if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
{ {
...@@ -173,18 +187,20 @@ namespace DotNetCore.CAP.Internal ...@@ -173,18 +187,20 @@ namespace DotNetCore.CAP.Internal
private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{ {
if (_poundList == null) var group = executeDescriptor.First().Attribute.Group;
if (!_poundList.TryGetValue(group, out var tmpList))
{ {
_poundList = executeDescriptor tmpList = executeDescriptor
.Where(x => x.Attribute.Name.IndexOf('#') >= 0) .Where(x => x.Attribute.Name.IndexOf('#') >= 0)
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor> .Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
{ {
Name = ("^" + x.Attribute.Name + "$").Replace("#", "[a-zA-Z\\.]+"), Name = ("^" + x.Attribute.Name + "$").Replace("#", "[0-9_a-zA-Z\\.]+"),
Descriptor = x Descriptor = x
}).ToList(); }).ToList();
_poundList.TryAdd(group, tmpList);
} }
foreach (var red in _poundList) foreach (var red in tmpList)
{ {
if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline)) if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
{ {
......
...@@ -6,13 +6,13 @@ ...@@ -6,13 +6,13 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="FluentAssertions" Version="5.4.1" /> <PackageReference Include="FluentAssertions" Version="5.5.3" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="xunit" Version="2.4.0" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference> </PackageReference>
......
...@@ -17,21 +17,23 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -17,21 +17,23 @@ namespace DotNetCore.CAP.MongoDB.Test
{ {
_api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions); _api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions);
var collection = Database.GetCollection<CapPublishedMessage>(MongoDBOptions.PublishedCollection); var collection = Database.GetCollection<PublishedMessage>(MongoDBOptions.PublishedCollection);
collection.InsertMany(new[] collection.InsertMany(new[]
{ {
new CapPublishedMessage new PublishedMessage
{ {
Id = SnowflakeId.Default().NextId(), Id = SnowflakeId.Default().NextId(),
Added = DateTime.Now.AddHours(-1), Added = DateTime.Now.AddHours(-1),
StatusName = "Failed", StatusName = "Failed",
Version = "v1",
Content = "abc" Content = "abc"
}, },
new CapPublishedMessage new PublishedMessage
{ {
Id = SnowflakeId.Default().NextId(), Id = SnowflakeId.Default().NextId(),
Added = DateTime.Now, Added = DateTime.Now,
StatusName = "Failed", StatusName = "Failed",
Version = "v1",
Content = "bbc" Content = "bbc"
} }
}); });
......
...@@ -24,9 +24,13 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -24,9 +24,13 @@ namespace DotNetCore.CAP.MongoDB.Test
Content = "test-content" Content = "test-content"
}; };
_connection.StoreReceivedMessage(new CapReceivedMessage(messageContext) _connection.StoreReceivedMessage(new ReceivedMessage()
{ {
Id = SnowflakeId.Default().NextId() Id = SnowflakeId.Default().NextId(),
Group=messageContext.Group,
Content=messageContext.Content,
Name=messageContext.Name,
Version="v1"
}); });
} }
...@@ -34,7 +38,7 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -34,7 +38,7 @@ namespace DotNetCore.CAP.MongoDB.Test
public void ChangeReceivedState_Test() public void ChangeReceivedState_Test()
{ {
StoreReceivedMessageAsync_TestAsync(); StoreReceivedMessageAsync_TestAsync();
var collection = Database.GetCollection<CapReceivedMessage>(MongoDBOptions.ReceivedCollection); var collection = Database.GetCollection<ReceivedMessage>(MongoDBOptions.ReceivedCollection);
var msg = collection.Find(x => true).FirstOrDefault(); var msg = collection.Find(x => true).FirstOrDefault();
_connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue(); _connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue();
...@@ -60,9 +64,9 @@ namespace DotNetCore.CAP.MongoDB.Test ...@@ -60,9 +64,9 @@ namespace DotNetCore.CAP.MongoDB.Test
}; };
_connection.StoreReceivedMessage(msg); _connection.StoreReceivedMessage(msg);
var collection = Database.GetCollection<CapReceivedMessage>(MongoDBOptions.ReceivedCollection); var collection = Database.GetCollection<ReceivedMessage>(MongoDBOptions.ReceivedCollection);
var updateDef = Builders<CapReceivedMessage> var updateDef = Builders<ReceivedMessage>
.Update.Set(x => x.Added, DateTime.Now.AddMinutes(-5)); .Update.Set(x => x.Added, DateTime.Now.AddMinutes(-5));
await collection.UpdateOneAsync(x => x.Id == id, updateDef); await collection.UpdateOneAsync(x => x.Id == id, updateDef);
......
...@@ -13,20 +13,20 @@ ...@@ -13,20 +13,20 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.5" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="xunit" Version="2.4.0" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Moq" Version="4.10.0" /> <PackageReference Include="Moq" Version="4.10.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.2.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.MySql.Test ...@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.MySql.Test
[Fact] [Fact]
public async Task GetPublishedMessageAsync_Test() public async Task GetPublishedMessageAsync_Test()
{ {
var sql = "INSERT INTO `cap.published`(`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = "INSERT INTO `cap.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId(); var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage var publishMessage = new CapPublishedMessage
{ {
...@@ -69,8 +69,8 @@ namespace DotNetCore.CAP.MySql.Test ...@@ -69,8 +69,8 @@ namespace DotNetCore.CAP.MySql.Test
public async Task GetReceivedMessageAsync_Test() public async Task GetReceivedMessageAsync_Test()
{ {
var sql = $@" var sql = $@"
INSERT INTO `cap.received`(`Id`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) INSERT INTO `cap.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId(); var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage var receivedMessage = new CapReceivedMessage
{ {
......
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
<PackageReference Include="Dapper" Version="1.50.5" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="Npgsql" Version="4.0.3" /> <PackageReference Include="Npgsql" Version="4.0.3" />
<PackageReference Include="xunit" Version="2.4.0" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference> </PackageReference>
......
...@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.PostgreSql.Test ...@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.PostgreSql.Test
[Fact] [Fact]
public async Task GetPublishedMessageAsync_Test() public async Task GetPublishedMessageAsync_Test()
{ {
var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Version"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId(); var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage var publishMessage = new CapPublishedMessage
{ {
...@@ -69,8 +69,8 @@ namespace DotNetCore.CAP.PostgreSql.Test ...@@ -69,8 +69,8 @@ namespace DotNetCore.CAP.PostgreSql.Test
public async Task GetReceivedMessageAsync_Test() public async Task GetReceivedMessageAsync_Test()
{ {
var sql = $@" var sql = $@"
INSERT INTO ""cap"".""received""(""Id"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") INSERT INTO ""cap"".""received""(""Id"",""Version"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId(); var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage var receivedMessage = new CapReceivedMessage
{ {
......
...@@ -13,21 +13,21 @@ ...@@ -13,21 +13,21 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.50.5" /> <PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.5.0" /> <PackageReference Include="System.Data.SqlClient" Version="4.6.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="xunit" Version="2.4.0" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Moq" Version="4.10.0" /> <PackageReference Include="Moq" Version="4.10.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.2.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
...@@ -20,7 +20,7 @@ namespace DotNetCore.CAP.SqlServer.Test ...@@ -20,7 +20,7 @@ namespace DotNetCore.CAP.SqlServer.Test
[Fact] [Fact]
public async Task GetPublishedMessageAsync_Test() public async Task GetPublishedMessageAsync_Test()
{ {
var sql = "INSERT INTO [Cap].[Published]([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = "INSERT INTO [Cap].[Published]([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId(); var insertedId = SnowflakeId.Default().NextId();
var publishMessage = new CapPublishedMessage var publishMessage = new CapPublishedMessage
{ {
...@@ -67,7 +67,7 @@ namespace DotNetCore.CAP.SqlServer.Test ...@@ -67,7 +67,7 @@ namespace DotNetCore.CAP.SqlServer.Test
[Fact] [Fact]
public async Task GetReceivedMessageAsync_Test() public async Task GetReceivedMessageAsync_Test()
{ {
var sql = @"INSERT INTO [Cap].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = @"INSERT INTO [Cap].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var insertedId = SnowflakeId.Default().NextId(); var insertedId = SnowflakeId.Default().NextId();
var receivedMessage = new CapReceivedMessage var receivedMessage = new CapReceivedMessage
{ {
......
...@@ -8,15 +8,15 @@ ...@@ -8,15 +8,15 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="System.Data.Common" Version="4.3.0" /> <PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="xunit" Version="2.4.0" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Moq" Version="4.10.0" /> <PackageReference Include="Moq" Version="4.10.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment