Commit aafeb949 authored by Savorboard's avatar Savorboard

optimize consumer related code

parent 323abb7f
...@@ -23,15 +23,9 @@ namespace DotNetCore.CAP.Internal ...@@ -23,15 +23,9 @@ namespace DotNetCore.CAP.Internal
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) public IConsumerInvoker CreateInvoker()
{ {
var context = new ConsumerInvokerContext(consumerContext) return new DefaultConsumerInvoker(_logger, _serviceProvider, _messagePacker, _modelBinderFactory);
{
Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _messagePacker,
_modelBinderFactory, consumerContext)
};
return context.Result;
} }
} }
} }
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
{
internal class CallbackMessageSender : ICallbackMessageSender
{
private readonly ILogger<CallbackMessageSender> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IContentSerializer _contentSerializer;
private readonly IMessagePacker _messagePacker;
public CallbackMessageSender(
ILogger<CallbackMessageSender> logger,
IServiceProvider serviceProvider,
IContentSerializer contentSerializer,
IMessagePacker messagePacker)
{
_logger = logger;
_serviceProvider = serviceProvider;
_contentSerializer = contentSerializer;
_messagePacker = messagePacker;
}
public async Task SendAsync(string messageId, string topicName, object bodyObj)
{
string body = null;
if (bodyObj != null && Helper.IsComplexType(bodyObj.GetType()))
body = _contentSerializer.Serialize(bodyObj);
else
body = bodyObj?.ToString();
var callbackMessage = new CapMessageDto
{
Id = messageId,
Content = body
};
var content = _messagePacker.Pack(callbackMessage);
var publishedMessage = new CapPublishedMessage
{
Name = topicName,
Content = content,
StatusName = StatusName.Scheduled
};
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var callbackPublisher = provider.GetService<ICallbackPublisher>();
await callbackPublisher.PublishAsync(publishedMessage);
}
}
}
}
using System.Threading.Tasks;
namespace DotNetCore.CAP.Internal
{
public interface ICallbackMessageSender
{
Task SendAsync(string messageId, string topicName, object bodyObj);
}
}
\ No newline at end of file
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal; using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -11,8 +9,6 @@ namespace DotNetCore.CAP.Internal ...@@ -11,8 +9,6 @@ namespace DotNetCore.CAP.Internal
{ {
public class DefaultConsumerInvoker : IConsumerInvoker public class DefaultConsumerInvoker : IConsumerInvoker
{ {
private readonly ConsumerContext _consumerContext;
private readonly ObjectMethodExecutor _executor;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IModelBinderFactory _modelBinderFactory; private readonly IModelBinderFactory _modelBinderFactory;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
...@@ -21,99 +17,70 @@ namespace DotNetCore.CAP.Internal ...@@ -21,99 +17,70 @@ namespace DotNetCore.CAP.Internal
public DefaultConsumerInvoker(ILogger logger, public DefaultConsumerInvoker(ILogger logger,
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
IMessagePacker messagePacker, IMessagePacker messagePacker,
IModelBinderFactory modelBinderFactory, IModelBinderFactory modelBinderFactory)
ConsumerContext consumerContext)
{ {
_modelBinderFactory = modelBinderFactory; _modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_messagePacker = messagePacker; _messagePacker = messagePacker;
_logger = logger; _logger = logger;
_consumerContext = consumerContext;
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo,
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
} }
public async Task InvokeAsync() public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context)
{ {
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); _logger.LogDebug("Executing consumer Topic: {0}", context.ConsumerDescriptor.MethodInfo.Name);
var executor = ObjectMethodExecutor.Create(
context.ConsumerDescriptor.MethodInfo,
context.ConsumerDescriptor.ImplTypeInfo);
using (var scope = _serviceProvider.CreateScope()) using (var scope = _serviceProvider.CreateScope())
{ {
var provider = scope.ServiceProvider; var provider = scope.ServiceProvider;
var serviceType = _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType(); var serviceType = context.ConsumerDescriptor.ImplTypeInfo.AsType();
var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType); var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType);
var jsonContent = _consumerContext.DeliverMessage.Content; var jsonContent = context.DeliverMessage.Content;
var message = _messagePacker.UnPack(jsonContent); var message = _messagePacker.UnPack(jsonContent);
object result; object resultObj;
if (_executor.MethodParameters.Length > 0) if (executor.MethodParameters.Length > 0)
result = await ExecuteWithParameterAsync(obj, message.Content); resultObj = await ExecuteWithParameterAsync(executor, obj, message.Content);
else else
result = await ExecuteAsync(obj); resultObj = await ExecuteAsync(executor, obj);
return new ConsumerExecutedResult(resultObj, message.Id, message.CallbackName);
if (!string.IsNullOrEmpty(message.CallbackName))
await SentCallbackMessage(message.Id, message.CallbackName, result);
} }
} }
private async Task<object> ExecuteAsync(object @class) private async Task<object> ExecuteAsync(ObjectMethodExecutor executor, object @class)
{ {
if (_executor.IsMethodAsync) if (executor.IsMethodAsync)
return await _executor.ExecuteAsync(@class); return await executor.ExecuteAsync(@class);
return _executor.Execute(@class); return executor.Execute(@class);
} }
private async Task<object> ExecuteWithParameterAsync(object @class, string parameterString) private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor,
object @class, string parameterString)
{ {
var firstParameter = _executor.MethodParameters[0]; var firstParameter = executor.MethodParameters[0];
try try
{ {
var binder = _modelBinderFactory.CreateBinder(firstParameter); var binder = _modelBinderFactory.CreateBinder(firstParameter);
var bindResult = await binder.BindModelAsync(parameterString); var bindResult = await binder.BindModelAsync(parameterString);
if (bindResult.IsSuccess) if (bindResult.IsSuccess)
{ {
if (_executor.IsMethodAsync) if (executor.IsMethodAsync)
return await _executor.ExecuteAsync(@class, bindResult.Model); return await executor.ExecuteAsync(@class, bindResult.Model);
return _executor.Execute(@class, bindResult.Model); return executor.Execute(@class, bindResult.Model);
} }
throw new MethodBindException( throw new MethodBindException(
$"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} "); $"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} ");
} }
catch (FormatException ex) catch (FormatException ex)
{ {
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, parameterString, _logger.ModelBinderFormattingException(executor.MethodInfo?.Name, firstParameter.Name, parameterString,
ex); ex);
return null; return null;
} }
} }
private async Task SentCallbackMessage(string messageId, string topicName, object bodyObj)
{
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var publisher = provider.GetRequiredService<ICallbackPublisher>();
var serializer = provider.GetService<IContentSerializer>();
var packer = provider.GetService<IMessagePacker>();
var callbackMessage = new CapMessageDto
{
Id = messageId,
Content = serializer.Serialize(bodyObj)
};
var content = packer.Pack(callbackMessage);
var publishedMessage = new CapPublishedMessage
{
Name = topicName,
Content = content,
StatusName = StatusName.Scheduled
};
await publisher.PublishAsync(publishedMessage);
}
}
} }
} }
\ No newline at end of file
using DotNetCore.CAP.Abstractions; namespace DotNetCore.CAP.Internal
namespace DotNetCore.CAP.Internal
{ {
public interface IConsumerInvokerFactory public interface IConsumerInvokerFactory
{ {
IConsumerInvoker CreateInvoker(ConsumerContext actionContext); IConsumerInvoker CreateInvoker();
} }
} }
\ No newline at end of file
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
{ {
......
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -8,19 +7,23 @@ namespace DotNetCore.CAP.Internal ...@@ -8,19 +7,23 @@ namespace DotNetCore.CAP.Internal
{ {
public class DefaultSubscriberExecutor : ISubscriberExecutor public class DefaultSubscriberExecutor : ISubscriberExecutor
{ {
private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly ICallbackMessageSender _callbackMessageSender;
private readonly ILogger<DefaultSubscriberExecutor> _logger; private readonly ILogger<DefaultSubscriberExecutor> _logger;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
public IConsumerInvoker Invoker { get; }
public DefaultSubscriberExecutor(MethodMatcherCache selector, public DefaultSubscriberExecutor(MethodMatcherCache selector,
IConsumerInvokerFactory consumerInvokerFactory, IConsumerInvokerFactory consumerInvokerFactory,
ICallbackMessageSender callbackMessageSender,
ILogger<DefaultSubscriberExecutor> logger) ILogger<DefaultSubscriberExecutor> logger)
{ {
_selector = selector; _selector = selector;
_consumerInvokerFactory = consumerInvokerFactory; _callbackMessageSender = callbackMessageSender;
_logger = logger; _logger = logger;
}
Invoker = consumerInvokerFactory.CreateInvoker();
}
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage receivedMessage) public async Task<OperateResult> ExecuteAsync(CapReceivedMessage receivedMessage)
{ {
...@@ -38,7 +41,10 @@ namespace DotNetCore.CAP.Internal ...@@ -38,7 +41,10 @@ namespace DotNetCore.CAP.Internal
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); var ret = await Invoker.InvokeAsync(consumerContext);
if (!string.IsNullOrEmpty(ret.CallbackName))
await _callbackMessageSender.SendAsync(ret.MessageId,ret.CallbackName,ret.Result);
return OperateResult.Success; return OperateResult.Success;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment