Commit c730b18f authored by Savorboard's avatar Savorboard

solve database dirty reading problems. now, only support PostgreSQL v9.5+

parent 3dec4f15
...@@ -37,13 +37,13 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -37,13 +37,13 @@ namespace DotNetCore.CAP.PostgreSql
public Task<IFetchedMessage> FetchNextMessageAsync() public Task<IFetchedMessage> FetchNextMessageAsync()
{ {
var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" LIMIT 1) RETURNING *;"; var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql); return FetchNextMessageCoreAsync(sql);
} }
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync() public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{ {
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;"; var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString)) using (var connection = new NpgsqlConnection(_options.ConnectionString))
{ {
...@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages() public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{ {
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}'"; var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(_options.ConnectionString)) using (var connection = new NpgsqlConnection(_options.ConnectionString))
{ {
...@@ -86,7 +86,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -86,7 +86,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync() public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{ {
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;"; var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString)) using (var connection = new NpgsqlConnection(_options.ConnectionString))
{ {
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
...@@ -95,7 +95,7 @@ namespace DotNetCore.CAP.PostgreSql ...@@ -95,7 +95,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages() public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{ {
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}'"; var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(_options.ConnectionString)) using (var connection = new NpgsqlConnection(_options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment