Unverified Commit 68739410 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

WIP - investigate and resolve async-write-path bugs (#1057)

Replace SemaphoreSlim with MutexSlim, due to major impact from https://github.com/dotnet/corefx/issues/35393
parent a8040317
...@@ -2126,19 +2126,22 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process ...@@ -2126,19 +2126,22 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
return CompletedTask<T>.Default(state); return CompletedTask<T>.Default(state);
} }
if (message.IsFireAndForget) TaskCompletionSource<T> tcs = null;
ResultBox<T> source = null;
if (!message.IsFireAndForget)
{
tcs = TaskSource.Create<T>(state);
source = ResultBox<T>.Get(tcs);
}
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);
if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited<T>(this, write, tcs, message, server);
if (tcs == null)
{ {
TryPushMessageToBridgeAsync(message, processor, null, ref server);
return CompletedTask<T>.Default(null); // F+F explicitly does not get async-state return CompletedTask<T>.Default(null); // F+F explicitly does not get async-state
} }
else else
{ {
var tcs = TaskSource.Create<T>(state);
var source = ResultBox<T>.Get(tcs);
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);
if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited<T>(this, write, tcs, message, server);
var result = write.Result; var result = write.Result;
if (result != WriteResult.Success) if (result != WriteResult.Success)
{ {
...@@ -2157,7 +2160,7 @@ private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @ ...@@ -2157,7 +2160,7 @@ private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @
var ex = @this.GetException(result, message, server); var ex = @this.GetException(result, message, server);
ThrowFailed(tcs, ex); ThrowFailed(tcs, ex);
} }
return await tcs.Task.ForAwait(); return tcs == null ? default(T) : await tcs.Task.ForAwait();
} }
internal Exception GetException(WriteResult result, Message message, ServerEndPoint server) internal Exception GetException(WriteResult result, Message message, ServerEndPoint server)
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
using System.Threading; using System.Threading;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial.Threading;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState; using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
namespace StackExchange.Redis namespace StackExchange.Redis
...@@ -52,6 +53,8 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti ...@@ -52,6 +53,8 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString(); Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
completionManager = new CompletionManager(Multiplexer, Name); completionManager = new CompletionManager(Multiplexer, Name);
TimeoutMilliseconds = timeoutMilliseconds; TimeoutMilliseconds = timeoutMilliseconds;
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds,
scheduler: Multiplexer?.SocketManager?.SchedulerPool);
} }
private readonly int TimeoutMilliseconds; private readonly int TimeoutMilliseconds;
...@@ -634,7 +637,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave) ...@@ -634,7 +637,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
return true; return true;
} }
private readonly SemaphoreSlim _SingleWriterLock = new SemaphoreSlim(1); private readonly MutexSlim _singleWriterMutex;
private Message _activeMesssage; private Message _activeMesssage;
...@@ -681,34 +684,37 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -681,34 +684,37 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
} }
} }
private async Task<WriteResult> WriteMessageTakingDelayedWriteLockAsync(PhysicalConnection physical, Message message) private async ValueTask<WriteResult> WriteMessageTakingDelayedWriteLockAsync(MutexSlim.AwaitableLockToken pendingLock, PhysicalConnection physical, Message message)
{ {
bool haveLock = false;
try try
{ {
// WriteMessageTakingWriteLockAsync will have checked for immediate availability, // WriteMessageTakingWriteLockAsync will have checked for immediate availability,
// so this is the fallback case - fine to go straight to "await" // so this is the fallback case - fine to go straight to "await"
haveLock = await _SingleWriterLock.WaitAsync(TimeoutMilliseconds).ForAwait();
if (!haveLock) // note: timeout is specified in mutex-constructor
using (var token = await pendingLock)
{ {
message.Cancel(); if (!token.Success)
Multiplexer?.OnMessageFaulted(message, null); {
this.CompleteSyncOrAsync(message); message.Cancel();
return WriteResult.TimeoutBeforeWrite; Multiplexer?.OnMessageFaulted(message, null);
} this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success) if (result == WriteResult.Success)
{ {
result = await physical.FlushAsync(false).ForAwait(); result = await physical.FlushAsync(false).ForAwait();
} }
physical.SetIdle(); physical.SetIdle();
return result; UnmarkActiveMessage(message);
return result;
}
} }
catch (Exception ex) { return HandleWriteException(message, ex); } catch (Exception ex) { return HandleWriteException(message, ex); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
} }
[Obsolete("prefer async")] [Obsolete("prefer async")]
...@@ -717,32 +723,33 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical ...@@ -717,32 +723,33 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
Trace("Writing: " + message); Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point message.SetEnqueued(physical); // this also records the read/write stats at this point
bool haveLock = false;
try try
{ {
haveLock = _SingleWriterLock.Wait(TimeoutMilliseconds); using (var token = _singleWriterMutex.TryWait())
if (!haveLock)
{ {
message.Cancel(); if (!token.Success)
Multiplexer?.OnMessageFaulted(message, null); {
this.CompleteSyncOrAsync(message); message.Cancel();
return WriteResult.TimeoutBeforeWrite; Multiplexer?.OnMessageFaulted(message, null);
} this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success) if (result == WriteResult.Success)
{ {
#pragma warning disable CS0618 #pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds); result = physical.FlushSync(false, TimeoutMilliseconds);
#pragma warning restore CS0618 #pragma warning restore CS0618
} }
physical.SetIdle(); UnmarkActiveMessage(message);
return result; physical.SetIdle();
return result;
}
} }
catch (Exception ex) { return HandleWriteException(message, ex); } catch (Exception ex) { return HandleWriteException(message, ex); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
} }
/// <summary> /// <summary>
...@@ -755,12 +762,24 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -755,12 +762,24 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
Trace("Writing: " + message); Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point message.SetEnqueued(physical); // this also records the read/write stats at this point
bool haveLock = false; bool releaseLock = false;
MutexSlim.LockToken token = default;
try try
{ {
// try to acquire it synchronously // try to acquire it synchronously
haveLock = _SingleWriterLock.Wait(0); // note: timeout is specified in mutex-constructor
if (!haveLock) return new ValueTask<WriteResult>(WriteMessageTakingDelayedWriteLockAsync(physical, message)); var pending = _singleWriterMutex.TryWaitAsync(options: MutexSlim.WaitOptions.DisableAsyncContext);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingDelayedWriteLockAsync(pending, physical, message);
releaseLock = true;
token = pending.GetResult(); // we can't use "using" for this, because we might not want to kill it yet
if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return new ValueTask<WriteResult>(WriteResult.TimeoutBeforeWrite);
}
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
...@@ -769,25 +788,38 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -769,25 +788,38 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
var flush = physical.FlushAsync(false); var flush = physical.FlushAsync(false);
if (!flush.IsCompletedSuccessfully) if (!flush.IsCompletedSuccessfully)
{ {
haveLock = false; // so we don't release prematurely releaseLock = false; // so we don't release prematurely
return new ValueTask<WriteResult>(CompleteWriteAndReleaseLockAsync(flush, message)); return CompleteWriteAndReleaseLockAsync(token, flush, message);
} }
result = flush.Result; // we know it was completed, this is fine result = flush.Result; // we know it was completed, this is fine
} }
UnmarkActiveMessage(message);
physical.SetIdle(); physical.SetIdle();
return new ValueTask<WriteResult>(result); return new ValueTask<WriteResult>(result);
} }
catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); } catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); }
finally { if (haveLock) ReleaseSingleWriterLock(message); } finally
{
if (releaseLock) token.Dispose();
}
} }
private async Task<WriteResult> CompleteWriteAndReleaseLockAsync(ValueTask<WriteResult> flush, Message message) private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(MutexSlim.LockToken lockToken, ValueTask<WriteResult> flush, Message message)
{ {
try { return await flush.ForAwait(); } using (lockToken)
catch (Exception ex) { return HandleWriteException(message, ex); } {
finally { ReleaseSingleWriterLock(message); } try
{
var result = await flush.ForAwait();
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
catch (Exception ex) { return HandleWriteException(message, ex); }
}
} }
private WriteResult HandleWriteException(Message message, Exception ex) private WriteResult HandleWriteException(Message message, Exception ex)
...@@ -797,11 +829,9 @@ private WriteResult HandleWriteException(Message message, Exception ex) ...@@ -797,11 +829,9 @@ private WriteResult HandleWriteException(Message message, Exception ex)
return WriteResult.WriteFailure; return WriteResult.WriteFailure;
} }
private void ReleaseSingleWriterLock(Message message) [MethodImpl(MethodImplOptions.AggressiveInlining)]
{ private void UnmarkActiveMessage(Message message)
Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us => Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us
_SingleWriterLock.Release();
}
private State ChangeState(State newState) private State ChangeState(State newState)
{ {
......
...@@ -217,9 +217,9 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in ...@@ -217,9 +217,9 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
else else
{ {
unableToConnectError = true; unableToConnectError = true;
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "; err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
+ PerfCounterHelper.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
} }
err += PerfCounterHelper.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
} }
} }
} }
......
...@@ -8,8 +8,8 @@ internal sealed class ServerSelectionStrategy ...@@ -8,8 +8,8 @@ internal sealed class ServerSelectionStrategy
{ {
public const int NoSlot = -1, MultipleSlots = -2; public const int NoSlot = -1, MultipleSlots = -2;
private const int RedisClusterSlotCount = 16384; private const int RedisClusterSlotCount = 16384;
private static readonly ushort[] crc16tab = private static ReadOnlySpan<ushort> s_crc16tab => new ushort[]
{ { // this syntax allows a special-case population implementation by the compiler/JIT
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
...@@ -72,6 +72,7 @@ private static unsafe int GetClusterSlot(in RedisKey key) ...@@ -72,6 +72,7 @@ private static unsafe int GetClusterSlot(in RedisKey key)
{ {
var blob = (byte[])key; var blob = (byte[])key;
fixed (byte* ptr = blob) fixed (byte* ptr = blob)
fixed (ushort* crc16tab = s_crc16tab)
{ {
int offset = 0, count = blob.Length, start, end; int offset = 0, count = blob.Length, start, end;
if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0 if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.9" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.18" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.1" /> <PackageReference Include="System.IO.Pipelines" Version="4.5.1" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
using System; using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using StackExchange.Redis; using StackExchange.Redis;
namespace TestConsole namespace TestConsole
{ {
internal static class Program internal static class Program
{ {
public static async Task Main() public static void Main()
{ {
using (var muxer = await ConnectionMultiplexer.ConnectAsync("127.0.0.1"))
{
var db = muxer.GetDatabase();
var sub = muxer.GetSubscriber();
Console.WriteLine("subscribing");
ChannelMessageQueue queue = await sub.SubscribeAsync("yolo");
Console.WriteLine("subscribed");
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment