Commit 276b8ee8 authored by Savorboard's avatar Savorboard

1

parent a6f60bf9
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.1" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.1" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="System.ComponentModel.TypeConverter" Version="4.3.0" /> <PackageReference Include="System.ComponentModel.TypeConverter" Version="4.3.0" />
</ItemGroup> </ItemGroup>
......
...@@ -16,9 +16,27 @@ ...@@ -16,9 +16,27 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="1.1.1" /> <Compile Remove="x64\**" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.1" /> <Compile Remove="x86\**" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.1" /> <Content Remove="x64\**" />
<Content Remove="x86\**" />
<EmbeddedResource Remove="x64\**" />
<EmbeddedResource Remove="x86\**" />
<None Remove="x64\**" />
<None Remove="x86\**" />
</ItemGroup>
<ItemGroup>
<Content Remove="C:\Users\yangxiaodong\.nuget\packages\rdkafka.internal.librdkafka\0.9.2-ci-28\build\..\runtimes\win7-x64\native\librdkafka.dll" />
<Content Remove="C:\Users\yangxiaodong\.nuget\packages\rdkafka.internal.librdkafka\0.9.2-ci-28\build\..\runtimes\win7-x64\native\zlib.dll" />
<Content Remove="C:\Users\yangxiaodong\.nuget\packages\rdkafka.internal.librdkafka\0.9.2-ci-28\build\..\runtimes\win7-x86\native\librdkafka.dll" />
<Content Remove="C:\Users\yangxiaodong\.nuget\packages\rdkafka.internal.librdkafka\0.9.2-ci-28\build\..\runtimes\win7-x86\native\zlib.dll" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.2" />
<PackageReference Include="RdKafka" Version="0.9.2-ci-189" /> <PackageReference Include="RdKafka" Version="0.9.2-ci-189" />
</ItemGroup> </ItemGroup>
......
...@@ -13,6 +13,8 @@ namespace Cap.Consistency.Abstractions ...@@ -13,6 +13,8 @@ namespace Cap.Consistency.Abstractions
} }
public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; } public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; }
} }
} }
...@@ -8,6 +8,7 @@ namespace Cap.Consistency.Abstractions ...@@ -8,6 +8,7 @@ namespace Cap.Consistency.Abstractions
{ {
public ConsumerInvokerContext(ConsumerContext consumerContext) { public ConsumerInvokerContext(ConsumerContext consumerContext) {
ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
} }
public ConsumerContext ConsumerContext { get; set; } public ConsumerContext ConsumerContext { get; set; }
......
namespace Cap.Consistency
{
public class BrokerOptions
{
public string HostName { get; set; }
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Routing;
namespace Cap.Consistency.Builder
{
public class ConsistencyMiddleware
{
private readonly ITopicRoute _router;
public ConsistencyMiddleware(ITopicRoute router) {
_router = router;
}
public async Task Invoke() {
var context = new TopicRouteContext();
context.Routes.Add(_router);
await _router.RouteAsync(context);
}
}
}
using System;
using Cap.Consistency;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// Consistence extensions for <see cref="IApplicationBuilder"/>
/// </summary>
public static class BuilderExtensions
{
/// <summary>
/// Enables Consistence for the current application
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder"/> instance this method extends.</returns>
public static IApplicationBuilder UseConsistency(this IApplicationBuilder app) {
if (app == null) {
throw new ArgumentNullException(nameof(app));
}
var marker = app.ApplicationServices.GetService<ConsistencyMarkerService>();
if (marker == null) {
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
}
return app;
}
}
}
\ No newline at end of file
...@@ -13,11 +13,11 @@ ...@@ -13,11 +13,11 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" /> <PackageReference Include="Confluent.Kafka" Version="0.9.5" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.1" /> <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Options" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
</ItemGroup> </ItemGroup>
......
using System;
using System.Reflection;
using System.Collections.Concurrent;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;
namespace Cap.Consistency
{
/// <summary>
/// Helper functions for configuring consistency services.
/// </summary>
public class ConsistencyBuilder
{
/// <summary>
/// Creates a new instance of <see cref="ConsistencyBuilder"/>.
/// </summary>
/// <param name="message">The <see cref="Type"/> to use for the message.</param>
/// <param name="service">The <see cref="IServiceCollection"/> to attach to.</param>
public ConsistencyBuilder(Type message, IServiceCollection service) {
MessageType = message;
Services = service;
}
/// <summary>
/// Gets the <see cref="IServiceCollection"/> services are attached to.
/// </summary>
/// <value>
/// The <see cref="IServiceCollection"/> services are attached to.
/// </value>
public IServiceCollection Services { get; private set; }
/// <summary>
/// Gets the <see cref="Type"/> used for messages.
/// </summary>
/// <value>
/// The <see cref="Type"/> used for messages.
/// </value>
public Type MessageType { get; private set; }
/// <summary>
/// Adds a <see cref="IConsistencyMessageStore{TMessage}"/> for the <seealso cref="MessageType"/>.
/// </summary>
/// <typeparam name="T">The role type held in the store.</typeparam>
/// <returns>The current <see cref="ConsistencyBuilder"/> instance.</returns>
public virtual ConsistencyBuilder AddMessageStore<T>() where T : class {
return AddScoped(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T));
}
public virtual ConsistencyBuilder AddMessageMethodTable() {
var provider = Services.BuildServiceProvider();
var finder = provider.GetRequiredService<QMessageFinder>();
finder.GetQMessageMethods(Services);
return null;
// Services.AddSingleton(serviceType, concreteType);
// return Add(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T));
}
/// <summary>
/// Adds a <see cref="ConsistencyMessageManager{TUser}"/> for the <seealso cref="MessageType"/>.
/// </summary>
/// <typeparam name="TMessageManager">The type of the message manager to add.</typeparam>
/// <returns>The current <see cref="ConsistencyBuilder"/> instance.</returns>
public virtual ConsistencyBuilder AddConsistencyMessageManager<TMessageManager>() where TMessageManager : class {
var messageManagerType = typeof(ConsistencyMessageManager<>).MakeGenericType(MessageType);
var customType = typeof(TMessageManager);
if (messageManagerType == customType ||
!messageManagerType.GetTypeInfo().IsAssignableFrom(customType.GetTypeInfo())) {
throw new InvalidOperationException($"Type {customType.Name} must be derive from ConsistencyMessageManager<{MessageType.Name}>");
}
Services.AddScoped(customType, services => services.GetRequiredService(messageManagerType));
return AddScoped(messageManagerType, customType);
}
private ConsistencyBuilder AddScoped(Type serviceType, Type concreteType) {
Services.AddScoped(serviceType, concreteType);
return this;
}
}
}
\ No newline at end of file
namespace Cap.Consistency
{
/// <summary>
/// Used to verify Consistency service was called on a ServiceCollection
/// </summary>
public class ConsistencyMarkerService { }
}
\ No newline at end of file
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency
{
/// <summary>
/// Provides the APIs for managing message in a persistence store.
/// </summary>
/// <typeparam name="TMessage">The type encapsulating a message.</typeparam>
public class ConsistencyMessageManager<TMessage> : IDisposable where TMessage : class
{
private bool _disposed;
private readonly HttpContext _context;
private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None;
/// <summary>
/// Constructs a new instance of <see cref="ConsistencyMessageManager{TMessage}"/>.
/// </summary>
/// <param name="store">The persistence store the manager will operate over.</param>
/// <param name="services">The <see cref="IServiceProvider"/> used to resolve services.</param>
/// <param name="logger">The logger used to log messages, warnings and errors.</param>
public ConsistencyMessageManager(IConsistencyMessageStore<TMessage> store,
IServiceProvider services,
ILogger<ConsistencyMessageManager<TMessage>> logger) {
if (store == null) {
throw new ArgumentNullException(nameof(store));
}
Store = store;
Logger = logger;
if (services != null) {
_context = services.GetService<IHttpContextAccessor>()?.HttpContext;
}
}
/// <summary>
/// Gets or sets the persistence store the manager operates over.
/// </summary>
/// <value>The persistence store the manager operates over.</value>
protected internal IConsistencyMessageStore<TMessage> Store { get; set; }
/// <summary>
/// Gets the <see cref="ILogger"/> used to log messages from the manager.
/// </summary>
/// <value>
/// The <see cref="ILogger"/> used to log messages from the manager.
/// </value>
protected internal virtual ILogger Logger { get; set; }
/// <summary>
/// Creates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to create.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> CreateAsync(TMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct
return Store.CreateAsync(message, CancellationToken);
}
/// <summary>
/// Updates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to update.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> UpdateAsync(TMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct
return Store.UpdateAsync(message, CancellationToken);
}
/// <summary>
/// Deletes the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to delete.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> DeleteAsync(TMessage message) {
ThrowIfDisposed();
if (message == null) {
throw new ArgumentNullException(nameof(message));
}
return Store.DeleteAsync(message, CancellationToken);
}
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the user matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
public virtual Task<TMessage> FindByIdAsync(string messageId) {
ThrowIfDisposed();
return Store.FindByIdAsync(messageId, CancellationToken);
}
/// <summary>
/// Gets the message identifier for the specified <paramref name="message"/>.
/// </summary>
/// <param name="message">The message whose identifier should be retrieved.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the identifier for the specified <paramref name="message"/>.</returns>
public virtual async Task<string> GetMessageIdAsync(TMessage message) {
ThrowIfDisposed();
return await Store.GetMessageIdAsync(message, CancellationToken);
}
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases the unmanaged resources used by the message manager and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing) {
if (disposing && !_disposed) {
Store.Dispose();
_disposed = true;
}
}
protected void ThrowIfDisposed() {
if (_disposed) {
throw new ObjectDisposedException(GetType().Name);
}
}
}
}
\ No newline at end of file
using Cap.Consistency;
namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// Represents all the options you can use to configure the system.
/// </summary>
public class ConsistencyOptions
{
/// <summary>
/// Gets or sets the <see cref="BrokerOptions"/> for the consistency system.
/// </summary>
public BrokerOptions Broker { get; set; } = new BrokerOptions();
public long MaxPendingEventNumber { get; set; }
public int MaxPendingEventNumber32 {
get {
if (this.MaxPendingEventNumber < int.MaxValue) {
return (int)this.MaxPendingEventNumber;
}
return int.MaxValue;
}
}
}
}
\ No newline at end of file
...@@ -4,7 +4,7 @@ using System.Text; ...@@ -4,7 +4,7 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cap.Consistency.Abstractions; using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
using Cap.Consistency.Route; using Cap.Consistency.Routing;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Consumer namespace Cap.Consistency.Consumer
...@@ -12,6 +12,7 @@ namespace Cap.Consistency.Consumer ...@@ -12,6 +12,7 @@ namespace Cap.Consistency.Consumer
public class ConsumerHandler : IConsumerHandler public class ConsumerHandler : IConsumerHandler
{ {
private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerExcutorSelector _selector; private readonly IConsumerExcutorSelector _selector;
private readonly ILoggerFactory _loggerFactory; private readonly ILoggerFactory _loggerFactory;
...@@ -19,30 +20,35 @@ namespace Cap.Consistency.Consumer ...@@ -19,30 +20,35 @@ namespace Cap.Consistency.Consumer
public ConsumerHandler( public ConsumerHandler(
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory, IConsumerInvokerFactory consumerInvokerFactory,
IConsumerExcutorSelector selector, IConsumerExcutorSelector selector,
ILoggerFactory loggerFactory) { ILoggerFactory loggerFactory) {
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory; _consumerInvokerFactory = consumerInvokerFactory;
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
_selector = selector; _selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>(); _logger = loggerFactory.CreateLogger<ConsumerHandler>();
} }
public Task Start(TopicRouteContext context) { public Task RouteAsync(TopicRouteContext context) {
if (context == null) { if (context == null) {
throw new ArgumentNullException(nameof(context)); throw new ArgumentNullException(nameof(context));
} }
context.ServiceProvider = _serviceProvider;
var matchs = _selector.SelectCandidates(context); var matchs = _selector.SelectCandidates(context);
if (matchs == null || matchs.Count==0) { if (matchs == null || matchs.Count == 0) {
_logger.LogInformation("can not be fond topic route"); _logger.LogInformation("can not be fond topic route");
return Task.CompletedTask; return Task.CompletedTask;
} }
var executeDescriptor = _selector.SelectBestCandidate(context, matchs); var executeDescriptor = _selector.SelectBestCandidate(context, matchs);
context.Handler = c => { context.Handler = c => {
var consumerContext = new ConsumerContext(executeDescriptor); var consumerContext = new ConsumerContext(executeDescriptor);
...@@ -52,17 +58,8 @@ namespace Cap.Consistency.Consumer ...@@ -52,17 +58,8 @@ namespace Cap.Consistency.Consumer
return invoker.InvokeAsync(); return invoker.InvokeAsync();
}; };
return Task.CompletedTask; return Task.CompletedTask;
} }
public void Start(IEnumerable<IConsumerService> consumers) {
throw new NotImplementedException();
}
public void Stop() {
throw new NotImplementedException();
}
} }
} }
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using Cap.Consistency.Routing;
namespace Cap.Consistency.Consumer namespace Cap.Consistency.Consumer
{ {
public interface IConsumerHandler public interface IConsumerHandler : ITopicRoute
{ {
void Start(IEnumerable<IConsumerService> consumers);
void Stop();
} }
} }
...@@ -17,7 +17,6 @@ namespace Cap.Consistency.Consumer.Kafka ...@@ -17,7 +17,6 @@ namespace Cap.Consistency.Consumer.Kafka
} }
public void Start(TopicRouteContext routeContext ) { public void Start(TopicRouteContext routeContext ) {
string brokerList = null;// args[0]; string brokerList = null;// args[0];
......
using System; using System;
using Cap.Consistency; using Cap.Consistency;
using Cap.Consistency.Routing;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
...@@ -27,5 +28,25 @@ namespace Microsoft.AspNetCore.Builder ...@@ -27,5 +28,25 @@ namespace Microsoft.AspNetCore.Builder
return app; return app;
} }
public static IApplicationBuilder UserRouter(this IApplicationBuilder builder, ITopicRoute router) {
if (builder == null) {
throw new ArgumentNullException(nameof(builder));
}
if (router == null) {
throw new ArgumentNullException(nameof(router));
}
var marker = builder.ApplicationServices.GetService<ConsistencyMarkerService>();
if (marker == null) {
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
}
var context = new TopicRouteContext();
}
} }
} }
\ No newline at end of file

// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0016:Use 'throw' expression", Justification = "<Pending>", Scope = "member", Target = "~M:Cap.Consistency.Internal.ConsumerInvoker.#ctor(Microsoft.Extensions.Logging.ILogger,Cap.Consistency.Abstractions.ConsumerContext,Cap.Consistency.Internal.ObjectMethodExecutor)")]
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Cap.Consistency
{
/// <summary>
/// Provides an abstraction for a store which manages consistent message.
/// </summary>
/// <typeparam name="TMessage"></typeparam>
public interface IConsistencyMessageStore<TMessage> : IDisposable where TMessage : class
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
Task<TMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> CreateAsync(TMessage message, CancellationToken cancellationToken);
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> UpdateAsync(TMessage message, CancellationToken cancellationToken);
/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> DeleteAsync(TMessage message, CancellationToken cancellationToken);
/// <summary>
/// Gets the ID for a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message whose ID should be returned.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that contains the ID of the message.</returns>
Task<string> GetMessageIdAsync(TMessage message, CancellationToken cancellationToken);
}
}
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using Cap.Consistency.Abstractions; using Cap.Consistency.Abstractions;
using Cap.Consistency.Route; using Cap.Consistency.Routing;
namespace Cap.Consistency.Infrastructure namespace Cap.Consistency.Infrastructure
{ {
......
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Internal
{
public class ConsumerInvoker : IConsumerInvoker
{
protected readonly ILogger _logger;
protected readonly IServiceProvider _serviceProvider;
private readonly ObjectMethodExecutor _executor;
protected readonly ConsumerContext _consumerContext;
private Dictionary<string, object> _arguments;
public ConsumerInvoker(ILogger logger,
IServiceProvider serviceProvider,
ConsumerContext consumerContext,
ObjectMethodExecutor objectMethodExecutor) {
if (logger == null) {
throw new ArgumentNullException(nameof(logger));
}
if (consumerContext == null) {
throw new ArgumentNullException(nameof(consumerContext));
}
if (objectMethodExecutor == null) {
throw new ArgumentNullException(nameof(objectMethodExecutor));
}
_logger = logger;
_serviceProvider = serviceProvider;
_consumerContext = consumerContext;
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo,
_consumerContext.ConsumerDescriptor.ImplType.GetTypeInfo());
}
public Task InvokeAsync() {
try {
using (_logger.BeginScope("consumer invoker begin")) {
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.Topic);
try {
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplType);
_executor.Execute(obj, null);
return Task.CompletedTask;
}
finally {
_logger.LogDebug("Executed consumer method .");
}
}
}
finally {
}
}
private object _controller;
private async Task InvokeConsumerMethodAsync() {
var controllerContext = _consumerContext;
var executor = _executor;
var controller = _controller;
var arguments = _arguments;
var orderedArguments = ConsumerMethodExecutor.PrepareArguments(arguments, executor);
var logger = _logger;
object result = null;
try {
var returnType = executor.MethodReturnType;
if (returnType == typeof(void)) {
executor.Execute(controller, orderedArguments);
result = new object();
}
else if (returnType == typeof(Task)) {
await (Task)executor.Execute(controller, orderedArguments);
result = new object();
}
//else if (executor.TaskGenericType == typeof(IActionResult)) {
// result = await (Task<IActionResult>)executor.Execute(controller, orderedArguments);
// if (result == null) {
// throw new InvalidOperationException(
// Resources.FormatActionResult_ActionReturnValueCannotBeNull(typeof(IActionResult)));
// }
//}
//else if (executor.IsTypeAssignableFromIActionResult) {
// if (_executor.IsMethodAsync) {
// result = (IActionResult)await _executor.ExecuteAsync(controller, orderedArguments);
// }
// else {
// result = (IActionResult)_executor.Execute(controller, orderedArguments);
// }
// if (result == null) {
// throw new InvalidOperationException(
// Resources.FormatActionResult_ActionReturnValueCannotBeNull(_executor.TaskGenericType ?? returnType));
// }
//}
//else if (!executor.IsMethodAsync) {
// var resultAsObject = executor.Execute(controller, orderedArguments);
// result = resultAsObject as IActionResult ?? new ObjectResult(resultAsObject) {
// DeclaredType = returnType,
// };
//}
//else if (executor.TaskGenericType != null) {
// var resultAsObject = await executor.ExecuteAsync(controller, orderedArguments);
// result = resultAsObject as IActionResult ?? new ObjectResult(resultAsObject) {
// DeclaredType = executor.TaskGenericType,
// };
//}
//else {
// // This will be the case for types which have derived from Task and Task<T> or non Task types.
// throw new InvalidOperationException(Resources.FormatActionExecutor_UnexpectedTaskInstance(
// executor.MethodInfo.Name,
// executor.MethodInfo.DeclaringType));
//}
//_result = result;
// logger.ActionMethodExecuted(controllerContext, result);
}
finally {
}
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Internal
{
public class ConsumerMethodExecutor
{
public static object[] PrepareArguments(
IDictionary<string, object> actionParameters,
ObjectMethodExecutor actionMethodExecutor) {
var declaredParameterInfos = actionMethodExecutor.ActionParameters;
var count = declaredParameterInfos.Length;
if (count == 0) {
return null;
}
var arguments = new object[count];
for (var index = 0; index < count; index++) {
var parameterInfo = declaredParameterInfos[index];
object value;
if (!actionParameters.TryGetValue(parameterInfo.Name, out value)) {
value = actionMethodExecutor.GetDefaultValueForParameter(index);
}
arguments[index] = value;
}
return arguments;
}
}
}
This diff is collapsed.
...@@ -4,10 +4,11 @@ using System.Text; ...@@ -4,10 +4,11 @@ using System.Text;
using System.Linq; using System.Linq;
using Cap.Consistency.Consumer; using Cap.Consistency.Consumer;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using System.Threading.Tasks;
namespace Cap.Consistency namespace Cap.Consistency
{ {
public class KafkaConsistency public class KafkaConsistency:IRoute
{ {
private IServiceProvider _serviceProvider; private IServiceProvider _serviceProvider;
private IEnumerable<IConsumerHandler> _handlers; private IEnumerable<IConsumerHandler> _handlers;
...@@ -29,5 +30,9 @@ namespace Cap.Consistency ...@@ -29,5 +30,9 @@ namespace Cap.Consistency
handler.Stop(); handler.Stop();
} }
} }
}
public async Task Start() {
}
} }
using System;
namespace Cap.Consistency
{
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = true)]
sealed class QMessageAttribute : Attribute
{
public QMessageAttribute(string messageName) {
MessageName = messageName;
}
public string MessageName { get; private set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using Cap.Consistency.Extensions;
using Microsoft.Extensions.DependencyInjection;
namespace Cap.Consistency
{
public class QMessageFinder
{
public ConcurrentDictionary<string, QMessageMethodInfo> GetQMessageMethods(IServiceCollection serviceColloection) {
if (serviceColloection == null) {
throw new ArgumentNullException(nameof(serviceColloection));
}
var qMessageTypes = new ConcurrentDictionary<string, QMessageMethodInfo>();
foreach (var serviceDescriptor in serviceColloection) {
foreach (var method in serviceDescriptor.ServiceType.GetTypeInfo().DeclaredMethods) {
var messageMethodInfo = new QMessageMethodInfo();
if (method.IsPropertyBinding()) {
continue;
}
var qMessageAttr = method.GetCustomAttribute<QMessageAttribute>();
if (qMessageAttr == null) {
continue;
}
messageMethodInfo.MessageName = qMessageAttr.MessageName;
messageMethodInfo.ImplType = method.DeclaringType;
messageMethodInfo.MethodInfo = method;
qMessageTypes.AddOrUpdate(qMessageAttr.MessageName, messageMethodInfo, (x, y) => y);
}
}
return qMessageTypes;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
namespace Cap.Consistency
{
public class QMessageMethodInfo
{
public MethodInfo MethodInfo { get; set; }
public Type ImplType { get; set; }
public string MessageName { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency.Routing
{
public interface ITopicRoute
{
Task RouteAsync(TopicRouteContext context);
}
}
...@@ -6,12 +6,15 @@ using Cap.Consistency.Abstractions; ...@@ -6,12 +6,15 @@ using Cap.Consistency.Abstractions;
using Cap.Consistency.Consumer; using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
namespace Cap.Consistency.Route namespace Cap.Consistency.Routing
{ {
public delegate Task HandlerConsumer(ConsumerExecutorDescriptor context); public delegate Task HandlerConsumer(ConsumerExecutorDescriptor context);
public class TopicRouteContext public class TopicRouteContext
{ {
public TopicRouteContext() {
}
public TopicRouteContext(DeliverMessage message) { public TopicRouteContext(DeliverMessage message) {
Message = message; Message = message;
...@@ -23,9 +26,11 @@ namespace Cap.Consistency.Route ...@@ -23,9 +26,11 @@ namespace Cap.Consistency.Route
public HandlerConsumer Handler { get; set; } public HandlerConsumer Handler { get; set; }
public IList<IConsumerHandler> Consumers { get; set; }
public IServiceProvider ServiceProvider { get; set; } public IServiceProvider ServiceProvider { get; set; }
public IList<IConsumerHandler> Consumers { get; set; } public IList<ITopicRoute> Routes { get; set; }
} }
} }
using System;
using Cap.Consistency;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection.Extensions;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="IServiceCollection"/> for configuring consistence services.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds and configures the consistence services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <returns>An <see cref="ConsistencyBuilder"/> for application services.</returns>
public static ConsistencyBuilder AddConsistency<TMessage>(this IServiceCollection services)
where TMessage : class {
return services.AddConsistency<TMessage>(setupAction: null);
}
/// <summary>
/// Adds and configures the consistence services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <param name="setupAction">An action to configure the <see cref="ConsistencyOptions"/>.</param>
/// <returns>An <see cref="ConsistencyBuilder"/> for application services.</returns>
public static ConsistencyBuilder AddConsistency<TMessage>(this IServiceCollection services, Action<ConsistencyOptions> setupAction)
where TMessage : class {
services.TryAddSingleton<ConsistencyMarkerService>();
services.TryAddScoped<ConsistencyMessageManager<TMessage>, ConsistencyMessageManager<TMessage>>();
services.AddSingleton<QMessageFinder>();
if (setupAction != null) {
services.Configure(setupAction);
}
return new ConsistencyBuilder(typeof(TMessage), services);
}
}
}
\ No newline at end of file
...@@ -23,20 +23,20 @@ ...@@ -23,20 +23,20 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0-preview-20170106-08" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-beta5-build1225" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0-beta5-build3474" /> <PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.1" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Moq" Version="4.6.36-*" /> <PackageReference Include="Moq" Version="4.7.10" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="1.1.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.3.0" /> <PackageReference Include="System.Data.SqlClient" Version="4.3.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="1.1.1" /> <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Testing" Version="1.2.0-preview1-22815" /> <PackageReference Include="Microsoft.AspNetCore.Testing" Version="1.2.0-preview1-22815" />
</ItemGroup> </ItemGroup>
......
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0-preview-20170106-08" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-beta5-build1225" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0-beta5-build3474" /> <PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.1" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Moq" Version="4.6.36-*" /> <PackageReference Include="Moq" Version="4.7.10" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
</ItemGroup> </ItemGroup>
</Project> </Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment