Commit fbf5fa61 authored by Savorboard's avatar Savorboard

Refactoring postgresql implementation for version 3.0

parent c15ae317
......@@ -17,10 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
if (configure == null) throw new ArgumentNullException(nameof(configure));
configure += x => x.Version = options.Version;
......@@ -38,10 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.RegisterExtension(new PostgreSqlCapOptionsExtension(x =>
{
......
......@@ -2,8 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
......@@ -22,12 +22,10 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>();
services.AddSingleton<ICapPublisher, PostgreSqlPublisher>();
services.AddSingleton<ICallbackPublisher>(provider => (PostgreSqlPublisher)provider.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, PostgreSqlCollectProcessor>();
services.AddSingleton<IDataStorage, PostgreSqlDataStorage>();
services.AddSingleton<IStorageInitializer, PostgreSqlStorageInitializer>();
services.AddTransient<CapTransactionBase, PostgreSqlCapTransaction>();
services.Configure(_configure);
......
......@@ -28,16 +28,14 @@ namespace DotNetCore.CAP
public void Configure(PostgreSqlOptions options)
{
if (options.DbContextType != null)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var provider = scope.ServiceProvider;
using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
using (var dbContext = (DbContext) provider.GetRequiredService(options.DbContextType))
{
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}
}
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
......@@ -9,13 +9,14 @@
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
<LangVersion>8</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.60.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.0" />
<PackageReference Include="Npgsql" Version="4.0.6" />
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.0.0" />
<PackageReference Include="Npgsql" Version="4.1.1" />
</ItemGroup>
<ItemGroup>
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Messages;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly PostgreSqlOptions _options;
public PostgreSqlPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<IOptions<PostgreSqlOptions>>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken))
{
if (transaction == null)
{
using (var connection = InitDbConnection())
{
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
private IDbConnection InitDbConnection()
{
var conn = new NpgsqlConnection(_options.ConnectionString);
conn.Open();
return conn;
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -3,6 +3,8 @@
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
......@@ -33,6 +35,23 @@ namespace DotNetCore.CAP
Flush();
}
public override async Task CommitAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
await dbContextTransaction.CommitAsync(cancellationToken);
break;
}
Flush();
}
public override void Rollback()
{
Debug.Assert(DbTransaction != null);
......@@ -48,6 +67,21 @@ namespace DotNetCore.CAP
}
}
public override async Task RollbackAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
await dbContextTransaction.RollbackAsync(cancellationToken);
break;
}
}
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
......@@ -85,10 +119,7 @@ namespace DotNetCore.CAP
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed)
{
dbConnection.Open();
}
if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();
var dbTransaction = dbConnection.BeginTransaction();
publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
internal class PostgreSqlCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
private static readonly string[] Tables =
{
"published", "received"
};
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public PostgreSqlCollectProcessor(ILogger<PostgreSqlCollectProcessor> logger,
IOptions<PostgreSqlOptions> sqlServerOptions)
{
_logger = logger;
_options = sqlServerOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
{
foreach (var table in Tables)
{
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
var removedCount = 0;
do
{
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync(
$"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlDataStorage : IDataStorage
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<PostgreSqlOptions> _options;
public PostgreSqlDataStorage(
IOptions<PostgreSqlOptions> options,
IOptions<CapOptions> capOptions)
{
_capOptions = capOptions;
_options = options;
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE \"{_options.Value.Schema}\".\"published\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE \"{_options.Value.Schema}\".\"received\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
{
var sql =
$"INSERT INTO \"{_options.Value.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage
{
DbId = content.GetId(),
Origin = content,
Content = StringSerializer.Serialize(content),
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var po = new
{
Id = message.DbId,
Name = name,
message.Content,
message.Retries,
message.Added,
message.ExpiresAt,
StatusName = StatusName.Scheduled
};
if (dbTransaction == null)
{
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, po);
}
else
{
var dbTrans = dbTransaction as IDbTransaction;
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
dbTrans = dbContextTrans.GetDbTransaction();
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(sql, po, dbTrans);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
var sql =
$"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql =
$"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
Origin = message,
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = mdMessage.DbId,
Group = group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
return mdMessage;
}
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
return await connection.ExecuteAsync(
$"DELETE FROM \"{_options.Value.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Value.Schema}\".\"{table}\" LIMIT @count);",
new {timeout, batchCount});
}
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
var result = new List<MediumMessage>();
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
var result = new List<MediumMessage>();
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
}
public IMonitoringApi GetMonitoringApi()
{
return new PostgreSqlMonitoringApi(_options);
}
}
}
\ No newline at end of file
......@@ -2,6 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
......@@ -33,6 +35,21 @@ namespace Microsoft.EntityFrameworkCore.Storage
_transaction.Rollback();
}
public Task CommitAsync(CancellationToken cancellationToken = default)
{
return _transaction.CommitAsync(cancellationToken);
}
public Task RollbackAsync(CancellationToken cancellationToken = default)
{
return _transaction.CommitAsync(cancellationToken);
}
public Guid TransactionId { get; }
public ValueTask DisposeAsync()
{
return new ValueTask(Task.Run(() => _transaction.Dispose()));
}
}
}
\ No newline at end of file
......@@ -5,24 +5,45 @@ using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlMonitoringApi : IMonitoringApi
{
private readonly PostgreSqlOptions _options;
private readonly PostgreSqlStorage _storage;
private readonly IOptions<PostgreSqlOptions> _options;
public PostgreSqlMonitoringApi(IStorage storage, IOptions<PostgreSqlOptions> options)
public PostgreSqlMonitoringApi(IOptions<PostgreSqlOptions> options)
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_storage = storage as PostgreSqlStorage ?? throw new ArgumentNullException(nameof(storage));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
}
public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
}
public StatisticsDto GetStatistics()
......@@ -32,7 +53,7 @@ select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeed
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';",
_options.Schema);
_options.Value.Schema);
var statistics = UseConnection(connection =>
{
......@@ -56,28 +77,16 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
where += " and Lower(\"StatusName\") = Lower(@StatusName)";
}
if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(\"StatusName\") = Lower(@StatusName)";
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Lower(\"Name\") = Lower(@Name)";
}
if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Lower(\"Name\") = Lower(@Name)";
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and Lower(\"Group\") = Lower(@Group)";
}
if (!string.IsNullOrEmpty(queryDto.Group)) where += " and Lower(\"Group\") = Lower(@Group)";
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and \"Content\" ILike '%@Content%'";
}
if (!string.IsNullOrEmpty(queryDto.Content)) where += " and \"Content\" ILike '%@Content%'";
var sqlQuery =
$"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
$"select * from \"{_options.Value.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
......@@ -92,42 +101,42 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Failed)));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Succeeded)));
}
public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Failed)));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Succeeded)));
}
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded));
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
}
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Failed));
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
}
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
$"select count(\"Id\") from \"{_options.Value.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
......@@ -135,7 +144,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
private T UseConnection<T>(Func<IDbConnection, T> action)
{
return _storage.UseConnection(action);
return action(new NpgsqlConnection(_options.Value.ConnectionString));
}
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
......@@ -165,7 +174,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
with aggr as (
select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"",
count(""Id"") as ""Count""
from ""{_options.Schema}"".""{tableName}""
from ""{_options.Value.Schema}"".""{tableName}""
where ""StatusName"" = @statusName
group by to_char(""Added"", 'yyyy-MM-dd-HH')
)
......@@ -178,9 +187,7 @@ select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
......@@ -193,4 +200,10 @@ select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
return result;
}
}
internal class TimelineCounter
{
public string Key { get; set; }
public int Count { get; set; }
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
public PostgreSqlStorageConnection(
IOptions<PostgreSqlOptions> options,
IOptions<CapOptions> capOptions)
{
_capOptions = capOptions.Value;
Options = options.Value;
}
public PostgreSqlOptions Options { get; }
public IStorageTransaction CreateTransaction()
{
return new PostgreSqlStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
public class PostgreSqlStorageInitializer : IStorageInitializer
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly IOptions<PostgreSqlOptions> _options;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger,
IOptions<CapOptions> capOptions,
public PostgreSqlStorageInitializer(
ILogger<PostgreSqlStorageInitializer> logger,
IOptions<PostgreSqlOptions> options)
{
_options = options;
_logger = logger;
_capOptions = capOptions;
}
public IStorageConnection GetConnection()
public string GetPublishedTableName()
{
return new PostgreSqlStorageConnection(_options, _capOptions);
return $"{_options.Value.Schema}.published";
}
public IMonitoringApi GetMonitoringApi()
public string GetReceivedTableName()
{
return new PostgreSqlMonitoringApi(this, _options);
return $"{_options.Value.Schema}.received";
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(_options.Value.Schema);
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
......@@ -56,45 +47,6 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
internal T UseConnection<T>(Func<IDbConnection, T> func)
{
IDbConnection connection = null;
try
{
connection = CreateAndOpenConnection();
return func(connection);
}
finally
{
ReleaseConnection(connection);
}
}
internal IDbConnection CreateAndOpenConnection()
{
var connection = _existingConnection ?? new NpgsqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
internal bool IsExistingConnection(IDbConnection connection)
{
return connection != null && ReferenceEquals(connection, _existingConnection);
}
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
protected virtual string CreateDbTablesScript(string schema)
{
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Messages;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageTransaction : IStorageTransaction
{
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly string _schema;
public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection)
{
var options = connection.Options;
_schema = options.Schema;
_dbConnection = new NpgsqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
......@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.RabbitMQ;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;
......@@ -27,7 +26,6 @@ namespace DotNetCore.CAP
services.AddSingleton<ITransport, RabbitMQMessageSender>();
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
}
}
}
\ No newline at end of file
......@@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using DotNetCore.CAP.Messages;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment