Commit c3b7d6b6 authored by Savorboard's avatar Savorboard

Message table in database changes the primary key to non auto-Increment. (#180)

parent 3aab6b23
...@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.MySql ...@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.MySql
await PublishAsyncInternal(message); await PublishAsyncInternal(message);
} }
protected override Task<int> ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)) CancellationToken cancel = default(CancellationToken))
{ {
var dbTrans = transaction.DbTransaction as IDbTransaction; var dbTrans = transaction.DbTransaction as IDbTransaction;
......
...@@ -52,7 +52,7 @@ namespace DotNetCore.CAP.MySql ...@@ -52,7 +52,7 @@ namespace DotNetCore.CAP.MySql
} }
} }
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message) public Task StoreReceivedMessageAsync(CapReceivedMessage message)
{ {
if (message == null) if (message == null)
{ {
...@@ -61,11 +61,11 @@ namespace DotNetCore.CAP.MySql ...@@ -61,11 +61,11 @@ namespace DotNetCore.CAP.MySql
var sql = $@" var sql = $@"
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID();"; VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new MySqlConnection(Options.ConnectionString)) using (var connection = new MySqlConnection(Options.ConnectionString))
{ {
return await connection.ExecuteScalarAsync<int>(sql, message); return connection.ExecuteScalarAsync<int>(sql, message);
} }
} }
......
...@@ -40,6 +40,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -40,6 +40,7 @@ namespace DotNetCore.CAP.Abstractions
{ {
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
Id = SnowflakeId.Default().NextId(),
Name = name, Name = name,
Content = Serialize(contentObj, callbackName), Content = Serialize(contentObj, callbackName),
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
...@@ -53,6 +54,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -53,6 +54,7 @@ namespace DotNetCore.CAP.Abstractions
{ {
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
Id = SnowflakeId.Default().NextId(),
Name = name, Name = name,
Content = Serialize(contentObj, callbackName), Content = Serialize(contentObj, callbackName),
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
...@@ -75,13 +77,11 @@ namespace DotNetCore.CAP.Abstractions ...@@ -75,13 +77,11 @@ namespace DotNetCore.CAP.Abstractions
{ {
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
message.Id = await ExecuteAsync(message, CapTransaction); await ExecuteAsync(message, CapTransaction);
if (message.Id > 0)
{
_capTransaction.AddToSent(message); _capTransaction.AddToSent(message);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
}
if (NotUseTransaction || CapTransaction.AutoCommit) if (NotUseTransaction || CapTransaction.AutoCommit)
{ {
...@@ -105,7 +105,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -105,7 +105,7 @@ namespace DotNetCore.CAP.Abstractions
protected abstract object GetDbTransaction(); protected abstract object GetDbTransaction();
protected abstract Task<int> ExecuteAsync(CapPublishedMessage message, protected abstract Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken)); CancellationToken cancel = default(CancellationToken));
......
...@@ -112,6 +112,7 @@ namespace DotNetCore.CAP ...@@ -112,6 +112,7 @@ namespace DotNetCore.CAP
var receivedMessage = new CapReceivedMessage(messageContext) var receivedMessage = new CapReceivedMessage(messageContext)
{ {
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled, StatusName = StatusName.Scheduled,
Content = messageBody Content = messageBody
}; };
...@@ -170,10 +171,7 @@ namespace DotNetCore.CAP ...@@ -170,10 +171,7 @@ namespace DotNetCore.CAP
private void StoreMessage(CapReceivedMessage receivedMessage) private void StoreMessage(CapReceivedMessage receivedMessage)
{ {
var id = _connection.StoreReceivedMessageAsync(receivedMessage) _connection.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult();
.GetAwaiter().GetResult();
receivedMessage.Id = id;
} }
private (Guid, string) TracingBefore(string topic, string values) private (Guid, string) TracingBefore(string topic, string values)
......
...@@ -32,7 +32,7 @@ namespace DotNetCore.CAP ...@@ -32,7 +32,7 @@ namespace DotNetCore.CAP
/// Stores the message. /// Stores the message.
/// </summary> /// </summary>
/// <param name="message">The message to store.</param> /// <param name="message">The message to store.</param>
Task<int> StoreReceivedMessageAsync(CapReceivedMessage message); Task StoreReceivedMessageAsync(CapReceivedMessage message);
/// <summary> /// <summary>
/// Returns the message with the given id. /// Returns the message with the given id.
......
...@@ -54,6 +54,7 @@ namespace DotNetCore.CAP.Internal ...@@ -54,6 +54,7 @@ namespace DotNetCore.CAP.Internal
var publishedMessage = new CapPublishedMessage var publishedMessage = new CapPublishedMessage
{ {
Id = SnowflakeId.Default().NextId(),
Name = topicName, Name = topicName,
Content = content, Content = content,
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
......
...@@ -10,12 +10,12 @@ namespace DotNetCore.CAP ...@@ -10,12 +10,12 @@ namespace DotNetCore.CAP
[SuppressMessage("ReSharper", "InconsistentNaming")] [SuppressMessage("ReSharper", "InconsistentNaming")]
internal static class LoggerExtensions internal static class LoggerExtensions
{ {
public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries) public static void ConsumerExecutedAfterThreshold(this ILogger logger, long messageId, int retries)
{ {
logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying."); logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying.");
} }
public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries) public static void SenderAfterThreshold(this ILogger logger, long messageId, int retries)
{ {
logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying."); logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying.");
} }
...@@ -25,12 +25,12 @@ namespace DotNetCore.CAP ...@@ -25,12 +25,12 @@ namespace DotNetCore.CAP
logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message); logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message);
} }
public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries) public static void ConsumerExecutionRetrying(this ILogger logger, long messageId, int retries)
{ {
logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}"); logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}");
} }
public static void SenderRetrying(this ILogger logger, int messageId, int retries) public static void SenderRetrying(this ILogger logger, long messageId, int retries)
{ {
logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} "); logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
} }
...@@ -40,7 +40,7 @@ namespace DotNetCore.CAP ...@@ -40,7 +40,7 @@ namespace DotNetCore.CAP
logger.LogDebug($"Message published. name: {name}, content:{content}."); logger.LogDebug($"Message published. name: {name}, content:{content}.");
} }
public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex) public static void MessagePublishException(this ILogger logger, long messageId, string reason, Exception ex)
{ {
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
} }
......
...@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Models ...@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Models
Added = DateTime.Now; Added = DateTime.Now;
} }
public int Id { get; set; } public long Id { get; set; }
public string Name { get; set; } public string Name { get; set; }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Models namespace DotNetCore.CAP.Models
{ {
...@@ -22,7 +23,7 @@ namespace DotNetCore.CAP.Models ...@@ -22,7 +23,7 @@ namespace DotNetCore.CAP.Models
Content = message.Content; Content = message.Content;
} }
public int Id { get; set; } public long Id { get; set; }
public string Group { get; set; } public string Group { get; set; }
......
...@@ -25,6 +25,7 @@ namespace DotNetCore.CAP.MySql.Test ...@@ -25,6 +25,7 @@ namespace DotNetCore.CAP.MySql.Test
var sql = "INSERT INTO `cap.published`(`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; var sql = "INSERT INTO `cap.published`(`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
var publishMessage = new CapPublishedMessage var publishMessage = new CapPublishedMessage
{ {
Id = SnowflakeId.Default().NextId(),
Name = "MySqlStorageConnectionTest", Name = "MySqlStorageConnectionTest",
Content = "", Content = "",
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
...@@ -45,6 +46,7 @@ namespace DotNetCore.CAP.MySql.Test ...@@ -45,6 +46,7 @@ namespace DotNetCore.CAP.MySql.Test
{ {
var receivedMessage = new CapReceivedMessage var receivedMessage = new CapReceivedMessage
{ {
Id = SnowflakeId.Default().NextId(),
Name = "MySqlStorageConnectionTest", Name = "MySqlStorageConnectionTest",
Content = "", Content = "",
Group = "mygroup", Group = "mygroup",
...@@ -71,6 +73,7 @@ namespace DotNetCore.CAP.MySql.Test ...@@ -71,6 +73,7 @@ namespace DotNetCore.CAP.MySql.Test
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
var receivedMessage = new CapReceivedMessage var receivedMessage = new CapReceivedMessage
{ {
Id = SnowflakeId.Default().NextId(),
Name = "MySqlStorageConnectionTest", Name = "MySqlStorageConnectionTest",
Content = "", Content = "",
Group = "mygroup", Group = "mygroup",
......
...@@ -16,6 +16,7 @@ namespace DotNetCore.CAP.Test ...@@ -16,6 +16,7 @@ namespace DotNetCore.CAP.Test
var fixture = Create(); var fixture = Create();
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
var state = Mock.Of<IState>(s => s.Name == "s" && s.ExpiresAfter == null); var state = Mock.Of<IState>(s => s.Name == "s" && s.ExpiresAfter == null);
...@@ -39,6 +40,7 @@ namespace DotNetCore.CAP.Test ...@@ -39,6 +40,7 @@ namespace DotNetCore.CAP.Test
var fixture = Create(); var fixture = Create();
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
var state = Mock.Of<IState>(s => s.Name == "s" && s.ExpiresAfter == TimeSpan.FromHours(1)); var state = Mock.Of<IState>(s => s.Name == "s" && s.ExpiresAfter == TimeSpan.FromHours(1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment