Commit 8d12e231 authored by Nick Craver's avatar Nick Craver

Task.Delay: cancel delay tasks expediently

Before these were left in the scheduler on every success, which piles up needlessly. This cancels them upon success.
parent adfc504d
...@@ -641,8 +641,10 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -641,8 +641,10 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
} }
var allTasks = Task.WhenAll(tasks).ObserveErrors(); var allTasks = Task.WhenAll(tasks).ObserveErrors();
var any = Task.WhenAny(allTasks, Task.Delay(remaining)).ObserveErrors(); var cts = new CancellationTokenSource();
var any = Task.WhenAny(allTasks, Task.Delay(remaining, cts.Token)).ObserveErrors();
bool all = await any.ForAwait() == allTasks; bool all = await any.ForAwait() == allTasks;
cts.Cancel();
LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out busyWorkerCount); LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out busyWorkerCount);
return all; return all;
} }
...@@ -1287,11 +1289,13 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1287,11 +1289,13 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ {
if (configuration.ResolveDns && configuration.HasDnsEndPoints()) if (configuration.ResolveDns && configuration.HasDnsEndPoints())
{ {
var cts = new CancellationTokenSource();
var dns = configuration.ResolveEndPointsAsync(this, log).ObserveErrors(); var dns = configuration.ResolveEndPointsAsync(this, log).ObserveErrors();
if ((await Task.WhenAny(dns, Task.Delay(TimeoutMilliseconds)).ForAwait()) != dns) if ((await Task.WhenAny(dns, Task.Delay(TimeoutMilliseconds, cts.Token)).ForAwait()) != dns)
{ {
throw new TimeoutException("Timeout resolving endpoints"); throw new TimeoutException("Timeout resolving endpoints");
} }
cts.Cancel();
} }
foreach (var endpoint in configuration.EndPoints) foreach (var endpoint in configuration.EndPoints)
{ {
......
...@@ -106,11 +106,12 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -106,11 +106,12 @@ internal async void BeginConnectAsync(TextWriter log)
RemoteEndPoint = endpoint, RemoteEndPoint = endpoint,
}; };
_socketArgs.Completed += SocketAwaitable.Callback; _socketArgs.Completed += SocketAwaitable.Callback;
CancellationTokenSource timeoutSource = null;
try try
{ {
if (_socket.ConnectAsync(_socketArgs)) if (_socket.ConnectAsync(_socketArgs))
{ // asynchronous operation is pending { // asynchronous operation is pending
ConfigureTimeout(_socketArgs, Multiplexer.RawConfig.ConnectTimeout); timeoutSource = ConfigureTimeout(_socketArgs, Multiplexer.RawConfig.ConnectTimeout);
} }
else else
{ // completed synchronously { // completed synchronously
...@@ -125,6 +126,7 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -125,6 +126,7 @@ internal async void BeginConnectAsync(TextWriter log)
if (ignoreConnect) return; if (ignoreConnect) return;
await awaitable; // wait for the connect to complete or fail (will throw) await awaitable; // wait for the connect to complete or fail (will throw)
timeoutSource?.Cancel();
if (await ConnectedAsync(_socket, log, Multiplexer.SocketManager).ForAwait()) if (await ConnectedAsync(_socket, log, Multiplexer.SocketManager).ForAwait())
{ {
...@@ -175,9 +177,10 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -175,9 +177,10 @@ internal async void BeginConnectAsync(TextWriter log)
} }
} }
private static void ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilliseconds) private static CancellationTokenSource ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilliseconds)
{ {
var timeout = Task.Delay(timeoutMilliseconds); var cts = new CancellationTokenSource();
var timeout = Task.Delay(timeoutMilliseconds, cts.Token);
timeout.ContinueWith((_, state) => timeout.ContinueWith((_, state) =>
{ {
try try
...@@ -190,6 +193,7 @@ private static void ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilli ...@@ -190,6 +193,7 @@ private static void ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilli
} }
catch { } catch { }
}, args); }, args);
return cts;
} }
private enum ReadMode : byte private enum ReadMode : byte
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment