Commit 71df4dfd authored by Savorboard's avatar Savorboard

Introduced rewrite table name option

parent 14d0b34c
......@@ -13,10 +13,11 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
{
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>>
{
private const string SqlClientPrefix = "System.Data.SqlClient.";
public const string SqlAfterCommitTransaction = "System.Data.SqlClient.WriteTransactionCommitAfter";
public const string SqlAfterCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = "System.Data.SqlClient.WriteTransactionCommitError";
public const string SqlErrorCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitError";
public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError";
private readonly ConcurrentDictionary<Guid, List<MediumMessage>> _bufferList;
private readonly IDispatcher _dispatcher;
......@@ -37,9 +38,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
public void OnNext(KeyValuePair<string, object> evt)
{
if (evt.Key == SqlAfterCommitTransaction)
if (evt.Key == SqlAfterCommitTransaction || evt.Key == SqlAfterCommitTransactionMicrosoft)
{
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var sqlConnection = (SqlConnection)GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId;
if (_bufferList.TryRemove(transactionKey, out var msgList))
foreach (var message in msgList)
......@@ -47,9 +48,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
_dispatcher.EnqueueToPublish(message);
}
}
else if (evt.Key == SqlErrorCommitTransaction)
else if (evt.Key == SqlErrorCommitTransaction || evt.Key == SqlErrorCommitTransactionMicrosoft)
{
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var sqlConnection = (SqlConnection)GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId;
_bufferList.TryRemove(transactionKey, out _);
......
......@@ -22,19 +22,26 @@ namespace DotNetCore.CAP.SqlServer
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<SqlServerOptions> _options;
private readonly IStorageInitializer _initializer;
private readonly string _pubName;
private readonly string _recName;
public SqlServerDataStorage(
IOptions<CapOptions> capOptions,
IOptions<SqlServerOptions> options)
IOptions<SqlServerOptions> options,
IStorageInitializer initializer)
{
_options = options;
_initializer = initializer;
_capOptions = capOptions;
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE [{_options.Value.Schema}].[Published] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
$"UPDATE {_pubName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
......@@ -48,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE [{_options.Value.Schema}].[Received] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
$"UPDATE {_recName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
......@@ -62,7 +69,7 @@ namespace DotNetCore.CAP.SqlServer
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
{
var sql = $"INSERT INTO {_options.Value.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
var sql = $"INSERT INTO {_pubName} ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_options.Value}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage
......@@ -107,7 +114,7 @@ namespace DotNetCore.CAP.SqlServer
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
var sql =
$"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
......@@ -127,7 +134,7 @@ namespace DotNetCore.CAP.SqlServer
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql =
$"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var mdMessage = new MediumMessage
......@@ -159,14 +166,14 @@ namespace DotNetCore.CAP.SqlServer
{
await using var connection = new SqlConnection(_options.Value.ConnectionString);
return await connection.ExecuteAsync(
$"DELETE TOP (@batchCount) FROM [{_options.Value.Schema}].[{table}] WITH (readpast) WHERE ExpiresAt < @timeout;",
new {timeout, batchCount});
$"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout;",
new { timeout, batchCount });
}
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
var sql = $"SELECT TOP (200) * FROM {_pubName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
var result = new List<MediumMessage>();
......@@ -190,7 +197,7 @@ namespace DotNetCore.CAP.SqlServer
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"SELECT TOP (200) * FROM {_recName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
var result = new List<MediumMessage>();
......@@ -213,7 +220,7 @@ namespace DotNetCore.CAP.SqlServer
public IMonitoringApi GetMonitoringApi()
{
return new SqlServerMonitoringApi(_options);
return new SqlServerMonitoringApi(_options, _initializer);
}
}
}
\ No newline at end of file
......@@ -19,21 +19,24 @@ namespace DotNetCore.CAP.SqlServer
internal class SqlServerMonitoringApi : IMonitoringApi
{
private readonly SqlServerOptions _options;
private readonly string _pubName;
private readonly string _recName;
public SqlServerMonitoringApi(IOptions<SqlServerOptions> options)
public SqlServerMonitoringApi(IOptions<SqlServerOptions> options, IStorageInitializer initializer)
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}
public StatisticsDto GetStatistics()
{
var sql = string.Format(@"
var sql = $@"
set transaction isolation level read committed;
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';",
_options.Schema);
select count(Id) from {_pubName} with (nolock) where StatusName = N'Succeeded';
select count(Id) from {_recName} with (nolock) where StatusName = N'Succeeded';
select count(Id) from {_pubName} with (nolock) where StatusName = N'Failed';
select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';";
var statistics = UseConnection(connection =>
{
......@@ -54,21 +57,21 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "Published" : "Received";
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
}
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "Published" : "Received";
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
}
public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received";
var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName;
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and statusname=@StatusName";
......@@ -80,13 +83,13 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
var sqlQuery2008 =
$@"select * from
(SELECT t.*, ROW_NUMBER() OVER(order by t.Added desc) AS rownumber
from [{_options.Schema}].{tableName} as t
(SELECT t.*, ROW_NUMBER() OVER(order by t.Added desc) AS row_number
from {tableName} as t
where 1=1 {where}) as tbl
where tbl.rownumber between @offset and @offset + @limit";
where tbl.row_number between @offset and @offset + @limit";
var sqlQuery =
$"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
$"select * from {tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
return UseConnection(conn => conn.Query<MessageDto>(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, new
{
......@@ -101,34 +104,34 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Failed)));
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed)));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Succeeded)));
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded)));
}
public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Failed)));
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed)));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Succeeded)));
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded)));
}
public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
var sql = $@"SELECT * FROM {_pubName} WITH (readpast) WHERE Id={id}";
await using var connection = new SqlConnection(_options.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
var sql = $@"SELECT * FROM {_recName} WITH (readpast) WHERE Id={id}";
await using var connection = new SqlConnection(_options.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
......@@ -136,9 +139,9 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
$"select count(Id) from {tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
}
......@@ -173,7 +176,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
with aggr as (
select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) as [Key],
count(id) [Count]
from [{_options.Schema}].{tableName}
from {tableName}
where StatusName = @statusName
group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added))
)
......@@ -184,15 +187,14 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
with aggr as (
select FORMAT(Added,'yyyy-MM-dd-HH') as [Key],
count(id) [Count]
from [{_options.Schema}].{tableName}
from {tableName}
where StatusName = @statusName
group by FORMAT(Added,'yyyy-MM-dd-HH')
)
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
var valuesMap = connection.Query<TimelineCounter>(
_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery,
new {keys = keyMaps.Keys, statusName})
var valuesMap = connection
.Query<TimelineCounter>(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
......
......@@ -29,12 +29,12 @@ namespace DotNetCore.CAP.SqlServer
_logger = logger;
}
public string GetPublishedTableName()
public virtual string GetPublishedTableName()
{
return $"[{_options.Value.Schema}].[Published]";
}
public string GetReceivedTableName()
public virtual string GetReceivedTableName()
{
return $"[{_options.Value.Schema}].[Received]";
}
......@@ -64,7 +64,7 @@ BEGIN
EXEC('CREATE SCHEMA [{schema}]')
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
IF OBJECT_ID(N'{GetReceivedTableName()}',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Received](
[Id] [bigint] NOT NULL,
......@@ -83,7 +83,7 @@ CREATE TABLE [{schema}].[Received](
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END;
IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL
IF OBJECT_ID(N'{GetPublishedTableName()}',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Published](
[Id] [bigint] NOT NULL,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment