Commit 246603c9 authored by Marc Gravell's avatar Marc Gravell

Only prevent sync-continuations if explicitly enabled; otherwise it can cause...

Only prevent sync-continuations if explicitly enabled; otherwise it can cause near-deadlocks in the TPL-pool; avoid a dependency on the thread-pool in SocketManager
parent c71ce066
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="NuGet.CommandLine" version="2.8.0" />
<package id="Redis-64" version="2.6.12.1" />
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="NuGet.CommandLine" version="2.8.0" />
<package id="Redis-64" version="2.6.12.1" />
</packages>
\ No newline at end of file
......@@ -16,19 +16,18 @@ sealed partial class CompletionManager
private readonly string name;
private int asyncResultCounter;
long completedSync, completedAsync, failedAsync;
private readonly bool allowSyncContinuations;
public CompletionManager(ConnectionMultiplexer multiplexer, string name)
{
this.multiplexer = multiplexer;
this.name = name;
this.allowSyncContinuations = multiplexer.RawConfig.AllowSynchronousContinuations;
}
public void CompleteSyncOrAsync(ICompletable operation)
{
if (operation == null) return;
if (operation.TryComplete(false))
if (operation.TryComplete(false, allowSyncContinuations))
{
multiplexer.Trace("Completed synchronously: " + operation, name);
Interlocked.Increment(ref completedSync);
......@@ -39,11 +38,12 @@ public void CompleteSyncOrAsync(ICompletable operation)
if (multiplexer.PreserveAsyncOrder)
{
multiplexer.Trace("Queueing for asynchronous completion", name);
bool startNewWorker;
lock (asyncCompletionQueue)
{
asyncCompletionQueue.Enqueue(operation);
startNewWorker = asyncCompletionQueue.Count == 1;
}
bool startNewWorker = Interlocked.Increment(ref asyncResultCounter) == 1;
if (startNewWorker)
{
multiplexer.Trace("Starting new async completion worker", name);
......@@ -98,7 +98,7 @@ private static void AnyOrderCompletionHandler(object state)
try
{
ConnectionMultiplexer.TraceWithoutContext("Completing async (any order): " + state);
((ICompletable)state).TryComplete(true);
((ICompletable)state).TryComplete(true, true);
}
catch (Exception ex)
{
......@@ -111,16 +111,6 @@ private static void ProcessAsyncCompletionQueue(object state)
((CompletionManager)state).ProcessAsyncCompletionQueueImpl();
}
bool DecrementAsyncCounterAndCheckForMoreAsyncWork()
{
if (Thread.VolatileRead(ref asyncResultCounter) == 1)
{ // if we're on the very last item, then rather than exit immediately,
// let's give it a moment to see if more work comes in
Thread.Yield();
}
return Interlocked.Decrement(ref asyncResultCounter) != 0;
}
partial void OnCompletedAsync();
private void ProcessAsyncCompletionQueueImpl()
{
......@@ -130,19 +120,22 @@ private void ProcessAsyncCompletionQueueImpl()
ICompletable next;
lock (asyncCompletionQueue)
{
if (asyncCompletionQueue.Count == 0)
{
// compete; note that since we didn't do work we do NOT
// want to decr the counter; fortunately "break" gets
// this correct
break;
}
next = asyncCompletionQueue.Dequeue();
next = asyncCompletionQueue.Count == 0 ? null
: asyncCompletionQueue.Dequeue();
}
if(next == null && Thread.Yield()) // give it a moment and try again
{
lock (asyncCompletionQueue)
{
next = asyncCompletionQueue.Count == 0 ? null
: asyncCompletionQueue.Dequeue();
}
}
if (next == null) break; // nothing to do
try
{
multiplexer.Trace("Completing async (ordered): " + next, name);
next.TryComplete(true);
next.TryComplete(true, allowSyncContinuations);
Interlocked.Increment(ref completedAsync);
}
catch(Exception ex)
......@@ -151,7 +144,7 @@ private void ProcessAsyncCompletionQueueImpl()
Interlocked.Increment(ref failedAsync);
}
total++;
} while (DecrementAsyncCounterAndCheckForMoreAsyncWork());
} while (true);
multiplexer.Trace("Async completion worker processed " + total + " operations", name);
}
......
......@@ -20,7 +20,7 @@ public sealed class ConfigurationOptions : ICloneable
VersionPrefix = "version=", ConnectTimeoutPrefix = "connectTimeout=", PasswordPrefix = "password=",
TieBreakerPrefix = "tiebreaker=", WriteBufferPrefix = "writeBuffer=", SslHostPrefix = "sslHost=",
ConfigChannelPrefix = "configChannel=", AbortOnConnectFailPrefix = "abortConnect=", ResolveDnsPrefix = "resolveDns=",
ChannelPrefixPrefix = "channelPrefix=";
ChannelPrefixPrefix = "channelPrefix=", AllowSyncContinuationsPrefix = "syncCont=";
private readonly EndPointCollection endpoints = new EndPointCollection();
......@@ -29,7 +29,7 @@ public sealed class ConfigurationOptions : ICloneable
/// </summary>
public RedisChannel ChannelPrefix { get;set; }
private bool? allowAdmin, abortOnConnectFail, resolveDns;
private bool? allowAdmin, abortOnConnectFail, resolveDns, allowSyncContinuations;
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
private Version defaultVersion;
......@@ -147,9 +147,15 @@ public ConfigurationOptions()
/// <summary>
/// Gets or sets whether connect/configuration timeouts should be explicitly notified via a TimeoutException
/// </summary>
public bool AbortOnConnectFail { get { return abortOnConnectFail ?? true; } set { abortOnConnectFail = value; } }
public bool AbortOnConnectFail { get { return abortOnConnectFail ?? true; } set { abortOnConnectFail = value; } }
/// <summary>
/// Gets or sets whether synchronous task continuations should be explicitly avoided (allowed by default)
/// </summary>
public bool AllowSynchronousContinuations { get { return allowSyncContinuations ?? true; } set { allowSyncContinuations = value; } }
/// <summary>
/// Parse the configuration from a comma-delimited configuration string
/// </summary>
......@@ -172,6 +178,7 @@ public ConfigurationOptions Clone()
keepAlive = keepAlive,
syncTimeout = syncTimeout,
allowAdmin = allowAdmin,
allowSyncContinuations = allowSyncContinuations,
defaultVersion = defaultVersion,
connectTimeout = connectTimeout,
password = password,
......@@ -218,6 +225,7 @@ public override string ToString()
Append(sb, AbortOnConnectFailPrefix, abortOnConnectFail);
Append(sb, ResolveDnsPrefix, resolveDns);
Append(sb, ChannelPrefixPrefix, (string)ChannelPrefix);
Append(sb, AllowSyncContinuationsPrefix, allowSyncContinuations);
CommandMap.AppendDeltas(sb);
return sb.ToString();
}
......@@ -299,7 +307,7 @@ void Clear()
{
clientName = serviceName = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = null;
allowAdmin = abortOnConnectFail = resolveDns = null;
allowAdmin = abortOnConnectFail = resolveDns = allowSyncContinuations = null;
defaultVersion = null;
endpoints.Clear();
CertificateSelection = null;
......@@ -349,6 +357,11 @@ private void DoParse(string configuration)
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) ResolveDns = tmp;
}
else if (IsOption(option, AllowSyncContinuationsPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) AllowSynchronousContinuations = tmp;
}
else if (IsOption(option, ServiceNamePrefix))
{
......
......@@ -56,7 +56,7 @@ public ConnectionFailureType FailureType
{
get { return failureType; }
}
bool ICompletable.TryComplete(bool isAsync)
bool ICompletable.TryComplete(bool isAsync, bool allowSyncContinuations)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
......
......@@ -26,7 +26,7 @@ public EndPoint EndPoint
{
get { return endpoint; }
}
bool ICompletable.TryComplete(bool isAsync)
bool ICompletable.TryComplete(bool isAsync, bool allowSyncContinuations)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
......
......@@ -36,7 +36,7 @@ public sealed class HashSlotMovedEventArgs : EventArgs, ICompletable
this.@new = @new;
}
bool ICompletable.TryComplete(bool isAsync)
bool ICompletable.TryComplete(bool isAsync, bool allowSyncContinuations)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
......
......@@ -4,7 +4,7 @@ namespace StackExchange.Redis
{
interface ICompletable
{
bool TryComplete(bool isAsync);
bool TryComplete(bool isAsync, bool allowSyncContinuations);
void AppendStormLog(StringBuilder sb);
}
}
......@@ -56,7 +56,7 @@ public string Origin
{
get { return origin; }
}
bool ICompletable.TryComplete(bool isAsync)
bool ICompletable.TryComplete(bool isAsync, bool allowSyncContinuations)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
......
......@@ -370,11 +370,11 @@ public override string ToString()
resultProcessor == null ? "(n/a)" : resultProcessor.GetType().Name);
}
public bool TryComplete(bool isAsync)
public bool TryComplete(bool isAsync, bool allowSyncContinuations)
{
if (resultBox != null)
{
return resultBox.TryComplete(isAsync);
return resultBox.TryComplete(isAsync, allowSyncContinuations);
}
else
{
......
......@@ -22,7 +22,7 @@ public override string ToString()
{
return (string)channel;
}
public bool TryComplete(bool isAsync)
public bool TryComplete(bool isAsync, bool allowSyncContinuations)
{
if (handler == null) return true;
if (isAsync)
......
......@@ -38,7 +38,7 @@ void ICompletable.AppendStormLog(StringBuilder sb)
sb.Append("event, error: ").Append(message);
}
bool ICompletable.TryComplete(bool isAsync)
bool ICompletable.TryComplete(bool isAsync, bool allowSyncContinuations)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
......
......@@ -21,7 +21,7 @@ public void SetException(Exception exception)
// this.exception = caught;
//}
}
public abstract bool TryComplete(bool isAsync);
public abstract bool TryComplete(bool isAsync, bool allowSyncContinuations);
[Conditional("DEBUG")]
protected static void IncrementAllocationCount()
......@@ -94,12 +94,12 @@ public void SetResult(T value)
this.value = value;
}
public override bool TryComplete(bool isAsync)
public override bool TryComplete(bool isAsync, bool allowSyncContinuations)
{
if (stateOrCompletionSource is TaskCompletionSource<T>)
{
var tcs = (TaskCompletionSource<T>)stateOrCompletionSource;
if (isAsync || TaskContinationCheck.NoContinuations(tcs.Task))
if (isAsync || allowSyncContinuations || TaskContinationCheck.NoContinuations(tcs.Task))
{
T val;
Exception ex;
......
......@@ -245,11 +245,17 @@ private void ReadImpl()
if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help
{
// seek help, work in parallel, then synchronize
lock (QueueDrainSyncLock)
var obj = new QueueDrainSyncLock(this);
lock (obj)
{
ThreadPool.QueueUserWorkItem(HelpProcessItems, this);
ProcessItems();
Monitor.Wait(QueueDrainSyncLock);
if (!obj.Consume())
{ // then our worker arrived and picked up work; we need
// to let it finish; note that if it *didn't* get that far
// yet, the Consume() call will mean that it never tries
Monitor.Wait(obj);
}
}
}
else
......@@ -260,6 +266,30 @@ private void ReadImpl()
}
}
sealed class QueueDrainSyncLock
{
private int workers;
public QueueDrainSyncLock(SocketManager manager)
{
this.manager = manager;
}
private readonly SocketManager manager;
public SocketManager Manager { get { return manager; } }
internal bool Consume()
{
return Interlocked.CompareExchange(ref workers, 1, 0) == 0;
}
internal void Pulse()
{
lock (this)
{
Monitor.PulseAll(this);
}
}
}
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern int select([In] int ignoredParameter, [In, Out] IntPtr[] readfds, [In, Out] IntPtr[] writefds, [In, Out] IntPtr[] exceptfds, [In] ref TimeValue timeout);
......@@ -293,14 +323,14 @@ internal void Shutdown(SocketToken token)
Shutdown(token.Socket);
}
private readonly object QueueDrainSyncLock = new object();
static readonly WaitCallback HelpProcessItems = state =>
{
var mgr = (SocketManager)state;
mgr.ProcessItems();
lock (mgr.QueueDrainSyncLock)
{
Monitor.PulseAll(mgr.QueueDrainSyncLock);
QueueDrainSyncLock qdsl = (QueueDrainSyncLock)state;
if (qdsl.Consume())
{
var mgr = qdsl.Manager;
mgr.ProcessItems();
qdsl.Pulse();
}
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment