Commit ffeb5590 authored by Savorboard's avatar Savorboard

Introduced rewrite table name option

parent b6699e1d
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
namespace Sample.Kafka.MySql
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
CreateHostBuilder(args).Build().Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}
\ No newline at end of file
......@@ -5,6 +5,7 @@ using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
......@@ -24,7 +25,7 @@ namespace DotNetCore.CAP
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IDataStorage, MySqlDataStorage>();
services.AddSingleton<IStorageInitializer, MySqlStorageInitializer>();
services.TryAddSingleton<IStorageInitializer, MySqlStorageInitializer>();
services.AddTransient<CapTransactionBase, MySqlCapTransaction>();
//Add MySqlOptions
......
......@@ -22,18 +22,23 @@ namespace DotNetCore.CAP.MySql
{
private readonly IOptions<MySqlOptions> _options;
private readonly IOptions<CapOptions> _capOptions;
private readonly IStorageInitializer _initializer;
public MySqlDataStorage(IOptions<MySqlOptions> options, IOptions<CapOptions> capOptions)
public MySqlDataStorage(
IOptions<MySqlOptions> options,
IOptions<CapOptions> capOptions,
IStorageInitializer initializer)
{
_options = options;
_capOptions = capOptions;
_initializer = initializer;
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
var sql = $"UPDATE `{_initializer.GetPublishedTableName()}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
await connection.ExecuteAsync(sql, new
{
......@@ -48,7 +53,7 @@ namespace DotNetCore.CAP.MySql
{
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
var sql = $"UPDATE `{_initializer.GetReceivedTableName()}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
await connection.ExecuteAsync(sql, new
{
......@@ -61,7 +66,7 @@ namespace DotNetCore.CAP.MySql
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default)
{
var sql = $"INSERT INTO `{_options.Value.TableNamePrefix}.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = $"INSERT INTO `{_initializer.GetPublishedTableName()}`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage
{
......@@ -106,7 +111,7 @@ namespace DotNetCore.CAP.MySql
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = $@"INSERT INTO `{_initializer.GetReceivedTableName()}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
......@@ -124,7 +129,7 @@ namespace DotNetCore.CAP.MySql
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = $@"INSERT INTO `{_initializer.GetReceivedTableName()}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var mdMessage = new MediumMessage
{
......@@ -162,7 +167,7 @@ namespace DotNetCore.CAP.MySql
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT * FROM `{_options.Value.TableNamePrefix}.published` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
var sql = $"SELECT * FROM `{_initializer.GetPublishedTableName()}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
var result = new List<MediumMessage>();
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
......@@ -184,7 +189,7 @@ namespace DotNetCore.CAP.MySql
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_options.Value.TableNamePrefix}.received` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
$"SELECT * FROM `{_initializer.GetReceivedTableName()}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
var result = new List<MediumMessage>();
......
......@@ -19,22 +19,24 @@ namespace DotNetCore.CAP.MySql
internal class MySqlMonitoringApi : IMonitoringApi
{
private readonly IOptions<MySqlOptions> _options;
private readonly string _prefix;
private readonly string _pubName;
private readonly string _recName;
public MySqlMonitoringApi(IOptions<MySqlOptions> options)
public MySqlMonitoringApi(IOptions<MySqlOptions> options, IStorageInitializer initializer)
{
_options = options;
_prefix = options.Value.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}
public StatisticsDto GetStatistics()
{
var sql = string.Format(@"
var sql = $@"
set transaction isolation level read committed;
select count(Id) from `{0}.published` where StatusName = N'Succeeded';
select count(Id) from `{0}.received` where StatusName = N'Succeeded';
select count(Id) from `{0}.published` where StatusName = N'Failed';
select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
select count(Id) from `{_pubName}` where StatusName = N'Succeeded';
select count(Id) from `{_recName}` where StatusName = N'Succeeded';
select count(Id) from `{_pubName}` where StatusName = N'Failed';
select count(Id) from `{_recName}` where StatusName = N'Failed';";
var statistics = UseConnection(connection =>
{
......@@ -55,21 +57,21 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName,nameof(StatusName.Failed)));
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
}
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof( StatusName.Succeeded)));
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
}
public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName;
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
......@@ -92,7 +94,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
}
var sqlQuery =
$"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
$"select * from `{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
......@@ -107,27 +109,27 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof( StatusName.Failed)));
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed)));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Succeeded)));
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded)));
}
public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Failed)));
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed)));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Succeeded)));
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded)));
}
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var sqlQuery = $"select count(Id) from `{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
......@@ -165,7 +167,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
select aggr.* from (
select date_format(`Added`,'%Y-%m-%d-%H') as `Key`,
count(id) `Count`
from `{_prefix}.{tableName}`
from `{tableName}`
where StatusName = @statusName
group by date_format(`Added`,'%Y-%m-%d-%H')
) aggr where `Key` in @keys;";
......@@ -195,7 +197,7 @@ select aggr.* from (
public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};";
var sql = $@"SELECT * FROM `{_pubName}` WHERE `Id`={id};";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
......@@ -203,7 +205,7 @@ select aggr.* from (
public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};";
var sql = $@"SELECT * FROM `{_recName}` WHERE Id={id};";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
......@@ -25,12 +24,12 @@ namespace DotNetCore.CAP.MySql
_logger = logger;
}
public string GetPublishedTableName()
public virtual string GetPublishedTableName()
{
return $"{_options.Value.TableNamePrefix}.published";
}
public string GetReceivedTableName()
public virtual string GetReceivedTableName()
{
return $"{_options.Value.TableNamePrefix}.received";
}
......@@ -42,7 +41,7 @@ namespace DotNetCore.CAP.MySql
return;
}
var sql = CreateDbTablesScript(_options.Value.TableNamePrefix);
var sql = CreateDbTablesScript();
await using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
......@@ -52,11 +51,11 @@ namespace DotNetCore.CAP.MySql
}
protected virtual string CreateDbTablesScript(string prefix)
protected virtual string CreateDbTablesScript()
{
var batchSql =
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.received` (
CREATE TABLE IF NOT EXISTS `{GetReceivedTableName()}` (
`Id` bigint NOT NULL,
`Version` varchar(20) DEFAULT NULL,
`Name` varchar(400) NOT NULL,
......@@ -70,7 +69,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.received` (
INDEX `IX_ExpiresAt`(`ExpiresAt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS `{prefix}.published` (
CREATE TABLE IF NOT EXISTS `{GetPublishedTableName()}` (
`Id` bigint NOT NULL,
`Version` varchar(20) DEFAULT NULL,
`Name` varchar(200) NOT NULL,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment