Commit e30a829e authored by Marc Gravell's avatar Marc Gravell

potentially controversial change; remove PreserveAsyncOrder - it is doomed to pain

also: has PubSubGetAllAnyOrder issues?
parent c14be682
...@@ -14,14 +14,11 @@ public class BasicOpsTests : TestBase ...@@ -14,14 +14,11 @@ public class BasicOpsTests : TestBase
{ {
public BasicOpsTests(ITestOutputHelper output) : base (output) { } public BasicOpsTests(ITestOutputHelper output) : base (output) { }
[Theory] [Fact]
[InlineData(true)] public void PingOnce()
[InlineData(false)]
public void PingOnce(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
var task = conn.PingAsync(); var task = conn.PingAsync();
...@@ -51,14 +48,11 @@ public void RapidDispose() ...@@ -51,14 +48,11 @@ public void RapidDispose()
} }
} }
[Theory] [Fact]
[InlineData(true)] public void PingMany()
[InlineData(false)]
public void PingMany(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
var tasks = new Task<TimeSpan>[10000]; var tasks = new Task<TimeSpan>[10000];
...@@ -155,14 +149,11 @@ public void SetWithZeroValue() ...@@ -155,14 +149,11 @@ public void SetWithZeroValue()
} }
} }
[Theory] [Fact]
[InlineData(true)] public async Task GetSetAsync()
[InlineData(false)]
public async Task GetSetAsync(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
RedisKey key = Me(); RedisKey key = Me();
...@@ -185,14 +176,11 @@ public async Task GetSetAsync(bool preserveOrder) ...@@ -185,14 +176,11 @@ public async Task GetSetAsync(bool preserveOrder)
} }
} }
[Theory] [Fact]
[InlineData(true)] public void GetSetSync()
[InlineData(false)]
public void GetSetSync(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
RedisKey key = Me(); RedisKey key = Me();
...@@ -298,15 +286,12 @@ public void GetWithExpiryWrongTypeSync() ...@@ -298,15 +286,12 @@ public void GetWithExpiryWrongTypeSync()
} }
#if DEBUG #if DEBUG
[Theory] [Fact]
[InlineData(true)] public void TestQuit()
[InlineData(false)]
public void TestQuit(bool preserveOrder)
{ {
SetExpectedAmbientFailureCount(1); SetExpectedAmbientFailureCount(1);
using (var muxer = Create(allowAdmin: true)) using (var muxer = Create(allowAdmin: true))
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
string key = Guid.NewGuid().ToString(); string key = Guid.NewGuid().ToString();
db.KeyDelete(key, CommandFlags.FireAndForget); db.KeyDelete(key, CommandFlags.FireAndForget);
...@@ -349,14 +334,11 @@ public async Task TestSevered(bool preserveOrder) ...@@ -349,14 +334,11 @@ public async Task TestSevered(bool preserveOrder)
} }
#endif #endif
[Theory] [Fact]
[InlineData(true)] public async Task IncrAsync()
[InlineData(false)]
public async Task IncrAsync(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
RedisKey key = Me(); RedisKey key = Me();
conn.KeyDelete(key, CommandFlags.FireAndForget); conn.KeyDelete(key, CommandFlags.FireAndForget);
...@@ -382,14 +364,11 @@ public async Task IncrAsync(bool preserveOrder) ...@@ -382,14 +364,11 @@ public async Task IncrAsync(bool preserveOrder)
} }
} }
[Theory] [Fact]
[InlineData(true)] public void IncrSync()
[InlineData(false)]
public void IncrSync(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
RedisKey key = Me(); RedisKey key = Me();
conn.KeyDelete(key, CommandFlags.FireAndForget); conn.KeyDelete(key, CommandFlags.FireAndForget);
......
...@@ -75,18 +75,19 @@ private void TestMassivePublish(ISubscriber conn, string channel, string caption ...@@ -75,18 +75,19 @@ private void TestMassivePublish(ISubscriber conn, string channel, string caption
Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds, caption); Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds, caption);
} }
[FactLongRunning] // [FactLongRunning]
public async Task PubSubOrder() [Fact]
public async Task PubSubGetAllAnyOrder()
{ {
using (var muxer = GetRemoteConnection(waitForOpen: true)) using (var muxer = GetRemoteConnection(waitForOpen: true,
syncTimeout: 20000))
{ {
var sub = muxer.GetSubscriber(); var sub = muxer.GetSubscriber();
const string channel = "PubSubOrder"; RedisChannel channel = Me();
const int count = 500000; const int count = 500000;
var syncLock = new object(); var syncLock = new object();
var data = new List<int>(count); var data = new HashSet<int>();
muxer.PreserveAsyncOrder = true;
await sub.SubscribeAsync(channel, (key, val) => await sub.SubscribeAsync(channel, (key, val) =>
{ {
bool pulse; bool pulse;
...@@ -118,7 +119,7 @@ public async Task PubSubOrder() ...@@ -118,7 +119,7 @@ public async Task PubSubOrder()
} }
for (int i = 0; i < count; i++) for (int i = 0; i < count; i++)
{ {
Assert.Equal(i, data[i]); Assert.Contains(i, data);
} }
} }
} }
......
...@@ -11,11 +11,15 @@ public class Issue791 : TestBase ...@@ -11,11 +11,15 @@ public class Issue791 : TestBase
public void PreserveAsyncOrderImplicitValue_ParsedFromConnectionString() public void PreserveAsyncOrderImplicitValue_ParsedFromConnectionString()
{ {
var options = ConfigurationOptions.Parse("preserveAsyncOrder=true"); var options = ConfigurationOptions.Parse("preserveAsyncOrder=true");
#pragma warning disable CS0618
Assert.True(options.PreserveAsyncOrder); Assert.True(options.PreserveAsyncOrder);
#pragma warning restore CS0618
Assert.Equal("preserveAsyncOrder=True", options.ToString()); Assert.Equal("preserveAsyncOrder=True", options.ToString());
options = ConfigurationOptions.Parse("preserveAsyncOrder=false"); options = ConfigurationOptions.Parse("preserveAsyncOrder=false");
#pragma warning disable CS0618
Assert.False(options.PreserveAsyncOrder); Assert.False(options.PreserveAsyncOrder);
#pragma warning restore CS0618
Assert.Equal("preserveAsyncOrder=False", options.ToString()); Assert.Equal("preserveAsyncOrder=False", options.ToString());
} }
...@@ -23,14 +27,18 @@ public void PreserveAsyncOrderImplicitValue_ParsedFromConnectionString() ...@@ -23,14 +27,18 @@ public void PreserveAsyncOrderImplicitValue_ParsedFromConnectionString()
public void DefaultValue_IsTrue() public void DefaultValue_IsTrue()
{ {
var options = ConfigurationOptions.Parse("ssl=true"); var options = ConfigurationOptions.Parse("ssl=true");
#pragma warning disable CS0618
Assert.True(options.PreserveAsyncOrder); Assert.True(options.PreserveAsyncOrder);
#pragma warning restore CS0618
} }
[Fact] [Fact]
public void PreserveAsyncOrder_SetConnectionMultiplexerProperty() public void PreserveAsyncOrder_SetConnectionMultiplexerProperty()
{ {
var multiplexer = ConnectionMultiplexer.Connect(TestConfig.Current.MasterServerAndPort + ",preserveAsyncOrder=false"); var multiplexer = ConnectionMultiplexer.Connect(TestConfig.Current.MasterServerAndPort + ",preserveAsyncOrder=false");
#pragma warning disable CS0618
Assert.False(multiplexer.PreserveAsyncOrder); Assert.False(multiplexer.PreserveAsyncOrder);
#pragma warning restore CS0618
} }
} }
} }
...@@ -13,18 +13,15 @@ public class MassiveOps : TestBase ...@@ -13,18 +13,15 @@ public class MassiveOps : TestBase
public MassiveOps(ITestOutputHelper output) : base(output) { } public MassiveOps(ITestOutputHelper output) : base(output) { }
[Theory] [Theory]
[InlineData(true, true)] [InlineData(true)]
[InlineData(true, false)] [InlineData(false)]
[InlineData(false, true)] public async Task MassiveBulkOpsAsync(bool withContinuation)
[InlineData(false, false)]
public async Task MassiveBulkOpsAsync(bool preserveOrder, bool withContinuation)
{ {
#if DEBUG #if DEBUG
var oldAsyncCompletionCount = ConnectionMultiplexer.GetAsyncCompletionWorkerCount(); var oldAsyncCompletionCount = ConnectionMultiplexer.GetAsyncCompletionWorkerCount();
#endif #endif
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
RedisKey key = "MBOA"; RedisKey key = "MBOA";
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
await conn.PingAsync().ForAwait(); await conn.PingAsync().ForAwait();
...@@ -43,7 +40,7 @@ public async Task MassiveBulkOpsAsync(bool preserveOrder, bool withContinuation) ...@@ -43,7 +40,7 @@ public async Task MassiveBulkOpsAsync(bool preserveOrder, bool withContinuation)
Assert.Equal(AsyncOpsQty, await conn.StringGetAsync(key).ForAwait()); Assert.Equal(AsyncOpsQty, await conn.StringGetAsync(key).ForAwait());
watch.Stop(); watch.Stop();
Output.WriteLine("{2}: Time for {0} ops: {1}ms ({3}, {4}); ops/s: {5}", AsyncOpsQty, watch.ElapsedMilliseconds, Me(), Output.WriteLine("{2}: Time for {0} ops: {1}ms ({3}, {4}); ops/s: {5}", AsyncOpsQty, watch.ElapsedMilliseconds, Me(),
withContinuation ? "with continuation" : "no continuation", preserveOrder ? "preserve order" : "any order", withContinuation ? "with continuation" : "no continuation", "any order",
AsyncOpsQty / watch.Elapsed.TotalSeconds); AsyncOpsQty / watch.Elapsed.TotalSeconds);
#if DEBUG #if DEBUG
Output.WriteLine("Async completion workers: " + (ConnectionMultiplexer.GetAsyncCompletionWorkerCount() - oldAsyncCompletionCount)); Output.WriteLine("Async completion workers: " + (ConnectionMultiplexer.GetAsyncCompletionWorkerCount() - oldAsyncCompletionCount));
...@@ -52,20 +49,15 @@ public async Task MassiveBulkOpsAsync(bool preserveOrder, bool withContinuation) ...@@ -52,20 +49,15 @@ public async Task MassiveBulkOpsAsync(bool preserveOrder, bool withContinuation)
} }
[Theory] [Theory]
[InlineData(true, 1)] [InlineData(1)]
[InlineData(false, 1)] [InlineData(5)]
[InlineData(true, 5)] [InlineData(10)]
[InlineData(false, 5)] [InlineData(50)]
[InlineData(true, 10)] public void MassiveBulkOpsSync(int threads)
[InlineData(false, 10)]
[InlineData(true, 50)]
[InlineData(false, 50)]
public void MassiveBulkOpsSync(bool preserveOrder, int threads)
{ {
int workPerThread = SyncOpsQty / threads; int workPerThread = SyncOpsQty / threads;
using (var muxer = Create(syncTimeout: 30000)) using (var muxer = Create(syncTimeout: 30000))
{ {
muxer.PreserveAsyncOrder = preserveOrder;
RedisKey key = "MBOS"; RedisKey key = "MBOS";
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
conn.KeyDelete(key); conn.KeyDelete(key);
...@@ -85,7 +77,7 @@ public void MassiveBulkOpsSync(bool preserveOrder, int threads) ...@@ -85,7 +77,7 @@ public void MassiveBulkOpsSync(bool preserveOrder, int threads)
Assert.Equal(workPerThread * threads, val); Assert.Equal(workPerThread * threads, val);
Output.WriteLine("{2}: Time for {0} ops on {4} threads: {1}ms ({3}); ops/s: {5}", Output.WriteLine("{2}: Time for {0} ops on {4} threads: {1}ms ({3}); ops/s: {5}",
threads * workPerThread, timeTaken.TotalMilliseconds, Me() threads * workPerThread, timeTaken.TotalMilliseconds, Me()
, preserveOrder ? "preserve order" : "any order", threads, (workPerThread * threads) / timeTaken.TotalSeconds); , "any order", threads, (workPerThread * threads) / timeTaken.TotalSeconds);
#if DEBUG #if DEBUG
long newAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount(); long newAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount();
long newWorkerCount = ConnectionMultiplexer.GetAsyncCompletionWorkerCount(); long newWorkerCount = ConnectionMultiplexer.GetAsyncCompletionWorkerCount();
...@@ -96,15 +88,12 @@ public void MassiveBulkOpsSync(bool preserveOrder, int threads) ...@@ -96,15 +88,12 @@ public void MassiveBulkOpsSync(bool preserveOrder, int threads)
} }
[Theory] [Theory]
[InlineData(true, 1)] [InlineData(1)]
[InlineData(false, 1)] [InlineData(5)]
[InlineData(true, 5)] public void MassiveBulkOpsFireAndForget(int threads)
[InlineData(false, 5)]
public void MassiveBulkOpsFireAndForget(bool preserveOrder, int threads)
{ {
using (var muxer = Create(syncTimeout: 30000)) using (var muxer = Create(syncTimeout: 30000))
{ {
muxer.PreserveAsyncOrder = preserveOrder;
#if DEBUG #if DEBUG
long oldAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount(); long oldAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount();
#endif #endif
...@@ -127,7 +116,7 @@ public void MassiveBulkOpsFireAndForget(bool preserveOrder, int threads) ...@@ -127,7 +116,7 @@ public void MassiveBulkOpsFireAndForget(bool preserveOrder, int threads)
Output.WriteLine("{2}: Time for {0} ops over {5} threads: {1:###,###}ms ({3}); ops/s: {4:###,###,##0}", Output.WriteLine("{2}: Time for {0} ops over {5} threads: {1:###,###}ms ({3}); ops/s: {4:###,###,##0}",
val, elapsed.TotalMilliseconds, Me(), val, elapsed.TotalMilliseconds, Me(),
preserveOrder ? "preserve order" : "any order", "any order",
val / elapsed.TotalSeconds, threads); val / elapsed.TotalSeconds, threads);
#if DEBUG #if DEBUG
long newAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount(); long newAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount();
......
...@@ -11,10 +11,8 @@ public class PreserveOrder : TestBase ...@@ -11,10 +11,8 @@ public class PreserveOrder : TestBase
{ {
public PreserveOrder(ITestOutputHelper output) : base (output) { } public PreserveOrder(ITestOutputHelper output) : base (output) { }
[Theory] [Fact]
[InlineData(true)] public void Execute()
[InlineData(false)]
public void Execute(bool preserveAsyncOrder)
{ {
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -33,9 +31,8 @@ public void Execute(bool preserveAsyncOrder) ...@@ -33,9 +31,8 @@ public void Execute(bool preserveAsyncOrder)
Thread.Sleep(1); // you kinda need to be slow, otherwise Thread.Sleep(1); // you kinda need to be slow, otherwise
// the pool will end up doing everything on one thread // the pool will end up doing everything on one thread
}); });
conn.PreserveAsyncOrder = preserveAsyncOrder;
Output.WriteLine(""); Output.WriteLine("");
Output.WriteLine("Sending ({0})...", preserveAsyncOrder ? "preserved order" : "any order"); Output.WriteLine("Sending ({0})...", "any order");
lock (received) lock (received)
{ {
received.Clear(); received.Clear();
...@@ -65,11 +62,9 @@ public void Execute(bool preserveAsyncOrder) ...@@ -65,11 +62,9 @@ public void Execute(bool preserveAsyncOrder)
if (received[i] != i) wrongOrder++; if (received[i] != i) wrongOrder++;
} }
Output.WriteLine("Out of order: " + wrongOrder); Output.WriteLine("Out of order: " + wrongOrder);
if (preserveAsyncOrder) Assert.Equal(0, wrongOrder);
else Assert.NotEqual(0, wrongOrder);
} }
} }
} }
} }
} }
} }
\ No newline at end of file
...@@ -42,23 +42,16 @@ public void ExplicitPublishMode() ...@@ -42,23 +42,16 @@ public void ExplicitPublishMode()
} }
[Theory] [Theory]
[InlineData(true, null, false)] [InlineData(null, false)]
[InlineData(false, null, false)] [InlineData("", false)]
[InlineData(true, "", false)] [InlineData("Foo:", false)]
[InlineData(false, "", false)] [InlineData(null, true)]
[InlineData(true, "Foo:", false)] [InlineData("", true)]
[InlineData(false, "Foo:", false)] [InlineData("Foo:", true)]
[InlineData(true, null, true)] public void TestBasicPubSub(string channelPrefix, bool wildCard)
[InlineData(false, null, true)]
[InlineData(true, "", true)]
[InlineData(false, "", true)]
[InlineData(true, "Foo:", true)]
[InlineData(false, "Foo:", true)]
public void TestBasicPubSub(bool preserveOrder, string channelPrefix, bool wildCard)
{ {
using (var muxer = Create(channelPrefix: channelPrefix)) using (var muxer = Create(channelPrefix: channelPrefix))
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var pub = GetAnyMaster(muxer); var pub = GetAnyMaster(muxer);
var sub = muxer.GetSubscriber(); var sub = muxer.GetSubscriber();
Ping(muxer, pub, sub); Ping(muxer, pub, sub);
...@@ -123,14 +116,11 @@ public void TestBasicPubSub(bool preserveOrder, string channelPrefix, bool wildC ...@@ -123,14 +116,11 @@ public void TestBasicPubSub(bool preserveOrder, string channelPrefix, bool wildC
} }
} }
[Theory] [Fact]
[InlineData(true)] public void TestBasicPubSubFireAndForget()
[InlineData(false)]
public void TestBasicPubSubFireAndForget(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var pub = GetAnyMaster(muxer); var pub = GetAnyMaster(muxer);
var sub = muxer.GetSubscriber(); var sub = muxer.GetSubscriber();
...@@ -193,14 +183,11 @@ private static void Ping(ConnectionMultiplexer muxer, IServer pub, ISubscriber s ...@@ -193,14 +183,11 @@ private static void Ping(ConnectionMultiplexer muxer, IServer pub, ISubscriber s
} }
} }
[Theory] [Fact]
[InlineData(true)] public void TestPatternPubSub()
[InlineData(false)]
public void TestPatternPubSub(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
var pub = GetAnyMaster(muxer); var pub = GetAnyMaster(muxer);
var sub = muxer.GetSubscriber(); var sub = muxer.GetSubscriber();
......
...@@ -14,14 +14,11 @@ public class Secure : TestBase ...@@ -14,14 +14,11 @@ public class Secure : TestBase
public Secure(ITestOutputHelper output) : base (output) { } public Secure(ITestOutputHelper output) : base (output) { }
[Theory] [Fact]
[InlineData(true)] public void MassiveBulkOpsFireAndForgetSecure()
[InlineData(false)]
public void MassiveBulkOpsFireAndForgetSecure(bool preserveOrder)
{ {
using (var muxer = Create()) using (var muxer = Create())
{ {
muxer.PreserveAsyncOrder = preserveOrder;
#if DEBUG #if DEBUG
long oldAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount(); long oldAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount();
#endif #endif
...@@ -39,7 +36,7 @@ public void MassiveBulkOpsFireAndForgetSecure(bool preserveOrder) ...@@ -39,7 +36,7 @@ public void MassiveBulkOpsFireAndForgetSecure(bool preserveOrder)
Assert.Equal(AsyncOpsQty, val); Assert.Equal(AsyncOpsQty, val);
watch.Stop(); watch.Stop();
Output.WriteLine("{2}: Time for {0} ops: {1}ms ({3}); ops/s: {4}", AsyncOpsQty, watch.ElapsedMilliseconds, Me(), Output.WriteLine("{2}: Time for {0} ops: {1}ms ({3}); ops/s: {4}", AsyncOpsQty, watch.ElapsedMilliseconds, Me(),
preserveOrder ? "preserve order" : "any order", "any order",
AsyncOpsQty / watch.Elapsed.TotalSeconds); AsyncOpsQty / watch.Elapsed.TotalSeconds);
#if DEBUG #if DEBUG
long newAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount(); long newAlloc = ConnectionMultiplexer.GetResultBoxAllocationCount();
......
...@@ -7,16 +7,12 @@ namespace StackExchange.Redis ...@@ -7,16 +7,12 @@ namespace StackExchange.Redis
{ {
internal sealed partial class CompletionManager internal sealed partial class CompletionManager
{ {
private static readonly WaitCallback processAsyncCompletionQueue = ProcessAsyncCompletionQueue,
anyOrderCompletionHandler = AnyOrderCompletionHandler;
private readonly Queue<ICompletable> asyncCompletionQueue = new Queue<ICompletable>(); private readonly Queue<ICompletable> asyncCompletionQueue = new Queue<ICompletable>();
private readonly ConnectionMultiplexer multiplexer; private readonly ConnectionMultiplexer multiplexer;
private readonly string name; private readonly string name;
private int activeAsyncWorkerThread = 0;
private long completedSync, completedAsync, failedAsync; private long completedSync, completedAsync, failedAsync;
public CompletionManager(ConnectionMultiplexer multiplexer, string name) public CompletionManager(ConnectionMultiplexer multiplexer, string name)
{ {
...@@ -34,27 +30,9 @@ public void CompleteSyncOrAsync(ICompletable operation) ...@@ -34,27 +30,9 @@ public void CompleteSyncOrAsync(ICompletable operation)
} }
else else
{ {
if (multiplexer.PreserveAsyncOrder) multiplexer.Trace("Using thread-pool for asynchronous completion", name);
{ multiplexer.SocketManager.ScheduleTask(s_AnyOrderCompletionHandler, operation);
multiplexer.Trace("Queueing for asynchronous completion", name); Interlocked.Increment(ref completedAsync); // k, *technically* we haven't actually completed this yet, but: close enough
bool startNewWorker;
lock (asyncCompletionQueue)
{
asyncCompletionQueue.Enqueue(operation);
startNewWorker = asyncCompletionQueue.Count == 1;
}
if (startNewWorker)
{
multiplexer.Trace("Starting new async completion worker", name);
OnCompletedAsync();
ThreadPool.QueueUserWorkItem(processAsyncCompletionQueue, this);
}
} else
{
multiplexer.Trace("Using thread-pool for asynchronous completion", name);
ThreadPool.QueueUserWorkItem(anyOrderCompletionHandler, operation);
Interlocked.Increment(ref completedAsync); // k, *technically* we haven't actually completed this yet, but: close enough
}
} }
} }
...@@ -93,6 +71,7 @@ internal void GetStormLog(StringBuilder sb) ...@@ -93,6 +71,7 @@ internal void GetStormLog(StringBuilder sb)
} }
} }
private static readonly Action<object> s_AnyOrderCompletionHandler = AnyOrderCompletionHandler;
private static void AnyOrderCompletionHandler(object state) private static void AnyOrderCompletionHandler(object state)
{ {
try try
...@@ -106,86 +85,7 @@ private static void AnyOrderCompletionHandler(object state) ...@@ -106,86 +85,7 @@ private static void AnyOrderCompletionHandler(object state)
} }
} }
private static void ProcessAsyncCompletionQueue(object state)
{
((CompletionManager)state).ProcessAsyncCompletionQueueImpl();
}
partial void OnCompletedAsync(); partial void OnCompletedAsync();
private void ProcessAsyncCompletionQueueImpl()
{
int currentThread = Environment.CurrentManagedThreadId;
try
{
while (Interlocked.CompareExchange(ref activeAsyncWorkerThread, currentThread, 0) != 0)
{
// if we don't win the lock, check whether there is still work; if there is we
// need to retry to prevent a nasty race condition
lock(asyncCompletionQueue)
{
if (asyncCompletionQueue.Count == 0) return; // another thread drained it; can exit
}
Thread.Sleep(1);
}
int total = 0;
while (true)
{
ICompletable next;
lock (asyncCompletionQueue)
{
next = asyncCompletionQueue.Count == 0 ? null
: asyncCompletionQueue.Dequeue();
}
if (next == null)
{
// give it a moment and try again, noting that we might lose the battle
// when we pause
Interlocked.CompareExchange(ref activeAsyncWorkerThread, 0, currentThread);
if (SpinWait() && Interlocked.CompareExchange(ref activeAsyncWorkerThread, currentThread, 0) == 0)
{
// we paused, and we got the lock back; anything else?
lock (asyncCompletionQueue)
{
next = asyncCompletionQueue.Count == 0 ? null
: asyncCompletionQueue.Dequeue();
}
}
}
if (next == null) break; // nothing to do <===== exit point
try
{
multiplexer.Trace("Completing async (ordered): " + next, name);
next.TryComplete(true);
Interlocked.Increment(ref completedAsync);
}
catch (Exception ex)
{
multiplexer.Trace("Async completion error: " + ex.Message, name);
Interlocked.Increment(ref failedAsync);
}
total++;
}
multiplexer.Trace("Async completion worker processed " + total + " operations", name);
}
finally
{
Interlocked.CompareExchange(ref activeAsyncWorkerThread, 0, currentThread);
}
}
private bool SpinWait()
{
var sw = new SpinWait();
byte maxSpins = 128;
do
{
if (sw.NextSpinWillYield)
return true;
maxSpins--;
}
while (maxSpins > 0);
return false;
}
} }
} }
...@@ -258,7 +258,12 @@ public int ConnectTimeout ...@@ -258,7 +258,12 @@ public int ConnectTimeout
/// <summary> /// <summary>
/// Specifies whether asynchronous operations should be invoked in a way that guarantees their original delivery order /// Specifies whether asynchronous operations should be invoked in a way that guarantees their original delivery order
/// </summary> /// </summary>
public bool PreserveAsyncOrder { get { return preserveAsyncOrder.GetValueOrDefault(true); } set { preserveAsyncOrder = value; } } [Obsolete("Not supported; attempting to guarantee delivery order is consistently a cause of major performance problems", false)]
public bool PreserveAsyncOrder
{
get { return preserveAsyncOrder.GetValueOrDefault(true); }
set { preserveAsyncOrder = value; }
}
/// <summary> /// <summary>
/// Type of proxy to use (if any); for example Proxy.Twemproxy. /// Type of proxy to use (if any); for example Proxy.Twemproxy.
...@@ -646,7 +651,9 @@ private void DoParse(string configuration, bool ignoreUnknown) ...@@ -646,7 +651,9 @@ private void DoParse(string configuration, bool ignoreUnknown)
DefaultDatabase = OptionKeys.ParseInt32(key, value); DefaultDatabase = OptionKeys.ParseInt32(key, value);
break; break;
case OptionKeys.PreserveAsyncOrder: case OptionKeys.PreserveAsyncOrder:
#pragma warning disable CS0618
PreserveAsyncOrder = OptionKeys.ParseBoolean(key, value); PreserveAsyncOrder = OptionKeys.ParseBoolean(key, value);
#pragma warning restore CS0618
break; break;
case OptionKeys.SslProtocols: case OptionKeys.SslProtocols:
SslProtocols = OptionKeys.ParseSslProtocols(key, value); SslProtocols = OptionKeys.ParseSslProtocols(key, value);
......
...@@ -899,7 +899,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -899,7 +899,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
map.AssertAvailable(RedisCommand.EXISTS); map.AssertAvailable(RedisCommand.EXISTS);
} }
PreserveAsyncOrder = configuration.PreserveAsyncOrder;
TimeoutMilliseconds = configuration.SyncTimeout; TimeoutMilliseconds = configuration.SyncTimeout;
OnCreateReaderWriter(configuration); OnCreateReaderWriter(configuration);
...@@ -1810,7 +1809,12 @@ public override string ToString() ...@@ -1810,7 +1809,12 @@ public override string ToString()
/// <summary> /// <summary>
/// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order /// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order
/// </summary> /// </summary>
public bool PreserveAsyncOrder { get; set; } [Obsolete("Not supported; attempting to guarantee delivery order is consistently a cause of major performance problems", false)]
public bool PreserveAsyncOrder
{
get => false;
set { }
}
/// <summary> /// <summary>
/// Indicates whether any servers are connected /// Indicates whether any servers are connected
......
using System; using System;
using System.IO; using System.IO;
using System.Net; using System.Net;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -44,6 +44,7 @@ public interface IConnectionMultiplexer ...@@ -44,6 +44,7 @@ public interface IConnectionMultiplexer
/// <summary> /// <summary>
/// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order /// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order
/// </summary> /// </summary>
[Obsolete("Not supported; attempting to guarantee delivery order is consistently a cause of major performance problems", false)]
bool PreserveAsyncOrder { get; set; } bool PreserveAsyncOrder { get; set; }
/// <summary> /// <summary>
...@@ -282,4 +283,4 @@ public interface IConnectionMultiplexer ...@@ -282,4 +283,4 @@ public interface IConnectionMultiplexer
/// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns> /// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns>
Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None); Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None);
} }
} }
\ No newline at end of file
...@@ -97,7 +97,7 @@ public override bool TryComplete(bool isAsync) ...@@ -97,7 +97,7 @@ public override bool TryComplete(bool isAsync)
{ {
if (stateOrCompletionSource is TaskCompletionSource<T> tcs) if (stateOrCompletionSource is TaskCompletionSource<T> tcs)
{ {
if (isAsync || TaskSource.IsSyncSafe(tcs.Task)) if (isAsync)
{ {
UnwrapAndRecycle(this, true, out T val, out Exception ex); UnwrapAndRecycle(this, true, out T val, out Exception ex);
......
...@@ -158,24 +158,27 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr ...@@ -158,24 +158,27 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr
const long Receive_ResumeWriterThreshold = 3L * 1024 * 1024 * 1024; const long Receive_ResumeWriterThreshold = 3L * 1024 * 1024 * 1024;
var defaultPipeOptions = PipeOptions.Default; var defaultPipeOptions = PipeOptions.Default;
_scheduler = new DedicatedThreadPoolPipeScheduler(name, _schedulerPool = new DedicatedThreadPoolPipeScheduler(name + ":IO",
minWorkers: minThreads, maxWorkers: maxThreads, minWorkers: minThreads, maxWorkers: maxThreads,
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal); priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
SendPipeOptions = new PipeOptions( SendPipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler, defaultPipeOptions.Pool, _schedulerPool, _schedulerPool,
pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold, pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold,
resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold, resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold,
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE), minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false); useSynchronizationContext: false);
ReceivePipeOptions = new PipeOptions( ReceivePipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler, defaultPipeOptions.Pool, _schedulerPool, _schedulerPool,
pauseWriterThreshold: Receive_PauseWriterThreshold, pauseWriterThreshold: Receive_PauseWriterThreshold,
resumeWriterThreshold: Receive_ResumeWriterThreshold, resumeWriterThreshold: Receive_ResumeWriterThreshold,
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE), minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false); useSynchronizationContext: false);
_completionPool = new DedicatedThreadPoolPipeScheduler(name + ":Completion",
minWorkers: 1, maxWorkers: maxThreads, useThreadPoolQueueLength: 1);
} }
private DedicatedThreadPoolPipeScheduler _scheduler; private DedicatedThreadPoolPipeScheduler _schedulerPool, _completionPool;
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions; internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
private enum CallbackOperation private enum CallbackOperation
...@@ -194,8 +197,10 @@ private void Dispose(bool disposing) ...@@ -194,8 +197,10 @@ private void Dispose(bool disposing)
// note: the scheduler *can't* be collected by itself - there will // note: the scheduler *can't* be collected by itself - there will
// be threads, and those threads will be rooting the DedicatedThreadPool; // be threads, and those threads will be rooting the DedicatedThreadPool;
// but: we can lend a hand! We need to do this even in the finalizer // but: we can lend a hand! We need to do this even in the finalizer
try { _scheduler?.Dispose(); } catch { } try { _schedulerPool?.Dispose(); } catch { }
_scheduler = null; try { _completionPool?.Dispose(); } catch { }
_schedulerPool = null;
_completionPool = null;
if (disposing) if (disposing)
{ {
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
...@@ -354,8 +359,11 @@ private void Shutdown(Socket socket) ...@@ -354,8 +359,11 @@ private void Shutdown(Socket socket)
internal string GetState() internal string GetState()
{ {
var s = _scheduler; var s = _schedulerPool;
return s == null ? null : $"{s.BusyCount} of {s.WorkerCount} busy ({s.MaxWorkerCount} max)"; return s == null ? null : $"{s.BusyCount} of {s.WorkerCount} busy ({s.MaxWorkerCount} max)";
} }
internal void ScheduleTask(Action<object> action, object state)
=> _completionPool.Schedule(action, state);
} }
} }
...@@ -5,86 +5,14 @@ ...@@ -5,86 +5,14 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
/// <summary>
/// We want to prevent callers hijacking the reader thread; this is a bit nasty, but works;
/// see https://stackoverflow.com/a/22588431/23354 for more information; a huge
/// thanks to Eli Arbel for spotting this (even though it is pure evil; it is *my kind of evil*)
/// </summary>
internal static class TaskSource internal static class TaskSource
{ {
// on .NET < 4.6, it was possible to have threads hijacked; this is no longer a problem in 4.6 and core-clr 5,
// thanks to the new TaskCreationOptions.RunContinuationsAsynchronously, however we still need to be a little
// "test and react", as we could be targeting 4.5 but running on a 4.6 machine, in which case *it can still
// work the magic* (thanks to over-the-top install)
/// <summary>
/// Indicates whether the specified task will not hijack threads when results are set
/// </summary>
public static readonly Func<Task, bool> IsSyncSafe;
static TaskSource()
{
try
{
Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
var il = method.GetILGenerator();
//var hasContinuation = il.DefineLabel();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, continuationField);
Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
// check if null
il.Emit(OpCodes.Brtrue_S, nonNull);
il.MarkLabel(goodReturn);
il.Emit(OpCodes.Ldc_I4_1);
il.Emit(OpCodes.Ret);
// check if is a SetOnInvokeMres - if so, we're OK
il.MarkLabel(nonNull);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, continuationField);
il.Emit(OpCodes.Isinst, safeScenario);
il.Emit(OpCodes.Brtrue_S, goodReturn);
il.Emit(OpCodes.Ldc_I4_0);
il.Emit(OpCodes.Ret);
IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
// and test them (check for an exception etc)
var tcs = new TaskCompletionSource<int>();
bool expectTrue = IsSyncSafe(tcs.Task);
tcs.Task.ContinueWith(delegate { });
bool expectFalse = IsSyncSafe(tcs.Task);
tcs.SetResult(0);
if (!expectTrue || expectFalse)
{
// revert to not trusting /them
IsSyncSafe = null;
}
}
}
catch (Exception)
{
IsSyncSafe = null;
}
if (IsSyncSafe == null)
{
IsSyncSafe = _ => false; // assume: not
}
}
/// <summary> /// <summary>
/// Create a new TaskCompletion source /// Create a new TaskCompletion source
/// </summary> /// </summary>
/// <typeparam name="T">The type for the created <see cref="TaskCompletionSource{TResult}"/>.</typeparam> /// <typeparam name="T">The type for the created <see cref="TaskCompletionSource{TResult}"/>.</typeparam>
/// <param name="asyncState">The state for the created <see cref="TaskCompletionSource{TResult}"/>.</param> /// <param name="asyncState">The state for the created <see cref="TaskCompletionSource{TResult}"/>.</param>
public static TaskCompletionSource<T> Create<T>(object asyncState) public static TaskCompletionSource<T> Create<T>(object asyncState)
{ => new TaskCompletionSource<T>(asyncState, TaskCreationOptions.None);
return new TaskCompletionSource<T>(asyncState, TaskCreationOptions.None);
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment