Commit 5e432a94 authored by Savorboard's avatar Savorboard

cleanup code and fix spelling

parent 49f4eb6b
......@@ -20,7 +20,7 @@ namespace Sample.Kafka.SqlServer
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerProt = 8500;
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5820;
d.NodeName = "CAP 2号节点";
......@@ -40,8 +40,6 @@ namespace Sample.Kafka.SqlServer
app.UseMvc();
app.UseCap();
app.UseCapDashboard();
}
}
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ namespace Sample.RabbitMQ.MySql
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerProt = 8500;
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeName = "CAP 1号节点";
......@@ -45,7 +45,6 @@ namespace Sample.RabbitMQ.MySql
app.UseMvc();
app.UseCap();
app.UseCapDashboard();
}
}
}
......@@ -26,7 +26,7 @@ namespace Sample.RabbitMQ.PostgreSql
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerProt = 8500;
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeName = "CAP一号节点";
......
......@@ -29,7 +29,7 @@ namespace Sample.RabbitMQ.SqlServer
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerProt = 8500;
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeName = "CAP一号节点";
......
......@@ -10,14 +10,6 @@ namespace DotNetCore.CAP
/// </summary>
public class KafkaOptions
{
private IEnumerable<KeyValuePair<string, object>> _kafkaConfig;
public KafkaOptions()
{
MainConfig = new Dictionary<string, object>();
}
/// <summary>
/// librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
/// <para>
......@@ -26,22 +18,28 @@ namespace DotNetCore.CAP
/// </summary>
public readonly IDictionary<string, object> MainConfig;
private IEnumerable<KeyValuePair<string, object>> _kafkaConfig;
public KafkaOptions()
{
MainConfig = new Dictionary<string, object>();
}
/// <summary>
/// The `bootstrap.servers` item config of <see cref="MainConfig"/>.
/// The `bootstrap.servers` item config of <see cref="MainConfig" />.
/// <para>
/// Initial list of brokers as a CSV list of broker host or host:port.
/// </para>
/// </summary>
public string Servers { get; set; }
internal IEnumerable<KeyValuePair<string, object>> AskafkaConfig()
internal IEnumerable<KeyValuePair<string, object>> AsKafkaConfig()
{
if (_kafkaConfig == null)
{
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
MainConfig["bootstrap.servers"] = Servers;
MainConfig["queue.buffering.max.ms"] = "10";
......
......@@ -9,18 +9,17 @@ namespace Microsoft.Extensions.DependencyInjection
/// <summary>
/// Configuration to use kafka in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="bootstrapServers">Kafka bootstrap server urls.</param>
public static CapOptions UseKafka(this CapOptions options, string bootstrapServers)
{
return options.UseKafka(opt =>
{
opt.Servers = bootstrapServers;
});
return options.UseKafka(opt => { opt.Servers = bootstrapServers; });
}
/// <summary>
/// Configuration to use kafka in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="configure">Provides programmatic configuration for the kafka .</param>
/// <returns></returns>
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure)
......
......@@ -13,12 +13,6 @@ namespace DotNetCore.CAP.Kafka
private readonly KafkaOptions _kafkaOptions;
private Consumer<Null, string> _consumerClient;
public event EventHandler<MessageContext> OnMessageReceieved;
public event EventHandler<string> OnError;
public IDeserializer<string> StringDeserializer { get; set; }
public KafkaConsumerClient(string groupId, KafkaOptions options)
{
_groupId = groupId;
......@@ -26,15 +20,19 @@ namespace DotNetCore.CAP.Kafka
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}
public IDeserializer<string> StringDeserializer { get; set; }
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<string> OnError;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
throw new ArgumentNullException(nameof(topics));
if (_consumerClient == null)
{
InitKafkaClient();
}
//_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0)));
_consumerClient.Subscribe(topics);
......@@ -65,7 +63,7 @@ namespace DotNetCore.CAP.Kafka
{
_kafkaOptions.MainConfig["group.id"] = _groupId;
var config = _kafkaOptions.AskafkaConfig();
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage;
......@@ -81,7 +79,7 @@ namespace DotNetCore.CAP.Kafka
Content = e.Value
};
OnMessageReceieved?.Invoke(sender, message);
OnMessageReceived?.Invoke(sender, message);
}
private void ConsumerClient_OnError(object sender, Error e)
......
......@@ -9,8 +9,8 @@ namespace DotNetCore.CAP.Kafka
{
internal class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions;
private readonly ILogger _logger;
public PublishQueueExecutor(
CapOptions options,
......@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Kafka
{
try
{
var config = _kafkaOptions.AskafkaConfig();
var config = _kafkaOptions.AsKafkaConfig();
var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config))
{
......@@ -39,8 +39,6 @@ namespace DotNetCore.CAP.Kafka
return Task.FromResult(OperateResult.Success);
}
else
{
return Task.FromResult(OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
......@@ -48,10 +46,10 @@ namespace DotNetCore.CAP.Kafka
}));
}
}
}
catch (Exception ex)
{
_logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
_logger.LogError(
$"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
return Task.FromResult(OperateResult.Failed(ex));
}
......
......@@ -6,7 +6,7 @@ namespace DotNetCore.CAP
public class EFOptions
{
/// <summary>
/// EF dbcontext type.
/// EF db context type.
/// </summary>
internal Type DbContextType { get; set; }
}
......
......@@ -29,22 +29,18 @@ namespace DotNetCore.CAP
_configure(mysqlOptions);
if (mysqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(mysqlOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType);
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return mysqlOptions;
}
});
}
else
{
services.AddSingleton(mysqlOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
......
......@@ -9,10 +9,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static CapOptions UseMySql(this CapOptions options, string connectionString)
{
return options.UseMySql(opt =>
{
opt.ConnectionString = connectionString;
});
return options.UseMySql(opt => { opt.ConnectionString = connectionString; });
}
public static CapOptions UseMySql(this CapOptions options, Action<MySqlOptions> configure)
......@@ -27,10 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
return options.UseEntityFramework<TContext>(opt => { opt.DbContextType = typeof(TContext); });
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
......@@ -38,7 +32,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions { DbContextType = typeof(TContext) };
var efOptions = new EFOptions {DbContextType = typeof(TContext)};
configure(efOptions);
options.RegisterExtension(new MySqlCapOptionsExtension(configure));
......
......@@ -13,9 +13,9 @@ namespace DotNetCore.CAP.MySql
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly DbContext _dbContext;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
......@@ -28,7 +28,15 @@ namespace DotNetCore.CAP.MySql
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
......@@ -45,36 +53,31 @@ namespace DotNetCore.CAP.MySql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = dbTrans;
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
{
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
......
......@@ -9,11 +9,10 @@ namespace DotNetCore.CAP.MySql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger,
......@@ -27,7 +26,8 @@ namespace DotNetCore.CAP.MySql
{
_logger.LogDebug("Collecting expired entities.");
var tables = new[]{
var tables = new[]
{
$"{_options.TableNamePrefix}.published",
$"{_options.TableNamePrefix}.received"
};
......@@ -39,8 +39,9 @@ namespace DotNetCore.CAP.MySql
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;",
new { now = DateTime.Now, count = MaxBatch });
removedCount = await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;",
new {now = DateTime.Now, count = MaxBatch});
}
if (removedCount != 0)
......
......@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public MySqlFetchedMessage(int messageId,
MessageType type,
......
......@@ -11,9 +11,9 @@ namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
private readonly MySqlOptions _options;
private readonly ILogger _logger;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly MySqlOptions _options;
public MySqlStorage(ILogger<MySqlStorage> logger, MySqlOptions options)
{
......@@ -97,9 +97,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
var connection = _existingConnection ?? new MySqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -112,9 +110,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
......@@ -5,23 +5,21 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlStorageConnection : IStorageConnection
{
private readonly MySqlOptions _options;
private readonly string _prefix;
public MySqlStorageConnection(MySqlOptions options)
{
_options = options;
_prefix = _options.TableNamePrefix;
Options = options;
_prefix = Options.TableNamePrefix;
}
public MySqlOptions Options => _options;
public MySqlOptions Options { get; }
public IStorageTransaction CreateTransaction()
{
......@@ -32,7 +30,7 @@ namespace DotNetCore.CAP.MySql
{
var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
......@@ -59,7 +57,7 @@ DELETE FROM `{_prefix}.queue` LIMIT 1;";
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
......@@ -69,14 +67,12 @@ DELETE FROM `{_prefix}.queue` LIMIT 1;";
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Failed}';";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
// CapReceviedMessage
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
......@@ -85,7 +81,7 @@ DELETE FROM `{_prefix}.queue` LIMIT 1;";
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
......@@ -94,25 +90,25 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Failed}';";
using (var connection = new MySqlConnection(_options.ConnectionString))
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
......@@ -123,10 +119,32 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
{
}
public bool ChangePublishedState(int messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new MySqlConnection(_options.ConnectionString);
var connection = new MySqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
......@@ -148,17 +166,8 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
return null;
}
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}
public bool ChangePublishedState(int messageId, string state)
{
throw new NotImplementedException();
}
public bool ChangeReceivedState(int messageId, string state)
{
throw new NotImplementedException();
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}
\ No newline at end of file
......@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.MySql
{
public class MySqlStorageTransaction : IStorageTransaction
{
private readonly string _prefix;
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;
private readonly string _prefix;
public MySqlStorageTransaction(MySqlStorageConnection connection)
{
......@@ -28,7 +28,8 @@ namespace DotNetCore.CAP.MySql
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
var sql =
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -36,7 +37,8 @@ namespace DotNetCore.CAP.MySql
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
var sql =
$"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -45,7 +47,8 @@ namespace DotNetCore.CAP.MySql
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
......@@ -53,7 +56,8 @@ namespace DotNetCore.CAP.MySql
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
......
......@@ -9,7 +9,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// Default is <see cref="DefaultSchema" />.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
......
......@@ -9,10 +9,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static CapOptions UsePostgreSql(this CapOptions options, string connectionString)
{
return options.UsePostgreSql(opt =>
{
opt.ConnectionString = connectionString;
});
return options.UsePostgreSql(opt => { opt.ConnectionString = connectionString; });
}
public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
......@@ -27,10 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
return options.UseEntityFramework<TContext>(opt => { opt.DbContextType = typeof(TContext); });
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
......@@ -38,7 +32,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions { DbContextType = typeof(TContext) };
var efOptions = new EFOptions {DbContextType = typeof(TContext)};
configure(efOptions);
options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));
......
......@@ -29,22 +29,18 @@ namespace DotNetCore.CAP
_configure(postgreSqlOptions);
if (postgreSqlOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(postgreSqlOptions.DbContextType);
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return postgreSqlOptions;
}
});
}
else
{
services.AddSingleton(postgreSqlOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
......
......@@ -13,9 +13,9 @@ namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly DbContext _dbContext;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
......@@ -28,7 +28,15 @@ namespace DotNetCore.CAP.PostgreSql
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
......@@ -45,36 +53,31 @@ namespace DotNetCore.CAP.PostgreSql
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = dbTrans;
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
{
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
return
$"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
......
......@@ -9,26 +9,22 @@ namespace DotNetCore.CAP.PostgreSql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private static readonly string[] Tables =
{
"published","received"
"published", "received"
};
public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger,
PostgreSqlOptions sqlServerOptions)
{
_logger = logger;
_provider = provider;
_options = sqlServerOptions;
}
......@@ -43,8 +39,9 @@ namespace DotNetCore.CAP.PostgreSql
{
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new { now = DateTime.Now, count = MaxBatch });
removedCount = await connection.ExecuteAsync(
$"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new {now = DateTime.Now, count = MaxBatch});
}
if (removedCount != 0)
......
......@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public PostgreSqlFetchedMessage(int messageId,
MessageType type,
......
......@@ -10,10 +10,10 @@ using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlMonitoringApi: IMonitoringApi
public class PostgreSqlMonitoringApi : IMonitoringApi
{
private readonly PostgreSqlStorage _storage;
private readonly PostgreSqlOptions _options;
private readonly PostgreSqlStorage _storage;
public PostgreSqlMonitoringApi(IStorage storage, PostgreSqlOptions options)
{
......@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.PostgreSql
public StatisticsDto GetStatistics()
{
string sql = String.Format(@"
var sql = string.Format(@"
select count(Id) from ""{0}"".""published"" where ""StatusName"" = N'Succeeded';
select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(Id) from ""{0}"".""published"" where ""StatusName"" = N'Failed';
......@@ -57,30 +57,20 @@ select count(Id) from ""{0}"".""received"" where ""StatusName"" in (N'Processin
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
if (string.Equals(queryDto.StatusName, StatusName.Processing, StringComparison.CurrentCultureIgnoreCase))
{
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')";
}
else
{
where += " and \"StatusName\" = @StatusName";
}
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and \"Name\" = @Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and \"Group\" = @Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and \"Content\" like '%@Content%'";
}
var sqlQuery = $"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
var sqlQuery =
$"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
......@@ -89,7 +79,7 @@ select count(Id) from ""{0}"".""received"" where ""StatusName"" in (N'Processin
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize,
Limit = queryDto.PageSize
}).ToList());
}
......@@ -143,7 +133,7 @@ select count(Id) from ""{0}"".""received"" where ""StatusName"" in (N'Processin
? $"select count(Id) from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
}
......@@ -152,7 +142,8 @@ select count(Id) from ""{0}"".""received"" where ""StatusName"" in (N'Processin
return _storage.UseConnection(action);
}
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName, string statusName)
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
......@@ -174,7 +165,7 @@ select count(Id) from ""{0}"".""received"" where ""StatusName"" in (N'Processin
IDictionary<string, DateTime> keyMaps)
{
//SQL Server 2012+
string sqlQuery =
var sqlQuery =
$@"
with aggr as (
select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"",
......@@ -187,13 +178,11 @@ select ""Key"",""Count"" from aggr where ""Key"" in @keys;";
var valuesMap = connection.Query(
sqlQuery,
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => (string)x.Key, x => (int)x.Count);
new {keys = keyMaps.Keys, statusName})
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
......@@ -11,9 +11,9 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly PostgreSqlOptions _options;
private readonly ILogger _logger;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger, PostgreSqlOptions options)
{
......@@ -64,9 +64,7 @@ namespace DotNetCore.CAP.PostgreSql
var connection = _existingConnection ?? new NpgsqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -79,10 +77,8 @@ namespace DotNetCore.CAP.PostgreSql
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
protected virtual string CreateDbTablesScript(string schema)
{
......
......@@ -63,8 +63,6 @@ namespace DotNetCore.CAP.PostgreSql
}
}
// CapReceviedMessage
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
......@@ -87,7 +85,7 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
......@@ -97,7 +95,7 @@ namespace DotNetCore.CAP.PostgreSql
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
......
......@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageTransaction : IStorageTransaction
{
private readonly string _schema;
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;
private readonly string _schema;
public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection)
{
......@@ -28,7 +28,10 @@ namespace DotNetCore.CAP.PostgreSql
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
var sql =
$@"UPDATE ""{
_schema
}"".""published"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -36,7 +39,10 @@ namespace DotNetCore.CAP.PostgreSql
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
var sql =
$@"UPDATE ""{
_schema
}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -45,7 +51,8 @@ namespace DotNetCore.CAP.PostgreSql
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
......@@ -53,7 +60,8 @@ namespace DotNetCore.CAP.PostgreSql
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
......
......@@ -8,10 +8,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static CapOptions UseRabbitMQ(this CapOptions options, string hostName)
{
return options.UseRabbitMQ(opt =>
{
opt.HostName = hostName;
});
return options.UseRabbitMQ(opt => { opt.HostName = hostName; });
}
public static CapOptions UseRabbitMQ(this CapOptions options, Action<RabbitMQOptions> configure)
......
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
......
......@@ -10,13 +10,13 @@ namespace DotNetCore.CAP.RabbitMQ
{
private const int DefaultPoolSize = 15;
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();
private readonly Func<IConnection> _activator;
private int _maxSize;
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();
private int _count;
private int _maxSize;
public ConnectionPool(RabbitMQOptions options)
{
_maxSize = DefaultPoolSize;
......@@ -24,9 +24,28 @@ namespace DotNetCore.CAP.RabbitMQ
_activator = CreateActivator(options);
}
IConnection IConnectionPool.Rent()
{
return Rent();
}
bool IConnectionPool.Return(IConnection connection)
{
return Return(connection);
}
public void Dispose()
{
_maxSize = 0;
IConnection context;
while (_pool.TryDequeue(out context))
context.Dispose();
}
private static Func<IConnection> CreateActivator(RabbitMQOptions options)
{
var factory = new ConnectionFactory()
var factory = new ConnectionFactory
{
HostName = options.HostName,
UserName = options.UserName,
......@@ -43,7 +62,7 @@ namespace DotNetCore.CAP.RabbitMQ
public virtual IConnection Rent()
{
if (_pool.TryDequeue(out IConnection connection))
if (_pool.TryDequeue(out var connection))
{
Interlocked.Decrement(ref _count);
......@@ -72,20 +91,5 @@ namespace DotNetCore.CAP.RabbitMQ
return false;
}
IConnection IConnectionPool.Rent() => Rent();
bool IConnectionPool.Return(IConnection connection) => Return(connection);
public void Dispose()
{
_maxSize = 0;
IConnection context;
while (_pool.TryDequeue(out context))
{
context.Dispose();
}
}
}
}
\ No newline at end of file
......@@ -9,8 +9,8 @@ namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly ConnectionPool _connectionPool;
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(
......@@ -36,11 +36,11 @@ namespace DotNetCore.CAP.RabbitMQ
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, durable: true);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: keyName,
basicProperties: null,
body: body);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName,
keyName,
null,
body);
_logger.LogDebug($"rabbitmq topic message [{keyName}] has been published.");
}
......@@ -48,10 +48,11 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception ex)
{
_logger.LogError($"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
_logger.LogError(
$"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
......
......@@ -10,18 +10,14 @@ namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClient : IConsumerClient
{
private readonly ConnectionPool _connectionPool;
private readonly string _exchageName;
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly ConnectionPool _connectionPool;
private IModel _channel;
private ulong _deliveryTag;
public event EventHandler<MessageContext> OnMessageReceieved;
public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName,
ConnectionPool connectionPool,
RabbitMQOptions options)
......@@ -34,36 +30,17 @@ namespace DotNetCore.CAP.RabbitMQ
InitClient();
}
private void InitClient()
{
var connection = _connectionPool.Rent();
public event EventHandler<MessageContext> OnMessageReceived;
_channel = connection.CreateModel();
_channel.ExchangeDeclare(
exchange: _exchageName,
type: RabbitMQOptions.ExchangeType,
durable: true);
var arguments = new Dictionary<string, object> { { "x-message-ttl", _rabbitMQOptions.QueueMessageExpires } };
_channel.QueueDeclare(_queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);
_connectionPool.Return(connection);
}
public event EventHandler<string> OnError;
public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));
foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
......@@ -72,10 +49,8 @@ namespace DotNetCore.CAP.RabbitMQ
consumer.Shutdown += OnConsumerShutdown;
_channel.BasicConsume(_queueName, false, consumer);
while (true)
{
Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult();
}
}
public void Commit()
{
......@@ -87,6 +62,27 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.Dispose();
}
private void InitClient()
{
var connection = _connectionPool.Rent();
_channel = connection.CreateModel();
_channel.ExchangeDeclare(
_exchageName,
RabbitMQOptions.ExchangeType,
true);
var arguments = new Dictionary<string, object> {{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}};
_channel.QueueDeclare(_queueName,
true,
false,
false,
arguments);
_connectionPool.Return(connection);
}
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
_deliveryTag = e.DeliveryTag;
......@@ -96,7 +92,7 @@ namespace DotNetCore.CAP.RabbitMQ
Name = e.RoutingKey,
Content = Encoding.UTF8.GetString(e.Body)
};
OnMessageReceieved?.Invoke(sender, message);
OnMessageReceived?.Invoke(sender, message);
}
private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
......
......@@ -2,8 +2,8 @@
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly ConnectionPool _connectionPool;
private readonly RabbitMQOptions _rabbitMQOptions;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool)
......
......@@ -9,7 +9,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// Default is <see cref="DefaultSchema" />.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
......
......@@ -9,10 +9,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static CapOptions UseSqlServer(this CapOptions options, string connectionString)
{
return options.UseSqlServer(opt =>
{
opt.ConnectionString = connectionString;
});
return options.UseSqlServer(opt => { opt.ConnectionString = connectionString; });
}
public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
......@@ -27,10 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
return options.UseEntityFramework<TContext>(opt => { opt.DbContextType = typeof(TContext); });
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
......@@ -38,7 +32,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions { DbContextType = typeof(TContext) };
var efOptions = new EFOptions {DbContextType = typeof(TContext)};
configure(efOptions);
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
......
......@@ -34,22 +34,18 @@ namespace DotNetCore.CAP
_configure(sqlServerOptions);
if (sqlServerOptions.DbContextType != null)
{
services.AddSingleton(x =>
{
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
});
}
else
{
services.AddSingleton(sqlServerOptions);
}
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerOptions : EFOptions
......
......@@ -13,9 +13,9 @@ namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly DbContext _dbContext;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly DbContext _dbContext;
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
......@@ -28,7 +28,15 @@ namespace DotNetCore.CAP.SqlServer
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
......@@ -45,36 +53,31 @@ namespace DotNetCore.CAP.SqlServer
dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
dbTrans = dbContextTransaction.GetDbTransaction();
}
DbTranasaction = dbTrans;
DbTransaction = dbTrans;
}
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods
private string PrepareSql()
{
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
return
$"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
......
......@@ -9,18 +9,18 @@ namespace DotNetCore.CAP.SqlServer
{
public class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private static readonly string[] Tables =
{
"Published","Received"
"Published", "Received"
};
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger,
SqlServerOptions sqlServerOptions)
{
......@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer
removedCount = await connection.ExecuteAsync($@"
DELETE TOP (@count)
FROM [{_options.Schema}].[{table}] WITH (readpast)
WHERE ExpiresAt < @now;", new { now = DateTime.Now, count = MaxBatch });
WHERE ExpiresAt < @now;", new {now = DateTime.Now, count = MaxBatch});
}
if (removedCount != 0)
......
......@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.SqlServer
{
public class SqlServerFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public SqlServerFetchedMessage(int messageId,
MessageType type,
......
......@@ -12,8 +12,8 @@ namespace DotNetCore.CAP.SqlServer
{
internal class SqlServerMonitoringApi : IMonitoringApi
{
private readonly SqlServerStorage _storage;
private readonly SqlServerOptions _options;
private readonly SqlServerStorage _storage;
public SqlServerMonitoringApi(IStorage storage, SqlServerOptions options)
{
......@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.SqlServer
public StatisticsDto GetStatistics()
{
string sql = String.Format(@"
var sql = string.Format(@"
set transaction isolation level read committed;
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Succeeded';
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeeded';
......@@ -31,7 +31,7 @@ select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed'
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
select count(Id) from [{0}].Published with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');",
_options.Schema);
_options.Schema);
var statistics = UseConnection(connection =>
{
......@@ -71,30 +71,20 @@ _options.Schema);
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
if (string.Equals(queryDto.StatusName, StatusName.Processing, StringComparison.CurrentCultureIgnoreCase))
{
if (string.Equals(queryDto.StatusName, StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')";
}
else
{
where += " and statusname=@StatusName";
}
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and group=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and content like '%@Content%'";
}
var sqlQuery = $"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
var sqlQuery =
$"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";
return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
......@@ -103,7 +93,7 @@ _options.Schema);
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize,
Limit = queryDto.PageSize
}).ToList());
}
......@@ -143,7 +133,7 @@ _options.Schema);
? $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName});
return count;
}
......@@ -152,7 +142,8 @@ _options.Schema);
return _storage.UseConnection(action);
}
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName, string statusName)
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
......@@ -174,8 +165,8 @@ _options.Schema);
IDictionary<string, DateTime> keyMaps)
{
//SQL Server 2012+
string sqlQuery =
$@"
var sqlQuery =
$@"
with aggr as (
select FORMAT(Added,'yyyy-MM-dd-HH') as [Key],
count(id) [Count]
......@@ -187,13 +178,11 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
var valuesMap = connection.Query(
sqlQuery,
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => (string)x.Key, x => (int)x.Count);
new {keys = keyMaps.Keys, statusName})
.ToDictionary(x => (string) x.Key, x => (int) x.Count);
foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
......
......@@ -11,9 +11,9 @@ namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorage : IStorage
{
private readonly SqlServerOptions _options;
private readonly ILogger _logger;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
public SqlServerStorage(ILogger<SqlServerStorage> logger, SqlServerOptions options)
{
......@@ -118,9 +118,7 @@ END;";
var connection = _existingConnection ?? new SqlConnection(_options.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
......@@ -133,10 +131,7 @@ END;";
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
......@@ -101,7 +101,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
......@@ -111,7 +111,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{
var sql =
$"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
......
......@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorageTransaction : IStorageTransaction
{
private readonly string _schema;
private readonly IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;
private readonly string _schema;
public SqlServerStorageTransaction(SqlServerStorageConnection connection)
{
......@@ -28,7 +28,8 @@ namespace DotNetCore.CAP.SqlServer
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -36,7 +37,8 @@ namespace DotNetCore.CAP.SqlServer
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
var sql =
$"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
......@@ -45,7 +47,8 @@ namespace DotNetCore.CAP.SqlServer
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
......@@ -53,7 +56,8 @@ namespace DotNetCore.CAP.SqlServer
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
public Task CommitAsync()
......
......@@ -10,7 +10,7 @@ namespace DotNetCore.CAP.Abstractions
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTranasaction { get; set; }
protected IDbTransaction DbTransaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
protected bool IsCapOpenedConn { get; set; }
protected bool IsUsingEF { get; set; }
......@@ -60,13 +60,15 @@ namespace DotNetCore.CAP.Abstractions
protected abstract void PrepareConnectionForEF();
protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);
protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected virtual string Serialize<T>(T obj, string callbackName = null)
{
var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer));
var serializer = (IContentSerializer) ServiceProvider.GetService(typeof(IContentSerializer));
var message = new CapMessageDto(obj)
{
......@@ -85,11 +87,11 @@ namespace DotNetCore.CAP.Abstractions
IsCapOpenedConn = true;
DbConnection.Open();
}
DbTranasaction = dbTransaction;
if (DbTranasaction == null)
DbTransaction = dbTransaction;
if (DbTransaction == null)
{
IsCapOpenedTrans = true;
DbTranasaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
DbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}
......@@ -97,7 +99,8 @@ namespace DotNetCore.CAP.Abstractions
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
......@@ -105,7 +108,8 @@ namespace DotNetCore.CAP.Abstractions
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
throw new InvalidOperationException(
"If you are using the EntityFramework, you do not need to use this overloaded.");
}
private async Task PublishWithTransAsync(string name, string content)
......@@ -117,7 +121,7 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
await ExecuteAsync(DbConnection, DbTranasaction, message);
await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap();
......@@ -133,7 +137,7 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
Execute(DbConnection, DbTranasaction, message);
Execute(DbConnection, DbTransaction, message);
ClosedCap();
......@@ -144,18 +148,16 @@ namespace DotNetCore.CAP.Abstractions
{
if (IsCapOpenedTrans)
{
DbTranasaction.Commit();
DbTranasaction.Dispose();
DbTransaction.Commit();
DbTransaction.Dispose();
}
if (IsCapOpenedConn)
{
DbConnection.Dispose();
}
}
public void Dispose()
{
DbTranasaction?.Dispose();
DbTransaction?.Dispose();
DbConnection?.Dispose();
}
......
......@@ -3,15 +3,15 @@
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// A context for consumers, it used to be provider wapper of method description and received message.
/// A context for consumers, it used to be provider wrapper of method description and received message.
/// </summary>
public class ConsumerContext
{
/// <summary>
/// create a new instance of <see cref="ConsumerContext"/> .
/// create a new instance of <see cref="ConsumerContext" /> .
/// </summary>
/// <param name="descriptor">consumer method descriptor. </param>
/// <param name="message"> reveied message.</param>
/// <param name="message"> received message.</param>
public ConsumerContext(ConsumerExecutorDescriptor descriptor, MessageContext message)
{
ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
......@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.Abstractions
public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; }
/// <summary>
/// consumer reveived message.
/// consumer received message.
/// </summary>
public MessageContext DeliverMessage { get; set; }
}
......
......@@ -3,22 +3,22 @@
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Defines an interface for selecting an cosumer service method to invoke for the current message.
/// Defines an interface for selecting an consumer service method to invoke for the current message.
/// </summary>
public interface IConsumerServiceSelector
{
/// <summary>
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with
/// Selects a set of <see cref="ConsumerExecutorDescriptor" /> candidates for the current message associated with
/// </summary>
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns>
/// <returns>A set of <see cref="ConsumerExecutorDescriptor" /> candidates or <c>null</c>.</returns>
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates();
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
/// Selects the best <see cref="ConsumerExecutorDescriptor" /> candidate from <paramref name="candidates" /> for the
/// current message associated.
/// </summary>
/// <param name="key">topic or exchange router key.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor"/> candidates.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param>
/// <returns></returns>
ConsumerExecutorDescriptor
SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
......
......@@ -8,22 +8,22 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
public struct ModelBindingResult
{
/// <summary>
/// Creates a <see cref="ModelBindingResult"/> representing a failed model binding operation.
/// Creates a <see cref="ModelBindingResult" /> representing a failed model binding operation.
/// </summary>
/// <returns>A <see cref="ModelBindingResult"/> representing a failed model binding operation.</returns>
/// <returns>A <see cref="ModelBindingResult" /> representing a failed model binding operation.</returns>
public static ModelBindingResult Failed()
{
return new ModelBindingResult(model: null, isSuccess: false);
return new ModelBindingResult(null, false);
}
/// <summary>
/// Creates a <see cref="ModelBindingResult"/> representing a successful model binding operation.
/// Creates a <see cref="ModelBindingResult" /> representing a successful model binding operation.
/// </summary>
/// <param name="model">The model value. May be <c>null.</c></param>
/// <returns>A <see cref="ModelBindingResult"/> representing a successful model bind.</returns>
/// <returns>A <see cref="ModelBindingResult" /> representing a successful model bind.</returns>
public static ModelBindingResult Success(object model)
{
return new ModelBindingResult(model, isSuccess: true);
return new ModelBindingResult(model, true);
}
private ModelBindingResult(object model, bool isSuccess)
......@@ -42,27 +42,17 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
public override string ToString()
{
if (IsSuccess)
{
return $"Success '{Model}'";
}
else
{
return $"Failed";
}
}
public override bool Equals(object obj)
{
var other = obj as ModelBindingResult?;
if (other == null)
{
return false;
}
else
{
return Equals(other.Value);
}
}
public override int GetHashCode()
{
......@@ -81,10 +71,10 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
}
/// <summary>
/// Compares <see cref="ModelBindingResult"/> objects for equality.
/// Compares <see cref="ModelBindingResult" /> objects for equality.
/// </summary>
/// <param name="x">A <see cref="ModelBindingResult"/>.</param>
/// <param name="y">A <see cref="ModelBindingResult"/>.</param>
/// <param name="x">A <see cref="ModelBindingResult" />.</param>
/// <param name="y">A <see cref="ModelBindingResult" />.</param>
/// <returns><c>true</c> if the objects are equal, otherwise <c>false</c>.</returns>
public static bool operator ==(ModelBindingResult x, ModelBindingResult y)
{
......@@ -92,10 +82,10 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
}
/// <summary>
/// Compares <see cref="ModelBindingResult"/> objects for inequality.
/// Compares <see cref="ModelBindingResult" /> objects for inequality.
/// </summary>
/// <param name="x">A <see cref="ModelBindingResult"/>.</param>
/// <param name="y">A <see cref="ModelBindingResult"/>.</param>
/// <param name="x">A <see cref="ModelBindingResult" />.</param>
/// <param name="y">A <see cref="ModelBindingResult" />.</param>
/// <returns><c>true</c> if the objects are not equal, otherwise <c>false</c>.</returns>
public static bool operator !=(ModelBindingResult x, ModelBindingResult y)
{
......
......@@ -4,7 +4,7 @@ namespace DotNetCore.CAP.Abstractions
{
/// <inheritdoc />
/// <summary>
/// An abstract attribute that for kafka attribute or rabbitmq attribute
/// An abstract attribute that for kafka attribute or rabbit mq attribute
/// </summary>
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple = true)]
public abstract class TopicAttribute : Attribute
......@@ -20,8 +20,8 @@ namespace DotNetCore.CAP.Abstractions
public string Name { get; }
/// <summary>
/// kafak --> groups.id
/// rabbitmq --> queue.name
/// kafka --> groups.id
/// rabbit MQ --> queue.name
/// </summary>
public string Group { get; set; } = "cap.default.group";
}
......
......@@ -7,21 +7,19 @@ using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// app extensions for <see cref="IApplicationBuilder"/>
/// app extensions for <see cref="IApplicationBuilder" />
/// </summary>
public static class AppBuilderExtensions
{
///<summary>
/// <summary>
/// Enables cap for the current application
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder"/> instance this method extends.</returns>
/// <param name="app">The <see cref="IApplicationBuilder" /> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder" /> instance this method extends.</returns>
public static IApplicationBuilder UseCap(this IApplicationBuilder app)
{
if (app == null)
{
throw new ArgumentNullException(nameof(app));
}
CheckRequirement(app);
......@@ -33,9 +31,7 @@ namespace Microsoft.AspNetCore.Builder
if (provider.GetService<DashboardOptions>() != null)
{
if (provider.GetService<DiscoveryOptions>() != null)
{
app.UseMiddleware<GatewayProxyMiddleware>();
}
app.UseMiddleware<DashboardMiddleware>();
}
......@@ -46,21 +42,18 @@ namespace Microsoft.AspNetCore.Builder
{
var marker = app.ApplicationServices.GetService<CapMarkerService>();
if (marker == null)
{
throw new InvalidOperationException("AddCap() must be called on the service collection. eg: services.AddCap(...)");
}
throw new InvalidOperationException(
"AddCap() must be called on the service collection. eg: services.AddCap(...)");
var messageQueuemarker = app.ApplicationServices.GetService<CapMessageQueueMakerService>();
if (messageQueuemarker == null)
{
throw new InvalidOperationException("You must be config used message queue provider at AddCap() options! eg: services.AddCap(options=>{ options.UseKafka(...) })");
}
var messageQueueMarker = app.ApplicationServices.GetService<CapMessageQueueMakerService>();
if (messageQueueMarker == null)
throw new InvalidOperationException(
"You must be config used message queue provider at AddCap() options! eg: services.AddCap(options=>{ options.UseKafka(...) })");
var databaseMarker = app.ApplicationServices.GetService<CapDatabaseStorageMarkerService>();
if (databaseMarker == null)
{
throw new InvalidOperationException("You must be config used database provider at AddCap() options! eg: services.AddCap(options=>{ options.UseSqlServer(...) })");
}
throw new InvalidOperationException(
"You must be config used database provider at AddCap() options! eg: services.AddCap(options=>{ options.UseSqlServer(...) })");
}
}
}
\ No newline at end of file
......@@ -15,7 +15,6 @@ namespace DotNetCore.CAP
/// </summary>
public class CapDatabaseStorageMarkerService
{
}
/// <summary>
......@@ -23,7 +22,6 @@ namespace DotNetCore.CAP
/// </summary>
public class CapMessageQueueMakerService
{
}
/// <summary>
......@@ -37,7 +35,7 @@ namespace DotNetCore.CAP
}
/// <summary>
/// Gets the <see cref="IServiceCollection"/> where MVC services are configured.
/// Gets the <see cref="IServiceCollection" /> where MVC services are configured.
/// </summary>
public IServiceCollection Services { get; }
......@@ -51,7 +49,7 @@ namespace DotNetCore.CAP
}
/// <summary>
/// Add an <see cref="ICapPublisher"/>.
/// Add an <see cref="ICapPublisher" />.
/// </summary>
/// <typeparam name="T">The type of the service.</typeparam>
public virtual CapBuilder AddProducerService<T>()
......
using System;
using System.Collections.Generic;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
......@@ -8,8 +9,6 @@ namespace DotNetCore.CAP
/// </summary>
public class CapOptions
{
internal IList<ICapOptionsExtension> Extensions { get; }
/// <summary>
/// Default value for polling delay timeout, in seconds.
/// </summary>
......@@ -21,7 +20,7 @@ namespace DotNetCore.CAP
public const int DefaultQueueProcessorCount = 2;
/// <summary>
/// Default succeeded message expriation timespan, in seconds.
/// Default succeeded message expiration time span, in seconds.
/// </summary>
public const int DefaultSucceedMessageExpirationAfter = 24 * 3600;
......@@ -39,8 +38,10 @@ namespace DotNetCore.CAP
Extensions = new List<ICapOptionsExtension>();
}
internal IList<ICapOptionsExtension> Extensions { get; }
/// <summary>
/// Productor job polling delay time.
/// Producer job polling delay time.
/// Default is 15 sec.
/// </summary>
public int PollingDelay { get; set; }
......@@ -53,7 +54,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Sent or received succeed message after timespan of due, then the message will be deleted at due time.
/// Dafault is 24*3600 seconds.
/// Default is 24*3600 seconds.
/// </summary>
public int SucceedMessageExpiredAfter { get; set; }
......@@ -66,7 +67,7 @@ namespace DotNetCore.CAP
/// <summary>
/// We’ll invoke this call-back with message type,name,content when requeue failed message.
/// </summary>
public Action<Models.MessageType, string, string> FailedCallback { get; set; }
public Action<MessageType, string, string> FailedCallback { get; set; }
/// <summary>
/// Registers an extension that will be executed when building services.
......
......@@ -11,16 +11,16 @@ using Microsoft.Extensions.DependencyInjection.Extensions;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="IServiceCollection"/> for configuring consistence services.
/// Contains extension methods to <see cref="IServiceCollection" /> for configuring consistence services.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds and configures the consistence services for the consitence.
/// Adds and configures the consistence services for the consistency.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <param name="setupAction">An action to configure the <see cref="CapOptions"/>.</param>
/// <returns>An <see cref="CapBuilder"/> for application services.</returns>
/// <param name="setupAction">An action to configure the <see cref="CapOptions" />.</param>
/// <returns>An <see cref="CapBuilder" /> for application services.</returns>
public static CapBuilder AddCap(
this IServiceCollection services,
Action<CapOptions> setupAction)
......@@ -51,15 +51,13 @@ namespace Microsoft.Extensions.DependencyInjection
//Executors
services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>();
services.AddSingleton<IQueueExecutor, SubscibeQueueExecutor>();
services.AddSingleton<IQueueExecutor, SubscribeQueueExecutor>();
//Options and extension service
var options = new CapOptions();
setupAction(options);
foreach (var serviceExtension in options.Extensions)
{
serviceExtension.AddServices(services);
}
services.AddSingleton(options);
return new CapBuilder(services);
......@@ -69,19 +67,13 @@ namespace Microsoft.Extensions.DependencyInjection
{
var consumerListenerServices = new List<KeyValuePair<Type, Type>>();
foreach (var rejectedServices in services)
{
if (rejectedServices.ImplementationType != null
&& typeof(ICapSubscribe).IsAssignableFrom(rejectedServices.ImplementationType))
{
consumerListenerServices.Add(new KeyValuePair<Type, Type>(typeof(ICapSubscribe),
rejectedServices.ImplementationType));
}
}
foreach (var service in consumerListenerServices)
{
services.AddTransient(service.Key, service.Value);
}
}
}
}
\ No newline at end of file
This diff is collapsed.
......@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.Dashboard
_command(context, id);
}
context.Response.StatusCode = (int)HttpStatusCode.NoContent;
context.Response.StatusCode = (int) HttpStatusCode.NoContent;
}
}
}
\ No newline at end of file
......@@ -10,12 +10,13 @@ namespace DotNetCore.CAP
{
public class DashboardMiddleware
{
private readonly DashboardOptions _options;
private readonly RequestDelegate _next;
private readonly IStorage _storage;
private readonly DashboardOptions _options;
private readonly RouteCollection _routes;
private readonly IStorage _storage;
public DashboardMiddleware(RequestDelegate next, DashboardOptions options, IStorage storage, RouteCollection routes)
public DashboardMiddleware(RequestDelegate next, DashboardOptions options, IStorage storage,
RouteCollection routes)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_options = options ?? throw new ArgumentNullException(nameof(options));
......@@ -40,17 +41,15 @@ namespace DotNetCore.CAP
var findResult = _routes.FindDispatcher(context.Request.Path.Value);
if (findResult == null)
{
return _next.Invoke(context);
}
if (_options.Authorization.Any(filter => !filter.Authorize(dashboardContext)))
{
var isAuthenticated = context.User?.Identity?.IsAuthenticated;
context.Response.StatusCode = isAuthenticated == true
? (int)HttpStatusCode.Forbidden
: (int)HttpStatusCode.Unauthorized;
? (int) HttpStatusCode.Forbidden
: (int) HttpStatusCode.Unauthorized;
return Task.CompletedTask;
}
......
......@@ -10,7 +10,7 @@ namespace DotNetCore.CAP
{
AppPath = "/";
PathMatch = "/cap";
Authorization = new[] { new LocalRequestsOnlyAuthorizationFilter() };
Authorization = new[] {new LocalRequestsOnlyAuthorizationFilter()};
StatsPollingInterval = 2000;
}
......@@ -28,5 +28,4 @@ namespace DotNetCore.CAP
/// </summary>
public int StatsPollingInterval { get; set; }
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using DotNetCore.CAP.Dashboard.GatewayProxy.Requester;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP
{
using Dashboard;
using Dashboard.GatewayProxy;
using Dashboard.GatewayProxy.Requester;
using Microsoft.Extensions.DependencyInjection;
internal sealed class DashboardOptionsExtension : ICapOptionsExtension
{
private readonly Action<DashboardOptions> _options;
......@@ -31,13 +31,11 @@ namespace DotNetCore.CAP
namespace Microsoft.Extensions.DependencyInjection
{
using DotNetCore.CAP;
public static class CapOptionsExtensions
{
public static CapOptions UseDashboard(this CapOptions capOptions)
{
return capOptions.UseDashboard(opt => {});
return capOptions.UseDashboard(opt => { });
}
public static CapOptions UseDashboard(this CapOptions capOptions, Action<DashboardOptions> options)
......
......@@ -22,12 +22,10 @@ namespace DotNetCore.CAP.Dashboard
protected override void WriteResponse(DashboardResponse response)
{
foreach (var resourceName in _resourceNames)
{
WriteResource(
response,
_assembly,
$"{_baseNamespace}.{resourceName}");
}
}
}
}
\ No newline at end of file
......@@ -20,18 +20,14 @@ namespace DotNetCore.CAP.Dashboard
if (!"POST".Equals(request.Method, StringComparison.OrdinalIgnoreCase))
{
response.StatusCode = (int)HttpStatusCode.MethodNotAllowed;
response.StatusCode = (int) HttpStatusCode.MethodNotAllowed;
return Task.FromResult(false);
}
if (_command(context))
{
response.StatusCode = (int)HttpStatusCode.NoContent;
}
response.StatusCode = (int) HttpStatusCode.NoContent;
else
{
response.StatusCode = 422;
}
return Task.FromResult(true);
}
......
......@@ -11,42 +11,6 @@ namespace DotNetCore.CAP.Dashboard
{
private static readonly Dictionary<string, DashboardMetric> Metrics = new Dictionary<string, DashboardMetric>();
static DashboardMetrics()
{
AddMetric(ServerCount);
AddMetric(SubscriberCount);
AddMetric(PublishedFailedCountOrNull);
AddMetric(ReceivedFailedCountOrNull);
AddMetric(PublishedProcessingCount);
AddMetric(ReceivedProcessingCount);
AddMetric(PublishedSucceededCount);
AddMetric(ReceivedSucceededCount);
AddMetric(PublishedFailedCount);
AddMetric(ReceivedFailedCount);
}
public static void AddMetric(DashboardMetric metric)
{
if (metric == null) throw new ArgumentNullException(nameof(metric));
lock (Metrics)
{
Metrics[metric.Name] = metric;
}
}
public static IEnumerable<DashboardMetric> GetMetrics()
{
lock (Metrics)
{
return Metrics.Values.ToList();
}
}
public static readonly DashboardMetric ServerCount = new DashboardMetric(
"servers:count",
"Metrics_Servers",
......@@ -156,5 +120,41 @@ namespace DotNetCore.CAP.Dashboard
Style = page.Statistics.ReceivedFailed > 0 ? MetricStyle.Danger : MetricStyle.Default,
Highlighted = page.Statistics.ReceivedFailed > 0
});
static DashboardMetrics()
{
AddMetric(ServerCount);
AddMetric(SubscriberCount);
AddMetric(PublishedFailedCountOrNull);
AddMetric(ReceivedFailedCountOrNull);
AddMetric(PublishedProcessingCount);
AddMetric(ReceivedProcessingCount);
AddMetric(PublishedSucceededCount);
AddMetric(ReceivedSucceededCount);
AddMetric(PublishedFailedCount);
AddMetric(ReceivedFailedCount);
}
public static void AddMetric(DashboardMetric metric)
{
if (metric == null) throw new ArgumentNullException(nameof(metric));
lock (Metrics)
{
Metrics[metric.Name] = metric;
}
}
public static IEnumerable<DashboardMetric> GetMetrics()
{
lock (Metrics)
{
return Metrics.Values.ToList();
}
}
}
}
\ No newline at end of file
......@@ -35,7 +35,10 @@ namespace DotNetCore.CAP.Dashboard
public override string LocalIpAddress => _context.Connection.LocalIpAddress.ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.ToString();
public override string GetQuery(string key) => _context.Request.Query[key];
public override string GetQuery(string key)
{
return _context.Request.Query[key];
}
public override async Task<IList<string>> GetFormValuesAsync(string key)
{
......
......@@ -7,8 +7,8 @@ namespace DotNetCore.CAP.Dashboard
internal class EmbeddedResourceDispatcher : IDashboardDispatcher
{
private readonly Assembly _assembly;
private readonly string _resourceName;
private readonly string _contentType;
private readonly string _resourceName;
public EmbeddedResourceDispatcher(
string contentType,
......@@ -47,9 +47,8 @@ namespace DotNetCore.CAP.Dashboard
using (var inputStream = assembly.GetManifestResourceStream(resourceName))
{
if (inputStream == null)
{
throw new ArgumentException($@"Resource with name {resourceName} not found in assembly {assembly}.");
}
throw new ArgumentException(
$@"Resource with name {resourceName} not found in assembly {assembly}.");
inputStream.CopyTo(response.Body);
}
......
......@@ -16,16 +16,14 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
public class GatewayProxyMiddleware
{
public const string NodeCookieName = "cap.node";
private readonly ILogger _logger;
private readonly RequestDelegate _next;
private readonly ILogger _logger;
private readonly IRequestMapper _requestMapper;
private readonly IHttpRequester _requester;
private readonly IRequestMapper _requestMapper;
private INodeDiscoveryProvider _discoveryProvider;
protected HttpRequestMessage DownstreamRequest { get; set; }
public GatewayProxyMiddleware(RequestDelegate next,
ILoggerFactory loggerFactory,
IRequestMapper requestMapper,
......@@ -37,6 +35,8 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
_requester = requester;
}
protected HttpRequestMessage DownstreamRequest { get; set; }
public async Task Invoke(HttpContext context,
DiscoveryOptions discoveryOptions,
INodeDiscoveryProvider discoveryProvider)
......@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
else
{
//For performance reasons, we need to put this functionality in the else
var isSwitchNode = request.Cookies.TryGetValue(NodeCookieName, out string requestNodeId);
var isSwitchNode = request.Cookies.TryGetValue(NodeCookieName, out var requestNodeId);
var isCurrentNode = discoveryOptions.NodeId.ToString() == requestNodeId;
var isNodesPage = request.Path.StartsWithSegments(new PathString(pathMatch + "/nodes"));
......@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
_logger.LogDebug("started calling gateway proxy middleware");
if (TryGetRemoteNode(requestNodeId, out Node node))
if (TryGetRemoteNode(requestNodeId, out var node))
{
try
{
......@@ -94,33 +94,28 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
public async Task SetResponseOnHttpContext(HttpContext context, HttpResponseMessage response)
{
foreach (var httpResponseHeader in response.Content.Headers)
{
AddHeaderIfDoesntExist(context, httpResponseHeader);
}
var content = await response.Content.ReadAsByteArrayAsync();
AddHeaderIfDoesntExist(context,
new KeyValuePair<string, IEnumerable<string>>("Content-Length", new[] { content.Length.ToString() }));
new KeyValuePair<string, IEnumerable<string>>("Content-Length", new[] {content.Length.ToString()}));
context.Response.OnStarting(state =>
{
var httpContext = (HttpContext)state;
var httpContext = (HttpContext) state;
httpContext.Response.StatusCode = (int)response.StatusCode;
httpContext.Response.StatusCode = (int) response.StatusCode;
return Task.CompletedTask;
}, context);
using (Stream stream = new MemoryStream(content))
{
if (response.StatusCode != HttpStatusCode.NotModified)
{
await stream.CopyToAsync(context.Response.Body);
}
}
}
private bool TryGetRemoteNode(string requestNodeId, out Node node)
{
......@@ -139,10 +134,8 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
KeyValuePair<string, IEnumerable<string>> httpResponseHeader)
{
if (!context.Response.Headers.ContainsKey(httpResponseHeader.Key))
{
context.Response.Headers.Add(httpResponseHeader.Key,
new StringValues(httpResponseHeader.Value.ToArray()));
}
}
}
}
\ No newline at end of file
......@@ -12,14 +12,14 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public class RequestMapper : IRequestMapper
{
private readonly string[] _unsupportedHeaders = { "host", "cookie" };
private const string SchemeDelimiter = "://";
private readonly string[] _unsupportedHeaders = {"host", "cookie"};
public async Task<HttpRequestMessage> Map(HttpRequest request)
{
try
{
var requestMessage = new HttpRequestMessage()
var requestMessage = new HttpRequestMessage
{
Content = await MapContent(request),
Method = MapMethod(request),
......@@ -45,11 +45,9 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
FragmentString fragment = new FragmentString())
{
if (scheme == null)
{
throw new ArgumentNullException(nameof(scheme));
}
var combinedPath = (pathBase.HasValue || path.HasValue) ? (pathBase + path).ToString() : "/";
var combinedPath = pathBase.HasValue || path.HasValue ? (pathBase + path).ToString() : "/";
var encodedHost = host.ToString();
var encodedQuery = query.ToString();
......@@ -77,13 +75,11 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
private async Task<HttpContent> MapContent(HttpRequest request)
{
if (request.Body == null)
{
return null;
}
var content = new ByteArrayContent(await ToByteArray(request.Body));
content.Headers.TryAddWithoutValidation("Content-Type", new[] { request.ContentType });
content.Headers.TryAddWithoutValidation("Content-Type", new[] {request.ContentType});
return content;
}
......@@ -101,13 +97,9 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy
private void MapHeaders(HttpRequest request, HttpRequestMessage requestMessage)
{
foreach (var header in request.Headers)
{
if (IsSupportedHeader(header))
{
requestMessage.Headers.TryAddWithoutValidation(header.Key, header.Value.ToArray());
}
}
}
private async Task<byte[]> ToByteArray(Stream stream)
{
......
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public interface IRequestMapper
{
Task<HttpRequestMessage> Map(HttpRequest request);
......
......@@ -8,7 +8,8 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
internal class HttpClientBuilder : IHttpClientBuilder
{
private readonly Dictionary<int, Func<DelegatingHandler>> _handlers = new Dictionary<int, Func<DelegatingHandler>>();
private readonly Dictionary<int, Func<DelegatingHandler>> _handlers =
new Dictionary<int, Func<DelegatingHandler>>();
public IHttpClient Create()
{
......@@ -41,13 +42,13 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
/// </summary>
internal class HttpClientWrapper : IHttpClient
{
public HttpClient Client { get; }
public HttpClientWrapper(HttpClient client)
{
Client = client;
}
public HttpClient Client { get; }
public Task<HttpResponseMessage> SendAsync(HttpRequestMessage request)
{
return Client.SendAsync(request);
......
......@@ -44,15 +44,13 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
var httpClient = _cacheHandlers.Get(cacheKey);
if (httpClient == null)
{
httpClient = builder.Create();
}
return httpClient;
}
private string GetCacheKey(HttpRequestMessage request, IHttpClientBuilder builder)
{
string baseUrl = $"{request.RequestUri.Scheme}://{request.RequestUri.Authority}";
var baseUrl = $"{request.RequestUri.Scheme}://{request.RequestUri.Authority}";
return baseUrl;
}
......
......@@ -5,7 +5,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
public interface IHttpClientBuilder
{
/// <summary>
/// Creates the <see cref="HttpClient"/>
/// Creates the <see cref="HttpClient" />
/// </summary>
IHttpClient Create();
}
......
......@@ -5,7 +5,8 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public class MemoryHttpClientCache : IHttpClientCache
{
private readonly ConcurrentDictionary<string, ConcurrentQueue<IHttpClient>> _httpClientsCache = new ConcurrentDictionary<string, ConcurrentQueue<IHttpClient>>();
private readonly ConcurrentDictionary<string, ConcurrentQueue<IHttpClient>> _httpClientsCache =
new ConcurrentDictionary<string, ConcurrentQueue<IHttpClient>>();
public void Set(string id, IHttpClient client, TimeSpan expirationTime)
{
......@@ -30,9 +31,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
IHttpClient client = null;
if (_httpClientsCache.TryGetValue(id, out var connectionQueue))
{
connectionQueue.TryDequeue(out client);
}
return client;
}
......
......@@ -31,9 +31,7 @@ namespace DotNetCore.CAP.Dashboard
public NonEscapedString MessagesSidebar(MessageType type)
{
if (type == MessageType.Publish)
{
return SidebarMenu(MessagesSidebarMenu.PublishedItems);
}
return SidebarMenu(MessagesSidebarMenu.ReceivedItems);
}
......@@ -80,12 +78,11 @@ namespace DotNetCore.CAP.Dashboard
public NonEscapedString StateLabel(string stateName)
{
if (String.IsNullOrWhiteSpace(stateName))
{
if (string.IsNullOrWhiteSpace(stateName))
return Raw($"<em>{Strings.Common_NoState}</em>");
}
return Raw($"<span class=\"label label-default\" style=\"background-color: {MessageHistoryRenderer.GetForegroundStateColor(stateName)};\">{stateName}</span>");
return Raw(
$"<span class=\"label label-default\" style=\"background-color: {MessageHistoryRenderer.GetForegroundStateColor(stateName)};\">{stateName}</span>");
}
public NonEscapedString RelativeTime(DateTime value)
......@@ -109,52 +106,36 @@ namespace DotNetCore.CAP.Dashboard
var builder = new StringBuilder();
if (displaySign)
{
builder.Append(duration.Value.TotalMilliseconds < 0 ? "-" : "+");
}
duration = duration.Value.Duration();
if (duration.Value.Days > 0)
{
builder.Append($"{duration.Value.Days}d ");
}
if (duration.Value.Hours > 0)
{
builder.Append($"{duration.Value.Hours}h ");
}
if (duration.Value.Minutes > 0)
{
builder.Append($"{duration.Value.Minutes}m ");
}
if (duration.Value.TotalHours < 1)
{
if (duration.Value.Seconds > 0)
{
builder.Append(duration.Value.Seconds);
if (duration.Value.Milliseconds > 0)
{
builder.Append($".{duration.Value.Milliseconds.ToString().PadLeft(3, '0')}");
}
builder.Append("s ");
}
else
{
if (duration.Value.Milliseconds > 0)
{
builder.Append($"{duration.Value.Milliseconds}ms ");
}
}
}
if (builder.Length <= 1)
{
builder.Append(" <1ms ");
}
builder.Remove(builder.Length - 1, 1);
......@@ -163,7 +144,7 @@ namespace DotNetCore.CAP.Dashboard
public string FormatProperties(IDictionary<string, string> properties)
{
return String.Join(", ", properties.Select(x => $"{x.Key}: \"{x.Value}\""));
return string.Join(", ", properties.Select(x => $"{x.Key}: \"{x.Value}\""));
}
public NonEscapedString QueueLabel(string queue)
......@@ -179,7 +160,7 @@ namespace DotNetCore.CAP.Dashboard
{
var parts = serverId.Split(':');
var shortenedId = parts.Length > 1
? String.Join(":", parts.Take(parts.Length - 1))
? string.Join(":", parts.Take(parts.Length - 1))
: serverId;
return new NonEscapedString(
......@@ -188,20 +169,40 @@ namespace DotNetCore.CAP.Dashboard
public NonEscapedString NodeSwitchLink(string id)
{
return Raw($"<a class=\"job-method\" onclick=\"nodeSwitch({id});\" href=\"javascript:;\">{Strings.NodePage_Switch}</a>");
return Raw(
$"<a class=\"job-method\" onclick=\"nodeSwitch({id});\" href=\"javascript:;\">{Strings.NodePage_Switch}</a>");
}
public NonEscapedString StackTrace(string stackTrace)
{
try
{
//return new NonEscapedString(StackTraceFormatter.FormatHtml(stackTrace, StackTraceHtmlFragments));
return new NonEscapedString(stackTrace);
}
catch (RegexMatchTimeoutException)
{
return new NonEscapedString(HtmlEncode(stackTrace));
}
}
public string HtmlEncode(string text)
{
return WebUtility.HtmlEncode(text);
}
#region MethodEscaped
public NonEscapedString MethodEscaped(MethodInfo method)
{
var @public = WrapKeyword("public");
var @async = string.Empty;
var async = string.Empty;
string @return;
var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(method.ReturnType, out var coercedAwaitableInfo);
if (isAwaitable)
{
@async = WrapKeyword("async");
async = WrapKeyword("async");
var asyncResultType = coercedAwaitableInfo.AwaitableInfo.ResultType;
@return = WrapType("Task") + WrapIdentifier("<") + WrapType(asyncResultType) + WrapIdentifier(">");
......@@ -211,7 +212,7 @@ namespace DotNetCore.CAP.Dashboard
@return = WrapType(method.ReturnType);
}
var @name = method.Name;
var name = method.Name;
string paramType = null;
string paramName = null;
......@@ -227,7 +228,8 @@ namespace DotNetCore.CAP.Dashboard
var paramString = paramType == null ? "();" : $"({paramType} {paramName});";
var outputString = @public + " " + (string.IsNullOrEmpty(@async) ? "" : @async + " ") + @return + " " + @name + paramString;
var outputString = @public + " " + (string.IsNullOrEmpty(async) ? "" : async + " ") + @return + " " + name +
paramString;
return new NonEscapedString(outputString);
}
......@@ -235,27 +237,16 @@ namespace DotNetCore.CAP.Dashboard
private string WrapType(Type type)
{
if (type == null)
{
return string.Empty;
}
if (type.Name == "Void")
{
return WrapKeyword(type.Name.ToLower());
}
if (Helper.IsComplexType(type))
{
return WrapType(type.Name);
}
if (type.IsPrimitive || type == typeof(string) || type == typeof(decimal))
{
return WrapKeyword(type.Name.ToLower());
}
else
{
return WrapType(type.Name);
}
}
private string WrapIdentifier(string value)
{
......@@ -276,24 +267,7 @@ namespace DotNetCore.CAP.Dashboard
{
return $"<span class=\"{@class}\">{value}</span>";
}
#endregion
public NonEscapedString StackTrace(string stackTrace)
{
try
{
//return new NonEscapedString(StackTraceFormatter.FormatHtml(stackTrace, StackTraceHtmlFragments));
return new NonEscapedString(stackTrace);
}
catch (RegexMatchTimeoutException)
{
return new NonEscapedString(HtmlEncode(stackTrace));
}
}
public string HtmlEncode(string text)
{
return WebUtility.HtmlEncode(text);
}
#endregion
}
}
\ No newline at end of file
......@@ -26,20 +26,18 @@ namespace DotNetCore.CAP.Dashboard
string serialized = null;
if (_command != null)
{
object result = _command(context);
var result = _command(context);
var settings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new JsonConverter[] { new StringEnumConverter { CamelCaseText = true } }
Converters = new JsonConverter[] {new StringEnumConverter {CamelCaseText = true}}
};
serialized = JsonConvert.SerializeObject(result, settings);
}
if (_jsonCommand != null)
{
serialized = _jsonCommand(context);
}
context.Response.ContentType = "application/json";
await context.Response.WriteAsync(serialized ?? string.Empty);
......
......@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Dashboard
var settings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new JsonConverter[] { new StringEnumConverter { CamelCaseText = true } }
Converters = new JsonConverter[] {new StringEnumConverter {CamelCaseText = true}}
};
var serialized = JsonConvert.SerializeObject(result, settings);
......
using System;
namespace DotNetCore.CAP.Dashboard
namespace DotNetCore.CAP.Dashboard
{
public class LocalRequestsOnlyAuthorizationFilter : IDashboardAuthorizationFilter
{
public bool Authorize(DashboardContext context)
{
// if unknown, assume not local
if (String.IsNullOrEmpty(context.Request.RemoteIpAddress))
if (string.IsNullOrEmpty(context.Request.RemoteIpAddress))
return false;
// check if localhost
......
......@@ -20,12 +20,10 @@ namespace DotNetCore.CAP.Dashboard
public IEnumerable<DashboardMetric> GetAllMetrics()
{
var metrics = new List<DashboardMetric> { Metric };
var metrics = new List<DashboardMetric> {Metric};
if (Metrics != null)
{
metrics.AddRange(Metrics);
}
return metrics.Where(x => x != null).ToList();
}
......
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Text;
using DotNetCore.CAP.Infrastructure;
......@@ -16,7 +18,7 @@ namespace DotNetCore.CAP.Dashboard
private static readonly IDictionary<string, string> ForegroundStateColors
= new Dictionary<string, string>();
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline")]
[SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline")]
static MessageHistoryRenderer()
{
Register(StatusName.Succeeded, SucceededRenderer);
......@@ -44,9 +46,7 @@ namespace DotNetCore.CAP.Dashboard
public static string GetBackgroundStateColor(string stateName)
{
if (stateName == null || !BackgroundStateColors.ContainsKey(stateName))
{
return "inherit";
}
return BackgroundStateColors[stateName];
}
......@@ -59,24 +59,19 @@ namespace DotNetCore.CAP.Dashboard
public static string GetForegroundStateColor(string stateName)
{
if (stateName == null || !ForegroundStateColors.ContainsKey(stateName))
{
return "inherit";
}
return ForegroundStateColors[stateName];
}
public static void Register(string state, Func<HtmlHelper, IDictionary<string, string>, NonEscapedString> renderer)
public static void Register(string state,
Func<HtmlHelper, IDictionary<string, string>, NonEscapedString> renderer)
{
if (!Renderers.ContainsKey(state))
{
Renderers.Add(state, renderer);
}
else
{
Renderers[state] = renderer;
}
}
public static bool Exists(string state)
{
......@@ -141,10 +136,10 @@ namespace DotNetCore.CAP.Dashboard
itemsAdded = true;
}
if (stateData.ContainsKey("Result") && !String.IsNullOrWhiteSpace(stateData["Result"]))
if (stateData.ContainsKey("Result") && !string.IsNullOrWhiteSpace(stateData["Result"]))
{
var result = stateData["Result"];
builder.Append($"<dt>Result:</dt><dd>{System.Net.WebUtility.HtmlEncode(result)}</dd>");
builder.Append($"<dt>Result:</dt><dd>{WebUtility.HtmlEncode(result)}</dd>");
itemsAdded = true;
}
......@@ -171,13 +166,9 @@ namespace DotNetCore.CAP.Dashboard
string serverId = null;
if (stateData.ContainsKey("ServerId"))
{
serverId = stateData["ServerId"];
}
else if (stateData.ContainsKey("ServerName"))
{
serverId = stateData["ServerName"];
}
if (serverId != null)
{
......
......@@ -20,7 +20,8 @@ namespace DotNetCore.CAP.Dashboard
Metric = DashboardMetrics.PublishedSucceededCount
});
PublishedItems.Add(page => new MenuItem(Strings.SidebarMenu_Processing, page.Url.To("/published/processing"))
PublishedItems.Add(page =>
new MenuItem(Strings.SidebarMenu_Processing, page.Url.To("/published/processing"))
{
Active = page.RequestPath.StartsWith("/published/processing"),
Metric = DashboardMetrics.PublishedProcessingCount
......
......@@ -20,7 +20,7 @@
Info,
Success,
Warning,
Danger,
Danger
}
internal static class MetricStyleExtensions
......
......@@ -7,9 +7,9 @@ namespace DotNetCore.CAP.Dashboard
{
private const int PageItemsCount = 7;
private const int DefaultRecordsPerPage = 10;
private int _endPageIndex = 1;
private int _startPageIndex = 1;
private int _endPageIndex = 1;
public Pager(int from, int perPage, long total)
{
......@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Dashboard
RecordsPerPage = perPage > 0 ? perPage : DefaultRecordsPerPage;
TotalRecordCount = total;
CurrentPage = FromRecord / RecordsPerPage + 1;
TotalPageCount = (int)Math.Ceiling((double)TotalRecordCount / RecordsPerPage);
TotalPageCount = (int) Math.Ceiling((double) TotalRecordCount / RecordsPerPage);
PagerItems = GenerateItems();
}
......@@ -110,7 +110,7 @@ namespace DotNetCore.CAP.Dashboard
if (_endPageIndex < TotalPageCount - 1)
{
var index = _startPageIndex + PageItemsCount;
if (index > TotalPageCount) { index = TotalPageCount; }
if (index > TotalPageCount) index = TotalPageCount;
var item = new Item(index, false, ItemType.MorePage);
results.Add(item);
}
......
@* Generator: Template TypeVisibility: Internal GeneratePrettyNames: True *@
@using System
@using System.Collections.Generic
@using DotNetCore.CAP.Models;
@using DotNetCore.CAP.Dashboard
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@using DotNetCore.CAP.Models
@using Newtonsoft.Json
@inherits RazorPage
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.HomePage_Title);
var monitor = Storage.GetMonitoringApi();
IDictionary<DateTime, int> publishedSucceeded = monitor.HourlySucceededJobs(MessageType.Publish);
IDictionary<DateTime, int> publishedFailed = monitor.HourlyFailedJobs(MessageType.Publish);
var publishedSucceeded = monitor.HourlySucceededJobs(MessageType.Publish);
var publishedFailed = monitor.HourlyFailedJobs(MessageType.Publish);
IDictionary<DateTime, int> receivedSucceeded = monitor.HourlySucceededJobs(MessageType.Subscribe);
IDictionary<DateTime, int> receivedFailed = monitor.HourlyFailedJobs(MessageType.Subscribe);
var receivedSucceeded = monitor.HourlySucceededJobs(MessageType.Subscribe);
var receivedFailed = monitor.HourlyFailedJobs(MessageType.Subscribe);
}
<div class="row">
......@@ -41,7 +38,8 @@
data-received-succeeded="@Statistics.ReceivedSucceeded"
data-received-failed="@Statistics.ReceivedFailed"
data-received-succeeded-string="@Strings.HomePage_GraphHover_RSucceeded"
data-received-failed-string="@Strings.HomePage_GraphHover_RFailed"></div>
data-received-failed-string="@Strings.HomePage_GraphHover_RFailed">
</div>
<div style="display: none;">
<span data-metric="published_succeeded:count"></span>
<span data-metric="published_failed:count"></span>
......
......@@ -2,10 +2,9 @@
@using System
@using System.Globalization
@using System.Reflection
@using DotNetCore.CAP.Dashboard
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@inherits RazorPage
@inherits DotNetCore.CAP.Dashboard.RazorPage
<!DOCTYPE html>
<html lang="@CultureInfo.CurrentUICulture.TwoLetterISOLanguageName">
<head>
......@@ -16,9 +15,9 @@
@{ var version = GetType().GetTypeInfo().Assembly.GetName().Version; }
<link rel="stylesheet" href="@Url.To($"/css{version.Major}{version.Minor}{version.Build}")">
</head>
<body>
<!-- Wrap all page content here -->
<div id="wrap">
<body>
<!-- Wrap all page content here -->
<div id="wrap">
<!-- Fixed navbar -->
<div class="navbar navbar-default navbar-fixed-top">
......@@ -33,7 +32,8 @@
</div>
<div class="collapse navbar-collapse">
@Html.RenderPartial(new Navigation())
@if(@AppPath != null) {
@if (AppPath != null)
{
<ul class="nav navbar-nav navbar-right">
<li>
<a href="@AppPath">
......@@ -52,27 +52,28 @@
<div class="container" style="margin-bottom: 20px;">
@RenderBody()
</div>
</div>
</div>
<div id="footer">
<div id="footer">
<div class="container">
<ul class="list-inline credit">
<li>
<a href="https://github.com/dotnetcore/cap/" target="_blank">CAP @($"{version.Major}.{version.Minor}.{version.Build}")
<a href="https://github.com/dotnetcore/cap/" target="_blank">
CAP @($"{version.Major}.{version.Minor}.{version.Build}")
</a>
</li>
<li>@Storage</li>
<li>@Strings.LayoutPage_Footer_Time @Html.LocalTime(DateTime.UtcNow)</li>
<li>@String.Format(Strings.LayoutPage_Footer_Generatedms, GenerationTime.Elapsed.TotalMilliseconds.ToString("N"))</li>
<li>@string.Format(Strings.LayoutPage_Footer_Generatedms, GenerationTime.Elapsed.TotalMilliseconds.ToString("N"))</li>
</ul>
</div>
</div>
</div>
<div id="capConfig"
<div id="capConfig"
data-pollinterval="@StatsPollingInterval"
data-pollurl="@(Url.To("/stats"))">
</div>
</div>
<script src="@Url.To($"/js{version.Major}{version.Minor}{version.Build}")"></script>
</body>
<script src="@Url.To($"/js{version.Major}{version.Minor}{version.Build}")"></script>
</body>
</html>
\ No newline at end of file
......@@ -6,12 +6,11 @@ namespace DotNetCore.CAP.Dashboard.Pages
{
internal partial class NodePage
{
private IList<Node> _nodes;
private INodeDiscoveryProvider _discoveryProvider;
private IList<Node> _nodes;
public NodePage()
{
}
public NodePage(string id)
......
@* Generator: Template TypeVisibility: Internal GeneratePrettyNames: True *@
@using DotNetCore.CAP.Dashboard
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@inherits RazorPage
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.NodePage_Title);
}
......@@ -33,7 +32,7 @@
<tbody>
@foreach (var node in Nodes)
{
<tr class="@(CurrentNodeId == node.Id? "active":null )">
<tr class="@(CurrentNodeId == node.Id ? "active" : null)">
<td>@node.Id</td>
<td>@node.Name</td>
<td>@node.Address</td>
......
......@@ -13,14 +13,12 @@ namespace DotNetCore.CAP.Dashboard.Pages
public int GetTotal(IMonitoringApi api)
{
if (string.Equals(StatusName, Infrastructure.StatusName.Succeeded, StringComparison.CurrentCultureIgnoreCase))
{
if (string.Equals(StatusName, Infrastructure.StatusName.Succeeded,
StringComparison.CurrentCultureIgnoreCase))
return api.PublishedSucceededCount();
}
if (string.Equals(StatusName, Infrastructure.StatusName.Processing, StringComparison.CurrentCultureIgnoreCase))
{
if (string.Equals(StatusName, Infrastructure.StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
return api.PublishedProcessingCount();
}
return api.PublishedFailedCount();
}
}
......
@* Generator: Template TypeVisibility: Internal GeneratePrettyNames: True *@
@using System
@using DotNetCore.CAP.Models;
@using DotNetCore.CAP.Dashboard
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Monitoring
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@inherits RazorPage
@using DotNetCore.CAP.Models
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.PublishedMessagesPage_Title);
......@@ -13,8 +13,8 @@
int.TryParse(Query("from"), out from);
int.TryParse(Query("count"), out perPage);
string name = Query("name");
string content = Query("content");
var name = Query("name");
var content = Query("content");
var monitor = Storage.GetMonitoringApi();
var pager = new Pager(from, perPage, GetTotal(monitor));
......@@ -49,11 +49,11 @@
<div class="btn-toolbar btn-toolbar-top">
<form class="row">
<span class="col-md-3">
<input type="text" class="form-control" name="name" value="@Query("name")" placeholder="@Strings.MessagesPage_Query_MessageName" />
<input type="text" class="form-control" name="name" value="@Query("name")" placeholder="@Strings.MessagesPage_Query_MessageName"/>
</span>
<div class="col-md-5">
<div class="input-group">
<input type="text" class="form-control" name="content" value="@Query("content")" placeholder="@Strings.MessagesPage_Query_MessageBody" />
<input type="text" class="form-control" name="content" value="@Query("content")" placeholder="@Strings.MessagesPage_Query_MessageBody"/>
<span class="input-group-btn">
<button class="btn btn-info">@Strings.MessagesPage_Query_Button</button>
</span>
......@@ -77,8 +77,8 @@
<table class="table">
<thead>
<tr>
<th style="width:60px;">
<input type="checkbox" class="js-jobs-list-select-all" />
<th style="width: 60px;">
<input type="checkbox" class="js-jobs-list-select-all"/>
</th>
<th>@Strings.MessagesPage_Table_Code</th>
<th>@Strings.MessagesPage_Table_Name</th>
......@@ -95,10 +95,10 @@
{
<tr class="js-jobs-list-row hover">
<td>
<input type="checkbox" class="js-jobs-list-checkbox" name="messages[]" value="@message.Id" />
<input type="checkbox" class="js-jobs-list-checkbox" name="messages[]" value="@message.Id"/>
</td>
<td class="word-break">
<a href="javascript:;" data-url='@(Url.To("/published/message/")+message.Id)' class="openModal">#@message.Id</a>
<a href="javascript:;" data-url='@(Url.To("/published/message/") + message.Id)' class="openModal">#@message.Id</a>
</td>
<td>
@message.Name
......@@ -132,10 +132,12 @@
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">&times;</span></button>
<button type="button" class="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">&times;</span>
</button>
<h4 class="modal-title">Message Content</h4>
</div>
<div id="jsonContent" style="max-height:500px;overflow-y:auto;" class="modal-body">
<div id="jsonContent" style="max-height: 500px; overflow-y: auto;" class="modal-body">
</div>
<div class="modal-footer">
<button type="button" class="btn btn-sm btn-primary" id="formatBtn" onclick="">@Strings.MessagesPage_Modal_Format</button>
......
......@@ -13,14 +13,12 @@ namespace DotNetCore.CAP.Dashboard.Pages
public int GetTotal(IMonitoringApi api)
{
if (string.Equals(StatusName, Infrastructure.StatusName.Succeeded, StringComparison.CurrentCultureIgnoreCase))
{
if (string.Equals(StatusName, Infrastructure.StatusName.Succeeded,
StringComparison.CurrentCultureIgnoreCase))
return api.ReceivedSucceededCount();
}
if (string.Equals(StatusName, Infrastructure.StatusName.Processing, StringComparison.CurrentCultureIgnoreCase))
{
if (string.Equals(StatusName, Infrastructure.StatusName.Processing,
StringComparison.CurrentCultureIgnoreCase))
return api.ReceivedProcessingCount();
}
return api.ReceivedFailedCount();
}
}
......
@* Generator: Template TypeVisibility: Internal GeneratePrettyNames: True *@
@using System
@using DotNetCore.CAP.Models;
@using DotNetCore.CAP.Dashboard
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Monitoring
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@inherits RazorPage
@using DotNetCore.CAP.Models
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.ReceivedMessagesPage_Title);
......@@ -13,9 +13,9 @@
int.TryParse(Query("from"), out from);
int.TryParse(Query("count"), out perPage);
string group = Query("group");
string name = Query("name");
string content = Query("content");
var group = Query("group");
var name = Query("name");
var content = Query("content");
var monitor = Storage.GetMonitoringApi();
var pager = new Pager(from, perPage, GetTotal(monitor));
......@@ -51,14 +51,14 @@
<div class="btn-toolbar btn-toolbar-top">
<form class="row">
<span class="col-md-2">
<input type="text" class="form-control" name="group" value="@Query("group")" placeholder="@Strings.MessagesPage_Query_MessageGroup" />
<input type="text" class="form-control" name="group" value="@Query("group")" placeholder="@Strings.MessagesPage_Query_MessageGroup"/>
</span>
<span class="col-md-3">
<input type="text" class="form-control" name="name" value="@Query("name")" placeholder="@Strings.MessagesPage_Query_MessageName" />
<input type="text" class="form-control" name="name" value="@Query("name")" placeholder="@Strings.MessagesPage_Query_MessageName"/>
</span>
<div class="col-md-5">
<div class="input-group">
<input type="text" class="form-control" name="content" value="@Query("content")" placeholder="@Strings.MessagesPage_Query_MessageBody" />
<input type="text" class="form-control" name="content" value="@Query("content")" placeholder="@Strings.MessagesPage_Query_MessageBody"/>
<span class="input-group-btn">
<button class="btn btn-info">@Strings.MessagesPage_Query_Button</button>
</span>
......@@ -82,8 +82,8 @@
<table class="table">
<thead>
<tr>
<th style="width:60px;">
<input type="checkbox" class="js-jobs-list-select-all" />
<th style="width: 60px;">
<input type="checkbox" class="js-jobs-list-select-all"/>
</th>
<th>@Strings.MessagesPage_Table_Code</th>
<th>@Strings.MessagesPage_Table_Group</th>
......@@ -101,10 +101,10 @@
{
<tr class="js-jobs-list-row hover">
<td>
<input type="checkbox" class="js-jobs-list-checkbox" name="messages[]" value="@message.Id" />
<input type="checkbox" class="js-jobs-list-checkbox" name="messages[]" value="@message.Id"/>
</td>
<td class="word-break">
<a href="javascript:;" data-url='@(Url.To("/received/message/")+message.Id)' class="openModal">#@message.Id</a>
<a href="javascript:;" data-url='@(Url.To("/received/message/") + message.Id)' class="openModal">#@message.Id</a>
</td>
<td>
@message.Group
......@@ -141,10 +141,12 @@
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">&times;</span></button>
<button type="button" class="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">&times;</span>
</button>
<h4 class="modal-title">Message Content</h4>
</div>
<div id="jsonContent" style="max-height:500px;overflow-y:auto;" class="modal-body">
<div id="jsonContent" style="max-height: 500px; overflow-y: auto;" class="modal-body">
</div>
<div class="modal-footer">
<button type="button" class="btn btn-sm btn-primary" id="formatBtn" onclick="">@Strings.MessagesPage_Modal_Format</button>
......
namespace DotNetCore.CAP.Dashboard.Pages
{
partial class Paginator
internal partial class Paginator
{
private readonly Pager _pager;
......
namespace DotNetCore.CAP.Dashboard.Pages
{
partial class PerPageSelector
internal partial class PerPageSelector
{
private readonly Pager _pager;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment