Commit 52ec7934 authored by Savorboard's avatar Savorboard

add failed call back.

parent a369ad96
...@@ -53,14 +53,32 @@ namespace DotNetCore.CAP.Processor ...@@ -53,14 +53,32 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{ {
var messages = await connection.GetFailedPublishedMessages(); var messages = await connection.GetFailedPublishedMessages();
var hasException = false;
foreach (var message in messages) foreach (var message in messages)
{ {
if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(Models.MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
_stateChanger.ChangeState(message, new EnqueuedState(), transaction); _stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync(); await transaction.CommitAsync();
} }
context.ThrowIfStopping(); context.ThrowIfStopping();
await context.WaitAsync(_delay); await context.WaitAsync(_delay);
} }
} }
...@@ -68,14 +86,32 @@ namespace DotNetCore.CAP.Processor ...@@ -68,14 +86,32 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context) private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context)
{ {
var messages = await connection.GetFailedReceviedMessages(); var messages = await connection.GetFailedReceviedMessages();
var hasException = false;
foreach (var message in messages) foreach (var message in messages)
{ {
if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(Models.MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
_stateChanger.ChangeState(message, new EnqueuedState(), transaction); _stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync(); await transaction.CommitAsync();
} }
context.ThrowIfStopping(); context.ThrowIfStopping();
await context.WaitAsync(_delay); await context.WaitAsync(_delay);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment