Commit b719bc48 authored by Marc Gravell's avatar Marc Gravell

improve configuration and tracking of async timeouts; only enable the...

improve configuration and tracking of async timeouts; only enable the time-marking if it is async-enabled (and not F+F)
parent 0187c9f6
......@@ -9,7 +9,7 @@ namespace StackExchange.Redis.Tests
{
public class AsyncTests : TestBase
{
public AsyncTests(ITestOutputHelper output) : base (output) { }
public AsyncTests(ITestOutputHelper output) : base(output) { }
protected override string GetConfiguration() => TestConfig.Current.MasterServerAndPort;
......@@ -49,13 +49,25 @@ public async Task AsyncTimeoutIsNoticed()
{
using (var conn = Create(syncTimeout: 1000))
{
var opt = ConfigurationOptions.Parse(conn.Configuration);
if (!Debugger.IsAttached)
{ // we max the timeouts if a degugger is detected
Assert.Equal(1000, opt.AsyncTimeout);
}
RedisKey key = Me();
var val = Guid.NewGuid().ToString();
var db = conn.GetDatabase();
db.StringSet(key, val);
Assert.Contains("; async timeouts: 0;", conn.GetStatus());
await db.ExecuteAsync("client", "pause", 4000); // client pause returns immediately
var ms = Stopwatch.StartNew();
var ex = await Assert.ThrowsAsync<RedisTimeoutException>(async () =>
{
await db.PingAsync(); // but *subsequent* operations are paused
var actual = await db.StringGetAsync(key); // but *subsequent* operations are paused
ms.Stop();
Writer.WriteLine($"Unexpectedly succeeded after {ms.ElapsedMilliseconds}ms");
});
......@@ -64,6 +76,10 @@ public async Task AsyncTimeoutIsNoticed()
Assert.Contains("Timeout awaiting response", ex.Message);
Writer.WriteLine(ex.Message);
string status = conn.GetStatus();
Writer.WriteLine(status);
Assert.Contains("; async timeouts: 1;", status);
}
}
}
......
......@@ -10,6 +10,7 @@ internal static class CompletedTask<T>
public static Task<T> FromResult(T value, object asyncState)
{
if (asyncState == null) return Task.FromResult<T>(value);
// note we do not need to deny exec-sync here; the value will be known
// before we hand it to them
var tcs = TaskSource.Create<T>(asyncState);
......
......@@ -65,6 +65,7 @@ internal static void Unknown(string key)
internal const string
AbortOnConnectFail = "abortConnect",
AllowAdmin = "allowAdmin",
AsyncTimeout = "asyncTimeout",
ChannelPrefix = "channelPrefix",
ConfigChannel = "configChannel",
ConfigCheckSeconds = "configCheckSeconds",
......@@ -92,6 +93,7 @@ internal const string
{
AbortOnConnectFail,
AllowAdmin,
AsyncTimeout,
ChannelPrefix,
ClientName,
ConfigChannel,
......@@ -133,7 +135,7 @@ public static string TryNormalize(string value)
private Version defaultVersion;
private int? keepAlive, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds;
private int? keepAlive, asyncTimeout, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds;
private Proxy? proxy;
......@@ -163,6 +165,11 @@ public static string TryNormalize(string value)
/// </summary>
public bool AllowAdmin { get { return allowAdmin.GetValueOrDefault(); } set { allowAdmin = value; } }
/// <summary>
/// Specifies the time in milliseconds that the system should allow for asynchronous operations (defaults to SyncTimeout)
/// </summary>
public int AsyncTimeout { get { return asyncTimeout ?? SyncTimeout; } set { asyncTimeout = value; } }
/// <summary>
/// Indicates whether the connection should be encrypted
/// </summary>
......@@ -412,6 +419,7 @@ public ConfigurationOptions Clone()
ServiceName = ServiceName,
keepAlive = keepAlive,
syncTimeout = syncTimeout,
asyncTimeout = asyncTimeout,
allowAdmin = allowAdmin,
defaultVersion = defaultVersion,
connectTimeout = connectTimeout,
......@@ -474,6 +482,7 @@ public string ToString(bool includePassword)
Append(sb, OptionKeys.ServiceName, ServiceName);
Append(sb, OptionKeys.KeepAlive, keepAlive);
Append(sb, OptionKeys.SyncTimeout, syncTimeout);
Append(sb, OptionKeys.AsyncTimeout, asyncTimeout);
Append(sb, OptionKeys.AllowAdmin, allowAdmin);
Append(sb, OptionKeys.Version, defaultVersion);
Append(sb, OptionKeys.ConnectTimeout, connectTimeout);
......@@ -569,7 +578,7 @@ private static void Append(StringBuilder sb, string prefix, object value)
private void Clear()
{
ClientName = ServiceName = Password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = DefaultDatabase = null;
keepAlive = syncTimeout = asyncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = DefaultDatabase = null;
allowAdmin = abortOnConnectFail = highPrioritySocketThreads = resolveDns = ssl = null;
defaultVersion = null;
EndPoints.Clear();
......@@ -618,6 +627,9 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SyncTimeout:
SyncTimeout = OptionKeys.ParseInt32(key, value, minValue: 1);
break;
case OptionKeys.AsyncTimeout:
AsyncTimeout = OptionKeys.ParseInt32(key, value, minValue: 1);
break;
case OptionKeys.AllowAdmin:
AllowAdmin = OptionKeys.ParseBoolean(key, value);
break;
......
......@@ -497,9 +497,13 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast;
/// <summary>
/// Gets the timeout associated with the connections
/// Gets the synchronous timeout associated with the connections
/// </summary>
public int TimeoutMilliseconds { get; }
/// <summary>
/// Gets the asynchronous timeout associated with the connections
/// </summary>
internal int AsyncTimeoutMilliseconds { get; }
/// <summary>
/// Gets all endpoints defined on the server
......@@ -956,6 +960,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
}
TimeoutMilliseconds = configuration.SyncTimeout;
AsyncTimeoutMilliseconds = configuration.AsyncTimeout;
OnCreateReaderWriter(configuration);
UnprocessableCompletionManager = new CompletionManager(this, "multiplexer");
......@@ -1242,8 +1247,9 @@ public void GetStatus(TextWriter log)
LogLocked(log, server.GetCounters().ToString());
LogLocked(log, server.GetProfile());
}
LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
LogLocked(log, "Sync timeouts: {0}; async timeouts: {1}; fire and forget: {2}; last heartbeat: {3}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref asyncTimeouts),
Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
}
private void ActivateAllServers(TextWriter log)
......@@ -1955,7 +1961,7 @@ public async Task CloseAsync(bool allowCommandsToComplete = true)
if (allowCommandsToComplete)
{
var quits = QuitAllServers();
await WaitAllIgnoreErrorsAsync(quits, configuration.SyncTimeout, null).ForAwait();
await WaitAllIgnoreErrorsAsync(quits, configuration.AsyncTimeout, null).ForAwait();
}
DisposeAndClearServers();
......@@ -2199,7 +2205,9 @@ public void ResetStormLog()
Interlocked.Exchange(ref haveStormLog, 0);
}
private long syncTimeouts, fireAndForgets;
private long syncTimeouts, fireAndForgets, asyncTimeouts;
internal void OnAsyncTimeout() => Interlocked.Increment(ref asyncTimeouts);
/// <summary>
/// Request all compatible clients to reconfigure or reconnect
......
......@@ -54,7 +54,7 @@ internal abstract class Message : ICompletable
private const CommandFlags AskingFlag = (CommandFlags)32,
ScriptUnavailableFlag = (CommandFlags)256,
TimedOutFlag = (CommandFlags)1024;
NeedsAsyncTimeoutCheckFlag = (CommandFlags)1024;
private const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster
| CommandFlags.DemandSlave
......@@ -619,20 +619,24 @@ internal void SetEnqueued()
internal void SetRequestSent()
{
Status = CommandStatus.Sent;
_writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that
if ((flags & NeedsAsyncTimeoutCheckFlag) != 0)
{
_writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that
}
performance?.SetRequestSent();
}
// the time (ticks) at which this message was considered written
private int _writeTickCount;
internal bool HasTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken)
private void SetNeedsTimeoutCheck() => flags |= NeedsAsyncTimeoutCheckFlag;
internal bool HasAsyncTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken)
{
if ((flags & TimedOutFlag) == 0)
if ((flags & NeedsAsyncTimeoutCheckFlag) != 0)
{
millisecondsTaken = unchecked(now - _writeTickCount); // note: we can't just check "if sent < cutoff" because of wrap-aro
if (millisecondsTaken >= timeoutMilliseconds)
{
flags |= TimedOutFlag; // note: we don't remove it from the queue - still might need to marry it up; but: it is toast
flags &= ~NeedsAsyncTimeoutCheckFlag; // note: we don't remove it from the queue - still might need to marry it up; but: it is toast
return true;
}
}
......@@ -666,12 +670,14 @@ internal void SetPreferSlave()
internal void SetSource(ResultProcessor resultProcessor, ResultBox resultBox)
{ // note order here reversed to prevent overload resolution errors
if (resultBox != null && resultBox.IsAsync) SetNeedsTimeoutCheck();
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
internal void SetSource<T>(ResultBox<T> resultBox, ResultProcessor<T> resultProcessor)
{
if (resultBox != null && resultBox.IsAsync) SetNeedsTimeoutCheck();
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
......
......@@ -498,14 +498,15 @@ internal void OnBridgeHeartbeat()
{
bool includeDetail = Multiplexer.IncludeDetailInExceptions;
var server = Bridge.ServerEndPoint;
var timeout = Multiplexer.TimeoutMilliseconds;
var timeout = Multiplexer.AsyncTimeoutMilliseconds;
foreach (var msg in _writtenAwaitingResponse)
{
if (msg.HasTimedOut(now, timeout, out var elapsed))
if (msg.HasAsyncTimedOut(now, timeout, out var elapsed))
{
var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
msg.SetException(timeoutEx); // tell the message that it is doomed
Bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc
Multiplexer.OnAsyncTimeout();
}
// note: it is important that we **do not** remove the message unless we're tearing down the socket; that
// would disrupt the chain for MatchResult; we just pre-emptively abort the message from the caller's
......
......@@ -8,6 +8,7 @@ namespace StackExchange.Redis
internal abstract partial class ResultBox
{
protected Exception _exception;
public abstract bool IsAsync { get; }
public void SetException(Exception exception) => _exception = exception;
......@@ -82,6 +83,8 @@ public void SetResult(T value)
this.value = value;
}
public override bool IsAsync => stateOrCompletionSource is TaskCompletionSource<T>;
public override bool TryComplete(bool isAsync)
{
if (stateOrCompletionSource is TaskCompletionSource<T> tcs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment