Commit 34e90105 authored by yangxiaodong's avatar yangxiaodong

add FailedJobProcessor to infinite retry failed messages.

parent 14320e86
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
......@@ -55,6 +56,16 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
// CapReceviedMessage
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
......@@ -89,6 +100,15 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}
public void Dispose()
{
}
......
......@@ -22,9 +22,14 @@ namespace DotNetCore.CAP
}
/// <summary>
/// Productor job polling delay time. Default is 8 sec.
/// Productor job polling delay time. Default is 5 sec.
/// </summary>
public int PollingDelay { get; set; } = 8;
public int PollingDelay { get; set; } = 5;
/// <summary>
/// Failed messages polling delay time. Default is 2 min.
/// </summary>
public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2);
/// <summary>
/// We’ll send a POST request to the URL below with details of any subscribed events.
......
......@@ -47,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection
//Processors
services.AddTransient<PublishQueuer>();
services.AddTransient<SubscribeQueuer>();
services.AddTransient<FailedJobProcessor>();
services.AddTransient<IDispatcher, DefaultDispatcher>();
//Executors
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
......@@ -27,6 +28,11 @@ namespace DotNetCore.CAP
/// </summary>
Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync();
/// <summary>
/// Returns executed failed messages.
/// </summary>
Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages();
// Received messages
/// <summary>
......@@ -46,6 +52,10 @@ namespace DotNetCore.CAP
/// </summary>
Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync();
/// <summary>
/// Returns executed failed message.
/// </summary>
Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages();
//-----------------------------------------
/// <summary>
......
......@@ -117,6 +117,7 @@ namespace DotNetCore.CAP.Processor
returnedProcessors.Add(_provider.GetRequiredService<PublishQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<SubscribeQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<FailedJobProcessor>());
returnedProcessors.Add(_provider.GetRequiredService<IAdditionalProcessor>());
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
{
public class FailedJobProcessor : IProcessor
{
private readonly CapOptions _options;
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
private readonly IStateChanger _stateChanger;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval;
public FailedJobProcessor(
IOptions<CapOptions> options,
ILogger<FailedJobProcessor> logger,
IServiceProvider provider,
IStateChanger stateChanger)
{
_options = options.Value;
_logger = logger;
_provider = provider;
_stateChanger = stateChanger;
_waitingInterval = _options.FailedMessageWaitingInterval;
}
public async Task ProcessAsync(ProcessingContext context)
{
if (context == null)
throw new ArgumentNullException(nameof(context));
using (var scope = _provider.CreateScope())
{
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();
await Task.WhenAll(
ProcessPublishedAsync(connection, context),
ProcessReceivededAsync(connection, context));
DefaultDispatcher.PulseEvent.Set();
await context.WaitAsync(_waitingInterval);
}
}
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetFailedPublishedMessages();
foreach (var message in messages)
{
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync();
}
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
}
private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetFailedReceviedMessages();
foreach (var message in messages)
{
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync();
}
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment