Commit 90f14f65 authored by yangxiaodong's avatar yangxiaodong

cleanup code.

parent 22849556
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
......
......@@ -2,6 +2,7 @@
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
......
......@@ -3,11 +3,12 @@ using DotNetCore.CAP.EntityFrameworkCore;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerCapOptionsExtension : ICapOptionsExtension
{
private Action<SqlServerOptions> _configure;
private readonly Action<SqlServerOptions> _configure;
public SqlServerCapOptionsExtension(Action<SqlServerOptions> configure)
{
......
namespace DotNetCore.CAP
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerOptions : EFOptions
{
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; } //= "Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True";
public string ConnectionString { get; set; }
}
}
\ No newline at end of file
......@@ -66,6 +66,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
}
catch
{
// ignored
}
}
}
......
......@@ -11,8 +11,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
{
public class SqlServerStorage : IStorage
{
private IServiceProvider _provider;
private ILogger _logger;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
public SqlServerStorage(
IServiceProvider provider,
......@@ -43,7 +43,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
protected virtual string CreateDbTablesScript(string schema)
{
var batchSQL =
var batchSql =
$@"
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
BEGIN
......@@ -96,7 +96,7 @@ CREATE TABLE [{schema}].[Published](
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END
GO";
return batchSQL;
return batchSql;
}
}
}
\ No newline at end of file
......@@ -95,15 +95,13 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
FetchedMessage fetched = null;
using (var connection = new SqlConnection(_options.ConnectionString))
{
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
try
{
fetched = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
var fetched = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
if (fetched == null)
return null;
......
......@@ -9,20 +9,17 @@ namespace DotNetCore.CAP.EntityFrameworkCore
{
public class SqlServerStorageTransaction : IStorageTransaction, IDisposable
{
private readonly SqlServerStorageConnection _connection;
private readonly SqlServerOptions _options;
private readonly string _schema;
private IDbTransaction _dbTransaction;
private IDbConnection _dbConnection;
private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;
public SqlServerStorageTransaction(SqlServerStorageConnection connection)
{
_connection = connection;
_options = _connection.Options;
_schema = _options.Schema;
var options = connection.Options;
_schema = options.Schema;
_dbConnection = new SqlConnection(_options.ConnectionString);
_dbConnection = new SqlConnection(options.ConnectionString);
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
......
......@@ -2,11 +2,12 @@
using DotNetCore.CAP.Kafka;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class KafkaCapOptionsExtension : ICapOptionsExtension
{
private Action<KafkaOptions> _configure;
private readonly Action<KafkaOptions> _configure;
public KafkaCapOptionsExtension(Action<KafkaOptions> configure)
{
......@@ -20,7 +21,7 @@ namespace DotNetCore.CAP
var kafkaOptions = new KafkaOptions();
_configure(kafkaOptions);
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
}
......
......@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
......
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
......
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
......
namespace DotNetCore.CAP
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
{
......@@ -34,7 +35,7 @@
public string HostName { get; set; } = "localhost";
/// <summary> The topic exchange type. </summary>
internal string EXCHANGE_TYPE = "topic";
internal const string ExchangeType = "topic";
/// <summary>
/// Password to use when authenticating to the server.
......
......@@ -2,11 +2,12 @@
using DotNetCore.CAP.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQCapOptionsExtension : ICapOptionsExtension
{
private Action<RabbitMQOptions> _configure;
private readonly Action<RabbitMQOptions> _configure;
public RabbitMQCapOptionsExtension(Action<RabbitMQOptions> configure)
{
......
......@@ -11,7 +11,7 @@ namespace DotNetCore.CAP.RabbitMQ
public class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMqOptions;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<RabbitMQOptions> options,
......@@ -19,21 +19,21 @@ namespace DotNetCore.CAP.RabbitMQ
: base(stateChanger, logger)
{
_logger = logger;
_rabbitMqOptions = options.Value;
_rabbitMQOptions = options.Value;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMqOptions.HostName,
UserName = _rabbitMqOptions.UserName,
Port = _rabbitMqOptions.Port,
Password = _rabbitMqOptions.Password,
VirtualHost = _rabbitMqOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
try
......@@ -43,8 +43,8 @@ namespace DotNetCore.CAP.RabbitMQ
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: keyName,
basicProperties: null,
body: body);
......
......@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.RabbitMQ
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: _exchageName, type: _rabbitMQOptions.EXCHANGE_TYPE);
_channel.ExchangeDeclare(exchange: _exchageName, type: _rabbitMQOptions.ExchangeType);
_channel.QueueDeclare(_queueName, exclusive: false);
}
......
......@@ -13,7 +13,7 @@ namespace DotNetCore.CAP
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
public BasePublishQueueExecutor(IStateChanger stateChanger,
protected BasePublishQueueExecutor(IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger)
{
_stateChanger = stateChanger;
......
......@@ -116,7 +116,8 @@ namespace DotNetCore.CAP
}
catch (SubscriberNotFoundException ex)
{
throw ex;
_logger.LogError("Can not be found subscribe method of name: " + receivedMessage.Name);
return OperateResult.Failed(ex);
}
catch (Exception ex)
{
......
......@@ -17,16 +17,11 @@ namespace DotNetCore.CAP
public IQueueExecutor GetInstance(MessageType messageType)
{
var _queueExectors = _serviceProvider.GetServices<IQueueExecutor>();
var queueExectors = _serviceProvider.GetServices<IQueueExecutor>();
if (messageType == MessageType.Publish)
{
return _queueExectors.FirstOrDefault(x => typeof(BasePublishQueueExecutor).IsAssignableFrom(x.GetType()));
}
else
{
return _queueExectors.FirstOrDefault(x => !typeof(BasePublishQueueExecutor).IsAssignableFrom(x.GetType()));
}
return messageType == MessageType.Publish
? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor)
: queueExectors.FirstOrDefault(x => !(x is BasePublishQueueExecutor));
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment