Commit 3785b696 authored by yangxiaodong's avatar yangxiaodong

resolving conflicts

parents f30c905f d646c518
...@@ -35,7 +35,7 @@ bin/ ...@@ -35,7 +35,7 @@ bin/
/.idea /.idea
Properties Properties
/pack.bat /pack.bat
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj
/src/DotNetCore.CAP/packages.config
/src/DotNetCore.CAP/project.json /src/DotNetCore.CAP/project.json
/src/DotNetCore.CAP/packages.config
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj
/NuGet.config /NuGet.config
using System.IO; using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
...@@ -7,22 +8,30 @@ namespace Sample.RabbitMQ.SqlServer ...@@ -7,22 +8,30 @@ namespace Sample.RabbitMQ.SqlServer
{ {
public class Program public class Program
{ {
//var config = new ConfigurationBuilder()
// .AddCommandLine(args)
// .AddEnvironmentVariables("ASPNETCORE_")
// .Build();
//var host = new WebHostBuilder()
// .UseConfiguration(config)
// .UseKestrel()
// .UseContentRoot(Directory.GetCurrentDirectory())
// .UseIISIntegration()
// .UseStartup<Startup>()
// .Build();
//host.Run();
public static void Main(string[] args) public static void Main(string[] args)
{ {
var config = new ConfigurationBuilder() BuildWebHost(args).Run();
.AddCommandLine(args) }
.AddEnvironmentVariables("ASPNETCORE_")
.Build();
var host = new WebHostBuilder() public static IWebHost BuildWebHost(string[] args) =>
.UseConfiguration(config) WebHost.CreateDefaultBuilder(args)
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
.UseStartup<Startup>() .UseStartup<Startup>()
.Build(); .Build();
host.Run();
}
} }
} }
\ No newline at end of file
...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP ...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP
services.AddSingleton(kafkaOptions); services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
} }
} }
} }
\ No newline at end of file
...@@ -23,9 +23,8 @@ namespace DotNetCore.CAP ...@@ -23,9 +23,8 @@ namespace DotNetCore.CAP
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<ConnectionPool>(); services.AddSingleton<ConnectionPool>();
services.AddScoped(x => x.GetService<ConnectionPool>().Rent());
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
} }
} }
} }
\ No newline at end of file
...@@ -10,19 +10,19 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -10,19 +10,19 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IConnection _connection; private readonly ConnectionPool _connectionPool;
private readonly RabbitMQOptions _rabbitMQOptions; private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor( public PublishQueueExecutor(
CapOptions options, CapOptions options,
IStateChanger stateChanger, IStateChanger stateChanger,
IConnection connection, ConnectionPool connectionPool,
RabbitMQOptions rabbitMQOptions, RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger) ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger) : base(options, stateChanger, logger)
{ {
_logger = logger; _logger = logger;
_connection = connection; _connectionPool = connectionPool;
_rabbitMQOptions = rabbitMQOptions; _rabbitMQOptions = rabbitMQOptions;
} }
...@@ -30,7 +30,8 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -30,7 +30,8 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
try try
{ {
using (var channel = _connection.CreateModel()) var connection = _connectionPool.Rent();
using (var channel = connection.CreateModel())
{ {
var body = Encoding.UTF8.GetBytes(content); var body = Encoding.UTF8.GetBytes(content);
......
...@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly IConnection _connection; private readonly IConnection _connection;
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection) public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool)
{ {
_rabbitMQOptions = rabbitMQOptions; _rabbitMQOptions = rabbitMQOptions;
_connection = connection; _connection = pool.Rent();
} }
public IConsumerClient Create(string groupId) public IConsumerClient Create(string groupId)
......
...@@ -19,11 +19,15 @@ namespace DotNetCore.CAP ...@@ -19,11 +19,15 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services) public void AddServices(IServiceCollection services)
{ {
services.AddSingleton<IStorage, SqlServerStorage>(); services.AddSingleton<IStorage, SqlServerStorage>();
services.AddScoped<IStorageConnection, SqlServerStorageConnection>(); services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>(); services.AddTransient<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>(); services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
AddSqlServerOptions(services);
}
private void AddSqlServerOptions(IServiceCollection services)
{
var sqlServerOptions = new SqlServerOptions(); var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions); _configure(sqlServerOptions);
...@@ -32,9 +36,13 @@ namespace DotNetCore.CAP ...@@ -32,9 +36,13 @@ namespace DotNetCore.CAP
{ {
services.AddSingleton(x => services.AddSingleton(x =>
{ {
var dbContext = (DbContext)x.GetService(sqlServerOptions.DbContextType); using (var scope = x.CreateScope())
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; {
return sqlServerOptions; var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
}); });
} }
else else
......
...@@ -82,15 +82,6 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -82,15 +82,6 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
services.AddTransient(service.Key, service.Value); services.AddTransient(service.Key, service.Value);
} }
var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types)
{
if (Helper.IsController(type.GetTypeInfo()))
{
services.AddTransient(typeof(object), type);
}
}
} }
} }
} }
\ No newline at end of file
...@@ -25,14 +25,13 @@ namespace DotNetCore.CAP ...@@ -25,14 +25,13 @@ namespace DotNetCore.CAP
IOptions<CapOptions> options, IOptions<CapOptions> options,
IStorage storage, IStorage storage,
IApplicationLifetime appLifetime, IApplicationLifetime appLifetime,
IServiceProvider provider) IEnumerable<IProcessingServer> servers)
{ {
_logger = logger; _logger = logger;
_appLifetime = appLifetime; _appLifetime = appLifetime;
Options = options.Value; Options = options.Value;
Storage = storage; Storage = storage;
Provider = provider; Servers = servers;
Servers = Provider.GetServices<IProcessingServer>();
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() => _ctsRegistration = appLifetime.ApplicationStopping.Register(() =>
...@@ -55,8 +54,6 @@ namespace DotNetCore.CAP ...@@ -55,8 +54,6 @@ namespace DotNetCore.CAP
protected IEnumerable<IProcessingServer> Servers { get; } protected IEnumerable<IProcessingServer> Servers { get; }
public IServiceProvider Provider { get; private set; }
public Task BootstrapAsync() public Task BootstrapAsync()
{ {
return (_bootstrappingTask = BootstrapTaskAsync()); return (_bootstrappingTask = BootstrapTaskAsync());
......
...@@ -67,17 +67,16 @@ namespace DotNetCore.CAP.Internal ...@@ -67,17 +67,16 @@ namespace DotNetCore.CAP.Internal
IServiceProvider provider) IServiceProvider provider)
{ {
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
// at cap startup time, find all Controller into the DI container,the type is object.
var controllers = provider.GetServices<object>();
foreach (var controller in controllers)
{
var typeInfo = controller.GetType().GetTypeInfo();
//double check
if (!Helper.IsController(typeInfo)) continue;
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); var types = Assembly.GetEntryAssembly().ExportedTypes;
} foreach (var type in types)
{
var typeInfo = type.GetTypeInfo();
if (Helper.IsController(typeInfo))
{
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}
}
return executorDescriptorList; return executorDescriptorList;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment