Commit 05125fc6 authored by yangxiaodong's avatar yangxiaodong

Refactor.

parent 00f42253
...@@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Kafka
{ {
public class KafkaJobProcessor : IJobProcessor public class KafkaJobProcessor : IJobProcessor
{ {
private readonly ConsistencyOptions _options; private readonly CapOptions _options;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider; private readonly IServiceProvider _provider;
...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP.Kafka
private TimeSpan _pollingDelay; private TimeSpan _pollingDelay;
public KafkaJobProcessor( public KafkaJobProcessor(
IOptions<ConsistencyOptions> options, IOptions<CapOptions> options,
ILogger<KafkaJobProcessor> logger, ILogger<KafkaJobProcessor> logger,
IServiceProvider provider) IServiceProvider provider)
{ {
......
...@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
public class RabbitMQProducerClient : ICapProducerService public class RabbitMQProducerClient : ICapProducerService
{ {
private readonly ConsistencyOptions _options; private readonly CapOptions _options;
private readonly ILogger _logger; private readonly ILogger _logger;
public RabbitMQProducerClient(IOptions<ConsistencyOptions> options, ILoggerFactory loggerFactory) public RabbitMQProducerClient(IOptions<CapOptions> options, ILoggerFactory loggerFactory)
{ {
_options = options.Value; _options = options.Value;
_logger = loggerFactory.CreateLogger(nameof(RabbitMQProducerClient)); _logger = loggerFactory.CreateLogger(nameof(RabbitMQProducerClient));
......
using System.Collections.Generic;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Defines an interface for selecting an cosumer service method to invoke for the current message.
/// </summary>
public interface IConsumerServiceSelector
{
/// <summary>
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with
/// <paramref name="context"/>.
/// </summary>
/// <param name="context">The <see cref="CapStartContext"/> associated with the current message.</param>
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns>
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(CapStartContext context);
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
/// current message associated with <paramref name="context"/>.
/// </summary>
/// <param name="key">topic or exchange router key.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor"/> candidates.</param>
/// <returns></returns>
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
}
}
\ No newline at end of file
...@@ -5,7 +5,7 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -5,7 +5,7 @@ namespace DotNetCore.CAP.Infrastructure
/// <summary> /// <summary>
/// Represents all the options you can use to configure the system. /// Represents all the options you can use to configure the system.
/// </summary> /// </summary>
public class ConsistencyOptions public class CapOptions
{ {
public string BrokerUrlList { get; set; } = "localhost:9092"; public string BrokerUrlList { get; set; } = "localhost:9092";
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Reflection; using System.Reflection;
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding; using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
...@@ -22,7 +23,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -22,7 +23,7 @@ namespace Microsoft.Extensions.DependencyInjection
/// <returns>An <see cref="CapBuilder"/> for application services.</returns> /// <returns>An <see cref="CapBuilder"/> for application services.</returns>
public static CapBuilder AddConsistency(this IServiceCollection services) public static CapBuilder AddConsistency(this IServiceCollection services)
{ {
services.AddConsistency(x => new ConsistencyOptions()); services.AddConsistency(x => new CapOptions());
return new CapBuilder(services); return new CapBuilder(services);
} }
...@@ -31,18 +32,18 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -31,18 +32,18 @@ namespace Microsoft.Extensions.DependencyInjection
/// Adds and configures the consistence services for the consitence. /// Adds and configures the consistence services for the consitence.
/// </summary> /// </summary>
/// <param name="services">The services available in the application.</param> /// <param name="services">The services available in the application.</param>
/// <param name="setupAction">An action to configure the <see cref="ConsistencyOptions"/>.</param> /// <param name="setupAction">An action to configure the <see cref="CapOptions"/>.</param>
/// <returns>An <see cref="CapBuilder"/> for application services.</returns> /// <returns>An <see cref="CapBuilder"/> for application services.</returns>
public static CapBuilder AddConsistency( public static CapBuilder AddConsistency(
this IServiceCollection services, this IServiceCollection services,
Action<ConsistencyOptions> setupAction) Action<CapOptions> setupAction)
{ {
services.TryAddSingleton<CapMarkerService>(); services.TryAddSingleton<CapMarkerService>();
services.Configure(setupAction); services.Configure(setupAction);
AddConsumerServices(services); AddConsumerServices(services);
services.TryAddSingleton<IConsumerExcutorSelector, ConsumerExcutorSelector>(); services.TryAddSingleton<IConsumerServiceSelector, DefaultConsumerServiceSelector>();
services.TryAddSingleton<IModelBinder, DefaultModelBinder>(); services.TryAddSingleton<IModelBinder, DefaultModelBinder>();
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>(); services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>(); services.TryAddSingleton<MethodMatcherCache>();
......
...@@ -3,13 +3,13 @@ using System.Threading; ...@@ -3,13 +3,13 @@ using System.Threading;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
public class TopicContext public class CapStartContext
{ {
public TopicContext() public CapStartContext()
{ {
} }
public TopicContext(IServiceProvider provider, CancellationToken cancellationToken) public CapStartContext(IServiceProvider provider, CancellationToken cancellationToken)
{ {
ServiceProvider = provider; ServiceProvider = provider;
CancellationToken = cancellationToken; CancellationToken = cancellationToken;
......
...@@ -17,7 +17,7 @@ namespace DotNetCore.CAP ...@@ -17,7 +17,7 @@ namespace DotNetCore.CAP
private Task _bootstrappingTask; private Task _bootstrappingTask;
public DefaultBootstrapper( public DefaultBootstrapper(
IOptions<ConsistencyOptions> options, IOptions<CapOptions> options,
ICapMessageStore storage, ICapMessageStore storage,
IApplicationLifetime appLifetime, IApplicationLifetime appLifetime,
IServiceProvider provider) IServiceProvider provider)
...@@ -41,7 +41,7 @@ namespace DotNetCore.CAP ...@@ -41,7 +41,7 @@ namespace DotNetCore.CAP
}); });
} }
protected ConsistencyOptions Options { get; } protected CapOptions Options { get; }
protected ICapMessageStore Storage { get; } protected ICapMessageStore Storage { get; }
......
...@@ -20,13 +20,13 @@ namespace DotNetCore.CAP ...@@ -20,13 +20,13 @@ namespace DotNetCore.CAP
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly ConsistencyOptions _options; private readonly CapOptions _options;
private readonly ICapMessageStore _messageStore; private readonly ICapMessageStore _messageStore;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
public event EventHandler<ConsistencyMessage> MessageReceieved; public event EventHandler<ConsistencyMessage> MessageReceieved;
private TopicContext _context; private CapStartContext _context;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
...@@ -37,7 +37,7 @@ namespace DotNetCore.CAP ...@@ -37,7 +37,7 @@ namespace DotNetCore.CAP
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
ICapMessageStore messageStore, ICapMessageStore messageStore,
MethodMatcherCache selector, MethodMatcherCache selector,
IOptions<ConsistencyOptions> options) { IOptions<CapOptions> options) {
_selector = selector; _selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>(); _logger = loggerFactory.CreateLogger<ConsumerHandler>();
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
...@@ -54,7 +54,7 @@ namespace DotNetCore.CAP ...@@ -54,7 +54,7 @@ namespace DotNetCore.CAP
} }
public void Start() { public void Start() {
_context = new TopicContext(_serviceProvider, _cts.Token); _context = new CapStartContext(_serviceProvider, _cts.Token);
var matchs = _selector.GetCandidatesMethods(_context); var matchs = _selector.GetCandidatesMethods(_context);
......
using System.Collections.Generic;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Infrastructure
{
public interface IConsumerExcutorSelector
{
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicContext context);
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor);
}
}
\ No newline at end of file
...@@ -3,26 +3,39 @@ using System.Collections.Generic; ...@@ -3,26 +3,39 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
{ {
public class ConsumerExcutorSelector : IConsumerExcutorSelector /// <summary>
/// A default <see cref="IConsumerServiceSelector"/> implementation.
/// </summary>
public class DefaultConsumerServiceSelector : IConsumerServiceSelector
{ {
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
public ConsumerExcutorSelector(IServiceProvider serviceProvider) /// <summary>
/// Creates a new <see cref="DefaultConsumerServiceSelector"/>.
/// </summary>
/// <param name="serviceProvider"></param>
public DefaultConsumerServiceSelector(IServiceProvider serviceProvider)
{ {
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) /// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="executeDescriptor"></param>
/// <returns></returns>
public ConsumerExecutorDescriptor SelectBestCandidate(string key,
IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{ {
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key); return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
} }
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicContext context) public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(CapStartContext context)
{ {
var consumerServices = context.ServiceProvider.GetServices<IConsumerService>(); var consumerServices = context.ServiceProvider.GetServices<IConsumerService>();
......
...@@ -7,14 +7,14 @@ namespace DotNetCore.CAP.Internal ...@@ -7,14 +7,14 @@ namespace DotNetCore.CAP.Internal
{ {
public class MethodMatcherCache public class MethodMatcherCache
{ {
private readonly IConsumerExcutorSelector _selector; private readonly IConsumerServiceSelector _selector;
public MethodMatcherCache(IConsumerExcutorSelector selector) public MethodMatcherCache(IConsumerServiceSelector selector)
{ {
_selector = selector; _selector = selector;
} }
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(TopicContext routeContext) public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(CapStartContext routeContext)
{ {
if (Entries.Count == 0) if (Entries.Count == 0)
{ {
......
...@@ -5,9 +5,9 @@ namespace DotNetCore.CAP.Job ...@@ -5,9 +5,9 @@ namespace DotNetCore.CAP.Job
{ {
public class DefaultCronJobRegistry : CronJobRegistry public class DefaultCronJobRegistry : CronJobRegistry
{ {
private readonly ConsistencyOptions _options; private readonly CapOptions _options;
public DefaultCronJobRegistry(IOptions<ConsistencyOptions> options) public DefaultCronJobRegistry(IOptions<CapOptions> options)
{ {
_options = options.Value; _options = options.Value;
......
...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP ...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP
private IServiceProvider _provider; private IServiceProvider _provider;
private IJobProcessor[] _processors; private IJobProcessor[] _processors;
private CancellationTokenSource _cts; private CancellationTokenSource _cts;
private ConsistencyOptions _options; private CapOptions _options;
private ProcessingContext _context; private ProcessingContext _context;
private DefaultCronJobRegistry _defaultJobRegistry; private DefaultCronJobRegistry _defaultJobRegistry;
private Task _compositeTask; private Task _compositeTask;
...@@ -29,7 +29,7 @@ namespace DotNetCore.CAP ...@@ -29,7 +29,7 @@ namespace DotNetCore.CAP
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
IServiceProvider provider, IServiceProvider provider,
DefaultCronJobRegistry defaultJobRegistry, DefaultCronJobRegistry defaultJobRegistry,
IOptions<ConsistencyOptions> options) IOptions<CapOptions> options)
{ {
_logger = logger; _logger = logger;
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment