Commit 7a7370b5 authored by yangxiaodong's avatar yangxiaodong

add feature of #24

parent 42c856fc
...@@ -12,13 +12,15 @@ namespace DotNetCore.CAP.Kafka ...@@ -12,13 +12,15 @@ namespace DotNetCore.CAP.Kafka
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions; private readonly KafkaOptions _kafkaOptions;
public PublishQueueExecutor(IStateChanger stateChanger, public PublishQueueExecutor(
KafkaOptions options, CapOptions options,
IStateChanger stateChanger,
KafkaOptions kafkaOptions,
ILogger<PublishQueueExecutor> logger) ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger) : base(options, stateChanger, logger)
{ {
_logger = logger; _logger = logger;
_kafkaOptions = options; _kafkaOptions = kafkaOptions;
} }
public override Task<OperateResult> PublishAsync(string keyName, string content) public override Task<OperateResult> PublishAsync(string keyName, string content)
......
...@@ -13,11 +13,13 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -13,11 +13,13 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly IConnection _connection; private readonly IConnection _connection;
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(IStateChanger stateChanger, public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
IConnection connection, IConnection connection,
RabbitMQOptions rabbitMQOptions, RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger) ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger) : base(options, stateChanger, logger)
{ {
_logger = logger; _logger = logger;
_connection = connection; _connection = connection;
......
...@@ -20,27 +20,48 @@ namespace DotNetCore.CAP ...@@ -20,27 +20,48 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public const int DefaultQueueProcessorCount = 2; public const int DefaultQueueProcessorCount = 2;
/// <summary>
/// Default successed message expriation timespan, in seconds.
/// </summary>
public const int DefaultSuccessMessageExpirationTimeSpan = 3600;
/// <summary>
/// Failed message retry waiting interval.
/// </summary>
public const int DefaultFailedMessageWaitingInterval = 180;
public CapOptions() public CapOptions()
{ {
PollingDelay = DefaultPollingDelay; PollingDelay = DefaultPollingDelay;
QueueProcessorCount = DefaultQueueProcessorCount; QueueProcessorCount = DefaultQueueProcessorCount;
SuccessedMessageExpiredTimeSpan = DefaultSuccessMessageExpirationTimeSpan;
FailedMessageWaitingInterval = DefaultFailedMessageWaitingInterval;
Extensions = new List<ICapOptionsExtension>(); Extensions = new List<ICapOptionsExtension>();
} }
/// <summary> /// <summary>
/// Productor job polling delay time. Default is 15 sec. /// Productor job polling delay time.
/// Default is 15 sec.
/// </summary> /// </summary>
public int PollingDelay { get; set; } public int PollingDelay { get; set; }
/// <summary> /// <summary>
/// Gets or sets the messages queue (Cap.Queue table) processor count. /// Gets or sets the messages queue (Cap.Queue table) processor count.
/// Default is 2 processor.
/// </summary> /// </summary>
public int QueueProcessorCount { get; set; } public int QueueProcessorCount { get; set; }
/// <summary> /// <summary>
/// Failed messages polling delay time. Default is 3 min. /// Sent or received successed message due timespan, then the message will be deleted at due time.
/// Dafault is 3600 seconds.
/// </summary>
public int SuccessedMessageExpiredTimeSpan { get; set; }
/// <summary>
/// Failed messages polling delay time.
/// Default is 180 seconds.
/// </summary> /// </summary>
public int FailedMessageWaitingInterval { get; set; } = (int)TimeSpan.FromMinutes(3).TotalSeconds; public int FailedMessageWaitingInterval { get; set; }
/// <summary> /// <summary>
/// We’ll invoke this call-back with message type,name,content when requeue failed message. /// We’ll invoke this call-back with message type,name,content when requeue failed message.
......
...@@ -10,12 +10,16 @@ namespace DotNetCore.CAP ...@@ -10,12 +10,16 @@ namespace DotNetCore.CAP
{ {
public abstract class BasePublishQueueExecutor : IQueueExecutor public abstract class BasePublishQueueExecutor : IQueueExecutor
{ {
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;
private readonly ILogger _logger; private readonly ILogger _logger;
protected BasePublishQueueExecutor(IStateChanger stateChanger, protected BasePublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger) ILogger<BasePublishQueueExecutor> logger)
{ {
_options = options;
_stateChanger = stateChanger; _stateChanger = stateChanger;
_logger = logger; _logger = logger;
} }
...@@ -54,7 +58,7 @@ namespace DotNetCore.CAP ...@@ -54,7 +58,7 @@ namespace DotNetCore.CAP
} }
else else
{ {
newState = new SucceededState(); newState = new SucceededState(_options.SuccessedMessageExpiredTimeSpan);
} }
await _stateChanger.ChangeStateAsync(message, newState, connection); await _stateChanger.ChangeStateAsync(message, newState, connection);
......
...@@ -15,16 +15,18 @@ namespace DotNetCore.CAP ...@@ -15,16 +15,18 @@ namespace DotNetCore.CAP
private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
public SubscibeQueueExecutor( public SubscibeQueueExecutor(
IStateChanger stateChanger, IStateChanger stateChanger,
MethodMatcherCache selector, MethodMatcherCache selector,
CapOptions options,
IConsumerInvokerFactory consumerInvokerFactory, IConsumerInvokerFactory consumerInvokerFactory,
ILogger<BasePublishQueueExecutor> logger) ILogger<BasePublishQueueExecutor> logger)
{ {
_selector = selector; _selector = selector;
_options = options;
_consumerInvokerFactory = consumerInvokerFactory; _consumerInvokerFactory = consumerInvokerFactory;
_stateChanger = stateChanger; _stateChanger = stateChanger;
_logger = logger; _logger = logger;
...@@ -62,7 +64,7 @@ namespace DotNetCore.CAP ...@@ -62,7 +64,7 @@ namespace DotNetCore.CAP
} }
else else
{ {
newState = new SucceededState(); newState = new SucceededState(_options.SuccessedMessageExpiredTimeSpan);
} }
await _stateChanger.ChangeStateAsync(message, newState, connection); await _stateChanger.ChangeStateAsync(message, newState, connection);
......
...@@ -7,10 +7,20 @@ namespace DotNetCore.CAP.Processor.States ...@@ -7,10 +7,20 @@ namespace DotNetCore.CAP.Processor.States
{ {
public const string StateName = "Succeeded"; public const string StateName = "Succeeded";
public TimeSpan? ExpiresAfter => TimeSpan.FromHours(1); public TimeSpan? ExpiresAfter { get; private set; }
public string Name => StateName; public string Name => StateName;
public SucceededState()
{
ExpiresAfter = TimeSpan.FromHours(1);
}
public SucceededState(int ExpireAfterSeconds)
{
ExpiresAfter = TimeSpan.FromSeconds(ExpireAfterSeconds);
}
public void Apply(CapPublishedMessage message, IStorageTransaction transaction) public void Apply(CapPublishedMessage message, IStorageTransaction transaction)
{ {
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment