Commit 1f2716f9 authored by yangxiaodong's avatar yangxiaodong

refactor.

parent 03dc95fa
...@@ -3,7 +3,7 @@ using System.Linq.Expressions; ...@@ -3,7 +3,7 @@ using System.Linq.Expressions;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions.ModelBinding; using DotNetCore.CAP.Abstractions.ModelBinding;
using Newtonsoft.Json; using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
{ {
...@@ -17,8 +17,8 @@ namespace DotNetCore.CAP.Internal ...@@ -17,8 +17,8 @@ namespace DotNetCore.CAP.Internal
{ {
bindingContext.Model = CreateModel(bindingContext); bindingContext.Model = CreateModel(bindingContext);
} }
bindingContext.Result = JsonConvert.DeserializeObject(bindingContext.Values, bindingContext.ModelType); bindingContext.Result = Helper.FromJson(bindingContext.Values, bindingContext.ModelType);
return Task.CompletedTask; return Task.CompletedTask;
} }
......
...@@ -11,9 +11,9 @@ namespace DotNetCore.CAP.Internal ...@@ -11,9 +11,9 @@ namespace DotNetCore.CAP.Internal
{ {
protected readonly ILogger _logger; protected readonly ILogger _logger;
protected readonly IServiceProvider _serviceProvider; protected readonly IServiceProvider _serviceProvider;
protected readonly ConsumerContext _consumerContext;
private readonly IModelBinder _modelBinder; private readonly IModelBinder _modelBinder;
private readonly ObjectMethodExecutor _executor; private readonly ObjectMethodExecutor _executor;
protected readonly ConsumerContext _consumerContext;
public DefaultConsumerInvoker(ILogger logger, public DefaultConsumerInvoker(ILogger logger,
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
...@@ -30,42 +30,29 @@ namespace DotNetCore.CAP.Internal ...@@ -30,42 +30,29 @@ namespace DotNetCore.CAP.Internal
public Task InvokeAsync() public Task InvokeAsync()
{ {
try using (_logger.BeginScope("consumer invoker begin"))
{ {
using (_logger.BeginScope("consumer invoker begin")) _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.Attribute);
try var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
{
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var value = _consumerContext.DeliverMessage.Content; var value = _consumerContext.DeliverMessage.Content;
if (_executor.MethodParameters.Length > 0) if (_executor.MethodParameters.Length > 0)
{ {
var firstParameter = _executor.MethodParameters[0]; var firstParameter = _executor.MethodParameters[0];
var bindingContext = ModelBindingContext.CreateBindingContext(value, var bindingContext = ModelBindingContext.CreateBindingContext(value,
firstParameter.Name, firstParameter.ParameterType); firstParameter.Name, firstParameter.ParameterType);
_modelBinder.BindModelAsync(bindingContext); _modelBinder.BindModelAsync(bindingContext);
_executor.Execute(obj, bindingContext.Result); _executor.Execute(obj, bindingContext.Result);
}
else
{
_executor.Execute(obj);
}
return Task.CompletedTask;
}
finally
{
_logger.LogDebug("Executed consumer method .");
}
} }
} else
finally {
{ _executor.Execute(obj);
}
return Task.CompletedTask;
} }
} }
} }
......
...@@ -41,15 +41,23 @@ namespace DotNetCore.CAP.Job ...@@ -41,15 +41,23 @@ namespace DotNetCore.CAP.Job
var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted();
if (nextReceivedMessage != null) if (nextReceivedMessage != null)
{ {
var executeDescriptor = matchs[nextReceivedMessage.KeyName]; try
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage); {
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); var executeDescriptor = matchs[nextReceivedMessage.KeyName];
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage);
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing);
await invoker.InvokeAsync(); await invoker.InvokeAsync();
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded);
}
catch (Exception ex)
{
_logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex);
}
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage,StatusName.Succeeded);
} }
} }
} }
......
...@@ -21,6 +21,7 @@ namespace DotNetCore.CAP ...@@ -21,6 +21,7 @@ namespace DotNetCore.CAP
private static Action<ILogger, string, string, Exception> _enqueuingSentMessage; private static Action<ILogger, string, string, Exception> _enqueuingSentMessage;
private static Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage; private static Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage;
private static Action<ILogger, string, Exception> _executingConsumerMethod; private static Action<ILogger, string, Exception> _executingConsumerMethod;
private static Action<ILogger, string, Exception> _receivedMessageRetryExecuting;
static LoggerExtensions() static LoggerExtensions()
{ {
...@@ -78,6 +79,11 @@ namespace DotNetCore.CAP ...@@ -78,6 +79,11 @@ namespace DotNetCore.CAP
LogLevel.Error, LogLevel.Error,
5, 5,
"Consumer method '{methodName}' failed to execute."); "Consumer method '{methodName}' failed to execute.");
_receivedMessageRetryExecuting = LoggerMessage.Define<string>(
LogLevel.Error,
5,
"Received message topic method '{topicName}' failed to execute.");
} }
public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex) public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex)
...@@ -85,6 +91,11 @@ namespace DotNetCore.CAP ...@@ -85,6 +91,11 @@ namespace DotNetCore.CAP
_executingConsumerMethod(logger, methodName, ex); _executingConsumerMethod(logger, methodName, ex);
} }
public static void ReceivedMessageRetryExecutingFailed(this ILogger logger, string topicName, Exception ex)
{
_receivedMessageRetryExecuting(logger, topicName, ex);
}
public static void EnqueuingReceivedMessage(this ILogger logger, string nameKey, string content) public static void EnqueuingReceivedMessage(this ILogger logger, string nameKey, string content)
{ {
_enqueuingReceivdeMessage(logger, nameKey, content, null); _enqueuingReceivdeMessage(logger, nameKey, content, null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment