Commit e1d1898f authored by Savorboard's avatar Savorboard

cleanup code.

parent 081fcd42
...@@ -13,7 +13,6 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -13,7 +13,6 @@ namespace Microsoft.Extensions.DependencyInjection
/// Adds an Entity Framework implementation of message stores. /// Adds an Entity Framework implementation of message stores.
/// </summary> /// </summary>
/// <typeparam name="TContext">The Entity Framework database context to use.</typeparam> /// <typeparam name="TContext">The Entity Framework database context to use.</typeparam>
/// <param name="services">The <see cref="CapBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="CapBuilder"/> instance this method extends.</returns> /// <returns>The <see cref="CapBuilder"/> instance this method extends.</returns>
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder) public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder)
where TContext : DbContext where TContext : DbContext
......
...@@ -6,8 +6,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -6,8 +6,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// <summary> /// <summary>
/// Base class for the Entity Framework database context used for CAP. /// Base class for the Entity Framework database context used for CAP.
/// </summary> /// </summary>
/// <typeparam name="TMessage">The type of message objects.</typeparam>
/// <typeparam name="Tkey">The type of the primarky key for messages.</typeparam>
public class CapDbContext : DbContext public class CapDbContext : DbContext
{ {
/// <summary> /// <summary>
...@@ -22,10 +20,13 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -22,10 +20,13 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public CapDbContext(DbContextOptions options) : base(options) { } public CapDbContext(DbContextOptions options) : base(options) { }
/// <summary> /// <summary>
/// Gets or sets the <see cref="DbSet{ConsistencyMessage}"/> of Messages. /// Gets or sets the <see cref="CapSentMessage"/> of Messages.
/// </summary> /// </summary>
public DbSet<CapSentMessage> CapSentMessages { get; set; } public DbSet<CapSentMessage> CapSentMessages { get; set; }
/// <summary>
/// Gets or sets the <see cref="CapReceivedMessages"/> of Messages.
/// </summary>
public DbSet<CapReceivedMessage> CapReceivedMessages { get; set; } public DbSet<CapReceivedMessage> CapReceivedMessages { get; set; }
/// <summary> /// <summary>
......
...@@ -8,13 +8,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -8,13 +8,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// <summary> /// <summary>
/// Represents a new instance of a persistence store for the specified message types. /// Represents a new instance of a persistence store for the specified message types.
/// </summary> /// </summary>
/// <typeparam name="ConsistencyMessage">The type representing a message.</typeparam>
/// <typeparam name="TContext">The type of the data context class used to access the store.</typeparam> /// <typeparam name="TContext">The type of the data context class used to access the store.</typeparam>
/// <typeparam name="TKey">The type of the primary key for a message.</typeparam>
public class CapMessageStore<TContext> : ICapMessageStore where TContext : DbContext public class CapMessageStore<TContext> : ICapMessageStore where TContext : DbContext
{ {
/// <summary> /// <summary>
/// Constructs a new instance of <see cref="ConsistencyMessageStore{ConsistencyMessage, TContext, TKey}"/>. /// Constructs a new instance of <see cref="TContext"/>.
/// </summary> /// </summary>
/// <param name="context">The <see cref="DbContext"/>.</param> /// <param name="context">The <see cref="DbContext"/>.</param>
public CapMessageStore(TContext context) public CapMessageStore(TContext context)
...@@ -24,9 +22,9 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -24,9 +22,9 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public TContext Context { get; private set; } public TContext Context { get; private set; }
private DbSet<CapSentMessage> SentMessages { get { return Context.Set<CapSentMessage>(); } } private DbSet<CapSentMessage> SentMessages => Context.Set<CapSentMessage>();
private DbSet<CapReceivedMessage> ReceivedMessages { get { return Context.Set<CapReceivedMessage>(); } } private DbSet<CapReceivedMessage> ReceivedMessages => Context.Set<CapReceivedMessage>();
/// <summary> /// <summary>
/// Creates the specified <paramref name="message"/> in the cap message store. /// Creates the specified <paramref name="message"/> in the cap message store.
...@@ -41,7 +39,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -41,7 +39,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return OperateResult.Success; return OperateResult.Success;
} }
public async Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string status, bool autoSaveChanges = true) public async Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string status,
bool autoSaveChanges = true)
{ {
Context.Attach(message); Context.Attach(message);
message.LastRun = DateTime.Now; message.LastRun = DateTime.Now;
...@@ -55,7 +54,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -55,7 +54,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore
} }
catch (DbUpdateConcurrencyException ex) catch (DbUpdateConcurrencyException ex)
{ {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); return OperateResult.Failed(
new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
} }
return OperateResult.Success; return OperateResult.Success;
} }
...@@ -87,7 +91,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -87,7 +91,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore
} }
catch (DbUpdateConcurrencyException ex) catch (DbUpdateConcurrencyException ex)
{ {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
} }
} }
...@@ -107,7 +115,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -107,7 +115,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore
} }
catch (DbUpdateConcurrencyException ex) catch (DbUpdateConcurrencyException ex)
{ {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
} }
} }
...@@ -124,7 +136,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -124,7 +136,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
return OperateResult.Success; return OperateResult.Success;
} }
public async Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status, bool autoSaveChanges = true) public async Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status,
bool autoSaveChanges = true)
{ {
Context.Attach(message); Context.Attach(message);
message.LastRun = DateTime.Now; message.LastRun = DateTime.Now;
...@@ -138,7 +151,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -138,7 +151,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore
} }
catch (DbUpdateConcurrencyException ex) catch (DbUpdateConcurrencyException ex)
{ {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
} }
return OperateResult.Success; return OperateResult.Success;
} }
...@@ -167,9 +184,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -167,9 +184,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore
} }
catch (DbUpdateConcurrencyException ex) catch (DbUpdateConcurrencyException ex)
{ {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
} }
} }
} }
} }
\ No newline at end of file
...@@ -32,16 +32,16 @@ namespace DotNetCore.CAP.Kafka ...@@ -32,16 +32,16 @@ namespace DotNetCore.CAP.Kafka
internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig() internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig()
{ {
if (!MainConfig.ContainsKey("bootstrap.servers")) if (MainConfig.ContainsKey("bootstrap.servers"))
return MainConfig.AsEnumerable();
if (string.IsNullOrEmpty(Servers))
{ {
if (string.IsNullOrEmpty(Servers)) throw new ArgumentNullException(nameof(Servers));
{ }
throw new ArgumentNullException(nameof(Servers)); else
} {
else MainConfig.Add("bootstrap.servers", Servers);
{
MainConfig.Add("bootstrap.servers", Servers);
}
} }
return MainConfig.AsEnumerable(); return MainConfig.AsEnumerable();
} }
......
...@@ -5,13 +5,17 @@ namespace DotNetCore.CAP.Kafka ...@@ -5,13 +5,17 @@ namespace DotNetCore.CAP.Kafka
public class CapSubscribeAttribute : TopicAttribute public class CapSubscribeAttribute : TopicAttribute
{ {
public CapSubscribeAttribute(string topicName) public CapSubscribeAttribute(string topicName)
: this(topicName, 0) { } : this(topicName, 0)
{
}
/// <summary> /// <summary>
/// Not support /// Not support
/// </summary> /// </summary>
public CapSubscribeAttribute(string topicName, int partition) public CapSubscribeAttribute(string topicName, int partition)
: this(topicName, partition, 0) { } : this(topicName, partition, 0)
{
}
/// <summary> /// <summary>
/// Not support /// Not support
...@@ -27,9 +31,9 @@ namespace DotNetCore.CAP.Kafka ...@@ -27,9 +31,9 @@ namespace DotNetCore.CAP.Kafka
public long Offset { get; } public long Offset { get; }
public bool IsPartition { get { return Partition == 0; } } public bool IsPartition => Partition == 0;
public bool IsOffset { get { return Offset == 0; } } public bool IsOffset => Offset == 0;
public override string ToString() public override string ToString()
{ {
......
...@@ -16,14 +16,13 @@ namespace DotNetCore.CAP.Kafka ...@@ -16,14 +16,13 @@ namespace DotNetCore.CAP.Kafka
{ {
public class KafkaJobProcessor : IJobProcessor public class KafkaJobProcessor : IJobProcessor
{ {
private readonly CapOptions _capOptions;
private readonly KafkaOptions _kafkaOptions; private readonly KafkaOptions _kafkaOptions;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider; private readonly IServiceProvider _provider;
private readonly ILogger _logger; private readonly ILogger _logger;
private TimeSpan _pollingDelay; private readonly TimeSpan _pollingDelay;
public KafkaJobProcessor( public KafkaJobProcessor(
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
...@@ -32,11 +31,10 @@ namespace DotNetCore.CAP.Kafka ...@@ -32,11 +31,10 @@ namespace DotNetCore.CAP.Kafka
IServiceProvider provider) IServiceProvider provider)
{ {
_logger = logger; _logger = logger;
_capOptions = capOptions.Value;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_provider = provider; _provider = provider;
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(_capOptions.PollingDelay); _pollingDelay = TimeSpan.FromSeconds(capOptions.Value.PollingDelay);
} }
public bool Waiting { get; private set; } public bool Waiting { get; private set; }
...@@ -85,28 +83,25 @@ namespace DotNetCore.CAP.Kafka ...@@ -85,28 +83,25 @@ namespace DotNetCore.CAP.Kafka
var provider = scopedContext.Provider; var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>(); var messageStore = provider.GetRequiredService<ICapMessageStore>();
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
if (message != null) if (message == null) return true;
try
{ {
try var sp = Stopwatch.StartNew();
{ message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message);
var sp = Stopwatch.StartNew();
message.StatusName = StatusName.Processing; await ExecuteJobAsync(message.KeyName, message.Content);
await messageStore.UpdateSentMessageAsync(message);
sp.Stop();
await ExecuteJobAsync(message.KeyName, message.Content);
message.StatusName = StatusName.Succeeded;
sp.Stop(); await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
message.StatusName = StatusName.Succeeded; }
await messageStore.UpdateSentMessageAsync(message); catch (Exception ex)
_logger.JobExecuted(sp.Elapsed.TotalSeconds); {
} _logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex);
catch (Exception ex) return false;
{
_logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex);
return false;
}
} }
} }
return true; return true;
......
...@@ -5,18 +5,18 @@ namespace DotNetCore.CAP.Kafka ...@@ -5,18 +5,18 @@ namespace DotNetCore.CAP.Kafka
{ {
internal static class LoggerExtensions internal static class LoggerExtensions
{ {
private static Action<ILogger, Exception> _collectingExpiredEntities; private static readonly Action<ILogger, Exception> _collectingExpiredEntities;
private static Action<ILogger, Exception> _installing; private static readonly Action<ILogger, Exception> _installing;
private static Action<ILogger, Exception> _installingError; private static readonly Action<ILogger, Exception> _installingError;
private static Action<ILogger, Exception> _installingSuccess; private static readonly Action<ILogger, Exception> _installingSuccess;
private static Action<ILogger, Exception> _jobFailed; private static readonly Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry; private static readonly Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted; private static readonly Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying; private static readonly Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded; private static readonly Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob; private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions() static LoggerExtensions()
{ {
......
...@@ -72,8 +72,7 @@ ...@@ -72,8 +72,7 @@
public int SocketWriteTimeout { get; set; } = DefaultConnectionTimeout; public int SocketWriteTimeout { get; set; } = DefaultConnectionTimeout;
/// <summary> /// <summary>
/// The port to connect on. <see cref="AmqpTcpEndpoint.UseDefaultPort"/> /// The port to connect on.
/// indicates the default for the protocol should be used.
/// </summary> /// </summary>
public int Port { get; set; } = -1; public int Port { get; set; } = -1;
} }
......
...@@ -14,14 +14,13 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -14,14 +14,13 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
public class RabbitJobProcessor : IJobProcessor public class RabbitJobProcessor : IJobProcessor
{ {
private readonly CapOptions _capOptions; private readonly RabbitMQOptions _rabbitMqOptions;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider; private readonly IServiceProvider _provider;
private readonly ILogger _logger; private readonly ILogger _logger;
private TimeSpan _pollingDelay; private readonly TimeSpan _pollingDelay;
public RabbitJobProcessor( public RabbitJobProcessor(
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
...@@ -30,11 +29,12 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -30,11 +29,12 @@ namespace DotNetCore.CAP.RabbitMQ
IServiceProvider provider) IServiceProvider provider)
{ {
_logger = logger; _logger = logger;
_capOptions = capOptions.Value; _rabbitMqOptions = rabbitMQOptions.Value;
_rabbitMQOptions = rabbitMQOptions.Value;
_provider = provider; _provider = provider;
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(_capOptions.PollingDelay);
var capOptions1 = capOptions.Value;
_pollingDelay = TimeSpan.FromSeconds(capOptions1.PollingDelay);
} }
public bool Waiting { get; private set; } public bool Waiting { get; private set; }
...@@ -62,7 +62,8 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -62,7 +62,8 @@ namespace DotNetCore.CAP.RabbitMQ
var token = GetTokenToWaitOn(context); var token = GetTokenToWaitOn(context);
} }
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
} }
finally finally
{ {
...@@ -102,7 +103,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -102,7 +103,7 @@ namespace DotNetCore.CAP.RabbitMQ
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex); _logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
return false; return false;
} }
} }
...@@ -113,14 +114,14 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -113,14 +114,14 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
var factory = new ConnectionFactory() var factory = new ConnectionFactory()
{ {
HostName = _rabbitMQOptions.HostName, HostName = _rabbitMqOptions.HostName,
UserName = _rabbitMQOptions.UserName, UserName = _rabbitMqOptions.UserName,
Port = _rabbitMQOptions.Port, Port = _rabbitMqOptions.Port,
Password = _rabbitMQOptions.Password, Password = _rabbitMqOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost, VirtualHost = _rabbitMqOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout, RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout, SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
}; };
using (var connection = factory.CreateConnection()) using (var connection = factory.CreateConnection())
...@@ -128,8 +129,8 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -128,8 +129,8 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
var body = Encoding.UTF8.GetBytes(content); var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, _rabbitMQOptions.EXCHANGE_TYPE); channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName, channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
routingKey: topic, routingKey: topic,
basicProperties: null, basicProperties: null,
body: body); body: body);
......
...@@ -10,7 +10,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -10,7 +10,7 @@ namespace DotNetCore.CAP.Abstractions
public ConsumerInvokerContext(ConsumerContext consumerContext) public ConsumerInvokerContext(ConsumerContext consumerContext)
{ {
ConsumerContext = consumerContext ?? ConsumerContext = consumerContext ??
throw new ArgumentNullException(nameof(consumerContext)); throw new ArgumentNullException(nameof(consumerContext));
} }
public ConsumerContext ConsumerContext { get; set; } public ConsumerContext ConsumerContext { get; set; }
......
...@@ -23,6 +23,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -23,6 +23,7 @@ namespace DotNetCore.CAP.Abstractions
/// <param name="key">topic or exchange router key.</param> /// <param name="key">topic or exchange router key.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor"/> candidates.</param> /// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor"/> candidates.</param>
/// <returns></returns> /// <returns></returns>
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates); ConsumerExecutorDescriptor
SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
} }
} }
\ No newline at end of file
...@@ -3,7 +3,7 @@ using Microsoft.Extensions.Primitives; ...@@ -3,7 +3,7 @@ using Microsoft.Extensions.Primitives;
namespace DotNetCore.CAP.Abstractions.ModelBinding namespace DotNetCore.CAP.Abstractions.ModelBinding
{ {
// <summary> /// <summary>
/// A context that contains operating information for model binding and validation. /// A context that contains operating information for model binding and validation.
/// </summary> /// </summary>
public class ModelBindingContext public class ModelBindingContext
......
...@@ -5,23 +5,18 @@ namespace DotNetCore.CAP.Abstractions ...@@ -5,23 +5,18 @@ namespace DotNetCore.CAP.Abstractions
/// <summary> /// <summary>
/// An abstract attribute that for kafka attribute or rabbitmq attribute /// An abstract attribute that for kafka attribute or rabbitmq attribute
/// </summary> /// </summary>
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)] [AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple = true)]
public abstract class TopicAttribute : Attribute public abstract class TopicAttribute : Attribute
{ {
private readonly string _name; protected TopicAttribute(string topicName)
public TopicAttribute(string topicName)
{ {
this._name = topicName; Name = topicName;
} }
/// <summary> /// <summary>
/// topic or exchange route key name. /// topic or exchange route key name.
/// </summary> /// </summary>
public string Name public string Name { get; }
{
get { return _name; }
}
/// <summary> /// <summary>
/// kafak --> groups.id /// kafak --> groups.id
......
using System; using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job; using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.Extensions.DependencyInjection namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
/// Used to verify cap service was called on a ServiceCollection /// Used to verify cap service was called on a ServiceCollection
/// </summary> /// </summary>
public class CapMarkerService { } public class CapMarkerService
{
}
/// <summary> /// <summary>
/// Allows fine grained configuration of CAP services. /// Allows fine grained configuration of CAP services.
......
using DotNetCore.CAP.Job; using DotNetCore.CAP.Job;
namespace DotNetCore.CAP.Infrastructure namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
/// Represents all the options you can use to configure the system. /// Represents all the options you can use to configure the system.
......
...@@ -4,9 +4,8 @@ using DotNetCore.CAP.Infrastructure; ...@@ -4,9 +4,8 @@ using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
/// Provides an abstraction for a store which manages consistent message. /// Provides an abstraction for a store which manages CAP message.
/// </summary> /// </summary>
/// <typeparam name="ConsistencyMessage"></typeparam>
public interface ICapMessageStore public interface ICapMessageStore
{ {
/// <summary> /// <summary>
...@@ -20,8 +19,10 @@ namespace DotNetCore.CAP ...@@ -20,8 +19,10 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
/// <param name="message">The type of <see cref="CapSentMessage"/>.</param> /// <param name="message">The type of <see cref="CapSentMessage"/>.</param>
/// <param name="statusName">The status name.</param> /// <param name="statusName">The status name.</param>
/// <param name="autoSaveChanges">auto save dbcontext changes.</param>
/// <returns></returns> /// <returns></returns>
Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName, bool autoSaveChanges = true); Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName,
bool autoSaveChanges = true);
/// <summary> /// <summary>
/// Fetches the next message to be executed. /// Fetches the next message to be executed.
...@@ -42,8 +43,6 @@ namespace DotNetCore.CAP ...@@ -42,8 +43,6 @@ namespace DotNetCore.CAP
Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message); Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message);
/// <summary> /// <summary>
/// Creates a new message in a store as an asynchronous operation. /// Creates a new message in a store as an asynchronous operation.
/// </summary> /// </summary>
...@@ -56,8 +55,10 @@ namespace DotNetCore.CAP ...@@ -56,8 +55,10 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
/// <param name="message">The type of <see cref="CapReceivedMessage"/>.</param> /// <param name="message">The type of <see cref="CapReceivedMessage"/>.</param>
/// <param name="statusName">The status name.</param> /// <param name="statusName">The status name.</param>
/// <param name="autoSaveChanges">auto save dbcontext changes.</param>
/// <returns></returns> /// <returns></returns>
Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, bool autoSaveChanges = true); Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true);
/// <summary> /// <summary>
/// Fetches the next message to be executed. /// Fetches the next message to be executed.
......
...@@ -45,10 +45,10 @@ namespace DotNetCore.CAP ...@@ -45,10 +45,10 @@ namespace DotNetCore.CAP
var message = new CapSentMessage var message = new CapSentMessage
{ {
KeyName = topic, KeyName = topic,
Content = content Content = content,
StatusName = StatusName.Enqueued
}; };
message.StatusName = StatusName.Enqueued;
await _store.StoreSentMessageAsync(message); await _store.StoreSentMessageAsync(message);
WaitHandleEx.PulseEvent.Set(); WaitHandleEx.PulseEvent.Set();
......
...@@ -103,12 +103,12 @@ namespace DotNetCore.CAP ...@@ -103,12 +103,12 @@ namespace DotNetCore.CAP
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
invoker.InvokeAsync(); invoker.InvokeAsync();
messageStore.ChangeReceivedMessageStateAsync(capMessage,StatusName.Succeeded).Wait(); messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait();
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.ConsumerMethodExecutingFailed(executeDescriptor.MethodInfo.Name, ex); _logger.ConsumerMethodExecutingFailed(executeDescriptor?.MethodInfo.Name, ex);
} }
} }
} }
...@@ -126,7 +126,7 @@ namespace DotNetCore.CAP ...@@ -126,7 +126,7 @@ namespace DotNetCore.CAP
try try
{ {
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); _compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds);
} }
catch (AggregateException ex) catch (AggregateException ex)
{ {
......
...@@ -13,13 +13,13 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -13,13 +13,13 @@ namespace DotNetCore.CAP.Infrastructure
/// <remarks> /// <remarks>
/// The Id property is initialized to from a new GUID string value. /// The Id property is initialized to from a new GUID string value.
/// </remarks> /// </remarks>
public CapMessage() protected CapMessage()
{ {
Id = Guid.NewGuid().ToString(); Id = Guid.NewGuid().ToString();
Added = DateTime.Now; Added = DateTime.Now;
} }
public CapMessage(MessageBase message) protected CapMessage(MessageBase message)
{ {
KeyName = message.KeyName; KeyName = message.KeyName;
Content = message.Content; Content = message.Content;
......
...@@ -4,10 +4,10 @@ ...@@ -4,10 +4,10 @@
{ {
public CapReceivedMessage() public CapReceivedMessage()
{ {
} }
public CapReceivedMessage(MessageBase baseMessage) : base(baseMessage) public CapReceivedMessage(MessageBase baseMessage)
: base(baseMessage)
{ {
} }
} }
......
...@@ -2,6 +2,5 @@ ...@@ -2,6 +2,5 @@
{ {
public class CapSentMessage : CapMessage public class CapSentMessage : CapMessage
{ {
} }
} }
\ No newline at end of file
...@@ -7,24 +7,24 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -7,24 +7,24 @@ namespace DotNetCore.CAP.Infrastructure
internal static class Helper internal static class Helper
{ {
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private static JsonSerializerSettings SerializerSettings; private static JsonSerializerSettings _serializerSettings;
public static void SetSerializerSettings(JsonSerializerSettings setting) public static void SetSerializerSettings(JsonSerializerSettings setting)
{ {
SerializerSettings = setting; _serializerSettings = setting;
} }
public static string ToJson(object value) public static string ToJson(object value)
{ {
return value != null return value != null
? JsonConvert.SerializeObject(value, SerializerSettings) ? JsonConvert.SerializeObject(value, _serializerSettings)
: null; : null;
} }
public static T FromJson<T>(string value) public static T FromJson<T>(string value)
{ {
return value != null return value != null
? JsonConvert.DeserializeObject<T>(value, SerializerSettings) ? JsonConvert.DeserializeObject<T>(value, _serializerSettings)
: default(T); : default(T);
} }
...@@ -33,14 +33,14 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -33,14 +33,14 @@ namespace DotNetCore.CAP.Infrastructure
if (type == null) throw new ArgumentNullException(nameof(type)); if (type == null) throw new ArgumentNullException(nameof(type));
return value != null return value != null
? JsonConvert.DeserializeObject(value, type, SerializerSettings) ? JsonConvert.DeserializeObject(value, type, _serializerSettings)
: null; : null;
} }
public static long ToTimestamp(DateTime value) public static long ToTimestamp(DateTime value)
{ {
var elapsedTime = value - Epoch; var elapsedTime = value - Epoch;
return (long)elapsedTime.TotalSeconds; return (long) elapsedTime.TotalSeconds;
} }
public static DateTime FromTimestamp(long value) public static DateTime FromTimestamp(long value)
...@@ -65,17 +65,8 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -65,17 +65,8 @@ namespace DotNetCore.CAP.Infrastructure
return false; return false;
} }
if (typeInfo.ContainsGenericParameters) return !typeInfo.ContainsGenericParameters
{ && typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase);
return false;
}
if (!typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase))
{
return false;
}
return true;
} }
} }
} }
\ No newline at end of file
...@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.Infrastructure
var tcs = new TaskCompletionSource<bool>(); var tcs = new TaskCompletionSource<bool>();
registeredHandle = ThreadPool.RegisterWaitForSingleObject( registeredHandle = ThreadPool.RegisterWaitForSingleObject(
handle, handle,
(state, timedOut) => ((TaskCompletionSource<bool>)state).TrySetResult(!timedOut), (state, timedOut) => ((TaskCompletionSource<bool>) state).TrySetResult(!timedOut),
tcs, tcs,
timeout, timeout,
true); true);
...@@ -31,10 +31,7 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -31,10 +31,7 @@ namespace DotNetCore.CAP.Infrastructure
} }
finally finally
{ {
if (registeredHandle != null) registeredHandle?.Unregister(null);
{
registeredHandle.Unregister(null);
}
} }
} }
} }
......
...@@ -24,9 +24,10 @@ namespace DotNetCore.CAP.Internal ...@@ -24,9 +24,10 @@ namespace DotNetCore.CAP.Internal
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext)
{ {
var context = new ConsumerInvokerContext(consumerContext); var context = new ConsumerInvokerContext(consumerContext)
{
context.Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinder, consumerContext); Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinder, consumerContext)
};
return context.Result; return context.Result;
} }
......
...@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Internal ...@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Internal
{ {
bindingContext.Model = CreateModel(bindingContext); bindingContext.Model = CreateModel(bindingContext);
} }
bindingContext.Result = Helper.FromJson(bindingContext.Values, bindingContext.ModelType); bindingContext.Result = Helper.FromJson(bindingContext.Values, bindingContext.ModelType);
return Task.CompletedTask; return Task.CompletedTask;
...@@ -30,19 +30,17 @@ namespace DotNetCore.CAP.Internal ...@@ -30,19 +30,17 @@ namespace DotNetCore.CAP.Internal
throw new ArgumentNullException(nameof(bindingContext)); throw new ArgumentNullException(nameof(bindingContext));
} }
if (_modelCreator == null) if (_modelCreator != null) return _modelCreator();
var modelTypeInfo = bindingContext.ModelType.GetTypeInfo();
if (modelTypeInfo.IsAbstract || modelTypeInfo.GetConstructor(Type.EmptyTypes) == null)
{ {
var modelTypeInfo = bindingContext.ModelType.GetTypeInfo(); throw new InvalidOperationException();
if (modelTypeInfo.IsAbstract || modelTypeInfo.GetConstructor(Type.EmptyTypes) == null)
{
throw new InvalidOperationException();
}
_modelCreator = Expression
.Lambda<Func<object>>(Expression.New(bindingContext.ModelType))
.Compile();
} }
_modelCreator = Expression
.Lambda<Func<object>>(Expression.New(bindingContext.ModelType))
.Compile();
return _modelCreator(); return _modelCreator();
} }
} }
......
...@@ -9,9 +9,10 @@ namespace DotNetCore.CAP.Internal ...@@ -9,9 +9,10 @@ namespace DotNetCore.CAP.Internal
{ {
public class DefaultConsumerInvoker : IConsumerInvoker public class DefaultConsumerInvoker : IConsumerInvoker
{ {
protected readonly ILogger _logger; protected readonly ILogger Logger;
protected readonly IServiceProvider _serviceProvider; protected readonly IServiceProvider ServiceProvider;
protected readonly ConsumerContext _consumerContext; protected readonly ConsumerContext ConsumerContext;
private readonly IModelBinder _modelBinder; private readonly IModelBinder _modelBinder;
private readonly ObjectMethodExecutor _executor; private readonly ObjectMethodExecutor _executor;
...@@ -20,23 +21,25 @@ namespace DotNetCore.CAP.Internal ...@@ -20,23 +21,25 @@ namespace DotNetCore.CAP.Internal
IModelBinder modelBinder, IModelBinder modelBinder,
ConsumerContext consumerContext) ConsumerContext consumerContext)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider;
_modelBinder = modelBinder; _modelBinder = modelBinder;
_consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); _executor = ObjectMethodExecutor.Create(ConsumerContext.ConsumerDescriptor.MethodInfo,
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo, ConsumerContext.ConsumerDescriptor.ImplTypeInfo);
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
ServiceProvider = serviceProvider;
ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
} }
public Task InvokeAsync() public Task InvokeAsync()
{ {
using (_logger.BeginScope("consumer invoker begin")) using (Logger.BeginScope("consumer invoker begin"))
{ {
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); Logger.LogDebug("Executing consumer Topic: {0}", ConsumerContext.ConsumerDescriptor.MethodInfo.Name);
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); var obj = ActivatorUtilities.GetServiceOrCreateInstance(ServiceProvider,
ConsumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var value = _consumerContext.DeliverMessage.Content; var value = ConsumerContext.DeliverMessage.Content;
if (_executor.MethodParameters.Length > 0) if (_executor.MethodParameters.Length > 0)
{ {
......
...@@ -18,19 +18,15 @@ namespace DotNetCore.CAP.Internal ...@@ -18,19 +18,15 @@ namespace DotNetCore.CAP.Internal
/// <summary> /// <summary>
/// Creates a new <see cref="DefaultConsumerServiceSelector"/>. /// Creates a new <see cref="DefaultConsumerServiceSelector"/>.
/// </summary> /// </summary>
/// <param name="serviceProvider"></param>
public DefaultConsumerServiceSelector(IServiceProvider serviceProvider) public DefaultConsumerServiceSelector(IServiceProvider serviceProvider)
{ {
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
/// <summary> /// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the /// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="executeDescriptor"/> for the
/// current message associated. /// current message associated.
/// </summary> /// </summary>
/// <param name="key"></param>
/// <param name="executeDescriptor"></param>
/// <returns></returns>
public ConsumerExecutorDescriptor SelectBestCandidate(string key, public ConsumerExecutorDescriptor SelectBestCandidate(string key,
IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{ {
...@@ -49,7 +45,7 @@ namespace DotNetCore.CAP.Internal ...@@ -49,7 +45,7 @@ namespace DotNetCore.CAP.Internal
} }
private IReadOnlyList<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes( private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
IServiceProvider provider) IServiceProvider provider)
{ {
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
...@@ -74,7 +70,7 @@ namespace DotNetCore.CAP.Internal ...@@ -74,7 +70,7 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList; return executorDescriptorList;
} }
private IReadOnlyList<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes( private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes(
IServiceProvider provider) IServiceProvider provider)
{ {
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
...@@ -83,24 +79,23 @@ namespace DotNetCore.CAP.Internal ...@@ -83,24 +79,23 @@ namespace DotNetCore.CAP.Internal
foreach (var controller in controllers) foreach (var controller in controllers)
{ {
var typeInfo = controller.GetType().GetTypeInfo(); var typeInfo = controller.GetType().GetTypeInfo();
//double check //double check
if (Helper.IsController(typeInfo)) if (!Helper.IsController(typeInfo)) continue;
foreach (var method in typeInfo.DeclaredMethods)
{ {
foreach (var method in typeInfo.DeclaredMethods) var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
{ if (topicAttr == null) continue;
var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
if (topicAttr == null) continue;
executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo)); executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo));
}
} }
continue;
} }
return executorDescriptorList; return executorDescriptorList;
} }
private ConsumerExecutorDescriptor InitDescriptor( private static ConsumerExecutorDescriptor InitDescriptor(
TopicAttribute attr, TopicAttribute attr,
MethodInfo methodInfo, MethodInfo methodInfo,
TypeInfo implType) TypeInfo implType)
......
...@@ -15,14 +15,13 @@ namespace DotNetCore.CAP.Internal ...@@ -15,14 +15,13 @@ namespace DotNetCore.CAP.Internal
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(IServiceProvider provider) public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(IServiceProvider provider)
{ {
if (Entries.Count == 0) if (Entries.Count != 0) return Entries;
{
var executorCollection = _selector.SelectCandidates(provider); var executorCollection = _selector.SelectCandidates(provider);
foreach (var item in executorCollection) foreach (var item in executorCollection)
{ {
Entries.GetOrAdd(item.Attribute.Name, item); Entries.GetOrAdd(item.Attribute.Name, item);
}
} }
return Entries; return Entries;
} }
...@@ -38,6 +37,6 @@ namespace DotNetCore.CAP.Internal ...@@ -38,6 +37,6 @@ namespace DotNetCore.CAP.Internal
} }
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> Entries { get; } = public ConcurrentDictionary<string, ConsumerExecutorDescriptor> Entries { get; } =
new ConcurrentDictionary<string, ConsumerExecutorDescriptor>(); new ConcurrentDictionary<string, ConsumerExecutorDescriptor>();
} }
} }
\ No newline at end of file
...@@ -16,12 +16,7 @@ namespace DotNetCore.CAP.Internal ...@@ -16,12 +16,7 @@ namespace DotNetCore.CAP.Internal
/// <param name="typeAttributes">The set of attributes for the <see cref="Type"/>.</param> /// <param name="typeAttributes">The set of attributes for the <see cref="Type"/>.</param>
public ModelAttributes(IEnumerable<object> typeAttributes) public ModelAttributes(IEnumerable<object> typeAttributes)
{ {
if (typeAttributes == null) Attributes = typeAttributes?.ToArray() ?? throw new ArgumentNullException(nameof(typeAttributes));
{
throw new ArgumentNullException(nameof(typeAttributes));
}
Attributes = typeAttributes.ToArray();
TypeAttributes = Attributes; TypeAttributes = Attributes;
} }
...@@ -34,18 +29,12 @@ namespace DotNetCore.CAP.Internal ...@@ -34,18 +29,12 @@ namespace DotNetCore.CAP.Internal
/// </param> /// </param>
public ModelAttributes(IEnumerable<object> propertyAttributes, IEnumerable<object> typeAttributes) public ModelAttributes(IEnumerable<object> propertyAttributes, IEnumerable<object> typeAttributes)
{ {
if (propertyAttributes == null) PropertyAttributes = propertyAttributes?.ToArray()
{ ?? throw new ArgumentNullException(nameof(propertyAttributes));
throw new ArgumentNullException(nameof(propertyAttributes));
}
if (typeAttributes == null) TypeAttributes = typeAttributes?.ToArray()
{ ?? throw new ArgumentNullException(nameof(typeAttributes));
throw new ArgumentNullException(nameof(typeAttributes));
}
PropertyAttributes = propertyAttributes.ToArray();
TypeAttributes = typeAttributes.ToArray();
Attributes = PropertyAttributes.Concat(TypeAttributes).ToArray(); Attributes = PropertyAttributes.Concat(TypeAttributes).ToArray();
} }
......
...@@ -15,27 +15,20 @@ namespace DotNetCore.CAP.Internal ...@@ -15,27 +15,20 @@ namespace DotNetCore.CAP.Internal
private readonly ConsumerMethodExecutor _executor; private readonly ConsumerMethodExecutor _executor;
private static readonly MethodInfo _convertOfTMethod = private static readonly MethodInfo _convertOfTMethod =
typeof(ObjectMethodExecutor).GetRuntimeMethods().Single(methodInfo => methodInfo.Name == nameof(ObjectMethodExecutor.Convert)); typeof(ObjectMethodExecutor).GetRuntimeMethods()
.Single(methodInfo => methodInfo.Name == nameof(Convert));
private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{ {
if (methodInfo == null) MethodInfo = methodInfo ?? throw new ArgumentNullException(nameof(methodInfo));
{
throw new ArgumentNullException(nameof(methodInfo));
}
MethodInfo = methodInfo;
TargetTypeInfo = targetTypeInfo; TargetTypeInfo = targetTypeInfo;
MethodParameters = methodInfo.GetParameters(); MethodParameters = methodInfo.GetParameters();
MethodReturnType = methodInfo.ReturnType; MethodReturnType = methodInfo.ReturnType;
IsMethodAsync = typeof(Task).IsAssignableFrom(MethodReturnType); IsMethodAsync = typeof(Task).IsAssignableFrom(MethodReturnType);
TaskGenericType = IsMethodAsync ? GetTaskInnerTypeOrNull(MethodReturnType) : null; TaskGenericType = IsMethodAsync ? GetTaskInnerTypeOrNull(MethodReturnType) : null;
//IsTypeAssignableFromIActionResult = typeof(IActionResult).IsAssignableFrom(TaskGenericType ?? MethodReturnType);
if (IsMethodAsync && TaskGenericType != null) if (IsMethodAsync && TaskGenericType != null)
{ {
// For backwards compatibility we're creating a sync-executor for an async method. This was
// supported in the past even though MVC wouldn't have called it.
_executor = GetExecutor(methodInfo, targetTypeInfo); _executor = GetExecutor(methodInfo, targetTypeInfo);
_executorAsync = GetExecutorAsync(TaskGenericType, methodInfo, targetTypeInfo); _executorAsync = GetExecutorAsync(TaskGenericType, methodInfo, targetTypeInfo);
} }
...@@ -129,21 +122,25 @@ namespace DotNetCore.CAP.Internal ...@@ -129,21 +122,25 @@ namespace DotNetCore.CAP.Internal
{ {
// must coerce methodCall to match ActionExecutor signature // must coerce methodCall to match ActionExecutor signature
var castMethodCall = Expression.Convert(methodCall, typeof(object)); var castMethodCall = Expression.Convert(methodCall, typeof(object));
var lambda = Expression.Lambda<ConsumerMethodExecutor>(castMethodCall, targetParameter, parametersParameter); var lambda =
Expression.Lambda<ConsumerMethodExecutor>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile(); return lambda.Compile();
} }
} }
private static ConsumerMethodExecutor WrapVoidAction(VoidActionExecutor executor) private static ConsumerMethodExecutor WrapVoidAction(VoidActionExecutor executor)
{ {
return delegate (object target, object[] parameters) return delegate(object target, object[] parameters)
{ {
executor(target, parameters); executor(target, parameters);
return null; return null;
}; };
} }
private static ConsumerMethodExecutorAsync GetExecutorAsync(Type taskInnerType, MethodInfo methodInfo, TypeInfo targetTypeInfo) private static ConsumerMethodExecutorAsync GetExecutorAsync(
Type taskInnerType,
MethodInfo methodInfo,
TypeInfo targetTypeInfo)
{ {
// Parameters to executor // Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target"); var targetParameter = Expression.Parameter(typeof(object), "target");
...@@ -167,7 +164,10 @@ namespace DotNetCore.CAP.Internal ...@@ -167,7 +164,10 @@ namespace DotNetCore.CAP.Internal
var methodCall = Expression.Call(instanceCast, methodInfo, parameters); var methodCall = Expression.Call(instanceCast, methodInfo, parameters);
var coerceMethodCall = GetCoerceMethodCallExpression(taskInnerType, methodCall, methodInfo); var coerceMethodCall = GetCoerceMethodCallExpression(taskInnerType, methodCall, methodInfo);
var lambda = Expression.Lambda<ConsumerMethodExecutorAsync>(coerceMethodCall, targetParameter, parametersParameter);
var lambda = Expression.Lambda<ConsumerMethodExecutorAsync>(coerceMethodCall,
targetParameter, parametersParameter);
return lambda.Compile(); return lambda.Compile();
} }
...@@ -181,8 +181,6 @@ namespace DotNetCore.CAP.Internal ...@@ -181,8 +181,6 @@ namespace DotNetCore.CAP.Internal
MethodInfo methodInfo) MethodInfo methodInfo)
{ {
var castMethodCall = Expression.Convert(methodCall, typeof(object)); var castMethodCall = Expression.Convert(methodCall, typeof(object));
// for: public Task<T> Action()
// constructs: return (Task<object>)Convert<T>((Task<T>)result)
var genericMethodInfo = _convertOfTMethod.MakeGenericMethod(taskValueType); var genericMethodInfo = _convertOfTMethod.MakeGenericMethod(taskValueType);
var genericMethodCall = Expression.Call(null, genericMethodInfo, castMethodCall); var genericMethodCall = Expression.Call(null, genericMethodInfo, castMethodCall);
var convertedResult = Expression.Convert(genericMethodCall, typeof(Task<object>)); var convertedResult = Expression.Convert(genericMethodCall, typeof(Task<object>));
...@@ -194,7 +192,7 @@ namespace DotNetCore.CAP.Internal ...@@ -194,7 +192,7 @@ namespace DotNetCore.CAP.Internal
/// </summary> /// </summary>
private static async Task<object> CastToObject<T>(Task<T> task) private static async Task<object> CastToObject<T>(Task<T> task)
{ {
return (object)await task; return (object) await task;
} }
private static Type GetTaskInnerTypeOrNull(Type type) private static Type GetTaskInnerTypeOrNull(Type type)
...@@ -281,7 +279,7 @@ namespace DotNetCore.CAP.Internal ...@@ -281,7 +279,7 @@ namespace DotNetCore.CAP.Internal
private static Task<object> Convert<T>(object taskAsObject) private static Task<object> Convert<T>(object taskAsObject)
{ {
var task = (Task<T>)taskAsObject; var task = (Task<T>) taskAsObject;
return CastToObject<T>(task); return CastToObject<T>(task);
} }
......
...@@ -5,7 +5,7 @@ namespace DotNetCore.CAP.Job ...@@ -5,7 +5,7 @@ namespace DotNetCore.CAP.Job
{ {
public class ComputedCronJob public class ComputedCronJob
{ {
private CronJobRegistry.Entry _entry; private readonly CronJobRegistry.Entry _entry;
public ComputedCronJob() public ComputedCronJob()
{ {
......
...@@ -96,7 +96,7 @@ namespace DotNetCore.CAP.Job ...@@ -96,7 +96,7 @@ namespace DotNetCore.CAP.Job
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> /// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute) public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute)
{ {
return string.Format("{0} {1} * * {2}", minute, hour, (int)dayOfWeek); return string.Format("{0} {1} * * {2}", minute, hour, (int) dayOfWeek);
} }
/// <summary> /// <summary>
...@@ -192,7 +192,7 @@ namespace DotNetCore.CAP.Job ...@@ -192,7 +192,7 @@ namespace DotNetCore.CAP.Job
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> /// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Yearly(int month, int day, int hour, int minute) public static string Yearly(int month, int day, int hour, int minute)
{ {
return string.Format("{0} {1} {2} {3} *", minute, hour, day, month); return $"{minute} {hour} {day} {month} *";
} }
} }
} }
\ No newline at end of file
...@@ -5,13 +5,11 @@ namespace DotNetCore.CAP.Job ...@@ -5,13 +5,11 @@ namespace DotNetCore.CAP.Job
{ {
public class DefaultCronJobRegistry : CronJobRegistry public class DefaultCronJobRegistry : CronJobRegistry
{ {
private readonly CapOptions _options;
public DefaultCronJobRegistry(IOptions<CapOptions> options) public DefaultCronJobRegistry(IOptions<CapOptions> options)
{ {
_options = options.Value; var options1 = options.Value;
RegisterJob<CapJob>(nameof(DefaultCronJobRegistry), _options.CronExp, RetryBehavior.DefaultRetry); RegisterJob<CapJob>(nameof(DefaultCronJobRegistry), options1.CronExp, RetryBehavior.DefaultRetry);
} }
} }
} }
\ No newline at end of file
...@@ -7,9 +7,9 @@ namespace DotNetCore.CAP.Job ...@@ -7,9 +7,9 @@ namespace DotNetCore.CAP.Job
{ {
public abstract class CronJobRegistry public abstract class CronJobRegistry
{ {
private List<Entry> _entries; private readonly List<Entry> _entries;
public CronJobRegistry() protected CronJobRegistry()
{ {
_entries = new List<Entry>(); _entries = new List<Entry>();
} }
......
...@@ -57,7 +57,6 @@ namespace DotNetCore.CAP.Job ...@@ -57,7 +57,6 @@ namespace DotNetCore.CAP.Job
{ {
_logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex); _logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex);
} }
} }
} }
} }
......
...@@ -10,9 +10,9 @@ namespace DotNetCore.CAP.Job ...@@ -10,9 +10,9 @@ namespace DotNetCore.CAP.Job
{ {
public class CronJobProcessor : IJobProcessor public class CronJobProcessor : IJobProcessor
{ {
private ILogger _logger; private readonly ILogger _logger;
private IServiceProvider _provider; private IServiceProvider _provider;
private DefaultCronJobRegistry _jobRegistry; private readonly DefaultCronJobRegistry _jobRegistry;
public CronJobProcessor( public CronJobProcessor(
DefaultCronJobRegistry jobRegistry, DefaultCronJobRegistry jobRegistry,
...@@ -105,12 +105,7 @@ namespace DotNetCore.CAP.Job ...@@ -105,12 +105,7 @@ namespace DotNetCore.CAP.Job
if (success) if (success)
{ {
//var connection = provider.GetRequiredService<IStorageConnection>();
//await connection.AttachCronJobAsync(computedJob.Job);
computedJob.Update(DateTime.UtcNow); computedJob.Update(DateTime.UtcNow);
//await connection.UpdateCronJobAsync(computedJob.Job);
} }
} }
} }
......
...@@ -6,8 +6,8 @@ namespace DotNetCore.CAP.Job ...@@ -6,8 +6,8 @@ namespace DotNetCore.CAP.Job
{ {
public class InfiniteRetryProcessor : IJobProcessor public class InfiniteRetryProcessor : IJobProcessor
{ {
private IJobProcessor _inner; private readonly IJobProcessor _inner;
private ILogger _logger; private readonly ILogger _logger;
public InfiniteRetryProcessor( public InfiniteRetryProcessor(
IJobProcessor inner, IJobProcessor inner,
......
...@@ -4,12 +4,11 @@ using System.Linq; ...@@ -4,12 +4,11 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace DotNetCore.CAP namespace DotNetCore.CAP.Job
{ {
public class JobProcessingServer : IProcessingServer, IDisposable public class JobProcessingServer : IProcessingServer, IDisposable
{ {
...@@ -43,7 +42,6 @@ namespace DotNetCore.CAP ...@@ -43,7 +42,6 @@ namespace DotNetCore.CAP
public void Start() public void Start()
{ {
var processorCount = Environment.ProcessorCount; var processorCount = Environment.ProcessorCount;
processorCount = 1;
_processors = GetProcessors(processorCount); _processors = GetProcessors(processorCount);
_logger.ServerStarting(processorCount, 1); _logger.ServerStarting(processorCount, 1);
...@@ -53,7 +51,7 @@ namespace DotNetCore.CAP ...@@ -53,7 +51,7 @@ namespace DotNetCore.CAP
_cts.Token); _cts.Token);
var processorTasks = _processors var processorTasks = _processors
.Select(p => InfiniteRetry(p)) .Select(InfiniteRetry)
.Select(p => p.ProcessAsync(_context)); .Select(p => p.ProcessAsync(_context));
_compositeTask = Task.WhenAll(processorTasks); _compositeTask = Task.WhenAll(processorTasks);
} }
...@@ -70,7 +68,7 @@ namespace DotNetCore.CAP ...@@ -70,7 +68,7 @@ namespace DotNetCore.CAP
_cts.Cancel(); _cts.Cancel();
try try
{ {
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); _compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds);
} }
catch (AggregateException ex) catch (AggregateException ex)
{ {
...@@ -97,7 +95,7 @@ namespace DotNetCore.CAP ...@@ -97,7 +95,7 @@ namespace DotNetCore.CAP
{ {
if (processor is CronJobProcessor) if (processor is CronJobProcessor)
{ {
if (i == 0) // only add first cronJob if (i == 0) // only add first cronJob
returnedProcessors.Add(processor); returnedProcessors.Add(processor);
} }
else else
......
...@@ -58,10 +58,7 @@ namespace DotNetCore.CAP.Job ...@@ -58,10 +58,7 @@ namespace DotNetCore.CAP.Job
public void Dispose() public void Dispose()
{ {
if (_scope != null) _scope?.Dispose();
{
_scope.Dispose();
}
} }
} }
} }
\ No newline at end of file
...@@ -10,15 +10,15 @@ namespace DotNetCore.CAP.Job ...@@ -10,15 +10,15 @@ namespace DotNetCore.CAP.Job
public static readonly RetryBehavior DefaultRetry; public static readonly RetryBehavior DefaultRetry;
public static readonly RetryBehavior NoRetry; public static readonly RetryBehavior NoRetry;
private static Random _random = new Random(); private static readonly Random _random = new Random();
private Func<int, int> _retryInThunk; private readonly Func<int, int> _retryInThunk;
static RetryBehavior() static RetryBehavior()
{ {
DefaultRetryCount = 25; DefaultRetryCount = 25;
DefaultRetryInThunk = retries => DefaultRetryInThunk = retries =>
(int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries))); (int) Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));
DefaultRetry = new RetryBehavior(true); DefaultRetry = new RetryBehavior(true);
NoRetry = new RetryBehavior(false); NoRetry = new RetryBehavior(false);
......
...@@ -8,20 +8,20 @@ namespace DotNetCore.CAP ...@@ -8,20 +8,20 @@ namespace DotNetCore.CAP
{ {
internal static class LoggerExtensions internal static class LoggerExtensions
{ {
private static Action<ILogger, int, int, Exception> _serverStarting; private static readonly Action<ILogger, int, int, Exception> _serverStarting;
private static Action<ILogger, Exception> _serverStartingError; private static readonly Action<ILogger, Exception> _serverStartingError;
private static Action<ILogger, Exception> _serverShuttingDown; private static readonly Action<ILogger, Exception> _serverShuttingDown;
private static Action<ILogger, string, Exception> _expectedOperationCanceledException; private static readonly Action<ILogger, string, Exception> _expectedOperationCanceledException;
private static Action<ILogger, Exception> _cronJobsNotFound; private static readonly Action<ILogger, Exception> _cronJobsNotFound;
private static Action<ILogger, int, Exception> _cronJobsScheduling; private static readonly Action<ILogger, int, Exception> _cronJobsScheduling;
private static Action<ILogger, string, double, Exception> _cronJobExecuted; private static readonly Action<ILogger, string, double, Exception> _cronJobExecuted;
private static Action<ILogger, string, Exception> _cronJobFailed; private static readonly Action<ILogger, string, Exception> _cronJobFailed;
private static Action<ILogger, string, string, Exception> _enqueuingSentMessage; private static readonly Action<ILogger, string, string, Exception> _enqueuingSentMessage;
private static Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage; private static readonly Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage;
private static Action<ILogger, string, Exception> _executingConsumerMethod; private static readonly Action<ILogger, string, Exception> _executingConsumerMethod;
private static Action<ILogger, string, Exception> _receivedMessageRetryExecuting; private static readonly Action<ILogger, string, Exception> _receivedMessageRetryExecuting;
static LoggerExtensions() static LoggerExtensions()
{ {
...@@ -31,9 +31,9 @@ namespace DotNetCore.CAP ...@@ -31,9 +31,9 @@ namespace DotNetCore.CAP
"Starting the processing server. Detected {MachineProcessorCount} machine processor(s). Initiating {ProcessorCount} job processor(s)."); "Starting the processing server. Detected {MachineProcessorCount} machine processor(s). Initiating {ProcessorCount} job processor(s).");
_serverStartingError = LoggerMessage.Define( _serverStartingError = LoggerMessage.Define(
LogLevel.Error, LogLevel.Error,
5, 5,
"Starting the processing server throw an exception."); "Starting the processing server throw an exception.");
_serverShuttingDown = LoggerMessage.Define( _serverShuttingDown = LoggerMessage.Define(
LogLevel.Debug, LogLevel.Debug,
......
...@@ -9,7 +9,6 @@ namespace DotNetCore.CAP ...@@ -9,7 +9,6 @@ namespace DotNetCore.CAP
public class OperateResult public class OperateResult
{ {
// ReSharper disable once InconsistentNaming // ReSharper disable once InconsistentNaming
private static readonly OperateResult _success = new OperateResult { Succeeded = true };
// ReSharper disable once FieldCanBeMadeReadOnly.Local // ReSharper disable once FieldCanBeMadeReadOnly.Local
private List<OperateError> _errors = new List<OperateError>(); private List<OperateError> _errors = new List<OperateError>();
...@@ -30,7 +29,7 @@ namespace DotNetCore.CAP ...@@ -30,7 +29,7 @@ namespace DotNetCore.CAP
/// Returns an <see cref="OperateResult"/> indicating a successful identity operation. /// Returns an <see cref="OperateResult"/> indicating a successful identity operation.
/// </summary> /// </summary>
/// <returns>An <see cref="OperateResult"/> indicating a successful operation.</returns> /// <returns>An <see cref="OperateResult"/> indicating a successful operation.</returns>
public static OperateResult Success => _success; public static OperateResult Success { get; } = new OperateResult {Succeeded = true};
/// <summary> /// <summary>
/// Creates an <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable. /// Creates an <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.
...@@ -39,7 +38,7 @@ namespace DotNetCore.CAP ...@@ -39,7 +38,7 @@ namespace DotNetCore.CAP
/// <returns>An <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns> /// <returns>An <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns>
public static OperateResult Failed(params OperateError[] errors) public static OperateResult Failed(params OperateError[] errors)
{ {
var result = new OperateResult { Succeeded = false }; var result = new OperateResult {Succeeded = false};
if (errors != null) if (errors != null)
{ {
result._errors.AddRange(errors); result._errors.AddRange(errors);
...@@ -57,9 +56,9 @@ namespace DotNetCore.CAP ...@@ -57,9 +56,9 @@ namespace DotNetCore.CAP
/// </remarks> /// </remarks>
public override string ToString() public override string ToString()
{ {
return Succeeded ? return Succeeded
"Succeeded" : ? "Succeeded"
string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList())); : string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList()));
} }
} }
......
...@@ -7,6 +7,5 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test ...@@ -7,6 +7,5 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test
{ {
public class CapMessageStoreTest public class CapMessageStoreTest
{ {
} }
} }
\ No newline at end of file
...@@ -11,7 +11,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test ...@@ -11,7 +11,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test
private const string MasterDatabaseName = "master"; private const string MasterDatabaseName = "master";
private const string DefaultDatabaseName = @"DotNetCore.CAP.EntityFrameworkCore.Test"; private const string DefaultDatabaseName = @"DotNetCore.CAP.EntityFrameworkCore.Test";
private const string DefaultConnectionStringTemplate = @"Server=192.168.2.206;Initial Catalog={0};User Id=sa;Password=123123;MultipleActiveResultSets=True"; private const string DefaultConnectionStringTemplate =
@"Server=192.168.2.206;Initial Catalog={0};User Id=sa;Password=123123;MultipleActiveResultSets=True";
public static string GetDatabaseName() public static string GetDatabaseName()
{ {
...@@ -43,4 +44,4 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test ...@@ -43,4 +44,4 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test
return connection; return connection;
} }
} }
} }
\ No newline at end of file
...@@ -5,58 +5,58 @@ using Microsoft.EntityFrameworkCore; ...@@ -5,58 +5,58 @@ using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore.Test namespace DotNetCore.CAP.EntityFrameworkCore.Test
{ {
public abstract class DatabaseTestHost : TestHost public abstract class DatabaseTestHost : TestHost
{ {
private static bool _sqlObjectInstalled; private static bool _sqlObjectInstalled;
protected override void PostBuildServices() protected override void PostBuildServices()
{ {
base.PostBuildServices(); base.PostBuildServices();
InitializeDatabase(); InitializeDatabase();
} }
public override void Dispose() public override void Dispose()
{ {
DeleteAllData(); DeleteAllData();
base.Dispose(); base.Dispose();
} }
private void InitializeDatabase() private void InitializeDatabase()
{ {
if (!_sqlObjectInstalled) if (!_sqlObjectInstalled)
{ {
using (CreateScope()) using (CreateScope())
{ {
var context = GetService<TestDbContext>(); var context = GetService<TestDbContext>();
context.Database.EnsureDeleted(); context.Database.EnsureDeleted();
context.Database.Migrate(); context.Database.Migrate();
_sqlObjectInstalled = true; _sqlObjectInstalled = true;
} }
} }
} }
private void DeleteAllData() private void DeleteAllData()
{ {
using (CreateScope()) using (CreateScope())
{ {
var context = GetService<TestDbContext>(); var context = GetService<TestDbContext>();
var commands = new[] var commands = new[]
{ {
"DISABLE TRIGGER ALL ON ?", "DISABLE TRIGGER ALL ON ?",
"ALTER TABLE ? NOCHECK CONSTRAINT ALL", "ALTER TABLE ? NOCHECK CONSTRAINT ALL",
"DELETE FROM ?", "DELETE FROM ?",
"ALTER TABLE ? CHECK CONSTRAINT ALL", "ALTER TABLE ? CHECK CONSTRAINT ALL",
"ENABLE TRIGGER ALL ON ?" "ENABLE TRIGGER ALL ON ?"
}; };
foreach (var command in commands) foreach (var command in commands)
{ {
context.Database.GetDbConnection().Execute( context.Database.GetDbConnection().Execute(
"sp_MSforeachtable", "sp_MSforeachtable",
new { command1 = command }, new {command1 = command},
commandType: CommandType.StoredProcedure); commandType: CommandType.StoredProcedure);
} }
} }
} }
} }
} }
\ No newline at end of file
...@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test ...@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test
db.Attach(message).State = Microsoft.EntityFrameworkCore.EntityState.Added; db.Attach(message).State = Microsoft.EntityFrameworkCore.EntityState.Added;
db.SaveChanges(); db.SaveChanges();
Assert.True(db.CapSentMessages.Any(u => u.Id == guid)); Assert.True(db.CapSentMessages.Any(u => u.Id == guid));
Assert.NotNull(db.CapSentMessages.FirstOrDefault(u => u.StatusName == StatusName.Enqueued)); Assert.NotNull(db.CapSentMessages.FirstOrDefault(u => u.StatusName == StatusName.Enqueued));
} }
......
...@@ -20,10 +20,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations ...@@ -20,10 +20,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations
Retries = table.Column<int>(nullable: false), Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: true) StatusName = table.Column<string>(maxLength: 50, nullable: true)
}, },
constraints: table => constraints: table => { table.PrimaryKey("PK_CapReceivedMessages", x => x.Id); });
{
table.PrimaryKey("PK_CapReceivedMessages", x => x.Id);
});
migrationBuilder.CreateTable( migrationBuilder.CreateTable(
name: "CapSentMessages", name: "CapSentMessages",
...@@ -37,10 +34,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations ...@@ -37,10 +34,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations
Retries = table.Column<int>(nullable: false), Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: true) StatusName = table.Column<string>(maxLength: 50, nullable: true)
}, },
constraints: table => constraints: table => { table.PrimaryKey("PK_CapSentMessages", x => x.Id); });
{
table.PrimaryKey("PK_CapSentMessages", x => x.Id);
});
} }
protected override void Down(MigrationBuilder migrationBuilder) protected override void Down(MigrationBuilder migrationBuilder)
...@@ -52,4 +46,4 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations ...@@ -52,4 +46,4 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations
name: "CapSentMessages"); name: "CapSentMessages");
} }
} }
} }
\ No newline at end of file
...@@ -17,50 +17,50 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations ...@@ -17,50 +17,50 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test.Migrations
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapReceivedMessage", b => modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapReceivedMessage", b =>
{ {
b.Property<string>("Id") b.Property<string>("Id")
.ValueGeneratedOnAdd(); .ValueGeneratedOnAdd();
b.Property<DateTime>("Added"); b.Property<DateTime>("Added");
b.Property<string>("Content"); b.Property<string>("Content");
b.Property<string>("KeyName"); b.Property<string>("KeyName");
b.Property<DateTime>("LastRun"); b.Property<DateTime>("LastRun");
b.Property<int>("Retries"); b.Property<int>("Retries");
b.Property<string>("StatusName") b.Property<string>("StatusName")
.HasMaxLength(50); .HasMaxLength(50);
b.HasKey("Id"); b.HasKey("Id");
b.ToTable("CapReceivedMessages"); b.ToTable("CapReceivedMessages");
}); });
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapSentMessage", b => modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapSentMessage", b =>
{ {
b.Property<string>("Id") b.Property<string>("Id")
.ValueGeneratedOnAdd(); .ValueGeneratedOnAdd();
b.Property<DateTime>("Added"); b.Property<DateTime>("Added");
b.Property<string>("Content"); b.Property<string>("Content");
b.Property<string>("KeyName"); b.Property<string>("KeyName");
b.Property<DateTime>("LastRun"); b.Property<DateTime>("LastRun");
b.Property<int>("Retries"); b.Property<int>("Retries");
b.Property<string>("StatusName") b.Property<string>("StatusName")
.HasMaxLength(50); .HasMaxLength(50);
b.HasKey("Id"); b.HasKey("Id");
b.ToTable("CapSentMessages"); b.ToTable("CapSentMessages");
}); });
} }
} }
} }
\ No newline at end of file
...@@ -10,4 +10,4 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test ...@@ -10,4 +10,4 @@ namespace DotNetCore.CAP.EntityFrameworkCore.Test
optionsBuilder.UseSqlServer(connectionString); optionsBuilder.UseSqlServer(connectionString);
} }
} }
} }
\ No newline at end of file
...@@ -4,94 +4,94 @@ using Microsoft.Extensions.DependencyInjection; ...@@ -4,94 +4,94 @@ using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.EntityFrameworkCore.Test namespace DotNetCore.CAP.EntityFrameworkCore.Test
{ {
public abstract class TestHost : IDisposable public abstract class TestHost : IDisposable
{ {
protected IServiceCollection _services; protected IServiceCollection _services;
private IServiceProvider _provider; private IServiceProvider _provider;
private IServiceProvider _scopedProvider; private IServiceProvider _scopedProvider;
public TestHost() public TestHost()
{ {
CreateServiceCollection(); CreateServiceCollection();
PreBuildServices(); PreBuildServices();
BuildServices(); BuildServices();
PostBuildServices(); PostBuildServices();
} }
protected IServiceProvider Provider => _scopedProvider ?? _provider; protected IServiceProvider Provider => _scopedProvider ?? _provider;
private void CreateServiceCollection() private void CreateServiceCollection()
{ {
var services = new ServiceCollection(); var services = new ServiceCollection();
services.AddOptions(); services.AddOptions();
services.AddLogging(); services.AddLogging();
var connectionString = ConnectionUtil.GetConnectionString(); var connectionString = ConnectionUtil.GetConnectionString();
//services.AddSingleton(new SqlServerOptions { ConnectionString = connectionString }); //services.AddSingleton(new SqlServerOptions { ConnectionString = connectionString });
services.AddDbContext<TestDbContext>(options => options.UseSqlServer(connectionString)); services.AddDbContext<TestDbContext>(options => options.UseSqlServer(connectionString));
_services = services; _services = services;
} }
protected virtual void PreBuildServices() protected virtual void PreBuildServices()
{ {
} }
private void BuildServices() private void BuildServices()
{ {
_provider = _services.BuildServiceProvider(); _provider = _services.BuildServiceProvider();
} }
protected virtual void PostBuildServices() protected virtual void PostBuildServices()
{ {
} }
public IDisposable CreateScope() public IDisposable CreateScope()
{ {
var scope = CreateScope(_provider); var scope = CreateScope(_provider);
var loc = scope.ServiceProvider; var loc = scope.ServiceProvider;
_scopedProvider = loc; _scopedProvider = loc;
return new DelegateDisposable(() => return new DelegateDisposable(() =>
{ {
if (_scopedProvider == loc) if (_scopedProvider == loc)
{ {
_scopedProvider = null; _scopedProvider = null;
} }
scope.Dispose(); scope.Dispose();
}); });
} }
public IServiceScope CreateScope(IServiceProvider provider) public IServiceScope CreateScope(IServiceProvider provider)
{ {
var scope = provider.GetService<IServiceScopeFactory>().CreateScope(); var scope = provider.GetService<IServiceScopeFactory>().CreateScope();
return scope; return scope;
} }
public T GetService<T>() => Provider.GetService<T>(); public T GetService<T>() => Provider.GetService<T>();
public T Ensure<T>(ref T service) public T Ensure<T>(ref T service)
where T : class where T : class
=> service ?? (service = GetService<T>()); => service ?? (service = GetService<T>());
public virtual void Dispose() public virtual void Dispose()
{ {
(_provider as IDisposable)?.Dispose(); (_provider as IDisposable)?.Dispose();
} }
private class DelegateDisposable : IDisposable private class DelegateDisposable : IDisposable
{ {
private Action _dispose; private Action _dispose;
public DelegateDisposable(Action dispose) public DelegateDisposable(Action dispose)
{ {
_dispose = dispose; _dispose = dispose;
} }
public void Dispose() public void Dispose()
{ {
_dispose(); _dispose();
} }
} }
} }
} }
\ No newline at end of file
...@@ -17,7 +17,6 @@ namespace DotNetCore.CAP.Test ...@@ -17,7 +17,6 @@ namespace DotNetCore.CAP.Test
services.AddCap().AddMessageStore<MyMessageStore>(); services.AddCap().AddMessageStore<MyMessageStore>();
var thingy = services.BuildServiceProvider() var thingy = services.BuildServiceProvider()
.GetRequiredService<ICapMessageStore>() as MyMessageStore; .GetRequiredService<ICapMessageStore>() as MyMessageStore;
Assert.NotNull(thingy); Assert.NotNull(thingy);
...@@ -55,7 +54,8 @@ namespace DotNetCore.CAP.Test ...@@ -55,7 +54,8 @@ namespace DotNetCore.CAP.Test
throw new NotImplementedException(); throw new NotImplementedException();
} }
public Task SendAsync<T>(string topic, T contentObj) { public Task SendAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException(); throw new NotImplementedException();
} }
} }
...@@ -71,12 +71,14 @@ namespace DotNetCore.CAP.Test ...@@ -71,12 +71,14 @@ namespace DotNetCore.CAP.Test
private class MyMessageStore : ICapMessageStore private class MyMessageStore : ICapMessageStore
{ {
public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, bool autoSaveChanges = true) public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
public Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName, bool autoSaveChanges = true) public Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName,
bool autoSaveChanges = true)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
......
...@@ -2,6 +2,5 @@ ...@@ -2,6 +2,5 @@
{ {
public class ConsistencyOptionsTest public class ConsistencyOptionsTest
{ {
} }
} }
\ No newline at end of file
...@@ -11,7 +11,6 @@ namespace DotNetCore.CAP.Test ...@@ -11,7 +11,6 @@ namespace DotNetCore.CAP.Test
{ {
public class ConsumerServiceSelectorTest public class ConsumerServiceSelectorTest
{ {
private IServiceProvider _provider; private IServiceProvider _provider;
public ConsumerServiceSelectorTest() public ConsumerServiceSelectorTest()
...@@ -45,7 +44,6 @@ namespace DotNetCore.CAP.Test ...@@ -45,7 +44,6 @@ namespace DotNetCore.CAP.Test
Assert.NotNull(bestCandidates.MethodInfo); Assert.NotNull(bestCandidates.MethodInfo);
Assert.Equal(bestCandidates.MethodInfo.ReturnType, typeof(Task)); Assert.Equal(bestCandidates.MethodInfo.ReturnType, typeof(Task));
} }
} }
public class CandidatesTopic : TopicAttribute public class CandidatesTopic : TopicAttribute
...@@ -55,8 +53,13 @@ namespace DotNetCore.CAP.Test ...@@ -55,8 +53,13 @@ namespace DotNetCore.CAP.Test
} }
} }
public interface IFooTest { } public interface IFooTest
public interface IBarTest { } {
}
public interface IBarTest
{
}
public class CandidatesFooTest : IFooTest, ICapSubscribe public class CandidatesFooTest : IFooTest, ICapSubscribe
{ {
...@@ -94,5 +97,4 @@ namespace DotNetCore.CAP.Test ...@@ -94,5 +97,4 @@ namespace DotNetCore.CAP.Test
Console.WriteLine("GetBar3() method has bee excuted."); Console.WriteLine("GetBar3() method has bee excuted.");
} }
} }
}
} \ No newline at end of file
...@@ -53,4 +53,4 @@ namespace DotNetCore.CAP.Test.Job ...@@ -53,4 +53,4 @@ namespace DotNetCore.CAP.Test.Job
Assert.True(computed.Next > now); Assert.True(computed.Next > now);
} }
} }
} }
\ No newline at end of file
...@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.Test.Job ...@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.Test.Job
services.AddTransient<DefaultCronJobRegistry>(); services.AddTransient<DefaultCronJobRegistry>();
services.AddLogging(); services.AddLogging();
services.AddSingleton(_options); services.AddSingleton(_options);
services.AddSingleton(_mockStorage.Object); services.AddSingleton(_mockStorage.Object);
_provider = services.BuildServiceProvider(); _provider = services.BuildServiceProvider();
_context = new ProcessingContext(_provider, null, _cancellationTokenSource.Token); _context = new ProcessingContext(_provider, null, _cancellationTokenSource.Token);
...@@ -182,4 +182,4 @@ namespace DotNetCore.CAP.Test.Job ...@@ -182,4 +182,4 @@ namespace DotNetCore.CAP.Test.Job
// public void Throw() { throw new Exception(); } // public void Throw() { throw new Exception(); }
//} //}
} }
} }
\ No newline at end of file
...@@ -7,12 +7,14 @@ namespace DotNetCore.CAP.Test ...@@ -7,12 +7,14 @@ namespace DotNetCore.CAP.Test
{ {
public class NoopMessageStore : ICapMessageStore public class NoopMessageStore : ICapMessageStore
{ {
public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, bool autoSaveChanges = true) public Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
public Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName, bool autoSaveChanges = true) public Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName,
bool autoSaveChanges = true)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
......
...@@ -9,7 +9,6 @@ namespace DotNetCore.CAP.Test ...@@ -9,7 +9,6 @@ namespace DotNetCore.CAP.Test
{ {
public class ObjectMethodExecutorTest public class ObjectMethodExecutorTest
{ {
[Fact] [Fact]
public void CanCreateInstance() public void CanCreateInstance()
{ {
...@@ -72,7 +71,6 @@ namespace DotNetCore.CAP.Test ...@@ -72,7 +71,6 @@ namespace DotNetCore.CAP.Test
{ {
public void Foo() public void Foo()
{ {
} }
public int GetThree() public int GetThree()
...@@ -87,7 +85,6 @@ namespace DotNetCore.CAP.Test ...@@ -87,7 +85,6 @@ namespace DotNetCore.CAP.Test
public void WithDefaultValue(string aaa = "aaa", string bbb = "bbb") public void WithDefaultValue(string aaa = "aaa", string bbb = "bbb")
{ {
} }
} }
} }
\ No newline at end of file
...@@ -6,7 +6,8 @@ namespace DotNetCore.CAP.Test ...@@ -6,7 +6,8 @@ namespace DotNetCore.CAP.Test
public class OperateResultTest public class OperateResultTest
{ {
[Fact] [Fact]
public void VerifyDefaultConstructor() { public void VerifyDefaultConstructor()
{
var result = new OperateResult(); var result = new OperateResult();
Assert.False(result.Succeeded); Assert.False(result.Succeeded);
...@@ -14,7 +15,8 @@ namespace DotNetCore.CAP.Test ...@@ -14,7 +15,8 @@ namespace DotNetCore.CAP.Test
} }
[Fact] [Fact]
public void NullFaildUsesEmptyErrors() { public void NullFaildUsesEmptyErrors()
{
var result = OperateResult.Failed(); var result = OperateResult.Failed();
Assert.False(result.Succeeded); Assert.False(result.Succeeded);
......
...@@ -8,7 +8,6 @@ using Xunit; ...@@ -8,7 +8,6 @@ using Xunit;
namespace DotNetCore.CAP.Test namespace DotNetCore.CAP.Test
{ {
public abstract class MessageManagerTestBase public abstract class MessageManagerTestBase
{ {
private const string NullValue = "(null)"; private const string NullValue = "(null)";
...@@ -27,7 +26,8 @@ namespace DotNetCore.CAP.Test ...@@ -27,7 +26,8 @@ namespace DotNetCore.CAP.Test
services.AddSingleton<ILogger<ICapMessageStore>>(new TestLogger<ICapMessageStore>()); services.AddSingleton<ILogger<ICapMessageStore>>(new TestLogger<ICapMessageStore>());
} }
protected virtual ICapMessageStore CreateManager(object context = null, IServiceCollection services = null, Action<IServiceCollection> configureServices = null) protected virtual ICapMessageStore CreateManager(object context = null, IServiceCollection services = null,
Action<IServiceCollection> configureServices = null)
{ {
if (services == null) if (services == null)
{ {
...@@ -109,5 +109,4 @@ namespace DotNetCore.CAP.Test ...@@ -109,5 +109,4 @@ namespace DotNetCore.CAP.Test
Assert.Equal(message, storeMessage); Assert.Equal(message, storeMessage);
} }
} }
} }
\ No newline at end of file
...@@ -3,5 +3,4 @@ using DotNetCore.CAP.Infrastructure; ...@@ -3,5 +3,4 @@ using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Test namespace DotNetCore.CAP.Test
{ {
}
} \ No newline at end of file
...@@ -13,24 +13,28 @@ namespace DotNetCore.CAP.Test ...@@ -13,24 +13,28 @@ namespace DotNetCore.CAP.Test
{ {
public IList<string> LogMessages { get; } = new List<string>(); public IList<string> LogMessages { get; } = new List<string>();
public IDisposable BeginScope<TState>(TState state) { public IDisposable BeginScope<TState>(TState state)
{
LogMessages.Add(state?.ToString()); LogMessages.Add(state?.ToString());
return null; return null;
} }
public bool IsEnabled(LogLevel logLevel) { public bool IsEnabled(LogLevel logLevel)
{
return true; return true;
} }
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter) { public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception,
if (formatter == null) { Func<TState, Exception, string> formatter)
{
if (formatter == null)
{
LogMessages.Add(state.ToString()); LogMessages.Add(state.ToString());
} }
else { else
{
LogMessages.Add(formatter(state, exception)); LogMessages.Add(formatter(state, exception));
} }
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment