Commit 11a37496 authored by Tim Stubbs's avatar Tim Stubbs Committed by Savorboard

Handle messages retrieval failure (#324)

parent 30b308d7
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class NeedRetryMessageProcessor : IProcessor public class NeedRetryMessageProcessor : IProcessor
{ {
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger<NeedRetryMessageProcessor> _logger;
private readonly IPublishMessageSender _publishMessageSender; private readonly IPublishMessageSender _publishMessageSender;
private readonly ISubscriberExecutor _subscriberExecutor; private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval; private readonly TimeSpan _waitingInterval;
public NeedRetryMessageProcessor( public NeedRetryMessageProcessor(
CapOptions options, CapOptions options,
ILogger<NeedRetryMessageProcessor> logger,
ISubscriberExecutor subscriberExecutor, ISubscriberExecutor subscriberExecutor,
IPublishMessageSender publishMessageSender) IPublishMessageSender publishMessageSender)
{ {
_logger = logger;
_subscriberExecutor = subscriberExecutor; _subscriberExecutor = subscriberExecutor;
_publishMessageSender = publishMessageSender; _publishMessageSender = publishMessageSender;
_waitingInterval = TimeSpan.FromSeconds(options.FailedRetryInterval); _waitingInterval = TimeSpan.FromSeconds(options.FailedRetryInterval);
...@@ -42,7 +48,7 @@ namespace DotNetCore.CAP.Processor ...@@ -42,7 +48,7 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{ {
var messages = await connection.GetPublishedMessagesOfNeedRetry(); var messages = await GetSafelyAsync(() => connection.GetPublishedMessagesOfNeedRetry());
foreach (var message in messages) foreach (var message in messages)
{ {
...@@ -56,7 +62,7 @@ namespace DotNetCore.CAP.Processor ...@@ -56,7 +62,7 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context) private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context)
{ {
var messages = await connection.GetReceivedMessagesOfNeedRetry(); var messages = await GetSafelyAsync(() => connection.GetReceivedMessagesOfNeedRetry());
foreach (var message in messages) foreach (var message in messages)
{ {
...@@ -67,5 +73,19 @@ namespace DotNetCore.CAP.Processor ...@@ -67,5 +73,19 @@ namespace DotNetCore.CAP.Processor
await context.WaitAsync(_delay); await context.WaitAsync(_delay);
} }
} }
private async Task<IEnumerable<T>> GetSafelyAsync<T>(Func<Task<IEnumerable<T>>> getMessagesAsync)
{
try
{
return await getMessagesAsync();
}
catch (Exception ex)
{
_logger.LogWarning(1, ex, "Get messages of type '{messageType}' failed. Retrying...", typeof(T).Name);
return Enumerable.Empty<T>();
}
}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment