Commit 926769d3 authored by yangxiaodong's avatar yangxiaodong

cleanup code.

parent 153dc812
...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP ...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP
var kafkaOptions = new KafkaOptions(); var kafkaOptions = new KafkaOptions();
_configure(kafkaOptions); _configure(kafkaOptions);
services.AddSingleton(kafkaOptions); services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
} }
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka; using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
......
using System; using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.MySql; using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
......
using System; using System;
using System.Data.SqlClient;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
......
using System.Data.SqlClient;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
......
...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql ...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql
//SELECT MessageId,MessageType FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; //SELECT MessageId,MessageType FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
//DELETE FROM `{_prefix}.queue` LIMIT 1; //DELETE FROM `{_prefix}.queue` LIMIT 1;
//COMMIT; //COMMIT;
var sql = $@" var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
DELETE FROM `{_prefix}.queue` LIMIT 1;"; DELETE FROM `{_prefix}.queue` LIMIT 1;";
...@@ -123,7 +123,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -123,7 +123,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{ {
//here don't use `using` to dispose //here don't use `using` to dispose
var connection = new MySqlConnection(_options.ConnectionString); var connection = new MySqlConnection(_options.ConnectionString);
await connection.OpenAsync(); await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
......
...@@ -19,6 +19,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -19,6 +19,7 @@ namespace DotNetCore.CAP.RabbitMQ
private ulong _deliveryTag; private ulong _deliveryTag;
public event EventHandler<MessageContext> OnMessageReceieved; public event EventHandler<MessageContext> OnMessageReceieved;
public event EventHandler<string> OnError; public event EventHandler<string> OnError;
public RabbitMQConsumerClient(string queueName, RabbitMQOptions options) public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
......
...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer
return PublishCoreAsync(name, content); return PublishCoreAsync(name, content);
} }
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{ {
CheckIsAdoNet(name); CheckIsAdoNet(name);
......
...@@ -114,7 +114,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -114,7 +114,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{ {
//here don't use `using` to dispose //here don't use `using` to dispose
var connection = new SqlConnection(_options.ConnectionString); var connection = new SqlConnection(_options.ConnectionString);
await connection.OpenAsync(); await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
......
using System; using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Abstractions namespace DotNetCore.CAP.Abstractions
{ {
......
...@@ -102,4 +102,4 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding ...@@ -102,4 +102,4 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
return !x.Equals(y); return !x.Equals(y);
} }
} }
} }
\ No newline at end of file
...@@ -3,7 +3,6 @@ using System.Collections.Generic; ...@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Reflection; using System.Reflection;
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
...@@ -60,7 +59,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -60,7 +59,7 @@ namespace Microsoft.Extensions.DependencyInjection
foreach (var serviceExtension in options.Extensions) foreach (var serviceExtension in options.Extensions)
{ {
serviceExtension.AddServices(services); serviceExtension.AddServices(services);
} }
services.AddSingleton(options); services.AddSingleton(options);
return new CapBuilder(services); return new CapBuilder(services);
......
...@@ -56,6 +56,7 @@ namespace DotNetCore.CAP ...@@ -56,6 +56,7 @@ namespace DotNetCore.CAP
/// Returns executed failed message. /// Returns executed failed message.
/// </summary> /// </summary>
Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages(); Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages();
//----------------------------------------- //-----------------------------------------
/// <summary> /// <summary>
......
using System; using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.Infrastructure namespace DotNetCore.CAP.Infrastructure
{ {
...@@ -11,4 +9,4 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -11,4 +9,4 @@ namespace DotNetCore.CAP.Infrastructure
throw new NotImplementedException(); throw new NotImplementedException();
} }
} }
} }
\ No newline at end of file
using System; using System;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
......
...@@ -78,4 +78,4 @@ namespace DotNetCore.CAP.Internal ...@@ -78,4 +78,4 @@ namespace DotNetCore.CAP.Internal
return new HashCodeCombiner(0x1505L); return new HashCodeCombiner(0x1505L);
} }
} }
} }
\ No newline at end of file
...@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Internal ...@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Internal
catch (FormatException ex) catch (FormatException ex)
{ {
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex); _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex);
} }
} }
else else
{ {
......
...@@ -26,7 +26,6 @@ namespace DotNetCore.CAP.Internal ...@@ -26,7 +26,6 @@ namespace DotNetCore.CAP.Internal
catch (Exception) catch (Exception)
{ {
return Task.FromResult(ModelBindingResult.Failed()); return Task.FromResult(ModelBindingResult.Failed());
} }
} }
} }
......
...@@ -83,4 +83,4 @@ namespace DotNetCore.CAP.Internal ...@@ -83,4 +83,4 @@ namespace DotNetCore.CAP.Internal
return !type.GetTypeInfo().IsValueType || isNullableValueType; return !type.GetTypeInfo().IsValueType || isNullableValueType;
} }
} }
} }
\ No newline at end of file
...@@ -7,4 +7,4 @@ namespace DotNetCore.CAP.Internal ...@@ -7,4 +7,4 @@ namespace DotNetCore.CAP.Internal
{ {
IModelBinder CreateBinder(ParameterInfo parameter); IModelBinder CreateBinder(ParameterInfo parameter);
} }
} }
\ No newline at end of file
using System; using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Models namespace DotNetCore.CAP.Models
{ {
......
using System; using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Models namespace DotNetCore.CAP.Models
{ {
......
...@@ -61,7 +61,7 @@ namespace DotNetCore.CAP.Processor ...@@ -61,7 +61,7 @@ namespace DotNetCore.CAP.Processor
_logger.LogTrace("Pulsing the Queuer."); _logger.LogTrace("Pulsing the Queuer.");
PublishQueuer.PulseEvent.Set(); PublishQueuer.PulseEvent.Set();
} }
public void Dispose() public void Dispose()
......
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
...@@ -83,4 +80,4 @@ namespace DotNetCore.CAP.Processor ...@@ -83,4 +80,4 @@ namespace DotNetCore.CAP.Processor
} }
} }
} }
} }
\ No newline at end of file
...@@ -18,8 +18,8 @@ namespace DotNetCore.CAP ...@@ -18,8 +18,8 @@ namespace DotNetCore.CAP
{ {
var queueExectors = _serviceProvider.GetServices<IQueueExecutor>(); var queueExectors = _serviceProvider.GetServices<IQueueExecutor>();
return messageType == MessageType.Publish return messageType == MessageType.Publish
? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor) ? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor)
: queueExectors.FirstOrDefault(x => !(x is BasePublishQueueExecutor)); : queueExectors.FirstOrDefault(x => !(x is BasePublishQueueExecutor));
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment