Commit 30d8a7c5 authored by Marc Gravell's avatar Marc Gravell

More thread-race avoidance

parent df6523ef
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class PreserveOrder : TestBase
{
[Test]
[TestCase(true)]
[TestCase(false)]
public void Execute(bool preserveAsyncOrder)
{
using (var conn = Create())
{
var sub = conn.GetSubscriber();
var received = new List<int>();
Console.WriteLine("Subscribing...");
const int COUNT = 1000;
sub.Subscribe("foo", (channel, message) =>
{
lock (received)
{
received.Add((int)message);
if (received.Count == COUNT)
Monitor.PulseAll(received); // wake the test rig
}
Thread.Sleep(1); // you kinda need to be slow, otherwise
// the pool will end up doing everything on one thread
});
conn.PreserveAsyncOrder = preserveAsyncOrder;
Console.WriteLine();
Console.WriteLine("Sending ({0})...", (preserveAsyncOrder ? "preserved order" : "any order"));
lock (received)
{
received.Clear();
// we'll also use received as a wait-detection mechanism; sneaky
// note: this does not do any cheating;
// it all goes to the server and back
for (int i = 0; i < COUNT; i++)
{
sub.Publish("foo", i);
}
Console.WriteLine("Allowing time for delivery etc...");
var watch = Stopwatch.StartNew();
if (!Monitor.Wait(received, 10000))
{
Console.WriteLine("Timed out; expect less data");
}
watch.Stop();
Console.WriteLine("Checking...");
lock (received)
{
Console.WriteLine("Received: {0} in {1}ms", received.Count, watch.ElapsedMilliseconds);
int wrongOrder = 0;
for (int i = 0; i < Math.Min(COUNT, received.Count); i++)
{
if (received[i] != i) wrongOrder++;
}
Console.WriteLine("Out of order: " + wrongOrder);
if (preserveAsyncOrder) Assert.AreEqual(0, wrongOrder);
else Assert.AreNotEqual(0, wrongOrder);
}
}
}
}
}
}
......@@ -77,6 +77,7 @@
<Compile Include="Locking.cs" />
<Compile Include="MultiMaster.cs" />
<Compile Include="Naming.cs" />
<Compile Include="PreserveOrder.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSub.cs" />
<Compile Include="RealWorld.cs" />
......
......@@ -117,7 +117,16 @@ private void ProcessAsyncCompletionQueueImpl()
int currentThread = Environment.CurrentManagedThreadId;
try
{
if (Interlocked.CompareExchange(ref activeAsyncWorkerThread, currentThread, 0) != 0) return;
while (Interlocked.CompareExchange(ref activeAsyncWorkerThread, currentThread, 0) != 0)
{
// if we don't win the lock, check whether there is still work; if there is we
// need to retry to prevent a nasty race condition
lock(asyncCompletionQueue)
{
if (asyncCompletionQueue.Count == 0) return; // another thread drained it; can exit
}
Thread.Sleep(1);
}
int total = 0;
do
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment