Commit 0f1da2e7 authored by Savorboard's avatar Savorboard

Change persistence message to synchronous method because message queues do not...

Change persistence message to synchronous method because message queues do not support asynchronous commits
parent 838e20d3
......@@ -40,8 +40,7 @@ namespace DotNetCore.CAP.InMemoryStorage
return Task.CompletedTask;
}
public Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
var message = new MediumMessage
{
......@@ -64,10 +63,10 @@ namespace DotNetCore.CAP.InMemoryStorage
StatusName = StatusName.Scheduled
});
return Task.FromResult(message);
return message;
}
public Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
public void StoreReceivedExceptionMessage(string name, string group, string content)
{
ReceivedMessages.Add(new MemoryMessage
{
......@@ -80,11 +79,9 @@ namespace DotNetCore.CAP.InMemoryStorage
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed
});
return Task.CompletedTask;
}
public Task<MediumMessage> StoreReceivedMessageAsync(string name, string @group, Message message)
public MediumMessage StoreReceivedMessage(string name, string @group, Message message)
{
var mdMessage = new MediumMessage
{
......@@ -107,7 +104,7 @@ namespace DotNetCore.CAP.InMemoryStorage
StatusName = StatusName.Failed
});
return Task.FromResult(mdMessage);
return mdMessage;
}
public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
......
......@@ -22,7 +22,6 @@ namespace DotNetCore.CAP.MongoDB
private readonly IOptions<CapOptions> _capOptions;
private readonly IMongoClient _client;
private readonly IMongoDatabase _database;
private readonly ILogger<MongoDBDataStorage> _logger;
private readonly IOptions<MongoDBOptions> _options;
public MongoDBDataStorage(
......@@ -34,7 +33,6 @@ namespace DotNetCore.CAP.MongoDB
_capOptions = capOptions;
_options = options;
_client = client;
_logger = logger;
_database = _client.GetDatabase(_options.Value.DatabaseName);
}
......@@ -62,10 +60,9 @@ namespace DotNetCore.CAP.MongoDB
await collection.UpdateOneAsync(x => x.Id == long.Parse(message.DbId), updateDef);
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
var insertOptions = new InsertOneOptions {BypassDocumentValidation = false};
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
var message = new MediumMessage
{
......@@ -93,18 +90,18 @@ namespace DotNetCore.CAP.MongoDB
if (dbTransaction == null)
{
await collection.InsertOneAsync(store, insertOptions, cancellationToken);
collection.InsertOne(store, insertOptions);
}
else
{
var dbTrans = dbTransaction as IClientSessionHandle;
await collection.InsertOneAsync(dbTrans, store, insertOptions, cancellationToken);
collection.InsertOne(dbTrans, store, insertOptions);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
......@@ -121,10 +118,10 @@ namespace DotNetCore.CAP.MongoDB
StatusName = nameof(StatusName.Failed)
};
await collection.InsertOneAsync(store);
collection.InsertOne(store);
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var mdMessage = new MediumMessage
{
......@@ -151,7 +148,7 @@ namespace DotNetCore.CAP.MongoDB
StatusName = nameof(StatusName.Scheduled)
};
await collection.InsertOneAsync(store);
collection.InsertOne(store);
return mdMessage;
}
......@@ -161,18 +158,15 @@ namespace DotNetCore.CAP.MongoDB
{
if (collection == _options.Value.PublishedCollection)
{
//Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, timeout);
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var ret = await publishedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken);
return (int) ret.DeletedCount;
return (int)ret.DeletedCount;
}
else
{
var receivedCollection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var ret = await receivedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken);
;
return (int) ret.DeletedCount;
return (int)ret.DeletedCount;
}
}
......
......@@ -64,7 +64,7 @@ namespace DotNetCore.CAP.MySql
});
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default)
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
var sql = $"INSERT INTO `{_initializer.GetPublishedTableName()}`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
......@@ -91,8 +91,8 @@ namespace DotNetCore.CAP.MySql
if (dbTransaction == null)
{
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, po);
using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, po);
}
else
{
......@@ -103,18 +103,18 @@ namespace DotNetCore.CAP.MySql
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(sql, po, dbTrans);
conn.Execute(sql, po, dbTrans);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var sql = $@"INSERT INTO `{_initializer.GetReceivedTableName()}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = @group,
......@@ -127,7 +127,7 @@ namespace DotNetCore.CAP.MySql
});
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var sql = $@"INSERT INTO `{_initializer.GetReceivedTableName()}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
......@@ -140,8 +140,8 @@ namespace DotNetCore.CAP.MySql
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
{
Id = mdMessage.DbId,
Group = @group,
......
......@@ -66,8 +66,7 @@ namespace DotNetCore.CAP.PostgreSql
});
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
var sql =
$"INSERT INTO {_pubName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
......@@ -96,8 +95,8 @@ namespace DotNetCore.CAP.PostgreSql
if (dbTransaction == null)
{
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, po);
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, po);
}
else
{
......@@ -106,20 +105,20 @@ namespace DotNetCore.CAP.PostgreSql
dbTrans = dbContextTrans.GetDbTransaction();
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(sql, po, dbTrans);
conn.Execute(sql, po, dbTrans);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var sql =
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
{
Id = SnowflakeId.Default().NextId(),
Group = group,
......@@ -132,7 +131,7 @@ namespace DotNetCore.CAP.PostgreSql
});
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var sql =
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
......@@ -147,8 +146,8 @@ namespace DotNetCore.CAP.PostgreSql
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
{
Id = long.Parse(mdMessage.DbId),
Group = group,
......
......@@ -66,8 +66,7 @@ namespace DotNetCore.CAP.SqlServer
});
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default)
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
var sql = $"INSERT INTO {_pubName} ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
......@@ -95,8 +94,8 @@ namespace DotNetCore.CAP.SqlServer
if (dbTransaction == null)
{
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, po);
using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, po);
}
else
{
......@@ -105,20 +104,20 @@ namespace DotNetCore.CAP.SqlServer
dbTrans = dbContextTrans.GetDbTransaction();
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(sql, po, dbTrans);
conn.Execute(sql, po, dbTrans);
}
return message;
}
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var sql =
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = group,
......@@ -131,7 +130,7 @@ namespace DotNetCore.CAP.SqlServer
});
}
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var sql =
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
......@@ -146,8 +145,8 @@ namespace DotNetCore.CAP.SqlServer
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
await using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
{
Id = mdMessage.DbId,
Group = group,
......
......@@ -35,7 +35,28 @@ namespace DotNetCore.CAP.Internal
public AsyncLocal<ICapTransaction> Transaction { get; }
public async Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default)
public Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default)
{
return Task.Run(() => Publish(name, value, headers), cancellationToken);
}
public Task PublishAsync<T>(string name, T value, string callbackName = null,
CancellationToken cancellationToken = default)
{
return Task.Run(() => Publish(name, value, callbackName), cancellationToken);
}
public void Publish<T>(string name, T value, string callbackName = null)
{
var header = new Dictionary<string, string>
{
{Headers.CallbackName, callbackName}
};
Publish(name, value, header);
}
public void Publish<T>(string name, T value, IDictionary<string, string> headers)
{
if (string.IsNullOrEmpty(name))
{
......@@ -67,7 +88,7 @@ namespace DotNetCore.CAP.Internal
if (Transaction.Value?.DbTransaction == null)
{
var mediumMessage = await _storage.StoreMessageAsync(name, message, cancellationToken: cancellationToken);
var mediumMessage = _storage.StoreMessage(name, message);
TracingAfter(tracingTimestamp, message);
......@@ -77,7 +98,7 @@ namespace DotNetCore.CAP.Internal
{
var transaction = (CapTransactionBase)Transaction.Value;
var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction, cancellationToken);
var mediumMessage = _storage.StoreMessage(name, message, transaction.DbTransaction);
TracingAfter(tracingTimestamp, message);
......@@ -97,27 +118,6 @@ namespace DotNetCore.CAP.Internal
}
}
public Task PublishAsync<T>(string name, T value, string callbackName = null,
CancellationToken cancellationToken = default)
{
var header = new Dictionary<string, string>
{
{Headers.CallbackName, callbackName}
};
return PublishAsync(name, value, header, cancellationToken);
}
public void Publish<T>(string name, T value, string callbackName = null)
{
PublishAsync(name, value, callbackName).GetAwaiter().GetResult();
}
public void Publish<T>(string name, T value, IDictionary<string, string> headers)
{
PublishAsync(name, value, headers).GetAwaiter().GetResult();
}
#region tracing
private long? TracingBefore(Message message)
......
......@@ -190,15 +190,16 @@ namespace DotNetCore.CAP.Internal
if (message.HasException())
{
var content = StringSerializer.Serialize(message);
await _storage.StoreReceivedExceptionMessageAsync(name, group, content);
_storage.StoreReceivedExceptionMessage(name, group, content);
client.Commit();
TracingAfter(tracingTimestamp, transportMessage, _serverAddress);
}
else
{
var mediumMessage = await _storage.StoreReceivedMessageAsync(name, group, message);
var mediumMessage = _storage.StoreReceivedMessage(name, group, message);
mediumMessage.Origin = message;
client.Commit();
......
......@@ -14,12 +14,11 @@ namespace DotNetCore.CAP.Persistence
Task ChangeReceiveStateAsync(MediumMessage message, StatusName state);
Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null,
CancellationToken cancellationToken = default);
MediumMessage StoreMessage(string name, Message content, object dbTransaction = null);
Task StoreReceivedExceptionMessageAsync(string name, string group, string content);
void StoreReceivedExceptionMessage(string name, string group, string content);
Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message content);
MediumMessage StoreReceivedMessage(string name, string group, Message content);
Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default);
......
......@@ -68,7 +68,7 @@ namespace DotNetCore.CAP.Processor
var result = await _sender.SendAsync(message);
if (!result.Succeeded)
{
_logger.MessagePublishException(message.Origin.GetId(),result.ToString(),result.Exception);
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
}
}
catch (Exception ex)
......
......@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.MySql.Test
}
[Fact]
public async Task StorageMessageTest()
public void StorageMessageTest()
{
var msgId = SnowflakeId.Default().NextId().ToString();
var header = new Dictionary<string, string>()
......@@ -31,12 +31,12 @@ namespace DotNetCore.CAP.MySql.Test
};
var message = new Message(header, null);
var mdMessage = await _storage.StoreMessageAsync("test.name", message);
var mdMessage = _storage.StoreMessage("test.name", message);
Assert.NotNull(mdMessage);
}
[Fact]
public async Task StoreReceivedMessageTest()
public void StoreReceivedMessageTest()
{
var msgId = SnowflakeId.Default().NextId().ToString();
var header = new Dictionary<string, string>()
......@@ -45,14 +45,14 @@ namespace DotNetCore.CAP.MySql.Test
};
var message = new Message(header, null);
var mdMessage = await _storage.StoreReceivedMessageAsync("test.name", "test.group", message);
var mdMessage = _storage.StoreReceivedMessage("test.name", "test.group", message);
Assert.NotNull(mdMessage);
}
[Fact]
public async Task StoreReceivedExceptionMessageTest()
public void StoreReceivedExceptionMessageTest()
{
await _storage.StoreReceivedExceptionMessageAsync("test.name", "test.group", "");
_storage.StoreReceivedExceptionMessage("test.name", "test.group", "");
}
[Fact]
......@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.MySql.Test
};
var message = new Message(header, null);
var mdMessage = await _storage.StoreMessageAsync("test.name", message);
var mdMessage = _storage.StoreMessage("test.name", message);
await _storage.ChangePublishStateAsync(mdMessage, StatusName.Succeeded);
}
......@@ -80,7 +80,7 @@ namespace DotNetCore.CAP.MySql.Test
};
var message = new Message(header, null);
var mdMessage = await _storage.StoreMessageAsync("test.name", message);
var mdMessage = _storage.StoreMessage("test.name", message);
await _storage.ChangeReceiveStateAsync(mdMessage, StatusName.Succeeded);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment