Commit d447c833 authored by Marc Gravell's avatar Marc Gravell

Merge branch 'master' of github.com:StackExchange/StackExchange.Redis

# Conflicts:
#	toys/TestConsoleBaseline/TestConsoleBaseline.csproj
parents 152de5fc 4ae78480
using System;
using System.Threading;
namespace StackExchange.Redis
{
internal static class CompletionManagerHelpers
{
public static void CompleteSyncOrAsync(this PhysicalBridge bridge, ICompletable operation)
=> CompletionManager.CompleteSyncOrAsyncImpl(bridge?.completionManager, operation);
public static void IncrementSyncCount(this PhysicalBridge bridge)
=> bridge?.completionManager?.IncrementSyncCount();
public static void CompleteAsync(this PhysicalBridge bridge, ICompletable operation)
=> CompletionManager.CompleteAsync(bridge?.completionManager, operation);
public static void CompleteSyncOrAsync(this CompletionManager manager, ICompletable operation)
=> CompletionManager.CompleteSyncOrAsyncImpl(manager, operation);
}
internal sealed partial class CompletionManager
{
internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, ICompletable operation)
{
if (operation == null) return;
if (manager != null) manager.PerInstanceCompleteSyncOrAsync(operation);
else SharedCompleteSyncOrAsync(operation);
}
internal void IncrementSyncCount() => Interlocked.Increment(ref completedSync);
internal static void CompleteAsync(CompletionManager manager, ICompletable operation)
{
var sched = manager.multiplexer.SocketManager;
if (sched != null)
{
sched.ScheduleTask(s_AnyOrderCompletionHandler, operation);
Interlocked.Increment(ref manager.completedAsync);
}
else
{
SocketManager.Shared.ScheduleTask(s_AnyOrderCompletionHandler, operation);
}
}
private readonly ConnectionMultiplexer multiplexer;
private readonly string name;
private long completedSync, completedAsync, failedAsync;
public CompletionManager(ConnectionMultiplexer multiplexer, string name)
{
this.multiplexer = multiplexer ?? throw new ArgumentNullException(nameof(multiplexer));
this.name = name;
}
private static void SharedCompleteSyncOrAsync(ICompletable operation)
{
if (!operation.TryComplete(false))
{
SocketManager.Shared.ScheduleTask(s_AnyOrderCompletionHandler, operation);
}
}
private void PerInstanceCompleteSyncOrAsync(ICompletable operation)
{
if (operation == null) { }
else if (operation.TryComplete(false))
{
multiplexer.Trace("Completed synchronously: " + operation, name);
Interlocked.Increment(ref completedSync);
}
else
{
multiplexer.Trace("Using thread-pool for asynchronous completion", name);
(multiplexer.SocketManager ?? SocketManager.Shared).ScheduleTask(s_AnyOrderCompletionHandler, operation);
Interlocked.Increment(ref completedAsync); // k, *technically* we haven't actually completed this yet, but: close enough
}
}
internal void GetCounters(ConnectionCounters counters)
{
counters.CompletedSynchronously = Interlocked.Read(ref completedSync);
counters.CompletedAsynchronously = Interlocked.Read(ref completedAsync);
counters.FailedAsynchronously = Interlocked.Read(ref failedAsync);
}
private static readonly Action<object> s_AnyOrderCompletionHandler = AnyOrderCompletionHandler;
private static void AnyOrderCompletionHandler(object state)
{
try
{
ConnectionMultiplexer.TraceWithoutContext("Completing async (any order): " + state);
((ICompletable)state).TryComplete(true);
}
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext("Async completion error: " + ex.Message);
}
}
}
}
...@@ -51,10 +51,7 @@ void ICompletable.AppendStormLog(StringBuilder sb) ...@@ -51,10 +51,7 @@ void ICompletable.AppendStormLog(StringBuilder sb)
else sb.Append(Format.ToString(EndPoint)); else sb.Append(Format.ToString(EndPoint));
} }
bool ICompletable.TryComplete(bool isAsync) bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
/// <summary> /// <summary>
/// Returns the physical name of the connection. /// Returns the physical name of the connection.
......
...@@ -79,7 +79,6 @@ public ServerCounters GetCounters() ...@@ -79,7 +79,6 @@ public ServerCounters GetCounters()
{ {
counters.Add(snapshot[i].GetCounters()); counters.Add(snapshot[i].GetCounters());
} }
UnprocessableCompletionManager.GetCounters(counters.Other);
return counters; return counters;
} }
...@@ -153,9 +152,8 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp ...@@ -153,9 +152,8 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp
var handler = ConnectionFailed; var handler = ConnectionFailed;
if (handler != null) if (handler != null)
{ {
UnprocessableCompletionManager.CompleteSyncOrAsync( ConnectionMultiplexer.CompleteAsWorker(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception, physicalName) new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception, physicalName));
);
} }
if (reconfigure) if (reconfigure)
{ {
...@@ -172,9 +170,8 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con ...@@ -172,9 +170,8 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con
var handler = InternalError; var handler = InternalError;
if (handler != null) if (handler != null)
{ {
UnprocessableCompletionManager.CompleteSyncOrAsync( ConnectionMultiplexer.CompleteAsWorker(
new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin) new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin));
);
} }
} }
catch catch
...@@ -188,9 +185,8 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT ...@@ -188,9 +185,8 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT
var handler = ConnectionRestored; var handler = ConnectionRestored;
if (handler != null) if (handler != null)
{ {
UnprocessableCompletionManager.CompleteSyncOrAsync( ConnectionMultiplexer.CompleteAsWorker(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null, physicalName) new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null, physicalName));
);
} }
ReconfigureIfNeeded(endpoint, false, "connection restored"); ReconfigureIfNeeded(endpoint, false, "connection restored");
} }
...@@ -200,9 +196,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs ...@@ -200,9 +196,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs
if (_isDisposed) return; if (_isDisposed) return;
if (handler != null) if (handler != null)
{ {
UnprocessableCompletionManager.CompleteSyncOrAsync( ConnectionMultiplexer.CompleteAsWorker(new EndPointEventArgs(handler, this, endpoint));
new EndPointEventArgs(handler, this, endpoint)
);
} }
} }
...@@ -219,7 +213,7 @@ internal void OnErrorMessage(EndPoint endpoint, string message) ...@@ -219,7 +213,7 @@ internal void OnErrorMessage(EndPoint endpoint, string message)
var handler = ErrorMessage; var handler = ErrorMessage;
if (handler != null) if (handler != null)
{ {
UnprocessableCompletionManager.CompleteSyncOrAsync( ConnectionMultiplexer.CompleteAsWorker(
new RedisErrorEventArgs(handler, this, endpoint, message) new RedisErrorEventArgs(handler, this, endpoint, message)
); );
} }
...@@ -748,12 +742,8 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -748,12 +742,8 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new) internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
{ {
var handler = HashSlotMoved; var handler = HashSlotMoved;
if (handler != null) if (handler != null) ConnectionMultiplexer.CompleteAsWorker(
{ new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new));
UnprocessableCompletionManager.CompleteSyncOrAsync(
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new)
);
}
} }
/// <summary> /// <summary>
...@@ -1055,7 +1045,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -1055,7 +1045,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
AsyncTimeoutMilliseconds = configuration.AsyncTimeout; AsyncTimeoutMilliseconds = configuration.AsyncTimeout;
OnCreateReaderWriter(configuration); OnCreateReaderWriter(configuration);
UnprocessableCompletionManager = new CompletionManager(this, "multiplexer");
ServerSelectionStrategy = new ServerSelectionStrategy(this); ServerSelectionStrategy = new ServerSelectionStrategy(this);
var configChannel = configuration.ConfigurationChannel; var configChannel = configuration.ConfigurationChannel;
...@@ -1150,8 +1139,6 @@ internal long LastHeartbeatSecondsAgo ...@@ -1150,8 +1139,6 @@ internal long LastHeartbeatSecondsAgo
internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - Thread.VolatileRead(ref lastGlobalHeartbeatTicks)) / 1000; internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - Thread.VolatileRead(ref lastGlobalHeartbeatTicks)) / 1000;
internal CompletionManager UnprocessableCompletionManager { get; }
/// <summary> /// <summary>
/// Obtain a pub/sub subscriber connection to the specified server /// Obtain a pub/sub subscriber connection to the specified server
/// </summary> /// </summary>
...@@ -2170,27 +2157,7 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo ...@@ -2170,27 +2157,7 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo
case WriteResult.NoConnectionAvailable: case WriteResult.NoConnectionAvailable:
return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot()); return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
case WriteResult.TimeoutBeforeWrite: case WriteResult.TimeoutBeforeWrite:
string bridgeCounters = null, connectionState = null; return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent", message, server, result);
try
{
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{
connectionState = (sentDelta >= 0 && receivedDelta >= 0) // these might not always be available
? $", state={state}, outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB"
: $", state={state}";
}
var bridge = server.GetBridge(message.Command, false);
if (bridge != null)
{
var active = bridge.GetActiveMessage();
bridge.GetOutstandingCount(out var inst, out var qs, out var @in);
bridgeCounters = $", inst={inst}, qs={qs}, in={@in}, active={active}";
}
}
catch { }
return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent ("
+ Format.ToString(TimeoutMilliseconds) + "ms" + connectionState + bridgeCounters + ")", message, server);
case WriteResult.WriteFailure: case WriteResult.WriteFailure:
default: default:
return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server); return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server);
......
using System; using System;
using System.Net; using System.Net;
using System.Text; using System.Text;
...@@ -30,9 +30,6 @@ void ICompletable.AppendStormLog(StringBuilder sb) ...@@ -30,9 +30,6 @@ void ICompletable.AppendStormLog(StringBuilder sb)
else sb.Append(Format.ToString(EndPoint)); else sb.Append(Format.ToString(EndPoint));
} }
bool ICompletable.TryComplete(bool isAsync) bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
} }
} }
\ No newline at end of file
...@@ -181,7 +181,7 @@ internal static string GetLibVersion() ...@@ -181,7 +181,7 @@ internal static string GetLibVersion()
} }
return _libVersion; return _libVersion;
} }
internal static Exception Timeout(ConnectionMultiplexer mutiplexer, string baseErrorMessage, Message message, ServerEndPoint server) internal static Exception Timeout(ConnectionMultiplexer mutiplexer, string baseErrorMessage, Message message, ServerEndPoint server, WriteResult? result = null)
{ {
List<Tuple<string, string>> data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) }; List<Tuple<string, string>> data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
var sb = new StringBuilder(); var sb = new StringBuilder();
...@@ -203,12 +203,38 @@ void add(string lk, string sk, string v) ...@@ -203,12 +203,38 @@ void add(string lk, string sk, string v)
} }
} }
// Add timeout data, if we have it
if (result == WriteResult.TimeoutBeforeWrite)
{
add("Timeout", "timeout", Format.ToString(mutiplexer.TimeoutMilliseconds));
try
{
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{
add("PhysicalState", "phys", state.ToString());
// these might not always be available
if (sentDelta >= 0)
{
add("OutboundDeltaKB", "outbound", $"{sentDelta >> 10}KiB");
}
if (receivedDelta >= 0)
{
add("InboundDeltaKB", "inbound", $"{receivedDelta >> 10}KiB");
}
}
}
catch { }
}
// Add server data, if we have it
if (server != null) if (server != null)
{ {
server.GetOutstandingCount(message.Command, out int inst, out int qs, out int @in); server.GetOutstandingCount(message.Command, out int inst, out int qs, out int @in, out int qu);
add("Instantaneous", "inst", inst.ToString()); add("OpsSinceLastHeartbeat", "inst", inst.ToString());
add("Queue-Awaiting-Write", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString()); add("Queue-Awaiting-Response", "qs", qs.ToString());
if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString());
if (@in >= 0) add("Socket-Inbound-Bytes", "in", @in.ToString());
if (mutiplexer.StormLogThreshold >= 0 && qs >= mutiplexer.StormLogThreshold && Interlocked.CompareExchange(ref mutiplexer.haveStormLog, 1, 0) == 0) if (mutiplexer.StormLogThreshold >= 0 && qs >= mutiplexer.StormLogThreshold && Interlocked.CompareExchange(ref mutiplexer.haveStormLog, 1, 0) == 0)
{ {
......
...@@ -37,10 +37,7 @@ public sealed class HashSlotMovedEventArgs : EventArgs, ICompletable ...@@ -37,10 +37,7 @@ public sealed class HashSlotMovedEventArgs : EventArgs, ICompletable
NewEndPoint = @new; NewEndPoint = @new;
} }
bool ICompletable.TryComplete(bool isAsync) bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
void ICompletable.AppendStormLog(StringBuilder sb) void ICompletable.AppendStormLog(StringBuilder sb)
{ {
......
using System; using System;
using System.Net; using System.Net;
using System.Text; using System.Text;
...@@ -47,9 +47,6 @@ void ICompletable.AppendStormLog(StringBuilder sb) ...@@ -47,9 +47,6 @@ void ICompletable.AppendStormLog(StringBuilder sb)
if (EndPoint != null) sb.Append(", ").Append(Format.ToString(EndPoint)); if (EndPoint != null) sb.Append(", ").Append(Format.ToString(EndPoint));
} }
bool ICompletable.TryComplete(bool isAsync) bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
} }
} }
\ No newline at end of file
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Runtime.CompilerServices;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -441,32 +442,17 @@ public override string ToString() ...@@ -441,32 +442,17 @@ public override string ToString()
public void SetResponseReceived() => performance?.SetResponseReceived(); public void SetResponseReceived() => performance?.SetResponseReceived();
public bool TryComplete(bool isAsync) bool ICompletable.TryComplete(bool isAsync) { Complete(); return true; }
public void Complete()
{ {
//Ensure we can never call TryComplete on the same resultBox from two threads by grabbing it now //Ensure we can never call Complete on the same resultBox from two threads by grabbing it now
var currBox = Interlocked.Exchange(ref resultBox, null); var currBox = Interlocked.Exchange(ref resultBox, null);
if (!isAsync) // set the completion/performance data
{ // set the performance completion the first chance we get (sync comes first) performance?.SetCompleted();
performance?.SetCompleted();
}
if (currBox != null)
{
var ret = currBox.TryComplete(isAsync);
//in async mode TryComplete will have unwrapped and recycled resultBox currBox?.ActivateContinuations();
if (!(ret || isAsync))
{
//put result box back if it was not already recycled
Interlocked.Exchange(ref resultBox, currBox);
}
return ret;
}
else
{
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
return true;
}
} }
internal static Message Create(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys) internal static Message Create(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys)
...@@ -614,7 +600,7 @@ internal void Fail(ConnectionFailureType failure, Exception innerException, stri ...@@ -614,7 +600,7 @@ internal void Fail(ConnectionFailureType failure, Exception innerException, stri
internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge) internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge)
{ {
resultBox?.SetException(exception); resultBox?.SetException(exception);
bridge.CompleteSyncOrAsync(this); Complete();
} }
internal bool TrySetResult<T>(T value) internal bool TrySetResult<T>(T value)
...@@ -629,6 +615,7 @@ internal bool TrySetResult<T>(T value) ...@@ -629,6 +615,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection) internal void SetEnqueued(PhysicalConnection connection)
{ {
SetWriteTime();
performance?.SetEnqueued(); performance?.SetEnqueued();
_enqueuedTo = connection; _enqueuedTo = connection;
if (connection == null) if (connection == null)
...@@ -670,6 +657,7 @@ internal void SetRequestSent() ...@@ -670,6 +657,7 @@ internal void SetRequestSent()
} }
// the time (ticks) at which this message was considered written // the time (ticks) at which this message was considered written
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void SetWriteTime() internal void SetWriteTime()
{ {
if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0) if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0)
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
using System.Threading; using System.Threading;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Threading; using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim; using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState; using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
...@@ -23,10 +24,9 @@ internal sealed class PhysicalBridge : IDisposable ...@@ -23,10 +24,9 @@ internal sealed class PhysicalBridge : IDisposable
private static readonly Message ReusableAskingCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.ASKING); private static readonly Message ReusableAskingCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.ASKING);
internal readonly CompletionManager completionManager;
private readonly long[] profileLog = new long[ProfileLogSamples]; private readonly long[] profileLog = new long[ProfileLogSamples];
private readonly Queue<Message> _preconnectBacklog = new Queue<Message>(); private readonly Queue<Message> _backlog = new Queue<Message>();
private int activeWriters = 0; private int activeWriters = 0;
private int beating; private int beating;
...@@ -52,7 +52,6 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti ...@@ -52,7 +52,6 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
ConnectionType = type; ConnectionType = type;
Multiplexer = serverEndPoint.Multiplexer; Multiplexer = serverEndPoint.Multiplexer;
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString(); Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
completionManager = new CompletionManager(Multiplexer, Name);
TimeoutMilliseconds = timeoutMilliseconds; TimeoutMilliseconds = timeoutMilliseconds;
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds); _singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds);
} }
...@@ -135,10 +134,9 @@ private WriteResult QueueOrFailMessage(Message message) ...@@ -135,10 +134,9 @@ private WriteResult QueueOrFailMessage(Message message)
{ {
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
var queue = _preconnectBacklog; lock (_backlog)
lock (queue)
{ {
queue.Enqueue(message); _backlog.Enqueue(message);
} }
message.SetEnqueued(null); message.SetEnqueued(null);
return WriteResult.Success; // we'll take it... return WriteResult.Success; // we'll take it...
...@@ -148,7 +146,7 @@ private WriteResult QueueOrFailMessage(Message message) ...@@ -148,7 +146,7 @@ private WriteResult QueueOrFailMessage(Message message)
// sorry, we're just not ready for you yet; // sorry, we're just not ready for you yet;
message.Cancel(); message.Cancel();
Multiplexer?.OnMessageFaulted(message, null); Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message); message.Complete();
return WriteResult.NoConnectionAvailable; return WriteResult.NoConnectionAvailable;
} }
} }
...@@ -157,7 +155,7 @@ private WriteResult FailDueToNoConnection(Message message) ...@@ -157,7 +155,7 @@ private WriteResult FailDueToNoConnection(Message message)
{ {
message.Cancel(); message.Cancel();
Multiplexer?.OnMessageFaulted(message, null); Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message); message.Complete();
return WriteResult.NoConnectionAvailable; return WriteResult.NoConnectionAvailable;
} }
...@@ -221,7 +219,6 @@ internal void GetCounters(ConnectionCounters counters) ...@@ -221,7 +219,6 @@ internal void GetCounters(ConnectionCounters counters)
counters.SocketCount = Interlocked.Read(ref socketCount); counters.SocketCount = Interlocked.Read(ref socketCount);
counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0); counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0);
counters.NonPreferredEndpointCount = Interlocked.Read(ref nonPreferredEndpointCount); counters.NonPreferredEndpointCount = Interlocked.Read(ref nonPreferredEndpointCount);
completionManager.GetCounters(counters);
physical?.GetCounters(counters); physical?.GetCounters(counters);
} }
...@@ -289,9 +286,13 @@ private void ShutdownSubscriptionQueue() ...@@ -289,9 +286,13 @@ private void ShutdownSubscriptionQueue()
internal bool TryEnqueueBackgroundSubscriptionWrite(PendingSubscriptionState state) internal bool TryEnqueueBackgroundSubscriptionWrite(PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state); => isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out int @in) internal void GetOutstandingCount(out int inst, out int qs, out int @in, out int qu)
{// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion {
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
lock(_backlog)
{
qu = _backlog.Count;
}
var tmp = physical; var tmp = physical;
if (tmp == null) if (tmp == null)
{ {
...@@ -434,35 +435,16 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti ...@@ -434,35 +435,16 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
} }
} }
private Message DequeueNextPendingBacklog()
{
lock (_preconnectBacklog)
{
return _preconnectBacklog.Count == 0 ? null : _preconnectBacklog.Dequeue();
}
}
[Obsolete("prefer async")]
private void WritePendingBacklogSync(PhysicalConnection connection)
{
if (connection != null)
{
Message next;
do
{
next = DequeueNextPendingBacklog();
#pragma warning disable CS0618
if (next != null) WriteMessageTakingWriteLockSync(connection, next);
#pragma warning restore CS0618
} while (next != null);
}
}
private void AbandonPendingBacklog(Exception ex) private void AbandonPendingBacklog(Exception ex)
{ {
Message next; Message next;
do do
{ {
next = DequeueNextPendingBacklog();
lock (_backlog)
{
next = _backlog.Count == 0 ? null : _backlog.Dequeue();
}
if (next != null) if (next != null)
{ {
Multiplexer?.OnMessageFaulted(next, ex); Multiplexer?.OnMessageFaulted(next, ex);
...@@ -480,9 +462,13 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -480,9 +462,13 @@ internal void OnFullyEstablished(PhysicalConnection connection)
LastException = null; LastException = null;
Interlocked.Exchange(ref failConnectCount, 0); Interlocked.Exchange(ref failConnectCount, 0);
ServerEndPoint.OnFullyEstablished(connection); ServerEndPoint.OnFullyEstablished(connection);
#pragma warning disable CS0618
WritePendingBacklogSync(connection); bool createWorker;
#pragma warning restore CS0618 lock (_backlog) // do we have pending system things to do?
{
createWorker = _backlog.Count != 0;
}
if (createWorker) StartBacklogProcessor();
if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication(); if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication();
} }
...@@ -664,7 +650,7 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -664,7 +650,7 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
// killed the underlying connection // killed the underlying connection
Trace("Unable to write to server"); Trace("Unable to write to server");
message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString()); message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
this.CompleteSyncOrAsync(message); message.Complete();
return result; return result;
} }
//The parent message (next) may be returned from GetMessages //The parent message (next) may be returned from GetMessages
...@@ -684,72 +670,142 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -684,72 +670,142 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
} }
} }
private async ValueTask<WriteResult> WriteMessageTakingDelayedWriteLockAsync(ValueTask<LockToken> pendingLock, PhysicalConnection physical, Message message) [Obsolete("prefer async")]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
{ {
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
LockToken token = default;
try try
{ {
// WriteMessageTakingWriteLockAsync will have checked for immediate availability, token = _singleWriterMutex.TryWait(WaitOptions.NoDelay);
// so this is the fallback case - fine to go straight to "await" if (!token.Success)
// note: timeout is specified in mutex-constructor
using (var token = await pendingLock)
{ {
// we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
bool haveBacklog;
lock (_backlog)
{
haveBacklog = _backlog.Count != 0;
}
if (haveBacklog)
{
PushToBacklog(message);
return WriteResult.Success; // queued counts as success
}
// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
token = _singleWriterMutex.TryWait();
if (!token.Success) if (!token.Success)
{ {
message.Cancel(); message.Cancel();
Multiplexer?.OnMessageFaulted(message, null); Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message); message.Complete();
return WriteResult.TimeoutBeforeWrite; return WriteResult.TimeoutBeforeWrite;
} }
}
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
}
physical.SetIdle(); if (result == WriteResult.Success)
UnmarkActiveMessage(message); {
return result; #pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds);
#pragma warning restore CS0618
} }
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
} }
catch (Exception ex) { return HandleWriteException(message, ex); } catch (Exception ex) { return HandleWriteException(message, ex); }
finally { token.Dispose(); }
} }
[Obsolete("prefer async")] [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message) private void PushToBacklog(Message message)
{ {
Trace("Writing: " + message); bool startWorker;
message.SetEnqueued(physical); // this also records the read/write stats at this point lock (_backlog)
{
startWorker = _backlog.Count == 0;
_backlog.Enqueue(message);
}
if (startWorker) StartBacklogProcessor();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void StartBacklogProcessor()
{
var sched = Multiplexer.SocketManager?.SchedulerPool ?? DedicatedThreadPoolPipeScheduler.Default;
sched.Schedule(s_ProcessBacklog, this);
}
static readonly Action<object> s_ProcessBacklog = s => ((PhysicalBridge)s).ProcessBacklog();
private void ProcessBacklog()
{
LockToken token = default;
try try
{ {
using (var token = _singleWriterMutex.TryWait()) while(true)
{ {
if (!token.Success) // try and get the lock; if unsuccessful, check for termination
token = _singleWriterMutex.TryWait();
if (token) break; // got the lock
lock (_backlog) { if (_backlog.Count == 0) return; }
}
// so now we are the writer; write some things!
Message message;
var timeout = TimeoutMilliseconds;
while(true)
{
lock(_backlog)
{ {
message.Cancel(); if (_backlog.Count == 0) break; // all done
Multiplexer?.OnMessageFaulted(message, null); message = _backlog.Dequeue();
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
} }
var result = WriteMessageInsideLock(physical, message); try
if (result == WriteResult.Success)
{ {
if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var elapsed))
{
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
}
else
{
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
#pragma warning disable CS0618 #pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds); result = physical.FlushSync(false, timeout);
#pragma warning restore CS0618 #pragma warning restore CS0618
} }
UnmarkActiveMessage(message); UnmarkActiveMessage(message);
physical.SetIdle(); if (result != WriteResult.Success)
return result; {
var ex = Multiplexer.GetException(result, message, ServerEndPoint);
HandleWriteException(message, ex);
}
}
}
catch (Exception ex)
{
HandleWriteException(message, ex);
}
} }
physical.SetIdle();
}
finally
{
token.Dispose();
} }
catch (Exception ex) { return HandleWriteException(message, ex); }
} }
/// <summary> /// <summary>
...@@ -762,23 +818,18 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -762,23 +818,18 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
Trace("Writing: " + message); Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point message.SetEnqueued(physical); // this also records the read/write stats at this point
bool releaseLock = false; bool releaseLock = true;
LockToken token = default; LockToken token = default;
try try
{ {
// try to acquire it synchronously // try to acquire it synchronously
// note: timeout is specified in mutex-constructor // note: timeout is specified in mutex-constructor
var pending = _singleWriterMutex.TryWaitAsync(options: MutexSlim.WaitOptions.DisableAsyncContext); token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingDelayedWriteLockAsync(pending, physical, message);
releaseLock = true;
token = pending.Result; // we can't use "using" for this, because we might not want to kill it yet
if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync) if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync)
{ {
message.Cancel(); PushToBacklog(message);
Multiplexer?.OnMessageFaulted(message, null); return new ValueTask<WriteResult>(WriteResult.Success); // queued counts as success
this.CompleteSyncOrAsync(message);
return new ValueTask<WriteResult>(WriteResult.TimeoutBeforeWrite);
} }
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
...@@ -1020,7 +1071,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne ...@@ -1020,7 +1071,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{ {
Trace("Write failed: " + ex.Message); Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.InternalFailure, ex, null); message.Fail(ConnectionFailureType.InternalFailure, ex, null);
this.CompleteSyncOrAsync(message); message.Complete();
// this failed without actually writing; we're OK with that... unless there's a transaction // this failed without actually writing; we're OK with that... unless there's a transaction
if (connection?.TransactionActive == true) if (connection?.TransactionActive == true)
...@@ -1035,7 +1086,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne ...@@ -1035,7 +1086,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{ {
Trace("Write failed: " + ex.Message); Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.InternalFailure, ex, null); message.Fail(ConnectionFailureType.InternalFailure, ex, null);
this.CompleteSyncOrAsync(message); message.Complete();
// we're not sure *what* happened here; probably an IOException; kill the connection // we're not sure *what* happened here; probably an IOException; kill the connection
connection?.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); connection?.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
......
...@@ -413,7 +413,7 @@ void add(string lk, string sk, string v) ...@@ -413,7 +413,7 @@ void add(string lk, string sk, string v)
if (next.Command == RedisCommand.QUIT && next.TrySetResult(true)) if (next.Command == RedisCommand.QUIT && next.TrySetResult(true))
{ {
// fine, death of a socket is close enough // fine, death of a socket is close enough
bridge.CompleteSyncOrAsync(next); next.Complete();
} }
else else
{ {
...@@ -467,7 +467,6 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail ...@@ -467,7 +467,6 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail
internal void EnqueueInsideWriteLock(Message next) internal void EnqueueInsideWriteLock(Message next)
{ {
next.SetWriteTime();
lock (_writtenAwaitingResponse) lock (_writtenAwaitingResponse)
{ {
_writtenAwaitingResponse.Enqueue(next); _writtenAwaitingResponse.Enqueue(next);
...@@ -860,9 +859,11 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout) ...@@ -860,9 +859,11 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
if (!flush.IsCompletedSuccessfully) if (!flush.IsCompletedSuccessfully)
{ {
// here lies the evil // here lies the evil
if (!flush.AsTask().Wait(millisecondsTimeout)) throw new TimeoutException("timeout while synchronously flushing"); if (!flush.AsTask().Wait(millisecondsTimeout)) ThrowTimeout();
} }
return flush.Result; return flush.Result;
void ThrowTimeout() => throw new TimeoutException("timeout while synchronously flushing");
} }
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure) internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
{ {
...@@ -1324,16 +1325,7 @@ private void MatchResult(in RawResult result) ...@@ -1324,16 +1325,7 @@ private void MatchResult(in RawResult result)
Trace("Response to: " + msg); Trace("Response to: " + msg);
if (msg.ComputeResult(this, result)) if (msg.ComputeResult(this, result))
{ {
if (msg.TryComplete(false)) msg.Complete();
{
BridgeCouldBeNull.IncrementSyncCount();
}
else
{
// got a result, and we couldn't complete it synchronously;
// note that we want to complete it async instead
BridgeCouldBeNull.CompleteAsync(msg);
}
} }
} }
......
...@@ -94,11 +94,10 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor ...@@ -94,11 +94,10 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor
private void FailNoServer(List<Message> messages) private void FailNoServer(List<Message> messages)
{ {
if (messages == null) return; if (messages == null) return;
var completion = multiplexer.UnprocessableCompletionManager;
foreach(var msg in messages) foreach(var msg in messages)
{ {
msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null, "unable to write batch"); msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null, "unable to write batch");
completion.CompleteSyncOrAsync(msg); msg.Complete();
} }
} }
} }
......
...@@ -36,9 +36,6 @@ void ICompletable.AppendStormLog(StringBuilder sb) ...@@ -36,9 +36,6 @@ void ICompletable.AppendStormLog(StringBuilder sb)
sb.Append("event, error: ").Append(Message); sb.Append("event, error: ").Append(Message);
} }
bool ICompletable.TryComplete(bool isAsync) bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
} }
} }
...@@ -12,21 +12,29 @@ public partial class ConnectionMultiplexer ...@@ -12,21 +12,29 @@ public partial class ConnectionMultiplexer
{ {
private readonly Dictionary<RedisChannel, Subscription> subscriptions = new Dictionary<RedisChannel, Subscription>(); private readonly Dictionary<RedisChannel, Subscription> subscriptions = new Dictionary<RedisChannel, Subscription>();
internal static bool TryCompleteHandler<T>(EventHandler<T> handler, object sender, T args, bool isAsync) where T : EventArgs internal static void CompleteAsWorker(ICompletable completable)
{
if (completable != null) ThreadPool.QueueUserWorkItem(s_CompleteAsWorker, completable);
}
static readonly WaitCallback s_CompleteAsWorker = s => ((ICompletable)s).TryComplete(true);
internal static bool TryCompleteHandler<T>(EventHandler<T> handler, object sender, T args, bool isAsync) where T : EventArgs, ICompletable
{ {
if (handler == null) return true; if (handler == null) return true;
if (isAsync) if (isAsync)
{ {
foreach (EventHandler<T> sub in handler.GetInvocationList()) foreach (EventHandler<T> sub in handler.GetInvocationList())
{ {
try try { sub.Invoke(sender, args); }
{ sub.Invoke(sender, args); } catch { }
catch
{ }
} }
return true; return true;
} }
return false; else
{
return false;
}
} }
internal Task AddSubscription(in RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags, object asyncState) internal Task AddSubscription(in RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags, object asyncState)
...@@ -77,7 +85,7 @@ internal void OnMessage(in RedisChannel subscription, in RedisChannel channel, i ...@@ -77,7 +85,7 @@ internal void OnMessage(in RedisChannel subscription, in RedisChannel channel, i
completable = sub.ForInvoke(channel, payload); completable = sub.ForInvoke(channel, payload);
} }
} }
if (completable != null) UnprocessableCompletionManager.CompleteSyncOrAsync(completable); if (completable != null && !completable.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(completable);
} }
internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
...@@ -88,7 +96,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) ...@@ -88,7 +96,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
foreach (var pair in subscriptions) foreach (var pair in subscriptions)
{ {
var msg = pair.Value.ForSyncShutdown(); var msg = pair.Value.ForSyncShutdown();
if (msg != null) UnprocessableCompletionManager.CompleteSyncOrAsync(msg); if (msg != null && !msg.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(msg);
pair.Value.Remove(true, null); pair.Value.Remove(true, null);
pair.Value.Remove(false, null); pair.Value.Remove(false, null);
......
...@@ -73,7 +73,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr ...@@ -73,7 +73,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr
} }
else else
{ {
var source = TaskResultBox<T>.Create(out var tcs, asyncState, TaskCreationOptions.RunContinuationsAsynchronously); var source = TaskResultBox<T>.Create(out var tcs, asyncState);
message.SetSource(source, processor); message.SetSource(source, processor);
task = tcs.Task; task = tcs.Task;
} }
...@@ -391,8 +391,9 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -391,8 +391,9 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
connection.Trace("Aborting: canceling wrapped messages"); connection.Trace("Aborting: canceling wrapped messages");
foreach (var op in InnerOperations) foreach (var op in InnerOperations)
{ {
op.Wrapped.Cancel(); var inner = op.Wrapped;
bridge.CompleteSyncOrAsync(op.Wrapped); inner.Cancel();
inner.Complete();
} }
} }
connection.Trace("End of transaction: " + Command); connection.Trace("End of transaction: " + Command);
...@@ -440,11 +441,11 @@ public override bool SetResult(PhysicalConnection connection, Message message, i ...@@ -440,11 +441,11 @@ public override bool SetResult(PhysicalConnection connection, Message message, i
if (result.IsError && message is TransactionMessage tran) if (result.IsError && message is TransactionMessage tran)
{ {
string error = result.GetString(); string error = result.GetString();
var bridge = connection.BridgeCouldBeNull;
foreach (var op in tran.InnerOperations) foreach (var op in tran.InnerOperations)
{ {
ServerFail(op.Wrapped, error); var inner = op.Wrapped;
bridge.CompleteSyncOrAsync(op.Wrapped); ServerFail(inner, error);
inner.Complete();
} }
} }
return base.SetResult(connection, message, result); return base.SetResult(connection, message, result);
...@@ -473,8 +474,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -473,8 +474,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
//cancel the commands in the transaction and mark them as complete with the completion manager //cancel the commands in the transaction and mark them as complete with the completion manager
foreach (var op in wrapped) foreach (var op in wrapped)
{ {
op.Wrapped.Cancel(); var inner = op.Wrapped;
bridge.CompleteSyncOrAsync(op.Wrapped); inner.Cancel();
inner.Complete();
} }
SetResult(message, false); SetResult(message, false);
return true; return true;
...@@ -490,8 +492,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -490,8 +492,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
connection.Trace("Server aborted due to failed WATCH"); connection.Trace("Server aborted due to failed WATCH");
foreach (var op in wrapped) foreach (var op in wrapped)
{ {
op.Wrapped.Cancel(); var inner = op.Wrapped;
bridge.CompleteSyncOrAsync(op.Wrapped); inner.Cancel();
inner.Complete();
} }
SetResult(message, false); SetResult(message, false);
return true; return true;
...@@ -502,10 +505,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -502,10 +505,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"processing {arr.Length} wrapped messages"); connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"processing {arr.Length} wrapped messages");
for (int i = 0; i < arr.Length; i++) for (int i = 0; i < arr.Length; i++)
{ {
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {arr[i]} for {wrapped[i].Wrapped.CommandAndKey}"); var inner = wrapped[i].Wrapped;
if (wrapped[i].Wrapped.ComputeResult(connection, arr[i])) connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {arr[i]} for {inner.CommandAndKey}");
if (inner.ComputeResult(connection, arr[i]))
{ {
bridge.CompleteSyncOrAsync(wrapped[i].Wrapped); inner.Complete();
} }
} }
SetResult(message, true); SetResult(message, true);
...@@ -522,7 +526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -522,7 +526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if(inner != null) if(inner != null)
{ {
inner.Fail(ConnectionFailureType.ProtocolFailure, null, "transaction failure"); inner.Fail(ConnectionFailureType.ProtocolFailure, null, "transaction failure");
bridge.CompleteSyncOrAsync(inner); inner.Complete();
} }
} }
} }
......
...@@ -9,7 +9,7 @@ internal interface IResultBox ...@@ -9,7 +9,7 @@ internal interface IResultBox
bool IsAsync { get; } bool IsAsync { get; }
bool IsFaulted { get; } bool IsFaulted { get; }
void SetException(Exception ex); void SetException(Exception ex);
bool TryComplete(bool isAsync); void ActivateContinuations();
void Cancel(); void Cancel();
} }
internal interface IResultBox<T> : IResultBox internal interface IResultBox<T> : IResultBox
...@@ -27,14 +27,13 @@ internal abstract class SimpleResultBox : IResultBox ...@@ -27,14 +27,13 @@ internal abstract class SimpleResultBox : IResultBox
void IResultBox.SetException(Exception exception) => _exception = exception ?? CancelledException; void IResultBox.SetException(Exception exception) => _exception = exception ?? CancelledException;
void IResultBox.Cancel() => _exception = CancelledException; void IResultBox.Cancel() => _exception = CancelledException;
bool IResultBox.TryComplete(bool isAsync) void IResultBox.ActivateContinuations()
{ {
lock (this) lock (this)
{ // tell the waiting thread that we're done { // tell the waiting thread that we're done
Monitor.PulseAll(this); Monitor.PulseAll(this);
} }
ConnectionMultiplexer.TraceWithoutContext("Pulsed", "Result"); ConnectionMultiplexer.TraceWithoutContext("Pulsed", "Result");
return true;
} }
// in theory nobody should directly observe this; the only things // in theory nobody should directly observe this; the only things
...@@ -109,46 +108,36 @@ T IResultBox<T>.GetResult(out Exception ex, bool _) ...@@ -109,46 +108,36 @@ T IResultBox<T>.GetResult(out Exception ex, bool _)
// nothing to do re recycle: TaskCompletionSource<T> cannot be recycled // nothing to do re recycle: TaskCompletionSource<T> cannot be recycled
} }
bool IResultBox.TryComplete(bool isAsync) void IResultBox.ActivateContinuations()
{ {
if (isAsync || (Task.CreationOptions & TaskCreationOptions.RunContinuationsAsynchronously) != 0) var val = _value;
var ex = _exception;
if (ex == null)
{ {
// either on the async completion step, or the task is guarded TrySetResult(val);
// againsts thread-stealing; complete it directly
// (note: RunContinuationsAsynchronously is only usable from NET46)
var val = _value;
var ex = _exception;
if (ex == null)
{
TrySetResult(val);
}
else
{
if (ex is TaskCanceledException) TrySetCanceled();
else TrySetException(ex);
// mark any exception as observed
var task = Task;
GC.KeepAlive(task.Exception);
GC.SuppressFinalize(task);
}
return true;
} }
else else
{ {
// could be thread-stealing continuations; push to async to preserve the reader thread if (ex is TaskCanceledException) TrySetCanceled();
return false; else TrySetException(ex);
var task = Task;
GC.KeepAlive(task.Exception); // mark any exception as observed
GC.SuppressFinalize(task); // finalizer only exists for unobserved-exception purposes
} }
} }
public static IResultBox<T> Create(out TaskCompletionSource<T> source, object asyncState, TaskCreationOptions creationOptions = TaskCreationOptions.None) public static IResultBox<T> Create(out TaskCompletionSource<T> source, object asyncState)
{ {
// since 2.0, we only support platforms where this is correctly implemented
const TaskCreationOptions CreationOptions = TaskCreationOptions.RunContinuationsAsynchronously;
// it might look a little odd to return the same object as two different things, // it might look a little odd to return the same object as two different things,
// but that's because it is serving two purposes, and I want to make it clear // but that's because it is serving two purposes, and I want to make it clear
// how it is being used in those 2 different ways; also, the *fact* that they // how it is being used in those 2 different ways; also, the *fact* that they
// are the same underlying object is an implementation detail that the rest of // are the same underlying object is an implementation detail that the rest of
// the code doesn't need to know about // the code doesn't need to know about
var obj = new TaskResultBox<T>(asyncState, creationOptions); var obj = new TaskResultBox<T>(asyncState, CreationOptions);
source = obj; source = obj;
return obj; return obj;
} }
......
...@@ -375,16 +375,16 @@ internal ServerCounters GetCounters() ...@@ -375,16 +375,16 @@ internal ServerCounters GetCounters()
return counters; return counters;
} }
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int @in) internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int @in, out int qu)
{ {
var bridge = GetBridge(command, false); var bridge = GetBridge(command, false);
if (bridge == null) if (bridge == null)
{ {
inst = qs = @in = 0; inst = qs = @in = qu = 0;
} }
else else
{ {
bridge.GetOutstandingCount(out inst, out qs, out @in); bridge.GetOutstandingCount(out inst, out qs, out @in, out qu);
} }
} }
......
...@@ -55,9 +55,8 @@ public static SocketManager Shared ...@@ -55,9 +55,8 @@ public static SocketManager Shared
public override string ToString() public override string ToString()
{ {
var scheduler = SchedulerPool; var scheduler = SchedulerPool;
var comp = CompletionPool;
return $"scheduler - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}; completion - queue: {comp ?.TotalServicedByQueue}, pool: {comp?.TotalServicedByPool}"; return $"scheduler - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}";
} }
private static SocketManager _shared; private static SocketManager _shared;
...@@ -100,16 +99,12 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int worker ...@@ -100,16 +99,12 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int worker
resumeWriterThreshold: Receive_ResumeWriterThreshold, resumeWriterThreshold: Receive_ResumeWriterThreshold,
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE), minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false); useSynchronizationContext: false);
_completionPool = new DedicatedThreadPoolPipeScheduler(name + ":Completion",
workerCount: workerCount, useThreadPoolQueueLength: 1);
} }
private DedicatedThreadPoolPipeScheduler _schedulerPool, _completionPool; private DedicatedThreadPoolPipeScheduler _schedulerPool;
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions; internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
internal DedicatedThreadPoolPipeScheduler SchedulerPool => _schedulerPool; internal DedicatedThreadPoolPipeScheduler SchedulerPool => _schedulerPool;
internal DedicatedThreadPoolPipeScheduler CompletionPool => _completionPool;
private enum CallbackOperation private enum CallbackOperation
{ {
...@@ -128,9 +123,7 @@ private void Dispose(bool disposing) ...@@ -128,9 +123,7 @@ private void Dispose(bool disposing)
// be threads, and those threads will be rooting the DedicatedThreadPool; // be threads, and those threads will be rooting the DedicatedThreadPool;
// but: we can lend a hand! We need to do this even in the finalizer // but: we can lend a hand! We need to do this even in the finalizer
try { _schedulerPool?.Dispose(); } catch { } try { _schedulerPool?.Dispose(); } catch { }
try { _completionPool?.Dispose(); } catch { }
_schedulerPool = null; _schedulerPool = null;
_completionPool = null;
if (disposing) if (disposing)
{ {
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
...@@ -165,8 +158,5 @@ internal string GetState() ...@@ -165,8 +158,5 @@ internal string GetState()
var s = _schedulerPool; var s = _schedulerPool;
return s == null ? null : $"{s.AvailableCount} of {s.WorkerCount} available"; return s == null ? null : $"{s.AvailableCount} of {s.WorkerCount} available";
} }
internal void ScheduleTask(Action<object> action, object state)
=> _completionPool.Schedule(action, state);
} }
} }
...@@ -25,10 +25,14 @@ public static Task<T> ObserveErrors<T>(this Task<T> task) ...@@ -25,10 +25,14 @@ public static Task<T> ObserveErrors<T>(this Task<T> task)
return task; return task;
} }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false); public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable ForAwait(this ValueTask task) => task.ConfigureAwait(false); [MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredValueTaskAwaitable ForAwait(this in ValueTask task) => task.ConfigureAwait(false);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false); public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable<T> ForAwait<T>(this ValueTask<T> task) => task.ConfigureAwait(false); [MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredValueTaskAwaitable<T> ForAwait<T>(this in ValueTask<T> task) => task.ConfigureAwait(false);
internal static void RedisFireAndForget(this Task task) => task?.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted); internal static void RedisFireAndForget(this Task task) => task?.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
......
...@@ -326,7 +326,7 @@ public void Teardown(TextWriter output) ...@@ -326,7 +326,7 @@ public void Teardown(TextWriter output)
} }
//Assert.True(false, $"There were {privateFailCount} private ambient exceptions."); //Assert.True(false, $"There were {privateFailCount} private ambient exceptions.");
} }
TestBase.Log(output, $"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}, (Completion) Queue: {SocketManager.Shared?.CompletionPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.CompletionPool?.TotalServicedByPool.ToString()}"); TestBase.Log(output, $"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}");
} }
} }
......
...@@ -180,7 +180,7 @@ public void Teardown() ...@@ -180,7 +180,7 @@ public void Teardown()
} }
Skip.Inconclusive($"There were {privateFailCount} private and {sharedFailCount.Value} ambient exceptions; expected {expectedFailCount}."); Skip.Inconclusive($"There were {privateFailCount} private and {sharedFailCount.Value} ambient exceptions; expected {expectedFailCount}.");
} }
Log($"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}, (Completion) Queue: {SocketManager.Shared?.CompletionPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.CompletionPool?.TotalServicedByPool.ToString()}"); Log($"Service Counts: (Scheduler) Queue: {SocketManager.Shared?.SchedulerPool?.TotalServicedByQueue.ToString()}, Pool: {SocketManager.Shared?.SchedulerPool?.TotalServicedByPool.ToString()}");
} }
protected IServer GetServer(IConnectionMultiplexer muxer) protected IServer GetServer(IConnectionMultiplexer muxer)
......
...@@ -16,17 +16,26 @@ public static async Task Main() ...@@ -16,17 +16,26 @@ public static async Task Main()
var start = DateTime.Now; var start = DateTime.Now;
Show(client.GetCounters()); Show(client.GetCounters());
var tasks = Enumerable.Range(0, 1000).Select(async i => var tasks = Enumerable.Range(0, 1000).Select(async i =>
{ {
int timeoutCount = 0;
RedisKey key = i.ToString();
for (int t = 0; t < 1000; t++) for (int t = 0; t < 1000; t++)
{ {
await db.StringIncrementAsync(i.ToString(), 1); try
// db.StringIncrement(i.ToString(), 1); {
await db.StringIncrementAsync(key, 1);
}
catch (TimeoutException) { timeoutCount++; }
} }
await Task.Yield(); return timeoutCount;
}); }).ToArray();
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
int totalTimeouts = tasks.Sum(x => x.Result);
Console.WriteLine("Total timeouts: " + totalTimeouts);
Console.WriteLine();
Show(client.GetCounters()); Show(client.GetCounters());
var duration = DateTime.Now.Subtract(start).TotalMilliseconds; var duration = DateTime.Now.Subtract(start).TotalMilliseconds;
......
...@@ -15,6 +15,6 @@ ...@@ -15,6 +15,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="[1.2.6]" /> <PackageReference Include="StackExchange.Redis" Version="[2.0.545]" /> <!-- [1.2.6] for previous major -->
</ItemGroup> </ItemGroup>
</Project> </Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment