Commit 3f50de25 authored by Savorboard's avatar Savorboard

Refactoring inmemory storage implementation for version 3.0

parent 004ed569
......@@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.InMemoryStorage;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
......@@ -13,14 +13,10 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IStorage, InMemoryStorage.InMemoryStorage>();
services.AddSingleton<IStorageConnection, InMemoryStorageConnection>();
services.AddSingleton<ICapPublisher, InMemoryPublisher>();
services.AddSingleton<ICallbackPublisher>(x => (InMemoryPublisher)x.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, InMemoryCollectProcessor>();
services.AddTransient<CapTransactionBase, InMemoryCapTransaction>();
services.AddSingleton<IDataStorage, InMemoryStorage.InMemoryStorage>();
services.AddSingleton<IStorageInitializer, InMemoryStorageInitializer>();
}
}
}
\ No newline at end of file
......@@ -11,10 +11,6 @@
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AsyncEnumerator" Version="3.0.0-beta1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryPublisher : CapPublisherBase, ICallbackPublisher
{
public InMemoryPublisher(IServiceProvider provider) : base(provider)
{
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
var connection = (InMemoryStorageConnection)ServiceProvider.GetService<IStorageConnection>();
connection.PublishedMessages.Add(message);
return Task.CompletedTask;
}
}
}
\ No newline at end of file
......@@ -2,9 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
using System.Threading;
using System.Threading.Tasks;
namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryCapTransaction : CapTransactionBase
internal class InMemoryCapTransaction : CapTransactionBase
{
public InMemoryCapTransaction(IDispatcher dispatcher) : base(dispatcher)
{
......@@ -15,11 +19,21 @@ namespace DotNetCore.CAP
Flush();
}
public override Task CommitAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public override void Rollback()
{
//Ignore
}
public override Task RollbackAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public override void Dispose()
{
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.InMemoryStorage
{
internal class InMemoryCollectProcessor : ICollectProcessor
{
private readonly ILogger _logger;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public InMemoryCollectProcessor(ILogger<InMemoryCollectProcessor> logger)
{
_logger = logger;
}
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug($"Collecting expired data from memory list.");
var connection = (InMemoryStorageConnection)context.Provider.GetService<IStorageConnection>();
connection.PublishedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now);
connection.ReceivedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now);
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.InMemoryStorage
{
internal class InMemoryStorage : IDataStorage
{
private readonly IOptions<CapOptions> _capOptions;
public InMemoryStorage(IOptions<CapOptions> capOptions)
{
_capOptions = capOptions;
}
public static IList<MemoryMessage> PublishedMessages { get; } = new List<MemoryMessage>();
public static IList<MemoryMessage> ReceivedMessages { get; } = new List<MemoryMessage>();
public Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
PublishedMessages.First(x => x.DbId == message.DbId).StatusName = state;
return Task.CompletedTask;
}
public Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
ReceivedMessages.First(x => x.DbId == message.DbId).StatusName = state;
return Task.CompletedTask;
}
public Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
{
var message = new MediumMessage
{
DbId = content.GetId(),
Origin = content,
Content = StringSerializer.Serialize(content),
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
PublishedMessages.Add(new MemoryMessage()
{
DbId = message.DbId,
Name = name,
Content = message.Content,
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
});
return Task.FromResult(message);
}
public Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
ReceivedMessages.Add(new MemoryMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed
});
return Task.CompletedTask;
}
public Task<MediumMessage> StoreReceivedMessageAsync(string name, string @group, Message message)
{
var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
Origin = message,
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
ReceivedMessages.Add(new MemoryMessage
{
DbId = mdMessage.DbId,
Group = group,
Name = name,
Content = StringSerializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
StatusName = StatusName.Failed
});
return Task.FromResult(mdMessage);
}
public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{
var ret = table == nameof(PublishedMessages)
? ((List<MemoryMessage>)PublishedMessages).RemoveAll(x => x.ExpiresAt < timeout)
: ((List<MemoryMessage>)ReceivedMessages).RemoveAll(x => x.ExpiresAt < timeout);
return Task.FromResult(ret);
}
public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var ret = PublishedMessages
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x);
return Task.FromResult(ret);
}
public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var ret = ReceivedMessages
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x);
return Task.FromResult(ret);
}
public IMonitoringApi GetMonitoringApi()
{
return new InMemoryMonitoringApi();
}
}
}
\ No newline at end of file
......@@ -3,56 +3,59 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP.InMemoryStorage
{
internal class InMemoryMonitoringApi : IMonitoringApi
{
private readonly IStorage _storage;
public Task<MediumMessage> GetPublishedMessageAsync(long id)
{
return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture)));
}
public InMemoryMonitoringApi(IStorage storage)
public Task<MediumMessage> GetReceivedMessageAsync(long id)
{
_storage = storage;
return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture)));
}
public StatisticsDto GetStatistics()
{
var connection = GetConnection();
var stats = new StatisticsDto
{
PublishedSucceeded = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded),
ReceivedSucceeded = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded),
PublishedFailed = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Failed),
ReceivedFailed = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed)
PublishedSucceeded = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded),
ReceivedSucceeded = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded),
PublishedFailed = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed),
ReceivedFailed = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed)
};
return stats;
}
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Failed);
return GetHourlyTimelineStats(type, nameof(StatusName.Failed));
}
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Succeeded);
return GetHourlyTimelineStats(type, nameof(StatusName.Succeeded));
}
public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var connection = GetConnection();
if (queryDto.MessageType == MessageType.Publish)
{
var expression = connection.PublishedMessages.Where(x => true);
var expression = InMemoryStorage.PublishedMessages.Where(x => true);
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
expression = expression.Where(x => x.StatusName.ToLower() == queryDto.StatusName);
expression = expression.Where(x => x.StatusName.ToString() == queryDto.StatusName);
}
if (!string.IsNullOrEmpty(queryDto.Name))
......@@ -73,19 +76,19 @@ namespace DotNetCore.CAP.InMemoryStorage
Added = x.Added,
Content = x.Content,
ExpiresAt = x.ExpiresAt,
Id = x.Id,
Id = long.Parse(x.DbId),
Name = x.Name,
Retries = x.Retries,
StatusName = x.StatusName
StatusName = x.StatusName.ToString()
}).ToList();
}
else
{
var expression = connection.ReceivedMessages.Where(x => true);
var expression = InMemoryStorage.ReceivedMessages.Where(x => true);
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
expression = expression.Where(x => x.StatusName.ToLower() == queryDto.StatusName);
expression = expression.Where(x => x.StatusName.ToString() == queryDto.StatusName);
}
if (!string.IsNullOrEmpty(queryDto.Name))
......@@ -113,37 +116,32 @@ namespace DotNetCore.CAP.InMemoryStorage
Version = "N/A",
Content = x.Content,
ExpiresAt = x.ExpiresAt,
Id = x.Id,
Id = long.Parse(x.DbId),
Name = x.Name,
Retries = x.Retries,
StatusName = x.StatusName
StatusName = x.StatusName.ToString()
}).ToList();
}
}
public int PublishedFailedCount()
{
return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Failed);
return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed);
}
public int PublishedSucceededCount()
{
return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded);
return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded);
}
public int ReceivedFailedCount()
{
return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Failed);
return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed);
}
public int ReceivedSucceededCount()
{
return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded);
}
private InMemoryStorageConnection GetConnection()
{
return (InMemoryStorageConnection)_storage.GetConnection();
return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded);
}
private Dictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName)
......@@ -158,20 +156,19 @@ namespace DotNetCore.CAP.InMemoryStorage
var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);
var connection = GetConnection();
Dictionary<string, int> valuesMap;
if (type == MessageType.Publish)
{
valuesMap = connection.PublishedMessages
.Where(x => x.StatusName == statusName)
valuesMap = InMemoryStorage.PublishedMessages
.Where(x => x.StatusName.ToString() == statusName)
.GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH"))
.ToDictionary(x => x.Key, x => x.Count());
}
else
{
valuesMap = connection.ReceivedMessages
.Where(x => x.StatusName == statusName)
valuesMap = InMemoryStorage.ReceivedMessages
.Where(x => x.StatusName.ToString() == statusName)
.GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH"))
.ToDictionary(x => x.Key, x => x.Count());
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Async;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
public InMemoryStorageConnection(IOptions<CapOptions> capOptions)
{
_capOptions = capOptions.Value;
PublishedMessages = new List<CapPublishedMessage>();
ReceivedMessages = new List<CapReceivedMessage>();
}
internal List<CapPublishedMessage> PublishedMessages { get; }
internal List<CapReceivedMessage> ReceivedMessages { get; }
public IStorageTransaction CreateTransaction()
{
return new InMemoryStorageTransaction(this);
}
public Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
return PublishedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id);
}
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
return await PublishedMessages.ToAsyncEnumerable()
.Where(x => x.Retries < _capOptions.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.ToListAsync();
}
public void StoreReceivedMessage(CapReceivedMessage message)
{
ReceivedMessages.Add(message);
}
public Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
return ReceivedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id);
}
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
return await ReceivedMessages.ToAsyncEnumerable()
.Where(x => x.Retries < _capOptions.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.ToListAsync();
}
public bool ChangePublishedState(long messageId, string state)
{
var msg = PublishedMessages.First(x => x.Id == messageId);
msg.Retries++;
msg.ExpiresAt = null;
msg.StatusName = state;
return true;
}
public bool ChangeReceivedState(long messageId, string state)
{
var msg = ReceivedMessages.First(x => x.Id == messageId);
msg.Retries++;
msg.ExpiresAt = null;
msg.StatusName = state;
return true;
}
}
}
\ No newline at end of file
......@@ -3,27 +3,20 @@
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryStorage : IStorage
internal class InMemoryStorageInitializer : IStorageInitializer
{
private readonly IStorageConnection _connection;
public InMemoryStorage(IStorageConnection connection)
{
_connection = connection;
}
public IStorageConnection GetConnection()
public string GetPublishedTableName()
{
return _connection;
return nameof(InMemoryStorage.PublishedMessages);
}
public IMonitoringApi GetMonitoringApi()
public string GetReceivedTableName()
{
return new InMemoryMonitoringApi(this);
return nameof(InMemoryStorage.ReceivedMessages);
}
public Task InitializeAsync(CancellationToken cancellationToken)
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryStorageTransaction : IStorageTransaction
{
private readonly InMemoryStorageConnection _connection;
public InMemoryStorageTransaction(InMemoryStorageConnection connection)
{
_connection = connection;
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var msg = _connection.PublishedMessages.FirstOrDefault(x => message.Id == x.Id);
if (msg == null) return;
msg.Retries = message.Retries;
msg.Content = message.Content;
msg.ExpiresAt = message.ExpiresAt;
msg.StatusName = message.StatusName;
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var msg = _connection.ReceivedMessages.FirstOrDefault(x => message.Id == x.Id);
if (msg == null) return;
msg.Retries = message.Retries;
msg.Content = message.Content;
msg.ExpiresAt = message.ExpiresAt;
msg.StatusName = message.StatusName;
}
public Task CommitAsync()
{
return Task.CompletedTask;
}
public void Dispose()
{
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP.InMemoryStorage
{
internal class MemoryMessage : MediumMessage
{
public string Name { get; set; }
public StatusName StatusName { get; set; }
public string Group { get; set; }
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment