Unverified Commit 58e8d63e authored by hamish-omny's avatar hamish-omny Committed by GitHub

Ensure that _activeMessage is always cleared, including if an exception is...

Ensure that _activeMessage is always cleared, including if an exception is thrown while trying to send the message or flush the connection. Leaving _activeMessage set causes WriteMessageInsideLock to always return a "NoConnectionAvailable" error indefinitely (#1374)
Co-authored-by: 's avatarNick Craver <craver@stackoverflow.com>
parent 4f58848a
...@@ -737,12 +737,15 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical ...@@ -737,12 +737,15 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
#pragma warning restore CS0618 #pragma warning restore CS0618
} }
UnmarkActiveMessage(message);
physical.SetIdle(); physical.SetIdle();
return result; return result;
} }
catch (Exception ex) { return HandleWriteException(message, ex); } catch (Exception ex) { return HandleWriteException(message, ex); }
finally { token.Dispose(); } finally
{
UnmarkActiveMessage(message);
token.Dispose();
}
} }
...@@ -887,7 +890,6 @@ private void ProcessBacklog() ...@@ -887,7 +890,6 @@ private void ProcessBacklog()
} }
_backlogStatus = BacklogStatus.MarkingInactive; _backlogStatus = BacklogStatus.MarkingInactive;
UnmarkActiveMessage(message);
if (result != WriteResult.Success) if (result != WriteResult.Success)
{ {
_backlogStatus = BacklogStatus.RecordingWriteFailure; _backlogStatus = BacklogStatus.RecordingWriteFailure;
...@@ -901,6 +903,10 @@ private void ProcessBacklog() ...@@ -901,6 +903,10 @@ private void ProcessBacklog()
_backlogStatus = BacklogStatus.RecordingFault; _backlogStatus = BacklogStatus.RecordingFault;
HandleWriteException(message, ex); HandleWriteException(message, ex);
} }
finally
{
UnmarkActiveMessage(message);
}
} }
_backlogStatus = BacklogStatus.SettingIdle; _backlogStatus = BacklogStatus.SettingIdle;
physical.SetIdle(); physical.SetIdle();
...@@ -985,8 +991,7 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -985,8 +991,7 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
result = flush.Result; // we know it was completed, this is fine result = flush.Result; // we know it was completed, this is fine
} }
UnmarkActiveMessage(message);
physical.SetIdle(); physical.SetIdle();
return new ValueTask<WriteResult>(result); return new ValueTask<WriteResult>(result);
...@@ -994,12 +999,17 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -994,12 +999,17 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); } catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); }
finally finally
{ {
if (releaseLock & token.Success) if (token.Success)
{ {
UnmarkActiveMessage(message);
if (releaseLock)
{
#if DEBUG #if DEBUG
RecordLockDuration(lockTaken); RecordLockDuration(lockTaken);
#endif #endif
token.Dispose(); token.Dispose();
}
} }
} }
} }
...@@ -1029,8 +1039,7 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va ...@@ -1029,8 +1039,7 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
{ {
result = await physical.FlushAsync(false).ForAwait(); result = await physical.FlushAsync(false).ForAwait();
} }
UnmarkActiveMessage(message);
physical.SetIdle(); physical.SetIdle();
#if DEBUG #if DEBUG
...@@ -1043,6 +1052,10 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va ...@@ -1043,6 +1052,10 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
{ {
return HandleWriteException(message, ex); return HandleWriteException(message, ex);
} }
finally
{
UnmarkActiveMessage(message);
}
} }
private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message, int lockTaken) private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message, int lockTaken)
...@@ -1052,7 +1065,6 @@ private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken ...@@ -1052,7 +1065,6 @@ private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken
try try
{ {
var result = await flush.ForAwait(); var result = await flush.ForAwait();
UnmarkActiveMessage(message);
physical.SetIdle(); physical.SetIdle();
return result; return result;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment