Unverified Commit 4ae78480 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Perf regression (#1076)

* compare baseline to 1.2.6

* remove the entire concept of the completion manager; it just doesn't make sense any more; everything async and external facing should be via the TP

* use "in" with ForAwait to avoid some extra copies

* experimental "backlog queue" approach

* Cleanup and de-dupe timeout exception data

* WriteMessageTakingWriteLockSync should consider backlog

* don't allocate all those strings
parent 0b75c7c3
using System;
using System.Threading;
namespace StackExchange.Redis
{
internal static class CompletionManagerHelpers
{
public static void CompleteSyncOrAsync(this PhysicalBridge bridge, ICompletable operation)
=> CompletionManager.CompleteSyncOrAsyncImpl(bridge?.completionManager, operation);
public static void IncrementSyncCount(this PhysicalBridge bridge)
=> bridge?.completionManager?.IncrementSyncCount();
public static void CompleteAsync(this PhysicalBridge bridge, ICompletable operation)
=> CompletionManager.CompleteAsync(bridge?.completionManager, operation);
public static void CompleteSyncOrAsync(this CompletionManager manager, ICompletable operation)
=> CompletionManager.CompleteSyncOrAsyncImpl(manager, operation);
}
internal sealed partial class CompletionManager
{
internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, ICompletable operation)
{
if (operation == null) return;
if (manager != null) manager.PerInstanceCompleteSyncOrAsync(operation);
else SharedCompleteSyncOrAsync(operation);
}
internal void IncrementSyncCount() => Interlocked.Increment(ref completedSync);
internal static void CompleteAsync(CompletionManager manager, ICompletable operation)
{
var sched = manager.multiplexer.SocketManager;
if (sched != null)
{
sched.ScheduleTask(s_AnyOrderCompletionHandler, operation);
Interlocked.Increment(ref manager.completedAsync);
}
else
{
SocketManager.Shared.ScheduleTask(s_AnyOrderCompletionHandler, operation);
}
}
private readonly ConnectionMultiplexer multiplexer;
private readonly string name;
private long completedSync, completedAsync, failedAsync;
public CompletionManager(ConnectionMultiplexer multiplexer, string name)
{
this.multiplexer = multiplexer ?? throw new ArgumentNullException(nameof(multiplexer));
this.name = name;
}
private static void SharedCompleteSyncOrAsync(ICompletable operation)
{
if (!operation.TryComplete(false))
{
SocketManager.Shared.ScheduleTask(s_AnyOrderCompletionHandler, operation);
}
}
private void PerInstanceCompleteSyncOrAsync(ICompletable operation)
{
if (operation == null) { }
else if (operation.TryComplete(false))
{
multiplexer.Trace("Completed synchronously: " + operation, name);
Interlocked.Increment(ref completedSync);
}
else
{
multiplexer.Trace("Using thread-pool for asynchronous completion", name);
(multiplexer.SocketManager ?? SocketManager.Shared).ScheduleTask(s_AnyOrderCompletionHandler, operation);
Interlocked.Increment(ref completedAsync); // k, *technically* we haven't actually completed this yet, but: close enough
}
}
internal void GetCounters(ConnectionCounters counters)
{
counters.CompletedSynchronously = Interlocked.Read(ref completedSync);
counters.CompletedAsynchronously = Interlocked.Read(ref completedAsync);
counters.FailedAsynchronously = Interlocked.Read(ref failedAsync);
}
private static readonly Action<object> s_AnyOrderCompletionHandler = AnyOrderCompletionHandler;
private static void AnyOrderCompletionHandler(object state)
{
try
{
ConnectionMultiplexer.TraceWithoutContext("Completing async (any order): " + state);
((ICompletable)state).TryComplete(true);
}
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext("Async completion error: " + ex.Message);
}
}
}
}
......@@ -51,10 +51,7 @@ void ICompletable.AppendStormLog(StringBuilder sb)
else sb.Append(Format.ToString(EndPoint));
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
/// <summary>
/// Returns the physical name of the connection.
......
......@@ -79,7 +79,6 @@ public ServerCounters GetCounters()
{
counters.Add(snapshot[i].GetCounters());
}
UnprocessableCompletionManager.GetCounters(counters.Other);
return counters;
}
......@@ -153,9 +152,8 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp
var handler = ConnectionFailed;
if (handler != null)
{
UnprocessableCompletionManager.CompleteSyncOrAsync(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception, physicalName)
);
ConnectionMultiplexer.CompleteAsWorker(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception, physicalName));
}
if (reconfigure)
{
......@@ -172,9 +170,8 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con
var handler = InternalError;
if (handler != null)
{
UnprocessableCompletionManager.CompleteSyncOrAsync(
new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin)
);
ConnectionMultiplexer.CompleteAsWorker(
new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin));
}
}
catch
......@@ -188,9 +185,8 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT
var handler = ConnectionRestored;
if (handler != null)
{
UnprocessableCompletionManager.CompleteSyncOrAsync(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null, physicalName)
);
ConnectionMultiplexer.CompleteAsWorker(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null, physicalName));
}
ReconfigureIfNeeded(endpoint, false, "connection restored");
}
......@@ -200,9 +196,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs
if (_isDisposed) return;
if (handler != null)
{
UnprocessableCompletionManager.CompleteSyncOrAsync(
new EndPointEventArgs(handler, this, endpoint)
);
ConnectionMultiplexer.CompleteAsWorker(new EndPointEventArgs(handler, this, endpoint));
}
}
......@@ -219,7 +213,7 @@ internal void OnErrorMessage(EndPoint endpoint, string message)
var handler = ErrorMessage;
if (handler != null)
{
UnprocessableCompletionManager.CompleteSyncOrAsync(
ConnectionMultiplexer.CompleteAsWorker(
new RedisErrorEventArgs(handler, this, endpoint, message)
);
}
......@@ -748,12 +742,8 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
{
var handler = HashSlotMoved;
if (handler != null)
{
UnprocessableCompletionManager.CompleteSyncOrAsync(
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new)
);
}
if (handler != null) ConnectionMultiplexer.CompleteAsWorker(
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new));
}
/// <summary>
......@@ -1055,7 +1045,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
AsyncTimeoutMilliseconds = configuration.AsyncTimeout;
OnCreateReaderWriter(configuration);
UnprocessableCompletionManager = new CompletionManager(this, "multiplexer");
ServerSelectionStrategy = new ServerSelectionStrategy(this);
var configChannel = configuration.ConfigurationChannel;
......@@ -1150,8 +1139,6 @@ internal long LastHeartbeatSecondsAgo
internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - Thread.VolatileRead(ref lastGlobalHeartbeatTicks)) / 1000;
internal CompletionManager UnprocessableCompletionManager { get; }
/// <summary>
/// Obtain a pub/sub subscriber connection to the specified server
/// </summary>
......@@ -2170,27 +2157,7 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo
case WriteResult.NoConnectionAvailable:
return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
case WriteResult.TimeoutBeforeWrite:
string bridgeCounters = null, connectionState = null;
try
{
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{
connectionState = (sentDelta >= 0 && receivedDelta >= 0) // these might not always be available
? $", state={state}, outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB"
: $", state={state}";
}
var bridge = server.GetBridge(message.Command, false);
if (bridge != null)
{
var active = bridge.GetActiveMessage();
bridge.GetOutstandingCount(out var inst, out var qs, out var @in);
bridgeCounters = $", inst={inst}, qs={qs}, in={@in}, active={active}";
}
}
catch { }
return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent ("
+ Format.ToString(TimeoutMilliseconds) + "ms" + connectionState + bridgeCounters + ")", message, server);
return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent", message, server, result);
case WriteResult.WriteFailure:
default:
return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server);
......
using System;
using System;
using System.Net;
using System.Text;
......@@ -30,9 +30,6 @@ void ICompletable.AppendStormLog(StringBuilder sb)
else sb.Append(Format.ToString(EndPoint));
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
}
......@@ -181,7 +181,7 @@ internal static string GetLibVersion()
}
return _libVersion;
}
internal static Exception Timeout(ConnectionMultiplexer mutiplexer, string baseErrorMessage, Message message, ServerEndPoint server)
internal static Exception Timeout(ConnectionMultiplexer mutiplexer, string baseErrorMessage, Message message, ServerEndPoint server, WriteResult? result = null)
{
List<Tuple<string, string>> data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
var sb = new StringBuilder();
......@@ -203,12 +203,38 @@ void add(string lk, string sk, string v)
}
}
// Add timeout data, if we have it
if (result == WriteResult.TimeoutBeforeWrite)
{
add("Timeout", "timeout", Format.ToString(mutiplexer.TimeoutMilliseconds));
try
{
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{
add("PhysicalState", "phys", state.ToString());
// these might not always be available
if (sentDelta >= 0)
{
add("OutboundDeltaKB", "outbound", $"{sentDelta >> 10}KiB");
}
if (receivedDelta >= 0)
{
add("InboundDeltaKB", "inbound", $"{receivedDelta >> 10}KiB");
}
}
}
catch { }
}
// Add server data, if we have it
if (server != null)
{
server.GetOutstandingCount(message.Command, out int inst, out int qs, out int @in);
add("Instantaneous", "inst", inst.ToString());
server.GetOutstandingCount(message.Command, out int inst, out int qs, out int @in, out int qu);
add("OpsSinceLastHeartbeat", "inst", inst.ToString());
add("Queue-Awaiting-Write", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString());
if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString());
if (@in >= 0) add("Socket-Inbound-Bytes", "in", @in.ToString());
if (mutiplexer.StormLogThreshold >= 0 && qs >= mutiplexer.StormLogThreshold && Interlocked.CompareExchange(ref mutiplexer.haveStormLog, 1, 0) == 0)
{
......
......@@ -37,10 +37,7 @@ public sealed class HashSlotMovedEventArgs : EventArgs, ICompletable
NewEndPoint = @new;
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
void ICompletable.AppendStormLog(StringBuilder sb)
{
......
using System;
using System;
using System.Net;
using System.Text;
......@@ -47,9 +47,6 @@ void ICompletable.AppendStormLog(StringBuilder sb)
if (EndPoint != null) sb.Append(", ").Append(Format.ToString(EndPoint));
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
}
......@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
......@@ -441,32 +442,17 @@ public override string ToString()
public void SetResponseReceived() => performance?.SetResponseReceived();
public bool TryComplete(bool isAsync)
bool ICompletable.TryComplete(bool isAsync) { Complete(); return true; }
public void Complete()
{
//Ensure we can never call TryComplete on the same resultBox from two threads by grabbing it now
//Ensure we can never call Complete on the same resultBox from two threads by grabbing it now
var currBox = Interlocked.Exchange(ref resultBox, null);
if (!isAsync)
{ // set the performance completion the first chance we get (sync comes first)
// set the completion/performance data
performance?.SetCompleted();
}
if (currBox != null)
{
var ret = currBox.TryComplete(isAsync);
//in async mode TryComplete will have unwrapped and recycled resultBox
if (!(ret || isAsync))
{
//put result box back if it was not already recycled
Interlocked.Exchange(ref resultBox, currBox);
}
return ret;
}
else
{
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
return true;
}
currBox?.ActivateContinuations();
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys)
......@@ -614,7 +600,7 @@ internal void Fail(ConnectionFailureType failure, Exception innerException, stri
internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge)
{
resultBox?.SetException(exception);
bridge.CompleteSyncOrAsync(this);
Complete();
}
internal bool TrySetResult<T>(T value)
......@@ -629,6 +615,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection)
{
SetWriteTime();
performance?.SetEnqueued();
_enqueuedTo = connection;
if (connection == null)
......@@ -670,6 +657,7 @@ internal void SetRequestSent()
}
// the time (ticks) at which this message was considered written
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void SetWriteTime()
{
if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0)
......
......@@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
......@@ -23,10 +24,9 @@ internal sealed class PhysicalBridge : IDisposable
private static readonly Message ReusableAskingCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.ASKING);
internal readonly CompletionManager completionManager;
private readonly long[] profileLog = new long[ProfileLogSamples];
private readonly Queue<Message> _preconnectBacklog = new Queue<Message>();
private readonly Queue<Message> _backlog = new Queue<Message>();
private int activeWriters = 0;
private int beating;
......@@ -52,7 +52,6 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
ConnectionType = type;
Multiplexer = serverEndPoint.Multiplexer;
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
completionManager = new CompletionManager(Multiplexer, Name);
TimeoutMilliseconds = timeoutMilliseconds;
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds);
}
......@@ -135,10 +134,9 @@ private WriteResult QueueOrFailMessage(Message message)
{
// you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed
var queue = _preconnectBacklog;
lock (queue)
lock (_backlog)
{
queue.Enqueue(message);
_backlog.Enqueue(message);
}
message.SetEnqueued(null);
return WriteResult.Success; // we'll take it...
......@@ -148,7 +146,7 @@ private WriteResult QueueOrFailMessage(Message message)
// sorry, we're just not ready for you yet;
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
message.Complete();
return WriteResult.NoConnectionAvailable;
}
}
......@@ -157,7 +155,7 @@ private WriteResult FailDueToNoConnection(Message message)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
message.Complete();
return WriteResult.NoConnectionAvailable;
}
......@@ -221,7 +219,6 @@ internal void GetCounters(ConnectionCounters counters)
counters.SocketCount = Interlocked.Read(ref socketCount);
counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0);
counters.NonPreferredEndpointCount = Interlocked.Read(ref nonPreferredEndpointCount);
completionManager.GetCounters(counters);
physical?.GetCounters(counters);
}
......@@ -289,9 +286,13 @@ private void ShutdownSubscriptionQueue()
internal bool TryEnqueueBackgroundSubscriptionWrite(PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out int @in)
{// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion
internal void GetOutstandingCount(out int inst, out int qs, out int @in, out int qu)
{
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
lock(_backlog)
{
qu = _backlog.Count;
}
var tmp = physical;
if (tmp == null)
{
......@@ -434,35 +435,16 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
}
}
private Message DequeueNextPendingBacklog()
{
lock (_preconnectBacklog)
{
return _preconnectBacklog.Count == 0 ? null : _preconnectBacklog.Dequeue();
}
}
[Obsolete("prefer async")]
private void WritePendingBacklogSync(PhysicalConnection connection)
{
if (connection != null)
{
Message next;
do
{
next = DequeueNextPendingBacklog();
#pragma warning disable CS0618
if (next != null) WriteMessageTakingWriteLockSync(connection, next);
#pragma warning restore CS0618
} while (next != null);
}
}
private void AbandonPendingBacklog(Exception ex)
{
Message next;
do
{
next = DequeueNextPendingBacklog();
lock (_backlog)
{
next = _backlog.Count == 0 ? null : _backlog.Dequeue();
}
if (next != null)
{
Multiplexer?.OnMessageFaulted(next, ex);
......@@ -480,9 +462,13 @@ internal void OnFullyEstablished(PhysicalConnection connection)
LastException = null;
Interlocked.Exchange(ref failConnectCount, 0);
ServerEndPoint.OnFullyEstablished(connection);
#pragma warning disable CS0618
WritePendingBacklogSync(connection);
#pragma warning restore CS0618
bool createWorker;
lock (_backlog) // do we have pending system things to do?
{
createWorker = _backlog.Count != 0;
}
if (createWorker) StartBacklogProcessor();
if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication();
}
......@@ -664,7 +650,7 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
// killed the underlying connection
Trace("Unable to write to server");
message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
this.CompleteSyncOrAsync(message);
message.Complete();
return result;
}
//The parent message (next) may be returned from GetMessages
......@@ -684,72 +670,142 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
}
}
private async ValueTask<WriteResult> WriteMessageTakingDelayedWriteLockAsync(ValueTask<LockToken> pendingLock, PhysicalConnection physical, Message message)
[Obsolete("prefer async")]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
{
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
LockToken token = default;
try
{
// WriteMessageTakingWriteLockAsync will have checked for immediate availability,
// so this is the fallback case - fine to go straight to "await"
// note: timeout is specified in mutex-constructor
using (var token = await pendingLock)
token = _singleWriterMutex.TryWait(WaitOptions.NoDelay);
if (!token.Success)
{
// we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
bool haveBacklog;
lock (_backlog)
{
haveBacklog = _backlog.Count != 0;
}
if (haveBacklog)
{
PushToBacklog(message);
return WriteResult.Success; // queued counts as success
}
// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
token = _singleWriterMutex.TryWait();
if (!token.Success)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
message.Complete();
return WriteResult.TimeoutBeforeWrite;
}
}
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
#pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds);
#pragma warning restore CS0618
}
physical.SetIdle();
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
}
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { token.Dispose(); }
}
[Obsolete("prefer async")]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void PushToBacklog(Message message)
{
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
bool startWorker;
lock (_backlog)
{
startWorker = _backlog.Count == 0;
_backlog.Enqueue(message);
}
if (startWorker) StartBacklogProcessor();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void StartBacklogProcessor()
{
var sched = Multiplexer.SocketManager?.SchedulerPool ?? DedicatedThreadPoolPipeScheduler.Default;
sched.Schedule(s_ProcessBacklog, this);
}
static readonly Action<object> s_ProcessBacklog = s => ((PhysicalBridge)s).ProcessBacklog();
private void ProcessBacklog()
{
LockToken token = default;
try
{
using (var token = _singleWriterMutex.TryWait())
while(true)
{
if (!token.Success)
// try and get the lock; if unsuccessful, check for termination
token = _singleWriterMutex.TryWait();
if (token) break; // got the lock
lock (_backlog) { if (_backlog.Count == 0) return; }
}
// so now we are the writer; write some things!
Message message;
var timeout = TimeoutMilliseconds;
while(true)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
lock(_backlog)
{
if (_backlog.Count == 0) break; // all done
message = _backlog.Dequeue();
}
try
{
if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var elapsed))
{
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
}
else
{
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
#pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds);
result = physical.FlushSync(false, timeout);
#pragma warning restore CS0618
}
UnmarkActiveMessage(message);
if (result != WriteResult.Success)
{
var ex = Multiplexer.GetException(result, message, ServerEndPoint);
HandleWriteException(message, ex);
}
}
}
catch (Exception ex)
{
HandleWriteException(message, ex);
}
}
physical.SetIdle();
return result;
}
finally
{
token.Dispose();
}
catch (Exception ex) { return HandleWriteException(message, ex); }
}
/// <summary>
......@@ -762,23 +818,18 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
bool releaseLock = false;
bool releaseLock = true;
LockToken token = default;
try
{
// try to acquire it synchronously
// note: timeout is specified in mutex-constructor
var pending = _singleWriterMutex.TryWaitAsync(options: MutexSlim.WaitOptions.DisableAsyncContext);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingDelayedWriteLockAsync(pending, physical, message);
token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay);
releaseLock = true;
token = pending.Result; // we can't use "using" for this, because we might not want to kill it yet
if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return new ValueTask<WriteResult>(WriteResult.TimeoutBeforeWrite);
PushToBacklog(message);
return new ValueTask<WriteResult>(WriteResult.Success); // queued counts as success
}
var result = WriteMessageInsideLock(physical, message);
......@@ -1020,7 +1071,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{
Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.InternalFailure, ex, null);
this.CompleteSyncOrAsync(message);
message.Complete();
// this failed without actually writing; we're OK with that... unless there's a transaction
if (connection?.TransactionActive == true)
......@@ -1035,7 +1086,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{
Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.InternalFailure, ex, null);
this.CompleteSyncOrAsync(message);
message.Complete();
// we're not sure *what* happened here; probably an IOException; kill the connection
connection?.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
......
......@@ -413,7 +413,7 @@ void add(string lk, string sk, string v)
if (next.Command == RedisCommand.QUIT && next.TrySetResult(true))
{
// fine, death of a socket is close enough
bridge.CompleteSyncOrAsync(next);
next.Complete();
}
else
{
......@@ -467,7 +467,6 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail
internal void EnqueueInsideWriteLock(Message next)
{
next.SetWriteTime();
lock (_writtenAwaitingResponse)
{
_writtenAwaitingResponse.Enqueue(next);
......@@ -860,9 +859,11 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
if (!flush.IsCompletedSuccessfully)
{
// here lies the evil
if (!flush.AsTask().Wait(millisecondsTimeout)) throw new TimeoutException("timeout while synchronously flushing");
if (!flush.AsTask().Wait(millisecondsTimeout)) ThrowTimeout();
}
return flush.Result;
void ThrowTimeout() => throw new TimeoutException("timeout while synchronously flushing");
}
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
{
......@@ -1324,16 +1325,7 @@ private void MatchResult(in RawResult result)
Trace("Response to: " + msg);
if (msg.ComputeResult(this, result))
{
if (msg.TryComplete(false))
{
BridgeCouldBeNull.IncrementSyncCount();
}
else
{
// got a result, and we couldn't complete it synchronously;
// note that we want to complete it async instead
BridgeCouldBeNull.CompleteAsync(msg);
}
msg.Complete();
}
}
......
......@@ -94,11 +94,10 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor
private void FailNoServer(List<Message> messages)
{
if (messages == null) return;
var completion = multiplexer.UnprocessableCompletionManager;
foreach(var msg in messages)
{
msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null, "unable to write batch");
completion.CompleteSyncOrAsync(msg);
msg.Complete();
}
}
}
......
......@@ -36,9 +36,6 @@ void ICompletable.AppendStormLog(StringBuilder sb)
sb.Append("event, error: ").Append(Message);
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
}
......@@ -12,22 +12,30 @@ public partial class ConnectionMultiplexer
{
private readonly Dictionary<RedisChannel, Subscription> subscriptions = new Dictionary<RedisChannel, Subscription>();
internal static bool TryCompleteHandler<T>(EventHandler<T> handler, object sender, T args, bool isAsync) where T : EventArgs
internal static void CompleteAsWorker(ICompletable completable)
{
if (completable != null) ThreadPool.QueueUserWorkItem(s_CompleteAsWorker, completable);
}
static readonly WaitCallback s_CompleteAsWorker = s => ((ICompletable)s).TryComplete(true);
internal static bool TryCompleteHandler<T>(EventHandler<T> handler, object sender, T args, bool isAsync) where T : EventArgs, ICompletable
{
if (handler == null) return true;
if (isAsync)
{
foreach (EventHandler<T> sub in handler.GetInvocationList())
{
try
{ sub.Invoke(sender, args); }
catch
{ }
try { sub.Invoke(sender, args); }
catch { }
}
return true;
}
else
{
return false;
}
}
internal Task AddSubscription(in RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags, object asyncState)
{
......@@ -77,7 +85,7 @@ internal void OnMessage(in RedisChannel subscription, in RedisChannel channel, i
completable = sub.ForInvoke(channel, payload);
}
}
if (completable != null) UnprocessableCompletionManager.CompleteSyncOrAsync(completable);
if (completable != null && !completable.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(completable);
}
internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
......@@ -88,7 +96,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
foreach (var pair in subscriptions)
{
var msg = pair.Value.ForSyncShutdown();
if (msg != null) UnprocessableCompletionManager.CompleteSyncOrAsync(msg);
if (msg != null && !msg.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(msg);
pair.Value.Remove(true, null);
pair.Value.Remove(false, null);
......
......@@ -73,7 +73,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr
}
else
{
var source = TaskResultBox<T>.Create(out var tcs, asyncState, TaskCreationOptions.RunContinuationsAsynchronously);
var source = TaskResultBox<T>.Create(out var tcs, asyncState);
message.SetSource(source, processor);
task = tcs.Task;
}
......@@ -391,8 +391,9 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
connection.Trace("Aborting: canceling wrapped messages");
foreach (var op in InnerOperations)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
var inner = op.Wrapped;
inner.Cancel();
inner.Complete();
}
}
connection.Trace("End of transaction: " + Command);
......@@ -440,11 +441,11 @@ public override bool SetResult(PhysicalConnection connection, Message message, i
if (result.IsError && message is TransactionMessage tran)
{
string error = result.GetString();
var bridge = connection.BridgeCouldBeNull;
foreach (var op in tran.InnerOperations)
{
ServerFail(op.Wrapped, error);
bridge.CompleteSyncOrAsync(op.Wrapped);
var inner = op.Wrapped;
ServerFail(inner, error);
inner.Complete();
}
}
return base.SetResult(connection, message, result);
......@@ -473,8 +474,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
//cancel the commands in the transaction and mark them as complete with the completion manager
foreach (var op in wrapped)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
var inner = op.Wrapped;
inner.Cancel();
inner.Complete();
}
SetResult(message, false);
return true;
......@@ -490,8 +492,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
connection.Trace("Server aborted due to failed WATCH");
foreach (var op in wrapped)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
var inner = op.Wrapped;
inner.Cancel();
inner.Complete();
}
SetResult(message, false);
return true;
......@@ -502,10 +505,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"processing {arr.Length} wrapped messages");
for (int i = 0; i < arr.Length; i++)
{
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {arr[i]} for {wrapped[i].Wrapped.CommandAndKey}");
if (wrapped[i].Wrapped.ComputeResult(connection, arr[i]))
var inner = wrapped[i].Wrapped;
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {arr[i]} for {inner.CommandAndKey}");
if (inner.ComputeResult(connection, arr[i]))
{
bridge.CompleteSyncOrAsync(wrapped[i].Wrapped);
inner.Complete();
}
}
SetResult(message, true);
......@@ -522,7 +526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if(inner != null)
{
inner.Fail(ConnectionFailureType.ProtocolFailure, null, "transaction failure");
bridge.CompleteSyncOrAsync(inner);
inner.Complete();
}
}
}
......
......@@ -9,7 +9,7 @@ internal interface IResultBox
bool IsAsync { get; }
bool IsFaulted { get; }
void SetException(Exception ex);
bool TryComplete(bool isAsync);
void ActivateContinuations();
void Cancel();
}
internal interface IResultBox<T> : IResultBox
......@@ -27,14 +27,13 @@ internal abstract class SimpleResultBox : IResultBox
void IResultBox.SetException(Exception exception) => _exception = exception ?? CancelledException;
void IResultBox.Cancel() => _exception = CancelledException;
bool IResultBox.TryComplete(bool isAsync)
void IResultBox.ActivateContinuations()
{
lock (this)
{ // tell the waiting thread that we're done
Monitor.PulseAll(this);
}
ConnectionMultiplexer.TraceWithoutContext("Pulsed", "Result");
return true;
}
// in theory nobody should directly observe this; the only things
......@@ -109,13 +108,8 @@ T IResultBox<T>.GetResult(out Exception ex, bool _)
// nothing to do re recycle: TaskCompletionSource<T> cannot be recycled
}
bool IResultBox.TryComplete(bool isAsync)
void IResultBox.ActivateContinuations()
{
if (isAsync || (Task.CreationOptions & TaskCreationOptions.RunContinuationsAsynchronously) != 0)
{
// either on the async completion step, or the task is guarded
// againsts thread-stealing; complete it directly
// (note: RunContinuationsAsynchronously is only usable from NET46)
var val = _value;
var ex = _exception;
......@@ -127,28 +121,23 @@ bool IResultBox.TryComplete(bool isAsync)
{
if (ex is TaskCanceledException) TrySetCanceled();
else TrySetException(ex);
// mark any exception as observed
var task = Task;
GC.KeepAlive(task.Exception);
GC.SuppressFinalize(task);
}
return true;
}
else
{
// could be thread-stealing continuations; push to async to preserve the reader thread
return false;
GC.KeepAlive(task.Exception); // mark any exception as observed
GC.SuppressFinalize(task); // finalizer only exists for unobserved-exception purposes
}
}
public static IResultBox<T> Create(out TaskCompletionSource<T> source, object asyncState, TaskCreationOptions creationOptions = TaskCreationOptions.None)
public static IResultBox<T> Create(out TaskCompletionSource<T> source, object asyncState)
{
// since 2.0, we only support platforms where this is correctly implemented
const TaskCreationOptions CreationOptions = TaskCreationOptions.RunContinuationsAsynchronously;
// it might look a little odd to return the same object as two different things,
// but that's because it is serving two purposes, and I want to make it clear
// how it is being used in those 2 different ways; also, the *fact* that they
// are the same underlying object is an implementation detail that the rest of
// the code doesn't need to know about
var obj = new TaskResultBox<T>(asyncState, creationOptions);
var obj = new TaskResultBox<T>(asyncState, CreationOptions);
source = obj;
return obj;
}
......
......@@ -375,16 +375,16 @@ internal ServerCounters GetCounters()
return counters;
}
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int @in)
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int @in, out int qu)
{
var bridge = GetBridge(command, false);
if (bridge == null)
{
inst = qs = @in = 0;
inst = qs = @in = qu = 0;
}
else
{
bridge.GetOutstandingCount(out inst, out qs, out @in);
bridge.GetOutstandingCount(out inst, out qs, out @in, out qu);
}
}
......
......@@ -55,9 +55,8 @@ public static SocketManager Shared
public override string ToString()
{
var scheduler = SchedulerPool;
var comp = CompletionPool;
return $"scheduler - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}; completion - queue: {comp ?.TotalServicedByQueue}, pool: {comp?.TotalServicedByPool}";
return $"scheduler - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}";
}
private static SocketManager _shared;
......@@ -100,16 +99,12 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int worker
resumeWriterThreshold: Receive_ResumeWriterThreshold,
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false);
_completionPool = new DedicatedThreadPoolPipeScheduler(name + ":Completion",
workerCount: workerCount, useThreadPoolQueueLength: 1);
}
private DedicatedThreadPoolPipeScheduler _schedulerPool, _completionPool;
private DedicatedThreadPoolPipeScheduler _schedulerPool;
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
internal DedicatedThreadPoolPipeScheduler SchedulerPool => _schedulerPool;
internal DedicatedThreadPoolPipeScheduler CompletionPool => _completionPool;
private enum CallbackOperation
{
......@@ -128,9 +123,7 @@ private void Dispose(bool disposing)
// be threads, and those threads will be rooting the DedicatedThreadPool;
// but: we can lend a hand! We need to do this even in the finalizer
try { _schedulerPool?.Dispose(); } catch { }
try { _completionPool?.Dispose(); } catch { }
_schedulerPool = null;
_completionPool = null;
if (disposing)
{
GC.SuppressFinalize(this);
......@@ -165,8 +158,5 @@ internal string GetState()
var s = _schedulerPool;
return s == null ? null : $"{s.AvailableCount} of {s.WorkerCount} available";
}
internal void ScheduleTask(Action<object> action, object state)
=> _completionPool.Schedule(action, state);
}
}
......@@ -25,10 +25,14 @@ public static Task<T> ObserveErrors<T>(this Task<T> task)
return task;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable ForAwait(this ValueTask task) => task.ConfigureAwait(false);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredValueTaskAwaitable ForAwait(this in ValueTask task) => task.ConfigureAwait(false);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable<T> ForAwait<T>(this ValueTask<T> task) => task.ConfigureAwait(false);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredValueTaskAwaitable<T> ForAwait<T>(this in ValueTask<T> task) => task.ConfigureAwait(false);
internal static void RedisFireAndForget(this Task task) => task?.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
......
......@@ -326,7 +326,7 @@ public void Teardown(TextWriter output)
}
//Assert.True(false, $"There were {privateFailCount} private ambient exceptions.");
}
TestBase.Log(output, $"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}, (Completion) Queue: {SocketManager.Shared?.CompletionPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.CompletionPool?.TotalServicedByPool.ToString()}");
TestBase.Log(output, $"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}");
}
}
......
......@@ -180,7 +180,7 @@ public void Teardown()
}
Skip.Inconclusive($"There were {privateFailCount} private and {sharedFailCount.Value} ambient exceptions; expected {expectedFailCount}.");
}
Log($"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}, (Completion) Queue: {SocketManager.Shared?.CompletionPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.CompletionPool?.TotalServicedByPool.ToString()}");
Log($"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}");
}
protected IServer GetServer(IConnectionMultiplexer muxer)
......
......@@ -16,17 +16,26 @@ public static async Task Main()
var start = DateTime.Now;
Show(client.GetCounters());
var tasks = Enumerable.Range(0, 1000).Select(async i =>
{
int timeoutCount = 0;
RedisKey key = i.ToString();
for (int t = 0; t < 1000; t++)
{
await db.StringIncrementAsync(i.ToString(), 1);
// db.StringIncrement(i.ToString(), 1);
try
{
await db.StringIncrementAsync(key, 1);
}
await Task.Yield();
});
catch (TimeoutException) { timeoutCount++; }
}
return timeoutCount;
}).ToArray();
await Task.WhenAll(tasks);
int totalTimeouts = tasks.Sum(x => x.Result);
Console.WriteLine("Total timeouts: " + totalTimeouts);
Console.WriteLine();
Show(client.GetCounters());
var duration = DateTime.Now.Subtract(start).TotalMilliseconds;
......
......@@ -15,6 +15,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="[1.2.7-alpha-00002]" />
<PackageReference Include="StackExchange.Redis" Version="[2.0.545]" /> <!-- [1.2.6] for previous major -->
</ItemGroup>
</Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment