Commit 65a5ea83 authored by Savorboard's avatar Savorboard

Refactoring sqlserver implementation for version 3.0

parent df677948
...@@ -17,10 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -17,10 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure) public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
{ {
if (configure == null) if (configure == null) throw new ArgumentNullException(nameof(configure));
{
throw new ArgumentNullException(nameof(configure));
}
configure += x => x.Version = options.Version; configure += x => x.Version = options.Version;
...@@ -38,10 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -38,10 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure) public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext where TContext : DbContext
{ {
if (configure == null) if (configure == null) throw new ArgumentNullException(nameof(configure));
{
throw new ArgumentNullException(nameof(configure));
}
options.RegisterExtension(new SqlServerCapOptionsExtension(x => options.RegisterExtension(new SqlServerCapOptionsExtension(x =>
{ {
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.SqlServer; using DotNetCore.CAP.SqlServer;
using DotNetCore.CAP.SqlServer.Diagnostics; using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
...@@ -25,12 +25,8 @@ namespace DotNetCore.CAP ...@@ -25,12 +25,8 @@ namespace DotNetCore.CAP
services.AddSingleton<CapStorageMarkerService>(); services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<DiagnosticProcessorObserver>(); services.AddSingleton<DiagnosticProcessorObserver>();
services.AddSingleton<IStorage, SqlServerStorage>(); services.AddSingleton<IDataStorage, SqlServerDataStorage>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); services.AddSingleton<IStorageInitializer, SqlServerStorageInitializer>();
services.AddSingleton<ICapPublisher, SqlServerPublisher>();
services.AddSingleton<ICallbackPublisher>(x => (SqlServerPublisher)x.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, SqlServerCollectProcessor>();
services.AddTransient<CapTransactionBase, SqlServerCapTransaction>(); services.AddTransient<CapTransactionBase, SqlServerCapTransaction>();
services.Configure(_configure); services.Configure(_configure);
......
...@@ -28,17 +28,12 @@ namespace DotNetCore.CAP ...@@ -28,17 +28,12 @@ namespace DotNetCore.CAP
public void Configure(SqlServerOptions options) public void Configure(SqlServerOptions options)
{ {
if (options.DbContextType != null) if (options.DbContextType == null) return;
{
using (var scope = _serviceScopeFactory.CreateScope()) using var scope = _serviceScopeFactory.CreateScope();
{ var provider = scope.ServiceProvider;
var provider = scope.ServiceProvider; using var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType);
using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType)) options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
{
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}
} }
} }
} }
\ No newline at end of file
...@@ -4,9 +4,9 @@ ...@@ -4,9 +4,9 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data.SqlClient;
using System.Reflection; using System.Reflection;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence;
using Microsoft.Data.SqlClient;
namespace DotNetCore.CAP.SqlServer.Diagnostics namespace DotNetCore.CAP.SqlServer.Diagnostics
{ {
...@@ -16,11 +16,11 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -16,11 +16,11 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter"; public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter";
public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError"; public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError";
private readonly ConcurrentDictionary<Guid, List<CapPublishedMessage>> _bufferList; private readonly ConcurrentDictionary<Guid, List<MediumMessage>> _bufferList;
private readonly IDispatcher _dispatcher; private readonly IDispatcher _dispatcher;
public DiagnosticObserver(IDispatcher dispatcher, public DiagnosticObserver(IDispatcher dispatcher,
ConcurrentDictionary<Guid, List<CapPublishedMessage>> bufferList) ConcurrentDictionary<Guid, List<MediumMessage>> bufferList)
{ {
_dispatcher = dispatcher; _dispatcher = dispatcher;
_bufferList = bufferList; _bufferList = bufferList;
...@@ -41,12 +41,10 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -41,12 +41,10 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection"); var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection");
var transactionKey = sqlConnection.ClientConnectionId; var transactionKey = sqlConnection.ClientConnectionId;
if (_bufferList.TryRemove(transactionKey, out var msgList)) if (_bufferList.TryRemove(transactionKey, out var msgList))
{
foreach (var message in msgList) foreach (var message in msgList)
{ {
_dispatcher.EnqueueToPublish(message); _dispatcher.EnqueueToPublish(message);
} }
}
} }
else if (evt.Key == SqlErrorCommitTransaction) else if (evt.Key == SqlErrorCommitTransaction)
{ {
......
...@@ -5,7 +5,7 @@ using System; ...@@ -5,7 +5,7 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP.SqlServer.Diagnostics namespace DotNetCore.CAP.SqlServer.Diagnostics
{ {
...@@ -17,10 +17,10 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -17,10 +17,10 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
public DiagnosticProcessorObserver(IDispatcher dispatcher) public DiagnosticProcessorObserver(IDispatcher dispatcher)
{ {
_dispatcher = dispatcher; _dispatcher = dispatcher;
BufferList = new ConcurrentDictionary<Guid, List<CapPublishedMessage>>(); BufferList = new ConcurrentDictionary<Guid, List<MediumMessage>>();
} }
public ConcurrentDictionary<Guid, List<CapPublishedMessage>> BufferList { get; } public ConcurrentDictionary<Guid, List<MediumMessage>> BufferList { get; }
public void OnCompleted() public void OnCompleted()
{ {
...@@ -33,9 +33,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics ...@@ -33,9 +33,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
public void OnNext(DiagnosticListener listener) public void OnNext(DiagnosticListener listener)
{ {
if (listener.Name == DiagnosticListenerName) if (listener.Name == DiagnosticListenerName)
{
listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList)); listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList));
}
} }
} }
} }
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName> <AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName>
<PackageTags>$(PackageTags);SQL Server</PackageTags> <PackageTags>$(PackageTags);SQL Server</PackageTags>
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile> <DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn> <NoWarn>1701;1702;1705;CS1591</NoWarn>
<LangVersion>8</LangVersion>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="1.60.6" /> <PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.0" /> <PackageReference Include="Microsoft.Data.SqlClient" Version="1.1.0-preview2.19309.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.0.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.6.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.0.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Messages;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly SqlServerOptions _options;
public SqlServerPublisher(IServiceProvider provider) : base(provider)
{
_options = ServiceProvider.GetService<IOptions<SqlServerOptions>>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken))
{
if (transaction == null)
{
using (var connection = new SqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO {_options.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
#endregion private methods
}
}
\ No newline at end of file
...@@ -4,10 +4,12 @@ ...@@ -4,10 +4,12 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data; using System.Data;
using System.Data.SqlClient; using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.SqlServer.Diagnostics; using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
...@@ -28,14 +30,12 @@ namespace DotNetCore.CAP ...@@ -28,14 +30,12 @@ namespace DotNetCore.CAP
{ {
var sqlServerOptions = serviceProvider.GetService<IOptions<SqlServerOptions>>().Value; var sqlServerOptions = serviceProvider.GetService<IOptions<SqlServerOptions>>().Value;
if (sqlServerOptions.DbContextType != null) if (sqlServerOptions.DbContextType != null)
{
_dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext; _dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext;
}
_diagnosticProcessor = serviceProvider.GetRequiredService<DiagnosticProcessorObserver>(); _diagnosticProcessor = serviceProvider.GetRequiredService<DiagnosticProcessorObserver>();
} }
protected override void AddToSent(CapPublishedMessage msg) protected override void AddToSent(MediumMessage msg)
{ {
if (DbTransaction is NoopTransaction) if (DbTransaction is NoopTransaction)
{ {
...@@ -47,24 +47,19 @@ namespace DotNetCore.CAP ...@@ -47,24 +47,19 @@ namespace DotNetCore.CAP
if (dbTransaction == null) if (dbTransaction == null)
{ {
if (DbTransaction is IDbContextTransaction dbContextTransaction) if (DbTransaction is IDbContextTransaction dbContextTransaction)
{
dbTransaction = dbContextTransaction.GetDbTransaction(); dbTransaction = dbContextTransaction.GetDbTransaction();
}
if (dbTransaction == null) if (dbTransaction == null) throw new ArgumentNullException(nameof(DbTransaction));
{
throw new ArgumentNullException(nameof(DbTransaction));
}
} }
var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId; var transactionKey = ((SqlConnection) dbTransaction.Connection).ClientConnectionId;
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list)) if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list))
{ {
list.Add(msg); list.Add(msg);
} }
else else
{ {
var msgList = new List<CapPublishedMessage>(1) { msg }; var msgList = new List<MediumMessage>(1) {msg};
_diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList); _diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList);
} }
} }
...@@ -86,6 +81,23 @@ namespace DotNetCore.CAP ...@@ -86,6 +81,23 @@ namespace DotNetCore.CAP
} }
} }
public override async Task CommitAsync(CancellationToken cancellationToken = default)
{
switch (DbTransaction)
{
case NoopTransaction _:
Flush();
break;
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
await _dbContext.SaveChangesAsync(cancellationToken);
await dbContextTransaction.CommitAsync(cancellationToken);
break;
}
}
public override void Rollback() public override void Rollback()
{ {
switch (DbTransaction) switch (DbTransaction)
...@@ -99,6 +111,19 @@ namespace DotNetCore.CAP ...@@ -99,6 +111,19 @@ namespace DotNetCore.CAP
} }
} }
public override async Task RollbackAsync(CancellationToken cancellationToken = default)
{
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
await dbContextTransaction.RollbackAsync(cancellationToken);
break;
}
}
public override void Dispose() public override void Dispose()
{ {
switch (DbTransaction) switch (DbTransaction)
...@@ -110,6 +135,7 @@ namespace DotNetCore.CAP ...@@ -110,6 +135,7 @@ namespace DotNetCore.CAP
dbContextTransaction.Dispose(); dbContextTransaction.Dispose();
break; break;
} }
DbTransaction = null; DbTransaction = null;
} }
} }
...@@ -144,15 +170,12 @@ namespace DotNetCore.CAP ...@@ -144,15 +170,12 @@ namespace DotNetCore.CAP
public static IDbTransaction BeginTransaction(this IDbConnection dbConnection, public static IDbTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false) ICapPublisher publisher, bool autoCommit = false)
{ {
if (dbConnection.State == ConnectionState.Closed) if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();
{
dbConnection.Open();
}
var dbTransaction = dbConnection.BeginTransaction(); var dbTransaction = dbConnection.BeginTransaction();
publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>(); publisher.Transaction.Value = publisher.ServiceProvider.GetService<CapTransactionBase>();
var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit); var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
return (IDbTransaction)capTransaction.DbTransaction; return (IDbTransaction) capTransaction.DbTransaction;
} }
/// <summary> /// <summary>
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
private static readonly string[] Tables =
{
"Published", "Received"
};
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public SqlServerCollectProcessor(
ILogger<SqlServerCollectProcessor> logger,
IOptions<SqlServerOptions> sqlServerOptions)
{
_logger = logger;
_options = sqlServerOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
{
foreach (var table in Tables)
{
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
int removedCount;
do
{
using (var connection = new SqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($@"
DELETE TOP (@count)
FROM [{_options.Schema}].[{table}] WITH (readpast)
WHERE ExpiresAt < @now;", new {now = DateTime.Now, count = MaxBatch});
}
if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerDataStorage : IDataStorage
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<SqlServerOptions> _options;
public SqlServerDataStorage(
IOptions<CapOptions> capOptions,
IOptions<SqlServerOptions> options)
{
_options = options;
_capOptions = capOptions;
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE [{_options.Value.Schema}].[Published] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE [{_options.Value.Schema}].[Received] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
{
var sql = $"INSERT INTO {_options.Value.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_options.Value}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage
{
DbId = content.GetId(),
Origin = content,
Content = StringSerializer.Serialize(content),
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var po = new
{
Id = message.DbId,
Name = name,
message.Content,
message.Retries,
message.Added,
message.ExpiresAt,
StatusName = StatusName.Scheduled
};
if (dbTransaction == null)
{
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, po);
}
else
{
var dbTrans = dbTransaction as IDbTransaction;
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
dbTrans = dbContextTrans.GetDbTransaction();
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(sql, po, dbTrans);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
var sql =
$"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql =
$"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
Origin = message,
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = mdMessage.DbId,
Group = group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
return mdMessage;
}
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
await using var connection = new SqlConnection(_options.Value.ConnectionString);
return await connection.ExecuteAsync(
$"DELETE TOP (@batchCount) FROM [{_options.Value.Schema}].[{table}] WITH (readpast) WHERE ExpiresAt < @timeout;",
new {timeout, batchCount});
}
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
var result = new List<MediumMessage>();
await using var connection = new SqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
var result = new List<MediumMessage>();
await using var connection = new SqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
}
public IMonitoringApi GetMonitoringApi()
{
return new SqlServerMonitoringApi(_options);
}
}
}
\ No newline at end of file
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
...@@ -18,6 +20,8 @@ namespace Microsoft.EntityFrameworkCore.Storage ...@@ -18,6 +20,8 @@ namespace Microsoft.EntityFrameworkCore.Storage
TransactionId = dbContextTransaction.TransactionId; TransactionId = dbContextTransaction.TransactionId;
} }
public Guid TransactionId { get; }
public void Dispose() public void Dispose()
{ {
_transaction.Dispose(); _transaction.Dispose();
...@@ -33,6 +37,19 @@ namespace Microsoft.EntityFrameworkCore.Storage ...@@ -33,6 +37,19 @@ namespace Microsoft.EntityFrameworkCore.Storage
_transaction.Rollback(); _transaction.Rollback();
} }
public Guid TransactionId { get; } public async Task CommitAsync(CancellationToken cancellationToken = default)
{
await _transaction.CommitAsync(cancellationToken);
}
public async Task RollbackAsync(CancellationToken cancellationToken = default)
{
await _transaction.RollbackAsync(cancellationToken);
}
public ValueTask DisposeAsync()
{
return new ValueTask(Task.Run(() => _transaction.Dispose()));
}
} }
} }
\ No newline at end of file
...@@ -5,11 +5,13 @@ using System; ...@@ -5,11 +5,13 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data; using System.Data;
using System.Linq; using System.Linq;
using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
...@@ -17,12 +19,10 @@ namespace DotNetCore.CAP.SqlServer ...@@ -17,12 +19,10 @@ namespace DotNetCore.CAP.SqlServer
internal class SqlServerMonitoringApi : IMonitoringApi internal class SqlServerMonitoringApi : IMonitoringApi
{ {
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly SqlServerStorage _storage;
public SqlServerMonitoringApi(IStorage storage, IOptions<SqlServerOptions> options) public SqlServerMonitoringApi(IOptions<SqlServerOptions> options)
{ {
_options = options.Value ?? throw new ArgumentNullException(nameof(options)); _options = options.Value ?? throw new ArgumentNullException(nameof(options));
_storage = storage as SqlServerStorage ?? throw new ArgumentNullException(nameof(storage));
} }
public StatisticsDto GetStatistics() public StatisticsDto GetStatistics()
...@@ -56,39 +56,27 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; ...@@ -56,39 +56,27 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
{ {
var tableName = type == MessageType.Publish ? "Published" : "Received"; var tableName = type == MessageType.Publish ? "Published" : "Received";
return UseConnection(connection => return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Failed)); GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
} }
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type) public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{ {
var tableName = type == MessageType.Publish ? "Published" : "Received"; var tableName = type == MessageType.Publish ? "Published" : "Received";
return UseConnection(connection => return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded)); GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
} }
public IList<MessageDto> Messages(MessageQueryDto queryDto) public IList<MessageDto> Messages(MessageQueryDto queryDto)
{ {
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received"; var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received";
var where = string.Empty; var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName)) if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and statusname=@StatusName";
{
where += " and statusname=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name)) if (!string.IsNullOrEmpty(queryDto.Name)) where += " and name=@Name";
{
where += " and name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group)) if (!string.IsNullOrEmpty(queryDto.Group)) where += " and [group]=@Group";
{
where += " and [group]=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content)) if (!string.IsNullOrEmpty(queryDto.Content)) where += " and content like '%@Content%'";
{
where += " and content like '%@Content%'";
}
var sqlQuery2008 = var sqlQuery2008 =
$@"select * from $@"select * from
...@@ -113,22 +101,36 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; ...@@ -113,22 +101,36 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
public int PublishedFailedCount() public int PublishedFailedCount()
{ {
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Failed)); return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Failed)));
} }
public int PublishedSucceededCount() public int PublishedSucceededCount()
{ {
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Succeeded)); return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Succeeded)));
} }
public int ReceivedFailedCount() public int ReceivedFailedCount()
{ {
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Failed)); return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Failed)));
} }
public int ReceivedSucceededCount() public int ReceivedSucceededCount()
{ {
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Succeeded)); return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Succeeded)));
}
public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
await using var connection = new SqlConnection(_options.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
await using var connection = new SqlConnection(_options.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
} }
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
...@@ -142,7 +144,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; ...@@ -142,7 +144,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
private T UseConnection<T>(Func<IDbConnection, T> action) private T UseConnection<T>(Func<IDbConnection, T> action)
{ {
return _storage.UseConnection(action); return action(new SqlConnection(_options.ConnectionString));
} }
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName, private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
...@@ -195,10 +197,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; ...@@ -195,10 +197,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)
{ {
if (!valuesMap.ContainsKey(key)) if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
{
valuesMap.Add(key, 0);
}
} }
var result = new Dictionary<DateTime, int>(); var result = new Dictionary<DateTime, int>();
...@@ -211,4 +210,11 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; ...@@ -211,4 +210,11 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
return result; return result;
} }
} }
internal class TimelineCounter
{
public string Key { get; set; }
public int Count { get; set; }
}
} }
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
public SqlServerStorageConnection(
IOptions<SqlServerOptions> options,
IOptions<CapOptions> capOptions)
{
_capOptions = capOptions.Value;
Options = options.Value;
}
public SqlServerOptions Options { get; }
public IStorageTransaction CreateTransaction()
{
return new SqlServerStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"
INSERT INTO [{Options.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(Options.ConnectionString))
{
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics; using System.Diagnostics;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.SqlServer.Diagnostics; using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
public class SqlServerStorage : IStorage public class SqlServerStorageInitializer : IStorageInitializer
{ {
private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<SqlServerOptions> _options; private readonly IOptions<SqlServerOptions> _options;
private readonly IDbConnection _existingConnection = null;
private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver;
public SqlServerStorage( public SqlServerStorageInitializer(
ILogger<SqlServerStorage> logger, ILogger<SqlServerStorageInitializer> logger,
IOptions<CapOptions> capOptions,
IOptions<SqlServerOptions> options, IOptions<SqlServerOptions> options,
DiagnosticProcessorObserver diagnosticProcessorObserver) DiagnosticProcessorObserver diagnosticProcessorObserver)
{ {
_options = options; _options = options;
_diagnosticProcessorObserver = diagnosticProcessorObserver; _diagnosticProcessorObserver = diagnosticProcessorObserver;
_logger = logger; _logger = logger;
_capOptions = capOptions;
} }
public IStorageConnection GetConnection() public string GetPublishedTableName()
{ {
return new SqlServerStorageConnection(_options, _capOptions); return $"[{_options.Value.Schema}].[Published]";
} }
public IMonitoringApi GetMonitoringApi() public string GetReceivedTableName()
{ {
return new SqlServerMonitoringApi(this, _options); return $"[{_options.Value.Schema}].[Received]";
} }
public async Task InitializeAsync(CancellationToken cancellationToken = default(CancellationToken)) public async Task InitializeAsync(CancellationToken cancellationToken)
{ {
if (cancellationToken.IsCancellationRequested) if (cancellationToken.IsCancellationRequested) return;
{
return;
}
var sql = CreateDbTablesScript(_options.Value.Schema); var sql = CreateDbTablesScript(_options.Value.Schema);
using (var connection = new SqlConnection(_options.Value.ConnectionString)) using (var connection = new SqlConnection(_options.Value.ConnectionString))
{ {
await connection.ExecuteAsync(sql); await connection.ExecuteAsync(sql);
...@@ -64,6 +54,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -64,6 +54,7 @@ namespace DotNetCore.CAP.SqlServer
DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver); DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver);
} }
protected virtual string CreateDbTablesScript(string schema) protected virtual string CreateDbTablesScript(string schema)
{ {
var batchSql = var batchSql =
...@@ -111,45 +102,5 @@ CREATE TABLE [{schema}].[Published]( ...@@ -111,45 +102,5 @@ CREATE TABLE [{schema}].[Published](
END;"; END;";
return batchSql; return batchSql;
} }
internal T UseConnection<T>(Func<IDbConnection, T> func)
{
IDbConnection connection = null;
try
{
connection = CreateAndOpenConnection();
return func(connection);
}
finally
{
ReleaseConnection(connection);
}
}
internal IDbConnection CreateAndOpenConnection()
{
var connection = _existingConnection ?? new SqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
internal bool IsExistingConnection(IDbConnection connection)
{
return connection != null && ReferenceEquals(connection, _existingConnection);
}
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
} }
} }
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorageTransaction : IStorageTransaction
{
private readonly IDbConnection _dbConnection;
private readonly string _schema;
public SqlServerStorageTransaction(SqlServerStorageConnection connection)
{
var options = connection.Options;
_schema = options.Schema;
_dbConnection = new SqlConnection(options.ConnectionString);
_dbConnection.Open();
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message);
}
public Task CommitAsync()
{
return Task.CompletedTask;
}
public void Dispose()
{
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment