Commit 5892e528 authored by Marc Gravell's avatar Marc Gravell

Explicitly track busy worker count

parent ce3785ec
...@@ -560,14 +560,15 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) ...@@ -560,14 +560,15 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
} }
return false; return false;
} }
private void LogLockedWithThreadPoolStats(TextWriter log, string message) private void LogLockedWithThreadPoolStats(TextWriter log, string message, out int busyWorkerCount)
{ {
busyWorkerCount = 0;
if(log != null) if(log != null)
{ {
var sb = new StringBuilder(); var sb = new StringBuilder();
sb.Append(message); sb.Append(message);
string iocp, worker; string iocp, worker;
GetThreadPoolStats(out iocp, out worker); busyWorkerCount = GetThreadPoolStats(out iocp, out worker);
sb.Append(", IOCP: ").Append(iocp).Append(", WORKER: ").Append(worker); sb.Append(", IOCP: ").Append(iocp).Append(", WORKER: ").Append(worker);
LogLocked(log, sb.ToString()); LogLocked(log, sb.ToString());
} }
...@@ -598,8 +599,8 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -598,8 +599,8 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
} }
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
int busyWorkerCount;
LogLockedWithThreadPoolStats(log, "Awaiting task completion"); LogLockedWithThreadPoolStats(log, "Awaiting task completion", out busyWorkerCount);
try try
{ {
...@@ -607,7 +608,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -607,7 +608,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds); var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0) if (remaining <= 0)
{ {
LogLockedWithThreadPoolStats(log, "Timeout before awaiting for tasks"); LogLockedWithThreadPoolStats(log, "Timeout before awaiting for tasks", out busyWorkerCount);
return false; return false;
} }
...@@ -619,7 +620,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -619,7 +620,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var any = Task.WhenAny(allTasks, Task.Delay(remaining)).ObserveErrors(); var any = Task.WhenAny(allTasks, Task.Delay(remaining)).ObserveErrors();
#endif #endif
bool all = await any.ForAwait() == allTasks; bool all = await any.ForAwait() == allTasks;
LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : "Not all tasks completed cleanly"); LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : "Not all tasks completed cleanly", out busyWorkerCount);
return all; return all;
} }
catch catch
...@@ -635,7 +636,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -635,7 +636,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds); var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0) if (remaining <= 0)
{ {
LogLockedWithThreadPoolStats(log, "Timeout awaiting tasks"); LogLockedWithThreadPoolStats(log, "Timeout awaiting tasks", out busyWorkerCount);
return false; return false;
} }
try try
...@@ -651,7 +652,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -651,7 +652,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
{ } { }
} }
} }
LogLockedWithThreadPoolStats(log, "Finished awaiting tasks"); LogLockedWithThreadPoolStats(log, "Finished awaiting tasks", out busyWorkerCount);
return false; return false;
} }
...@@ -1888,7 +1889,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -1888,7 +1889,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
}; };
int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar); int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar);
GetThreadPoolStats(out iocp, out worker); int busyWorkerCount = GetThreadPoolStats(out iocp, out worker);
add("Instantaneous", "inst", inst.ToString()); add("Instantaneous", "inst", inst.ToString());
#if !__MonoCS__ #if !__MonoCS__
add("Manager-State", "mgr", mgrState.ToString()); add("Manager-State", "mgr", mgrState.ToString());
...@@ -1905,7 +1906,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -1905,7 +1906,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
add("ThreadPool-IO-Completion", "IOCP", iocp); add("ThreadPool-IO-Completion", "IOCP", iocp);
add("ThreadPool-Workers", "WORKER", worker); add("ThreadPool-Workers", "WORKER", worker);
data.Add(Tuple.Create("Busy-Workers", busyWorkerCount.ToString()));
errMessage = sb.ToString(); errMessage = sb.ToString();
if (stormLogThreshold >= 0 && queue >= stormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0) if (stormLogThreshold >= 0 && queue >= stormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0)
{ {
...@@ -1935,7 +1936,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -1935,7 +1936,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
return val; return val;
} }
} }
private static void GetThreadPoolStats(out string iocp, out string worker) private static int GetThreadPoolStats(out string iocp, out string worker)
{ {
//BusyThreads = TP.GetMaxThreads() –TP.GetAVailable(); //BusyThreads = TP.GetMaxThreads() –TP.GetAVailable();
//If BusyThreads >= TP.GetMinThreads(), then threadpool growth throttling is possible. //If BusyThreads >= TP.GetMinThreads(), then threadpool growth throttling is possible.
...@@ -1954,6 +1955,7 @@ private static void GetThreadPoolStats(out string iocp, out string worker) ...@@ -1954,6 +1955,7 @@ private static void GetThreadPoolStats(out string iocp, out string worker)
iocp = string.Format("(Busy={0},Free={1},Min={2},Max={3})", busyIoThreads, freeIoThreads, minIoThreads, maxIoThreads); iocp = string.Format("(Busy={0},Free={1},Min={2},Max={3})", busyIoThreads, freeIoThreads, minIoThreads, maxIoThreads);
worker = string.Format("(Busy={0},Free={1},Min={2},Max={3})", busyWorkerThreads, freeWorkerThreads, minWorkerThreads, maxWorkerThreads); worker = string.Format("(Busy={0},Free={1},Min={2},Max={3})", busyWorkerThreads, freeWorkerThreads, minWorkerThreads, maxWorkerThreads);
return busyWorkerThreads;
} }
/// <summary> /// <summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment