Commit 4ebacd62 authored by Marc Gravell's avatar Marc Gravell

optionally re-add the "write on async/await path" code rather than relying on...

optionally re-add the "write on async/await path" code rather than relying on the backlog; add supporting code for both; heartbeat should also help clear down anything in the backlog
parent 1ad2f95e
...@@ -489,6 +489,8 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -489,6 +489,8 @@ internal void OnHeartbeat(bool ifConnectedOnly)
bool runThisTime = false; bool runThisTime = false;
try try
{ {
CheckBacklogForTimeouts();
runThisTime = !isDisposed && Interlocked.CompareExchange(ref beating, 1, 0) == 0; runThisTime = !isDisposed && Interlocked.CompareExchange(ref beating, 1, 0) == 0;
if (!runThisTime) return; if (!runThisTime) return;
...@@ -687,28 +689,13 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical ...@@ -687,28 +689,13 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
{ {
// we can't get it *instantaneously*; is there // we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor? // perhaps a backlog and active backlog processor?
bool haveBacklog; if (PushToBacklog(message, onlyIfExists: true)) return WriteResult.Success; // queued counts as success
lock (_backlog)
{
haveBacklog = _backlog.Count != 0;
}
if (haveBacklog)
{
PushToBacklog(message);
return WriteResult.Success; // queued counts as success
}
// no backlog... try to wait with the timeout; // no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as // if we *still* can't get it: that counts as
// an actual timeout // an actual timeout
token = _singleWriterMutex.TryWait(); token = _singleWriterMutex.TryWait();
if (!token.Success) if (!token.Success) return TimedOutBeforeWrite(message);
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
message.Complete();
return WriteResult.TimeoutBeforeWrite;
}
} }
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
...@@ -730,15 +717,18 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical ...@@ -730,15 +717,18 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
} }
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private void PushToBacklog(Message message) private bool PushToBacklog(Message message, bool onlyIfExists)
{ {
bool startWorker; bool wasEmpty;
lock (_backlog) lock (_backlog)
{ {
startWorker = _backlog.Count == 0; wasEmpty = _backlog.Count == 0;
if (wasEmpty & onlyIfExists) return false;
_backlog.Enqueue(message); _backlog.Enqueue(message);
} }
if (startWorker) StartBacklogProcessor(); if (wasEmpty) StartBacklogProcessor();
return true;
} }
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private void StartBacklogProcessor() private void StartBacklogProcessor()
...@@ -754,6 +744,27 @@ private void StartBacklogProcessor() ...@@ -754,6 +744,27 @@ private void StartBacklogProcessor()
if (bridge != null) bridge.ProcessBacklog(); if (bridge != null) bridge.ProcessBacklog();
}; };
private void CheckBacklogForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
{
lock (_backlog)
{
var now = Environment.TickCount;
var timeout = TimeoutMilliseconds;
while (_backlog.Count != 0)
{
var message = _backlog.Peek();
if (message.IsInternalCall) break; // don't stomp these (not that they should have the async timeout flag, but...)
if (!message.HasAsyncTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking
_backlog.Dequeue(); // consume it for real
// tell the message that it failed
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
}
}
}
private void ProcessBacklog() private void ProcessBacklog()
{ {
LockToken token = default; LockToken token = default;
...@@ -780,7 +791,7 @@ private void ProcessBacklog() ...@@ -780,7 +791,7 @@ private void ProcessBacklog()
try try
{ {
if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var elapsed)) if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var _))
{ {
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint); var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this); message.SetExceptionAndComplete(ex, this);
...@@ -817,6 +828,14 @@ private void ProcessBacklog() ...@@ -817,6 +828,14 @@ private void ProcessBacklog()
} }
} }
private WriteResult TimedOutBeforeWrite(Message message)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
message.Complete();
return WriteResult.TimeoutBeforeWrite;
}
/// <summary> /// <summary>
/// This writes a message to the output stream /// This writes a message to the output stream
/// </summary> /// </summary>
...@@ -824,10 +843,22 @@ private void ProcessBacklog() ...@@ -824,10 +843,22 @@ private void ProcessBacklog()
/// <param name="message">The message to be written.</param> /// <param name="message">The message to be written.</param>
internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnection physical, Message message) internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnection physical, Message message)
{ {
/* design decision/choice; the code works fine either way, but if this is
* set to *true*, then when we can't take the writer-lock *right away*,
* we push the message to the backlog (starting a worker if needed)
*
* otherwise, we go for a TryWaitAsync and rely on the await machinery
*
* "true" seems to give faster times *when under heavy contention*, based on profiling
* but it involves the backlog concept; "false" works well under low contention, and
* makes more use of async
*/
const bool ALWAYS_USE_BACKLOG_IF_CANNOT_GET_SYNC_LOCK = true;
Trace("Writing: " + message); Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point message.SetEnqueued(physical); // this also records the read/write stats at this point
bool releaseLock = true; bool releaseLock = true; // fine to default to true, as it doesn't matter until token is a "success"
LockToken token = default; LockToken token = default;
try try
{ {
...@@ -835,10 +866,21 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -835,10 +866,21 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
// note: timeout is specified in mutex-constructor // note: timeout is specified in mutex-constructor
token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay); token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay);
if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync) if (!token.Success)
{ {
PushToBacklog(message); // we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
if (PushToBacklog(message, onlyIfExists: !ALWAYS_USE_BACKLOG_IF_CANNOT_GET_SYNC_LOCK))
return new ValueTask<WriteResult>(WriteResult.Success); // queued counts as success return new ValueTask<WriteResult>(WriteResult.Success); // queued counts as success
// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
var pending = _singleWriterMutex.TryWaitAsync(options: WaitOptions.DisableAsyncContext);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingWriteLockAsync_Awaited(pending, physical, message);
token = pending.Result; // fine since we know we got a result
if (!token.Success) return new ValueTask<WriteResult>(TimedOutBeforeWrite(message));
} }
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
...@@ -867,6 +909,33 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -867,6 +909,33 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
} }
} }
private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(ValueTask<LockToken> pending, PhysicalConnection physical, Message message)
{
try
{
using (var token = await pending)
{
if (!token.Success) return TimedOutBeforeWrite(message);
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false);
}
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
}
catch (Exception ex)
{
return HandleWriteException(message, ex);
}
}
private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message) private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message)
{ {
using (lockToken) using (lockToken)
......
...@@ -15,6 +15,6 @@ ...@@ -15,6 +15,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="[2.0.545]" /> <!-- [1.2.6] for previous major --> <PackageReference Include="StackExchange.Redis" Version="[2.0.558]" /> <!-- [1.2.6] for previous major -->
</ItemGroup> </ItemGroup>
</Project> </Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment