Commit 2e1cb035 authored by Savorboard's avatar Savorboard

Fix inmemeory storage bug

parent 26c78f3e
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
...@@ -24,19 +25,19 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -24,19 +25,19 @@ namespace DotNetCore.CAP.InMemoryStorage
_capOptions = capOptions; _capOptions = capOptions;
} }
public static IList<MemoryMessage> PublishedMessages { get; } = new List<MemoryMessage>(); public static ConcurrentDictionary<string, MemoryMessage> PublishedMessages { get; } = new ConcurrentDictionary<string, MemoryMessage>();
public static IList<MemoryMessage> ReceivedMessages { get; } = new List<MemoryMessage>(); public static ConcurrentDictionary<string, MemoryMessage> ReceivedMessages { get; } = new ConcurrentDictionary<string, MemoryMessage>();
public Task ChangePublishStateAsync(MediumMessage message, StatusName state) public Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{ {
PublishedMessages.First(x => x.DbId == message.DbId).StatusName = state; PublishedMessages[message.DbId].StatusName = state;
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) public Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{ {
ReceivedMessages.First(x => x.DbId == message.DbId).StatusName = state; ReceivedMessages[message.DbId].StatusName = state;
return Task.CompletedTask; return Task.CompletedTask;
} }
...@@ -52,7 +53,7 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -52,7 +53,7 @@ namespace DotNetCore.CAP.InMemoryStorage
Retries = 0 Retries = 0
}; };
PublishedMessages.Add(new MemoryMessage() PublishedMessages[message.DbId] = new MemoryMessage()
{ {
DbId = message.DbId, DbId = message.DbId,
Name = name, Name = name,
...@@ -61,24 +62,27 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -61,24 +62,27 @@ namespace DotNetCore.CAP.InMemoryStorage
Added = message.Added, Added = message.Added,
ExpiresAt = message.ExpiresAt, ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}); };
return message; return message;
} }
public void StoreReceivedExceptionMessage(string name, string group, string content) public void StoreReceivedExceptionMessage(string name, string group, string content)
{ {
ReceivedMessages.Add(new MemoryMessage var id = SnowflakeId.Default().NextId().ToString();
ReceivedMessages[id] = new MemoryMessage
{ {
DbId = SnowflakeId.Default().NextId().ToString(), DbId = id,
Group = group, Group = group,
Origin = null,
Name = name, Name = name,
Content = content, Content = content,
Retries = _capOptions.Value.FailedRetryCount, Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15), ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed StatusName = StatusName.Failed
}); };
} }
public MediumMessage StoreReceivedMessage(string name, string @group, Message message) public MediumMessage StoreReceivedMessage(string name, string @group, Message message)
...@@ -92,9 +96,10 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -92,9 +96,10 @@ namespace DotNetCore.CAP.InMemoryStorage
Retries = 0 Retries = 0
}; };
ReceivedMessages.Add(new MemoryMessage ReceivedMessages[mdMessage.DbId] = new MemoryMessage
{ {
DbId = mdMessage.DbId, DbId = mdMessage.DbId,
Origin = mdMessage.Origin,
Group = group, Group = group,
Name = name, Name = name,
Content = StringSerializer.Serialize(mdMessage.Origin), Content = StringSerializer.Serialize(mdMessage.Origin),
...@@ -102,38 +107,69 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -102,38 +107,69 @@ namespace DotNetCore.CAP.InMemoryStorage
Added = mdMessage.Added, Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt, ExpiresAt = mdMessage.ExpiresAt,
StatusName = StatusName.Failed StatusName = StatusName.Failed
}); };
return mdMessage; return mdMessage;
} }
public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{ {
var ret = table == nameof(PublishedMessages) var removed = 0;
? ((List<MemoryMessage>)PublishedMessages).RemoveAll(x => x.ExpiresAt < timeout) if (table == nameof(PublishedMessages))
: ((List<MemoryMessage>)ReceivedMessages).RemoveAll(x => x.ExpiresAt < timeout); {
return Task.FromResult(ret); var ids = PublishedMessages.Values.Where(x => x.ExpiresAt < timeout).Select(x => x.DbId).ToList();
foreach (var id in ids)
{
if (PublishedMessages.TryRemove(id, out _))
{
removed++;
}
}
}
else
{
var ids = ReceivedMessages.Values.Where(x => x.ExpiresAt < timeout).Select(x => x.DbId).ToList();
foreach (var id in ids)
{
if (PublishedMessages.TryRemove(id, out _))
{
removed++;
}
}
}
return Task.FromResult(removed);
} }
public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{ {
var ret = PublishedMessages var ret = PublishedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount .Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10) && x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200) .Take(200)
.Select(x => (MediumMessage)x); .Select(x => (MediumMessage)x);
foreach (var message in ret)
{
message.Origin = StringSerializer.DeSerialize(message.Content);
}
return Task.FromResult(ret); return Task.FromResult(ret);
} }
public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{ {
var ret = ReceivedMessages var ret = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount .Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10) && x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200) .Take(200)
.Select(x => (MediumMessage)x); .Select(x => (MediumMessage)x);
foreach (var message in ret)
{
message.Origin = StringSerializer.DeSerialize(message.Content);
}
return Task.FromResult(ret); return Task.FromResult(ret);
} }
......
...@@ -17,22 +17,22 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -17,22 +17,22 @@ namespace DotNetCore.CAP.InMemoryStorage
{ {
public Task<MediumMessage> GetPublishedMessageAsync(long id) public Task<MediumMessage> GetPublishedMessageAsync(long id)
{ {
return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.Values.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture)));
} }
public Task<MediumMessage> GetReceivedMessageAsync(long id) public Task<MediumMessage> GetReceivedMessageAsync(long id)
{ {
return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.Values.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture)));
} }
public StatisticsDto GetStatistics() public StatisticsDto GetStatistics()
{ {
var stats = new StatisticsDto var stats = new StatisticsDto
{ {
PublishedSucceeded = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded), PublishedSucceeded = InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded),
ReceivedSucceeded = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded), ReceivedSucceeded = InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded),
PublishedFailed = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed), PublishedFailed = InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Failed),
ReceivedFailed = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed) ReceivedFailed = InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Failed)
}; };
return stats; return stats;
} }
...@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.InMemoryStorage
{ {
if (queryDto.MessageType == MessageType.Publish) if (queryDto.MessageType == MessageType.Publish)
{ {
var expression = InMemoryStorage.PublishedMessages.Where(x => true); var expression = InMemoryStorage.PublishedMessages.Values.Where(x => true);
if (!string.IsNullOrEmpty(queryDto.StatusName)) if (!string.IsNullOrEmpty(queryDto.StatusName))
{ {
...@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.InMemoryStorage
} }
else else
{ {
var expression = InMemoryStorage.ReceivedMessages.Where(x => true); var expression = InMemoryStorage.ReceivedMessages.Values.Where(x => true);
if (!string.IsNullOrEmpty(queryDto.StatusName)) if (!string.IsNullOrEmpty(queryDto.StatusName))
{ {
...@@ -127,22 +127,22 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -127,22 +127,22 @@ namespace DotNetCore.CAP.InMemoryStorage
public int PublishedFailedCount() public int PublishedFailedCount()
{ {
return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed); return InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Failed);
} }
public int PublishedSucceededCount() public int PublishedSucceededCount()
{ {
return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded); return InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded);
} }
public int ReceivedFailedCount() public int ReceivedFailedCount()
{ {
return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed); return InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Failed);
} }
public int ReceivedSucceededCount() public int ReceivedSucceededCount()
{ {
return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded); return InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded);
} }
private Dictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName) private Dictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName)
...@@ -161,14 +161,14 @@ namespace DotNetCore.CAP.InMemoryStorage ...@@ -161,14 +161,14 @@ namespace DotNetCore.CAP.InMemoryStorage
Dictionary<string, int> valuesMap; Dictionary<string, int> valuesMap;
if (type == MessageType.Publish) if (type == MessageType.Publish)
{ {
valuesMap = InMemoryStorage.PublishedMessages valuesMap = InMemoryStorage.PublishedMessages.Values
.Where(x => x.StatusName.ToString() == statusName) .Where(x => x.StatusName.ToString() == statusName)
.GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH"))
.ToDictionary(x => x.Key, x => x.Count()); .ToDictionary(x => x.Key, x => x.Count());
} }
else else
{ {
valuesMap = InMemoryStorage.ReceivedMessages valuesMap = InMemoryStorage.ReceivedMessages.Values
.Where(x => x.StatusName.ToString() == statusName) .Where(x => x.StatusName.ToString() == statusName)
.GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH"))
.ToDictionary(x => x.Key, x => x.Count()); .ToDictionary(x => x.Key, x => x.Count());
......
...@@ -13,6 +13,11 @@ namespace DotNetCore.CAP.Serialization ...@@ -13,6 +13,11 @@ namespace DotNetCore.CAP.Serialization
{ {
public Task<TransportMessage> SerializeAsync(Message message) public Task<TransportMessage> SerializeAsync(Message message)
{ {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
if (message.Value == null) if (message.Value == null)
{ {
return Task.FromResult(new TransportMessage(message.Headers, null)); return Task.FromResult(new TransportMessage(message.Headers, null));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment