Commit c2df2877 authored by Savorboard's avatar Savorboard

Because of deadlock and performance issues, mysql does not use transactions...

Because of deadlock and performance issues, mysql does not use transactions when consuming messages, and requeue the message to queue table when they exception. (#36,#68)
parent 664c3b8f
using System; using Dapper;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql namespace DotNetCore.CAP.MySql
{ {
public class MySqlFetchedMessage : IFetchedMessage public class MySqlFetchedMessage : IFetchedMessage
{ {
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1); private readonly string _connectionString = null;
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
public MySqlFetchedMessage(int messageId, public MySqlFetchedMessage(int messageId, MessageType type, string connectionString)
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{ {
MessageId = messageId; MessageId = messageId;
MessageType = type; MessageType = type;
_connection = connection;
_transaction = transaction; _connectionString = connectionString;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
} }
public int MessageId { get; } public int MessageId { get; }
...@@ -32,43 +22,21 @@ namespace DotNetCore.CAP.MySql ...@@ -32,43 +22,21 @@ namespace DotNetCore.CAP.MySql
public void RemoveFromQueue() public void RemoveFromQueue()
{ {
lock (_lockObject) // ignored
{
_transaction.Commit();
}
} }
public void Requeue() public void Requeue()
{ {
lock (_lockObject) using (var connection = new MySqlConnection(_connectionString))
{ {
_transaction.Rollback(); connection.Execute("insert into `cap.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);"
, new {MessageId, MessageType });
} }
} }
public void Dispose() public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{ {
// ignored // ignored
} }
} }
}
}
} }
\ No newline at end of file
...@@ -53,7 +53,8 @@ namespace DotNetCore.CAP.MySql ...@@ -53,7 +53,8 @@ namespace DotNetCore.CAP.MySql
$@" $@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` ( CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL, `MessageId` int(11) NOT NULL,
`MessageType` tinyint(4) NOT NULL `MessageType` tinyint(4) NOT NULL,
`ProcessId` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `{prefix}.received` ( CREATE TABLE IF NOT EXISTS `{prefix}.received` (
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
...@@ -43,15 +42,11 @@ namespace DotNetCore.CAP.MySql ...@@ -43,15 +42,11 @@ namespace DotNetCore.CAP.MySql
public Task<IFetchedMessage> FetchNextMessageAsync() public Task<IFetchedMessage> FetchNextMessageAsync()
{ {
var sql = $@" var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
DELETE FROM `{_prefix}.queue` LIMIT 1;"; SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;
DELETE FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId";
// The following `sql` can improve performance, but repeated consumption occurs in multiple instances return FetchNextMessageCoreAsync(sql, new { ProcessId = Guid.NewGuid().ToString() });
//var sql = $@"
//SELECT @MId:=`MessageId` as MessageId, @MType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
//DELETE FROM `{_prefix}.queue` where `MessageId` = @MId AND `MessageType`=@MType;";
return FetchNextMessageCoreAsync(sql);
} }
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync() public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
...@@ -122,11 +117,6 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();"; ...@@ -122,11 +117,6 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
} }
} }
public void Dispose()
{
}
public bool ChangePublishedState(int messageId, string state) public bool ChangePublishedState(int messageId, string state)
{ {
var sql = var sql =
...@@ -151,44 +141,20 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();"; ...@@ -151,44 +141,20 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{ {
//here don't use `using` to dispose FetchedMessage fetchedMessage;
var connection = new MySqlConnection(Options.ConnectionString); using (var connection = new MySqlConnection(Options.ConnectionString))
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
//fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args, transaction);
// An anomaly with unknown causes, sometimes QuerySingleOrDefaultAsync can't return expected result.
using (var reader = connection.ExecuteReader(sql, args, transaction))
{
while (reader.Read())
{
fetchedMessage = new FetchedMessage
{
MessageId = (int)reader.GetInt64(0),
MessageType = (MessageType)reader.GetInt64(1)
};
}
}
}
catch (MySqlException)
{ {
transaction.Dispose(); fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args);
connection.Dispose();
throw;
} }
if (fetchedMessage == null) if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null; return null;
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, Options.ConnectionString);
} }
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, public void Dispose()
transaction); {
} }
} }
} }
\ No newline at end of file
...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql ...@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish}, _dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction); _dbTransaction);
} }
...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.MySql ...@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.MySql
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe}, _dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction); _dbTransaction);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment