Commit 8e41e0d5 authored by Savorboard's avatar Savorboard

add failed message processor.

parent 5af28b60
...@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.Processor ...@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.Processor
_context = new ProcessingContext(_provider, _cts.Token); _context = new ProcessingContext(_provider, _cts.Token);
var processorTasks = _processors var processorTasks = _processors
.Select(p => InfiniteRetry(p)) .Select(InfiniteRetry)
.Select(p => p.ProcessAsync(_context)); .Select(p => p.ProcessAsync(_context));
_compositeTask = Task.WhenAll(processorTasks); _compositeTask = Task.WhenAll(processorTasks);
} }
...@@ -84,10 +84,7 @@ namespace DotNetCore.CAP.Processor ...@@ -84,10 +84,7 @@ namespace DotNetCore.CAP.Processor
private bool AllProcessorsWaiting() private bool AllProcessorsWaiting()
{ {
foreach (var processor in _messageDispatchers) return _messageDispatchers.All(processor => processor.Waiting);
if (!processor.Waiting)
return false;
return true;
} }
private IProcessor InfiniteRetry(IProcessor inner) private IProcessor InfiniteRetry(IProcessor inner)
...@@ -107,7 +104,7 @@ namespace DotNetCore.CAP.Processor ...@@ -107,7 +104,7 @@ namespace DotNetCore.CAP.Processor
returnedProcessors.Add(_provider.GetRequiredService<PublishQueuer>()); returnedProcessors.Add(_provider.GetRequiredService<PublishQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<SubscribeQueuer>()); returnedProcessors.Add(_provider.GetRequiredService<SubscribeQueuer>());
//returnedProcessors.Add(_provider.GetRequiredService<FailedJobProcessor>()); returnedProcessors.Add(_provider.GetRequiredService<FailedProcessor>());
returnedProcessors.Add(_provider.GetRequiredService<IAdditionalProcessor>()); returnedProcessors.Add(_provider.GetRequiredService<IAdditionalProcessor>());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment