Commit 768a62c3 authored by Savorboard's avatar Savorboard

refactor

parent 3ec25195
...@@ -13,7 +13,8 @@ namespace Sample.Kafka ...@@ -13,7 +13,8 @@ namespace Sample.Kafka
{ {
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{ {
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); //optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
} }
} }
} }
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.Kafka; using DotNetCore.CAP.RabbitMQ;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Dapper;
namespace Sample.Kafka.Controllers namespace Sample.Kafka.Controllers
{ {
...@@ -11,10 +12,12 @@ namespace Sample.Kafka.Controllers ...@@ -11,10 +12,12 @@ namespace Sample.Kafka.Controllers
public class ValuesController : Controller, ICapSubscribe public class ValuesController : Controller, ICapSubscribe
{ {
private readonly ICapPublisher _producer; private readonly ICapPublisher _producer;
private readonly AppDbContext _dbContext ;
public ValuesController(ICapPublisher producer) public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{ {
_producer = producer; _producer = producer;
_dbContext = dbContext;
} }
[Route("/")] [Route("/")]
...@@ -33,11 +36,11 @@ namespace Sample.Kafka.Controllers ...@@ -33,11 +36,11 @@ namespace Sample.Kafka.Controllers
} }
[Route("~/send")] [Route("~/send")]
public async Task<IActionResult> SendTopic([FromServices] AppDbContext dbContext) public async Task<IActionResult> SendTopic()
{ {
using (var trans = dbContext.Database.BeginTransaction()) using (var trans = _dbContext.Database.BeginTransaction())
{ {
await _producer.PublishAsync("zzwl.topic.finace.callBack", new Person { Name = "Test", Age = 11 }); await _producer.PublishAsync("zzwl.topic.finace.callBack","");
trans.Commit(); trans.Commit();
} }
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
......
...@@ -28,13 +28,14 @@ namespace Sample.Kafka ...@@ -28,13 +28,14 @@ namespace Sample.Kafka
services.AddCap() services.AddCap()
.AddEntityFrameworkStores<AppDbContext>(x=> { .AddEntityFrameworkStores<AppDbContext>(x=> {
x.ConnectionString = "Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"; //x.ConnectionString = "Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True";
x.ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True";
}) })
.AddRabbitMQ(x => .AddRabbitMQ(x =>
{ {
x.HostName = "192.168.2.206"; x.HostName = "localhost";
x.UserName = "admin"; // x.UserName = "admin";
x.Password = "123123"; // x.Password = "123123";
}); });
//.AddKafka(x => x.Servers = ""); //.AddKafka(x => x.Servers = "");
......
...@@ -23,8 +23,10 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -23,8 +23,10 @@ namespace Microsoft.Extensions.DependencyInjection
builder.Services.AddSingleton<IStorage, EFStorage>(); builder.Services.AddSingleton<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>(); builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
builder.Services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); builder.Services.AddScoped<ICapPublisher, CapPublisher>();
builder.Services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
builder.Services.Configure(actionOptions); builder.Services.Configure(actionOptions);
var sqlServerOptions = new SqlServerOptions(); var sqlServerOptions = new SqlServerOptions();
......
...@@ -12,7 +12,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -12,7 +12,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public EFOptions() public EFOptions()
{ {
ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"; ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True";
} }
/// <summary> /// <summary>
......
...@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </param> /// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder) protected override void OnModelCreating(ModelBuilder modelBuilder)
{ {
//_sqlServerOptions = new SqlServerOptions(); _sqlServerOptions = new SqlServerOptions();
modelBuilder.HasDefaultSchema(_sqlServerOptions.Schema); modelBuilder.HasDefaultSchema(_sqlServerOptions.Schema);
modelBuilder.Entity<CapSentMessage>(b => modelBuilder.Entity<CapSentMessage>(b =>
...@@ -67,7 +67,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -67,7 +67,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{ {
// optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); // optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
} }
} }
} }
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class CapPublisher : ICapPublisher
{
private readonly SqlServerOptions _options;
private readonly IServiceProvider _provider;
private readonly DbContext _dbContext;
public CapPublisher(SqlServerOptions options, IServiceProvider provider)
{
_options = options;
_provider = provider;
_dbContext = (DbContext)_provider.GetService(_options.DbContextType);
}
public async Task PublishAsync(string topic, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
var message = new CapSentMessage
{
KeyName = topic,
Content = content,
StatusName = StatusName.Scheduled
};
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await connection.ExecuteAsync(sql, message, transaction: dbTransaction);
PublishQueuer.PulseEvent.Set();
}
public Task PublishAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException();
}
}
}
...@@ -89,7 +89,7 @@ WHERE StatusName = '{StatusName.Scheduled}'"; ...@@ -89,7 +89,7 @@ WHERE StatusName = '{StatusName.Scheduled}'";
var sql = $@" var sql = $@"
SELECT TOP (1) * SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapReceivedMessages)}] WITH (readpast) FROM [{_options.Schema}].[{nameof(CapDbContext.CapReceivedMessages)}] WITH (readpast)
WHERE StateName = '{StatusName.Enqueued}'"; WHERE StatusName = '{StatusName.Enqueued}'";
var connection = _context.GetDbConnection(); var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapReceivedMessage>(sql)).FirstOrDefault(); var message = (await connection.QueryAsync<CapReceivedMessage>(sql)).FirstOrDefault();
......
...@@ -55,14 +55,14 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -55,14 +55,14 @@ namespace Microsoft.Extensions.DependencyInjection
//Processors //Processors
services.AddTransient<PublishQueuer>(); services.AddTransient<PublishQueuer>();
services.AddTransient<SubscribeQueuer>(); services.AddTransient<SubscribeQueuer>();
services.AddTransient<IMessageProcessor, DefaultMessageProcessor>(); services.AddTransient<IDispatcher, DefaultDispatcher>();
//Executors //Executors
services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>(); services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>();
services.AddSingleton<IQueueExecutor, SubscibeQueueExecutor>(); services.AddSingleton<IQueueExecutor, SubscibeQueueExecutor>();
services.TryAddScoped<ICapPublisher, DefaultCapPublisher>(); // services.TryAddScoped<ICapPublisher, DefaultCapPublisher>();
return new CapBuilder(services); return new CapBuilder(services);
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
<ItemGroup> <ItemGroup>
<None Include="IQueueExecutor.Subscibe.cs" /> <None Include="IQueueExecutor.Subscibe.cs" />
<None Include="Processor\IProcessor.Message.Default.cs" /> <None Include="Processor\IDispatcher.Default.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -8,7 +8,7 @@ using Microsoft.Extensions.Options; ...@@ -8,7 +8,7 @@ using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class DefaultMessageProcessor : IMessageProcessor public class DefaultDispatcher : IDispatcher
{ {
private readonly IQueueExecutorFactory _queueExecutorFactory; private readonly IQueueExecutorFactory _queueExecutorFactory;
private readonly IServiceProvider _provider; private readonly IServiceProvider _provider;
...@@ -19,11 +19,11 @@ namespace DotNetCore.CAP.Processor ...@@ -19,11 +19,11 @@ namespace DotNetCore.CAP.Processor
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public DefaultMessageProcessor( public DefaultDispatcher(
IServiceProvider provider, IServiceProvider provider,
IQueueExecutorFactory queueExecutorFactory, IQueueExecutorFactory queueExecutorFactory,
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
ILogger<DefaultMessageProcessor> logger) ILogger<DefaultDispatcher> logger)
{ {
_logger = logger; _logger = logger;
_queueExecutorFactory = queueExecutorFactory; _queueExecutorFactory = queueExecutorFactory;
......
...@@ -4,7 +4,7 @@ using System.Text; ...@@ -4,7 +4,7 @@ using System.Text;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public interface IMessageProcessor : IProcessor public interface IDispatcher : IProcessor
{ {
bool Waiting { get; } bool Waiting { get; }
} }
......
...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Processor ...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Processor
private readonly CapOptions _options; private readonly CapOptions _options;
private IProcessor[] _processors; private IProcessor[] _processors;
private IList<IMessageProcessor> _messageProcessors; private IList<IDispatcher> _messageDispatchers;
private ProcessingContext _context; private ProcessingContext _context;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
...@@ -34,6 +34,7 @@ namespace DotNetCore.CAP.Processor ...@@ -34,6 +34,7 @@ namespace DotNetCore.CAP.Processor
_provider = provider; _provider = provider;
_options = options.Value; _options = options.Value;
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
_messageDispatchers = new List<IDispatcher>();
} }
public void Start() public void Start()
...@@ -90,7 +91,7 @@ namespace DotNetCore.CAP.Processor ...@@ -90,7 +91,7 @@ namespace DotNetCore.CAP.Processor
private bool AllProcessorsWaiting() private bool AllProcessorsWaiting()
{ {
foreach (var processor in _messageProcessors) foreach (var processor in _messageDispatchers)
{ {
if (!processor.Waiting) if (!processor.Waiting)
{ {
...@@ -110,10 +111,10 @@ namespace DotNetCore.CAP.Processor ...@@ -110,10 +111,10 @@ namespace DotNetCore.CAP.Processor
var returnedProcessors = new List<IProcessor>(); var returnedProcessors = new List<IProcessor>();
for (int i = 0; i < processorCount; i++) for (int i = 0; i < processorCount; i++)
{ {
var messageProcessors = _provider.GetService<IMessageProcessor>(); var messageProcessors = _provider.GetService<IDispatcher>();
_messageProcessors.Add(messageProcessors); _messageDispatchers.Add(messageProcessors);
} }
returnedProcessors.AddRange(_messageProcessors); returnedProcessors.AddRange(_messageDispatchers);
returnedProcessors.Add(_provider.GetService<PublishQueuer>()); returnedProcessors.Add(_provider.GetService<PublishQueuer>());
returnedProcessors.Add(_provider.GetService<SubscribeQueuer>()); returnedProcessors.Add(_provider.GetService<SubscribeQueuer>());
......
...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor ...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor
context.ThrowIfStopping(); context.ThrowIfStopping();
DefaultMessageProcessor.PulseEvent.Set(); DefaultDispatcher.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent, await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay); context.CancellationToken.WaitHandle, _pollingDelay);
......
...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor ...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor
context.ThrowIfStopping(); context.ThrowIfStopping();
DefaultMessageProcessor.PulseEvent.Set(); DefaultDispatcher.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent, await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay); context.CancellationToken.WaitHandle, _pollingDelay);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment