Commit cf6f5576 authored by Marc Gravell's avatar Marc Gravell

Hot wait loop; better init logging

parent deb8b4f4
...@@ -559,6 +559,26 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) ...@@ -559,6 +559,26 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
} }
return false; return false;
} }
private void LogLockedWithThreadPoolStats(TextWriter log, string message)
{
if(log != null)
{
var sb = new StringBuilder();
sb.Append(message);
AppendThreadPoolStats(sb);
LogLocked(log, sb.ToString());
}
}
static bool AllComplete(Task[] tasks)
{
for(int i = 0 ; i < tasks.Length ; i++)
{
var task = tasks[i];
if (!task.IsCanceled && !task.IsCompleted && !task.IsFaulted)
return false;
}
return true;
}
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log) private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log)
{ {
if (tasks == null) throw new ArgumentNullException("tasks"); if (tasks == null) throw new ArgumentNullException("tasks");
...@@ -568,8 +588,36 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -568,8 +588,36 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
return true; return true;
} }
LogLocked(log, "Awaiting task completion..."); if (AllComplete(tasks))
{
LogLocked(log, "All tasks are already complete");
return true;
}
// try and allow them to finish without needing to await; re-acquiring the thread can be massively problematic
int delay = 1;
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
for (int i = 0; i < 10; i++)
{
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if(remaining <= 0)
{
LogLockedWithThreadPoolStats(log, "Timeout waiting for tasks");
return false;
}
if (delay > remaining) delay = remaining;
Thread.Sleep(delay);
if (AllComplete(tasks))
{
LogLocked(log, "Tasks completed in sleep-loop");
return true;
}
delay = (delay * 3) >> 1;
if (delay == 1) delay = 2;
}
LogLockedWithThreadPoolStats(log, "Awaiting task completion");
try try
{ {
...@@ -583,7 +631,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -583,7 +631,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var any = Task.WhenAny(allTasks, Task.Delay(timeoutMilliseconds)).ObserveErrors(); var any = Task.WhenAny(allTasks, Task.Delay(timeoutMilliseconds)).ObserveErrors();
#endif #endif
bool all = await any.ForAwait() == allTasks; bool all = await any.ForAwait() == allTasks;
LogLocked(log, all ? "All tasks completed cleanly" : "Not all tasks completed cleanly"); LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : "Not all tasks completed cleanly");
return all; return all;
} }
catch catch
...@@ -599,7 +647,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -599,7 +647,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds); var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0) if (remaining <= 0)
{ {
LogLocked(log, "Timeout awaiting tasks"); LogLockedWithThreadPoolStats(log, "Timeout awaiting tasks");
return false; return false;
} }
try try
...@@ -615,7 +663,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -615,7 +663,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
{ } { }
} }
} }
LogLocked(log, "Finished awaiting tasks"); LogLockedWithThreadPoolStats(log, "Finished awaiting tasks");
return false; return false;
} }
...@@ -1854,7 +1902,6 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -1854,7 +1902,6 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
return val; return val;
} }
} }
private static void AppendThreadPoolStats(StringBuilder errorMessage) private static void AppendThreadPoolStats(StringBuilder errorMessage)
{ {
//BusyThreads = TP.GetMaxThreads() –TP.GetAVailable(); //BusyThreads = TP.GetMaxThreads() –TP.GetAVailable();
...@@ -1872,8 +1919,8 @@ private static void AppendThreadPoolStats(StringBuilder errorMessage) ...@@ -1872,8 +1919,8 @@ private static void AppendThreadPoolStats(StringBuilder errorMessage)
int busyIoThreads = maxIoThreads - freeIoThreads; int busyIoThreads = maxIoThreads - freeIoThreads;
int busyWorkerThreads = maxWorkerThreads - freeWorkerThreads; int busyWorkerThreads = maxWorkerThreads - freeWorkerThreads;
errorMessage.AppendFormat(", IOCP:(Busy={0},Min={1},Max={2})", busyIoThreads, minIoThreads, maxIoThreads); errorMessage.AppendFormat(", IOCP:(Busy={0},Free={1},Min={2},Max={3})", busyIoThreads, freeIoThreads, minIoThreads, maxIoThreads);
errorMessage.AppendFormat(", WORKER:(Busy={0},Min={1},Max={2})", busyWorkerThreads, minWorkerThreads, maxWorkerThreads); errorMessage.AppendFormat(", WORKER:(Busy={0},Free={1},Min={2},Max={3})", busyWorkerThreads, freeWorkerThreads, minWorkerThreads, maxWorkerThreads);
} }
/// <summary> /// <summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment