Commit c61ae151 authored by yangxiaodong's avatar yangxiaodong

refactor storage

parent 20b90339
...@@ -19,7 +19,6 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -19,7 +19,6 @@ namespace Microsoft.Extensions.DependencyInjection
where TContext : DbContext where TContext : DbContext
{ {
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>(); builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>(); builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>(); builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
...@@ -27,15 +26,27 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -27,15 +26,27 @@ namespace Microsoft.Extensions.DependencyInjection
} }
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<EFOptions> options) public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<EFOptions> actionOptions)
where TContext : DbContext where TContext : DbContext
{ {
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>(); builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddSingleton<IStorage, EFStorage>();
builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>(); builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
builder.Services.Configure(options); builder.Services.Configure(actionOptions);
var efOptions = new EFOptions();
actionOptions(efOptions);
builder.Services.AddDbContext<CapDbContext>(options =>
{
options.UseSqlServer(efOptions.ConnectionString, sqlOpts =>
{
sqlOpts.MigrationsHistoryTable(
efOptions.MigrationsHistoryTableName,
efOptions.MigrationsHistoryTableSchema ?? efOptions.Schema);
});
});
return builder; return builder;
} }
......
...@@ -7,11 +7,29 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -7,11 +7,29 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public class EFOptions public class EFOptions
{ {
public const string DefaultSchema = "cap"; public const string DefaultSchema = "cap";
public const string DefaultMigrationsHistoryTableName = "__EFMigrationsHistory";
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
/// <summary> /// <summary>
/// Gets or sets the schema to use when creating database objects. /// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>. /// Default is <see cref="DefaultSchema"/>.
/// </summary> /// </summary>
public string Schema { get; set; } = DefaultSchema; public string Schema { get; set; } = DefaultSchema;
/// <summary>
/// Gets or sets the migrations history table's schema.
/// If this is null, <see cref="Schema"/> will be used.
/// </summary>
public string MigrationsHistoryTableSchema { get; set; }
/// <summary>
/// Gets or sets the migrations history table's name.
/// Default is <see cref="DefaultMigrationsHistoryTableName"/>.
/// </summary>
public string MigrationsHistoryTableName { get; set; } = DefaultMigrationsHistoryTableName;
} }
} }
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
...@@ -25,8 +24,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -25,8 +24,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore
private DbSet<CapSentMessage> SentMessages => Context.Set<CapSentMessage>(); private DbSet<CapSentMessage> SentMessages => Context.Set<CapSentMessage>();
private DbSet<CapReceivedMessage> ReceivedMessages => Context.Set<CapReceivedMessage>();
/// <summary> /// <summary>
/// Creates the specified <paramref name="message"/> in the cap message store. /// Creates the specified <paramref name="message"/> in the cap message store.
/// </summary> /// </summary>
...@@ -39,158 +36,5 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -39,158 +36,5 @@ namespace DotNetCore.CAP.EntityFrameworkCore
await Context.SaveChangesAsync(); await Context.SaveChangesAsync();
return OperateResult.Success; return OperateResult.Success;
} }
public async Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string status,
bool autoSaveChanges = true)
{
Context.Attach(message);
message.LastRun = DateTime.Now;
message.StatusName = status;
try
{
if (autoSaveChanges)
{
await Context.SaveChangesAsync();
}
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(
new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
return OperateResult.Success;
}
/// <summary>
/// First Enqueued Message.
/// </summary>
public async Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
return await SentMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued);
}
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
public async Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Attach(message);
message.LastRun = DateTime.Now;
Context.Update(message);
try
{
await Context.SaveChangesAsync();
return OperateResult.Success;
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
}
/// <summary>
/// Deletes the specified <paramref name="message"/> from the consistency message store.
/// </summary>
/// <param name="message">The message to delete.</param>
public async Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Remove(message);
try
{
await Context.SaveChangesAsync();
return OperateResult.Success;
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
}
/// <summary>
/// Creates the specified <paramref name="message"/> in the consistency message store.
/// </summary>
/// <param name="message">The message to create.</param>
public async Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Add(message);
await Context.SaveChangesAsync();
return OperateResult.Success;
}
public async Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status,
bool autoSaveChanges = true)
{
Context.Attach(message);
message.LastRun = DateTime.Now;
message.StatusName = status;
try
{
if (autoSaveChanges)
{
await Context.SaveChangesAsync();
}
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
return OperateResult.Success;
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted()
{
return await ReceivedMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued);
}
/// <summary>
/// Updates the specified <paramref name="message"/> in the message store.
/// </summary>
/// <param name="message">The message to update.</param>
public async Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Attach(message);
message.LastRun = DateTime.Now;
Context.Update(message);
try
{
await Context.SaveChangesAsync();
return OperateResult.Success;
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
}
} }
} }
\ No newline at end of file
...@@ -16,7 +16,11 @@ ...@@ -16,7 +16,11 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="1.1.2" />
<PackageReference Include="System.ComponentModel.TypeConverter" Version="4.3.0" /> <PackageReference Include="System.ComponentModel.TypeConverter" Version="4.3.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
...@@ -29,17 +29,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore ...@@ -29,17 +29,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public IStorageTransaction CreateTransaction() public IStorageTransaction CreateTransaction()
{ {
return new EFStorageTransaction(this); return new EFStorageTransaction(this);
} }
public Task StoreSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
message.LastRun = NormalizeDateTime(message.LastRun);
_context.Add(message);
return _context.SaveChangesAsync();
}
public Task<CapSentMessage> GetSentMessageAsync(string id) public Task<CapSentMessage> GetSentMessageAsync(string id)
{ {
......
...@@ -5,6 +5,8 @@ using System.Threading; ...@@ -5,6 +5,8 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job; using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
...@@ -16,23 +18,26 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -16,23 +18,26 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
private readonly RabbitMQOptions _rabbitMqOptions; private readonly RabbitMQOptions _rabbitMqOptions;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly IStateChanger _stateChanger;
private readonly IServiceProvider _provider; private readonly IServiceProvider _provider;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay; private readonly TimeSpan _pollingDelay;
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public RabbitJobProcessor( public RabbitJobProcessor(
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
IOptions<RabbitMQOptions> rabbitMQOptions, IOptions<RabbitMQOptions> rabbitMQOptions,
ILogger<RabbitJobProcessor> logger, ILogger<RabbitJobProcessor> logger,
IStateChanger stateChanger,
IServiceProvider provider) IServiceProvider provider)
{ {
_logger = logger; _logger = logger;
_rabbitMqOptions = rabbitMQOptions.Value; _rabbitMqOptions = rabbitMQOptions.Value;
_provider = provider; _provider = provider;
_stateChanger = stateChanger;
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
var capOptions1 = capOptions.Value; var capOptions1 = capOptions.Value;
_pollingDelay = TimeSpan.FromSeconds(capOptions1.PollingDelay); _pollingDelay = TimeSpan.FromSeconds(capOptions1.PollingDelay);
} }
...@@ -62,7 +67,7 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -62,7 +67,7 @@ namespace DotNetCore.CAP.RabbitMQ
var token = GetTokenToWaitOn(context); var token = GetTokenToWaitOn(context);
} }
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay); context.CancellationToken.WaitHandle, _pollingDelay);
} }
finally finally
...@@ -78,39 +83,93 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -78,39 +83,93 @@ namespace DotNetCore.CAP.RabbitMQ
private async Task<bool> Step(ProcessingContext context) private async Task<bool> Step(ProcessingContext context)
{ {
var fetched = default(IFetchedMessage);
using (var scopedContext = context.CreateScope()) using (var scopedContext = context.CreateScope())
{ {
var provider = scopedContext.Provider; var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>(); var messageStore = provider.GetRequiredService<ICapMessageStore>();
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); var connection = provider.GetRequiredService<IStorageConnection>();
try
if ((fetched = await connection.FetchNextSentMessageAsync()) != null)
{ {
if (message != null) using (fetched)
{ {
var sp = Stopwatch.StartNew(); var message = await connection.GetSentMessageAsync(fetched.MessageId);
message.StatusName = StatusName.Processing; try
await messageStore.UpdateSentMessageAsync(message); {
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = ExecuteJob(message.KeyName, message.Content);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateJobForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
return false;
}
ExecuteJob(message.KeyName, message.Content); }
}
}
return fetched != null;
}
sp.Stop(); private async Task<bool> UpdateJobForRetryAsync(CapSentMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
message.StatusName = StatusName.Succeeded; var now = DateTime.UtcNow;
await messageStore.UpdateSentMessageAsync(message); var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
_logger.JobExecuted(sp.Elapsed.TotalSeconds); var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
} message.LastRun = due;
} using (var transaction = connection.CreateTransaction())
catch (Exception ex) {
{ transaction.UpdateMessage(message);
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex); await transaction.CommitAsync();
return false;
}
} }
return true; return true;
} }
private void ExecuteJob(string topic, string content) private OperateResult ExecuteJob(string topic, string content)
{ {
var factory = new ConnectionFactory() var factory = new ConnectionFactory()
{ {
...@@ -124,17 +183,26 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -124,17 +183,26 @@ namespace DotNetCore.CAP.RabbitMQ
SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
}; };
using (var connection = factory.CreateConnection()) try
using (var channel = connection.CreateModel())
{ {
var body = Encoding.UTF8.GetBytes(content); using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE); channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName, channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
routingKey: topic, routingKey: topic,
basicProperties: null, basicProperties: null,
body: body); body: body);
}
return OperateResult.Success;
}
catch (Exception ex)
{
return OperateResult.Failed(ex, new OperateError() { Code = ex.HResult.ToString(), Description = ex.Message });
} }
} }
} }
} }
\ No newline at end of file
...@@ -14,62 +14,5 @@ namespace DotNetCore.CAP ...@@ -14,62 +14,5 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
/// <param name="message">The message to create in the store.</param> /// <param name="message">The message to create in the store.</param>
Task<OperateResult> StoreSentMessageAsync(CapSentMessage message); Task<OperateResult> StoreSentMessageAsync(CapSentMessage message);
/// <summary>
/// Change <see cref="CapSentMessage"/> model status name.
/// </summary>
/// <param name="message">The type of <see cref="CapSentMessage"/>.</param>
/// <param name="statusName">The status name.</param>
/// <param name="autoSaveChanges">auto save dbcontext changes.</param>
/// <returns></returns>
Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName,
bool autoSaveChanges = true);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
/// <returns></returns>
Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync();
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message);
/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message);
/// <summary>
/// Change <see cref="CapReceivedMessage"/> model status name.
/// </summary>
/// <param name="message">The type of <see cref="CapReceivedMessage"/>.</param>
/// <param name="statusName">The status name.</param>
/// <param name="autoSaveChanges">auto save dbcontext changes.</param>
/// <returns></returns>
Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted();
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message);
} }
} }
\ No newline at end of file
...@@ -114,7 +114,7 @@ namespace DotNetCore.CAP ...@@ -114,7 +114,7 @@ namespace DotNetCore.CAP
private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext) private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
{ {
var provider = serviceScope.ServiceProvider; var provider = serviceScope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>(); var messageStore = provider.GetRequiredService<IStorageConnection>();
var receivedMessage = new CapReceivedMessage(messageContext) var receivedMessage = new CapReceivedMessage(messageContext)
{ {
StatusName = StatusName.Enqueued, StatusName = StatusName.Enqueued,
...@@ -126,13 +126,17 @@ namespace DotNetCore.CAP ...@@ -126,13 +126,17 @@ namespace DotNetCore.CAP
private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
{ {
var provider = serviceScope.ServiceProvider; var provider = serviceScope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>(); var messageStore = provider.GetRequiredService<IStorageConnection>();
try try
{ {
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);
if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) if (executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{ {
messageStore.FetchNextReceivedMessageAsync
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait();
// If there are multiple consumers in the same group, we will take the first // If there are multiple consumers in the same group, we will take the first
......
...@@ -10,13 +10,7 @@ namespace DotNetCore.CAP ...@@ -10,13 +10,7 @@ namespace DotNetCore.CAP
public interface IStorageConnection : IDisposable public interface IStorageConnection : IDisposable
{ {
//Sent messages //Sent messages
/// <summary>
/// Stores the message.
/// </summary>
/// <param name="message">The message to store.</param>
Task StoreSentMessageAsync(CapSentMessage message);
/// <summary> /// <summary>
/// Returns the message with the given id. /// Returns the message with the given id.
...@@ -56,7 +50,7 @@ namespace DotNetCore.CAP ...@@ -56,7 +50,7 @@ namespace DotNetCore.CAP
/// <summary> /// <summary>
/// Returns the next message to be enqueued. /// Returns the next message to be enqueued.
/// </summary> /// </summary>
Task<CapSentMessage> GetNextReceviedMessageToBeEnqueuedAsync(); Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync();
//----------------------------------------- //-----------------------------------------
......
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Job
{
public class JobQueuer : IJobProcessor
{
private ILogger _logger;
private JobsOptions _options;
private IStateChanger _stateChanger;
private IServiceProvider _provider;
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
private TimeSpan _pollingDelay;
public JobQueuer(
ILogger<JobQueuer> logger,
JobsOptions options,
IStateChanger stateChanger,
IServiceProvider provider)
{
_logger = logger;
_options = options;
_stateChanger = stateChanger;
_provider = provider;
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay);
}
public async Task ProcessAsync(ProcessingContext context)
{
using (var scope = _provider.CreateScope())
{
CapSentMessage sentMessage;
CapReceivedMessage receivedMessage;
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();
while (
!context.IsStopping &&
(sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null)
{
var state = new EnqueuedState();
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(sentMessage, state, transaction);
await transaction.CommitAsync();
}
}
}
context.ThrowIfStopping();
DelayedJobProcessor.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay);
}
}
}
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public class EnqueuedState : IState
{
public const string StateName = "Enqueued";
public TimeSpan? ExpiresAfter => null;
public string Name => StateName;
public void Apply(CapSentMessage message, IStorageTransaction transaction)
{
transaction.EnqueueMessage(message);
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
transaction.EnqueueMessage(message);
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public class FailedState : IState
{
public const string StateName = "Failed";
public TimeSpan? ExpiresAfter => TimeSpan.FromDays(15);
public string Name => StateName;
public void Apply(CapSentMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public class ProcessingState : IState
{
public const string StateName = "Processing";
public TimeSpan? ExpiresAfter => null;
public string Name => StateName;
public void Apply(CapSentMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public class ScheduledState : IState
{
public const string StateName = "Scheduled";
public TimeSpan? ExpiresAfter => null;
public string Name => StateName;
public void Apply(CapSentMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public class SucceededState : IState
{
public const string StateName = "Succeeded";
public TimeSpan? ExpiresAfter => TimeSpan.FromHours(1);
public string Name => StateName;
public void Apply(CapSentMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public interface IState
{
TimeSpan? ExpiresAfter { get; }
string Name { get; }
void Apply(CapSentMessage message, IStorageTransaction transaction);
void Apply(CapReceivedMessage message, IStorageTransaction transaction);
}
}
\ No newline at end of file
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public class StateChanger : IStateChanger
{
public void ChangeState(CapSentMessage message, IState state, IStorageTransaction transaction)
{
//var now = DateTime.UtcNow;
//if (state.ExpiresAfter != null)
//{
// message.ExpiresAt = now.Add(state.ExpiresAfter.Value);
//}
//else
//{
// message.ExpiresAt = null;
//}
message.StatusName = state.Name;
state.Apply(message, transaction);
transaction.UpdateMessage(message);
}
public void ChangeState(CapReceivedMessage message, IState state, IStorageTransaction transaction)
{
//var now = DateTime.UtcNow;
//if (state.ExpiresAfter != null)
//{
// job.ExpiresAt = now.Add(state.ExpiresAfter.Value);
//}
//else
//{
// job.ExpiresAt = null;
//}
message.StatusName = state.Name;
state.Apply(message, transaction);
transaction.UpdateMessage(message);
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public static class StateChangerExtensions
{
public static async Task ChangeStateAsync(
this IStateChanger @this, CapSentMessage message, IState state, IStorageConnection connection)
{
using (var transaction = connection.CreateTransaction())
{
@this.ChangeState(message, state, transaction);
await transaction.CommitAsync();
}
}
public static async Task ChangeStateAsync(
this IStateChanger @this, CapReceivedMessage message, IState state, IStorageConnection connection)
{
using (var transaction = connection.CreateTransaction())
{
@this.ChangeState(message, state, transaction);
await transaction.CommitAsync();
}
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Job.States
{
public interface IStateChanger
{
void ChangeState(CapSentMessage message, IState state, IStorageTransaction transaction);
void ChangeState(CapReceivedMessage message, IState state, IStorageTransaction transaction);
}
}
\ No newline at end of file
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
namespace DotNetCore.CAP namespace DotNetCore.CAP
...@@ -18,6 +19,8 @@ namespace DotNetCore.CAP ...@@ -18,6 +19,8 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public bool Succeeded { get; set; } public bool Succeeded { get; set; }
public Exception Exception { get; set; }
/// <summary> /// <summary>
/// An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s containing an errors /// An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s containing an errors
/// that occurred during the operation. /// that occurred during the operation.
...@@ -46,6 +49,17 @@ namespace DotNetCore.CAP ...@@ -46,6 +49,17 @@ namespace DotNetCore.CAP
return result; return result;
} }
public static OperateResult Failed(Exception ex, params OperateError[] errors)
{
var result = new OperateResult { Succeeded = false };
result.Exception = ex;
if (errors != null)
{
result._errors.AddRange(errors);
}
return result;
}
/// <summary> /// <summary>
/// Converts the value of the current <see cref="OperateResult"/> object to its equivalent string representation. /// Converts the value of the current <see cref="OperateResult"/> object to its equivalent string representation.
/// </summary> /// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment