Commit d5314e1a authored by Savorboard's avatar Savorboard

bug fixed

parent ce06c23c
using System; using System;
using System.Collections.Generic;
using System.Data; using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper; using Dapper;
...@@ -36,7 +35,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -36,7 +35,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
{ {
using (var connection = new MySqlConnection(AppDbContext.ConnectionString)) using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
{ {
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) using (var transaction = connection.BeginTransaction(_capBus, true))
{ {
//your business code //your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
...@@ -45,8 +44,6 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -45,8 +44,6 @@ namespace Sample.RabbitMQ.MySql.Controllers
//{ //{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
//} //}
transaction.Commit();
} }
} }
...@@ -60,7 +57,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -60,7 +57,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
{ {
dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); dbContext.Persons.Add(new Person() { Name = "ef.transaction" });
for (int i = 0; i < 5; i++) for (int i = 0; i < 1; i++)
{ {
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
} }
...@@ -74,7 +71,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -74,7 +71,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
[NonAction] [NonAction]
[CapSubscribe("sample.rabbitmq.mysql")] [CapSubscribe("sample.rabbitmq.mysql")]
public void Subscriber(Person2 p) public void Subscriber(DateTime p)
{ {
Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}"); Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
} }
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.2.6" /> <PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="3.0.0-rc1.final" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
......
...@@ -34,8 +34,8 @@ namespace DotNetCore.CAP.MySql ...@@ -34,8 +34,8 @@ namespace DotNetCore.CAP.MySql
await connection.ExecuteAsync(sql, new await connection.ExecuteAsync(sql, new
{ {
Id = message.DbId, Id = message.DbId,
Retries = message.Retries, message.Retries,
ExpiresAt = message.ExpiresAt, message.ExpiresAt,
StatusName = state.ToString("G") StatusName = state.ToString("G")
}); });
} }
...@@ -50,8 +50,8 @@ namespace DotNetCore.CAP.MySql ...@@ -50,8 +50,8 @@ namespace DotNetCore.CAP.MySql
await connection.ExecuteAsync(sql, new await connection.ExecuteAsync(sql, new
{ {
Id = message.DbId, Id = message.DbId,
Retries = message.Retries, message.Retries,
ExpiresAt = message.ExpiresAt, message.ExpiresAt,
StatusName = state.ToString("G") StatusName = state.ToString("G")
}); });
} }
...@@ -61,7 +61,7 @@ namespace DotNetCore.CAP.MySql ...@@ -61,7 +61,7 @@ namespace DotNetCore.CAP.MySql
{ {
var sql = $"INSERT INTO `{_options.Value.TableNamePrefix}.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = $"INSERT INTO `{_options.Value.TableNamePrefix}.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage() var message = new MediumMessage
{ {
DbId = content.GetId(), DbId = content.GetId(),
Origin = content, Origin = content,
...@@ -75,9 +75,9 @@ namespace DotNetCore.CAP.MySql ...@@ -75,9 +75,9 @@ namespace DotNetCore.CAP.MySql
Id = message.DbId, Id = message.DbId,
Name = name, Name = name,
Content = StringSerializer.Serialize(message.Origin), Content = StringSerializer.Serialize(message.Origin),
Retries = message.Retries, message.Retries,
Added = message.Added, message.Added,
ExpiresAt = message.ExpiresAt, message.ExpiresAt,
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
...@@ -103,37 +103,56 @@ namespace DotNetCore.CAP.MySql ...@@ -103,37 +103,56 @@ namespace DotNetCore.CAP.MySql
return message; return message;
} }
public Task<MediumMessage> StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default) public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{ {
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage() using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql, new
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
}
}
public Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var mdMessage = new MediumMessage
{ {
DbId = SnowflakeId.Default().NextId().ToString(), DbId = SnowflakeId.Default().NextId().ToString(),
Origin = content, Origin = message,
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
}; };
var content = StringSerializer.Serialize(mdMessage.Origin);
var po = new
{
Id = message.DbId,
Group = group,
Name = name,
Content = StringSerializer.Serialize(message.Origin),
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
};
using (var connection = new MySqlConnection(_options.Value.ConnectionString)) using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{ {
connection.Execute(sql, po);
connection.Execute(sql, new
{
Id = mdMessage.DbId,
Group = group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
} }
return Task.FromResult(message); return Task.FromResult(mdMessage);
} }
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
...@@ -157,7 +176,7 @@ namespace DotNetCore.CAP.MySql ...@@ -157,7 +176,7 @@ namespace DotNetCore.CAP.MySql
var reader = await connection.ExecuteReaderAsync(sql); var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read()) while (reader.Read())
{ {
result.Add(new MediumMessage() result.Add(new MediumMessage
{ {
DbId = reader.GetInt64(0).ToString(), DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)), Origin = StringSerializer.DeSerialize(reader.GetString(3)),
...@@ -181,7 +200,7 @@ namespace DotNetCore.CAP.MySql ...@@ -181,7 +200,7 @@ namespace DotNetCore.CAP.MySql
var reader = await connection.ExecuteReaderAsync(sql); var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read()) while (reader.Read())
{ {
result.Add(new MediumMessage() result.Add(new MediumMessage
{ {
DbId = reader.GetInt64(0).ToString(), DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)), Origin = StringSerializer.DeSerialize(reader.GetString(3)),
......
...@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Abstractions
optionHeaders.Add(Headers.CorrelationSequence, 0.ToString()); optionHeaders.Add(Headers.CorrelationSequence, 0.ToString());
} }
optionHeaders.Add(Headers.MessageName, name); optionHeaders.Add(Headers.MessageName, name);
optionHeaders.Add(Headers.Type, typeof(T).AssemblyQualifiedName); optionHeaders.Add(Headers.Type, typeof(T).FullName);
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString()); optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
var message = new Message(optionHeaders, value); var message = new Message(optionHeaders, value);
...@@ -79,7 +79,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -79,7 +79,7 @@ namespace DotNetCore.CAP.Abstractions
{ {
var transaction = (CapTransactionBase)Transaction.Value; var transaction = (CapTransactionBase)Transaction.Value;
var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction, cancellationToken); var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction, cancellationToken);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Diagnostics;
...@@ -170,18 +172,44 @@ namespace DotNetCore.CAP ...@@ -170,18 +172,44 @@ namespace DotNetCore.CAP
var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
var message = await _serializer.DeserializeAsync(transportMessage, type); Message message;
try
{
message = await _serializer.DeserializeAsync(transportMessage, type);
}
catch (Exception e)
{
transportMessage.Headers.Add(Headers.Exception, e.Message);
var dataUri = $"data:{transportMessage.Headers[Headers.Type]};base64," + Convert.ToBase64String(transportMessage.Body);
message = new Message(transportMessage.Headers, dataUri);
}
var mediumMessage = await _storage.StoreMessageAsync(name, group, message); if (message.HasException())
{
var content = StringSerializer.Serialize(message);
await _storage.StoreReceivedExceptionMessageAsync(name, group, content);
client.Commit(); client.Commit();
if (operationId != null) if (operationId != null)
{ {
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed); TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}
} }
else
{
var mediumMessage = await _storage.StoreReceivedMessageAsync(name, group, message);
mediumMessage.Origin = message;
_dispatcher.EnqueueToExecute(mediumMessage, executor); client.Commit();
if (operationId != null)
{
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}
_dispatcher.EnqueueToExecute(mediumMessage, executor);
}
} }
catch (Exception e) catch (Exception e)
{ {
......
...@@ -23,5 +23,7 @@ ...@@ -23,5 +23,7 @@
public const string Group = "cap-msg-group"; public const string Group = "cap-msg-group";
public const string SentTime = "cap-senttime"; public const string SentTime = "cap-senttime";
public const string Exception = "cap-exception";
} }
} }
...@@ -51,6 +51,11 @@ namespace DotNetCore.CAP.Messages ...@@ -51,6 +51,11 @@ namespace DotNetCore.CAP.Messages
return 0; return 0;
} }
public static bool HasException(this Message message)
{
return message.Headers.ContainsKey(Headers.Exception);
}
} }
} }
\ No newline at end of file
...@@ -15,7 +15,9 @@ namespace DotNetCore.CAP.Persistence ...@@ -15,7 +15,9 @@ namespace DotNetCore.CAP.Persistence
Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default); Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default);
Task<MediumMessage> StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default); Task StoreReceivedExceptionMessageAsync(string name, string group, string content);
Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message content);
Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default); Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment