Commit df6523ef authored by Marc Gravell's avatar Marc Gravell

Preserve order: didn't (quite); fixed

parent d80ca807
...@@ -110,43 +110,58 @@ private static void ProcessAsyncCompletionQueue(object state) ...@@ -110,43 +110,58 @@ private static void ProcessAsyncCompletionQueue(object state)
} }
partial void OnCompletedAsync(); partial void OnCompletedAsync();
int activeAsyncWorkerThread = 0;
private void ProcessAsyncCompletionQueueImpl() private void ProcessAsyncCompletionQueueImpl()
{ {
int total = 0; int currentThread = Environment.CurrentManagedThreadId;
do try
{ {
ICompletable next; if (Interlocked.CompareExchange(ref activeAsyncWorkerThread, currentThread, 0) != 0) return;
lock (asyncCompletionQueue) int total = 0;
{ do
next = asyncCompletionQueue.Count == 0 ? null
: asyncCompletionQueue.Dequeue();
}
if(next == null && Thread.Yield()) // give it a moment and try again
{ {
ICompletable next;
lock (asyncCompletionQueue) lock (asyncCompletionQueue)
{ {
next = asyncCompletionQueue.Count == 0 ? null next = asyncCompletionQueue.Count == 0 ? null
: asyncCompletionQueue.Dequeue(); : asyncCompletionQueue.Dequeue();
} }
} if (next == null)
if (next == null) break; // nothing to do {
try // give it a moment and try again, noting that we might lose the battle
{ // when we pause
multiplexer.Trace("Completing async (ordered): " + next, name); Interlocked.CompareExchange(ref activeAsyncWorkerThread, 0, currentThread);
next.TryComplete(true); if (Thread.Yield() && Interlocked.CompareExchange(ref activeAsyncWorkerThread, currentThread, 0) == 0)
Interlocked.Increment(ref completedAsync); {
} // we paused, and we got the lock back; anything else?
catch(Exception ex) lock (asyncCompletionQueue)
{ {
multiplexer.Trace("Async completion error: " + ex.Message, name); next = asyncCompletionQueue.Count == 0 ? null
Interlocked.Increment(ref failedAsync); : asyncCompletionQueue.Dequeue();
} }
total++; }
} while (true); }
if (next == null) break; // nothing to do <===== exit point
multiplexer.Trace("Async completion worker processed " + total + " operations", name); try
{
multiplexer.Trace("Completing async (ordered): " + next, name);
next.TryComplete(true);
Interlocked.Increment(ref completedAsync);
}
catch (Exception ex)
{
multiplexer.Trace("Async completion error: " + ex.Message, name);
Interlocked.Increment(ref failedAsync);
}
total++;
} while (true);
multiplexer.Trace("Async completion worker processed " + total + " operations", name);
}
finally
{
Interlocked.CompareExchange(ref activeAsyncWorkerThread, 0, currentThread);
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment