Commit 6cae200e authored by yangxiaodong's avatar yangxiaodong

fix bug.

parent 0c902940
...@@ -25,12 +25,13 @@ namespace DotNetCore.CAP.SqlServer ...@@ -25,12 +25,13 @@ namespace DotNetCore.CAP.SqlServer
return new SqlServerStorageTransaction(this); return new SqlServerStorageTransaction(this);
} }
public Task<CapPublishedMessage> GetPublishedMessageAsync(int id) public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{ {
var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString)) using (var connection = new SqlConnection(_options.ConnectionString))
{ {
return connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
} }
} }
...@@ -56,7 +57,7 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; ...@@ -56,7 +57,7 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
// CapReceviedMessage // CapReceviedMessage
public Task StoreReceivedMessageAsync(CapReceivedMessage message) public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
...@@ -66,16 +67,16 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -66,16 +67,16 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(_options.ConnectionString)) using (var connection = new SqlConnection(_options.ConnectionString))
{ {
return connection.ExecuteAsync(sql, message); await connection.ExecuteAsync(sql, message);
} }
} }
public Task<CapReceivedMessage> GetReceivedMessageAsync(int id) public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{ {
var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString)) using (var connection = new SqlConnection(_options.ConnectionString))
{ {
return connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
} }
} }
...@@ -94,26 +95,30 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; ...@@ -94,26 +95,30 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{ {
using (var connection = new SqlConnection(_options.ConnectionString)) //here don't use `using` to dispose
var connection = new SqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (SqlException)
{ {
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted)) transaction.Dispose();
{ throw;
try
{
var fetched = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
if (fetched == null)
return null;
return new SqlServerFetchedMessage(fetched.MessageId, fetched.MessageType, connection, transaction);
}
catch (Exception)
{
transaction.Rollback();
return null;
}
}
} }
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
} }
} }
} }
\ No newline at end of file
...@@ -20,6 +20,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -20,6 +20,7 @@ namespace DotNetCore.CAP.SqlServer
_schema = options.Schema; _schema = options.Schema;
_dbConnection = new SqlConnection(options.ConnectionString); _dbConnection = new SqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
} }
...@@ -27,16 +28,16 @@ namespace DotNetCore.CAP.SqlServer ...@@ -27,16 +28,16 @@ namespace DotNetCore.CAP.SqlServer
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE [{_schema}].[Published] SET [ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message); _dbConnection.Execute(sql, message, _dbTransaction);
} }
public void UpdateMessage(CapReceivedMessage message) public void UpdateMessage(CapReceivedMessage message)
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE [{_schema}].[Received] SET [ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message); _dbConnection.Execute(sql, message, _dbTransaction);
} }
public void EnqueueMessage(CapPublishedMessage message) public void EnqueueMessage(CapPublishedMessage message)
...@@ -44,7 +45,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -44,7 +45,7 @@ namespace DotNetCore.CAP.SqlServer
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);"; var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }); _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
} }
public void EnqueueMessage(CapReceivedMessage message) public void EnqueueMessage(CapReceivedMessage message)
...@@ -52,7 +53,7 @@ namespace DotNetCore.CAP.SqlServer ...@@ -52,7 +53,7 @@ namespace DotNetCore.CAP.SqlServer
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);"; var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }); _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
} }
public Task CommitAsync() public Task CommitAsync()
......
...@@ -113,7 +113,7 @@ namespace DotNetCore.CAP ...@@ -113,7 +113,7 @@ namespace DotNetCore.CAP
var messageStore = provider.GetRequiredService<IStorageConnection>(); var messageStore = provider.GetRequiredService<IStorageConnection>();
var receivedMessage = new CapReceivedMessage(messageContext) var receivedMessage = new CapReceivedMessage(messageContext)
{ {
StatusName = StatusName.Enqueued, StatusName = StatusName.Scheduled,
}; };
messageStore.StoreReceivedMessageAsync(receivedMessage).Wait(); messageStore.StoreReceivedMessageAsync(receivedMessage).Wait();
return receivedMessage; return receivedMessage;
......
...@@ -24,56 +24,53 @@ namespace DotNetCore.CAP ...@@ -24,56 +24,53 @@ namespace DotNetCore.CAP
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{ {
using (fetched) var message = await connection.GetPublishedMessageAsync(fetched.MessageId);
try
{ {
var message = await connection.GetPublishedMessageAsync(fetched.MessageId); var sp = Stopwatch.StartNew();
try await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0) if (message.Retries > 0)
{ {
_logger.JobRetrying(message.Retries); _logger.JobRetrying(message.Retries);
} }
var result = await PublishAsync(message.Name, message.Content); var result = await PublishAsync(message.Name, message.Content);
sp.Stop(); sp.Stop();
var newState = default(IState); var newState = default(IState);
if (!result.Succeeded) if (!result.Succeeded)
{
var shouldRetry = await UpdateJobForRetryAsync(message, connection);
if (shouldRetry)
{ {
var shouldRetry = await UpdateJobForRetryAsync(message, connection); newState = new ScheduledState();
if (shouldRetry) _logger.JobFailedWillRetry(result.Exception);
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
} }
else else
{ {
newState = new SucceededState(); newState = new FailedState();
_logger.JobFailed(result.Exception);
} }
await _stateChanger.ChangeStateAsync(message, newState, connection); }
else
fetched.RemoveFromQueue(); {
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
if (result.Succeeded) fetched.RemoveFromQueue();
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
return OperateResult.Success; if (result.Succeeded)
}
catch (Exception ex)
{ {
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); _logger.JobExecuted(sp.Elapsed.TotalSeconds);
return OperateResult.Failed(ex);
} }
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex);
} }
} }
...@@ -81,7 +78,7 @@ namespace DotNetCore.CAP ...@@ -81,7 +78,7 @@ namespace DotNetCore.CAP
{ {
var retryBehavior = RetryBehavior.DefaultRetry; var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.UtcNow; var now = DateTime.Now;
var retries = ++message.Retries; var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount) if (retries >= retryBehavior.RetryCount)
{ {
......
...@@ -37,61 +37,58 @@ namespace DotNetCore.CAP ...@@ -37,61 +37,58 @@ namespace DotNetCore.CAP
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{ {
using (fetched) var message = await connection.GetReceivedMessageAsync(fetched.MessageId);
try
{ {
var message = await connection.GetReceivedMessageAsync(fetched.MessageId); var sp = Stopwatch.StartNew();
try await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0) if (message.Retries > 0)
{ {
_logger.JobRetrying(message.Retries); _logger.JobRetrying(message.Retries);
} }
var result = await ExecuteSubscribeAsync(message); var result = await ExecuteSubscribeAsync(message);
sp.Stop(); sp.Stop();
var newState = default(IState); var newState = default(IState);
if (!result.Succeeded) if (!result.Succeeded)
{
var shouldRetry = await UpdateJobForRetryAsync(message, connection);
if (shouldRetry)
{ {
var shouldRetry = await UpdateJobForRetryAsync(message, connection); newState = new ScheduledState();
if (shouldRetry) _logger.JobFailedWillRetry(result.Exception);
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
} }
else else
{ {
newState = new SucceededState(); newState = new FailedState();
} _logger.JobFailed(result.Exception);
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
} }
return OperateResult.Success;
} }
catch (SubscriberNotFoundException ex) else
{ {
_logger.LogError(ex.Message); newState = new SucceededState();
return OperateResult.Failed(ex);
} }
catch (Exception ex) await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{ {
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); _logger.JobExecuted(sp.Elapsed.TotalSeconds);
return OperateResult.Failed(ex);
} }
return OperateResult.Success;
}
catch (SubscriberNotFoundException ex)
{
_logger.LogError(ex.Message);
return OperateResult.Failed(ex);
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex);
} }
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.SqlServer.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment