Commit 11e1482b authored by Marc Gravell's avatar Marc Gravell

perform asunc timeouts in bridge heartbeats

parent 267ec00d
using System.Linq; using System;
using System.Linq;
using System.Threading.Tasks;
using Xunit; using Xunit;
using Xunit.Abstractions; using Xunit.Abstractions;
...@@ -40,5 +42,22 @@ public void AsyncTasksReportFailureIfServerUnavailable() ...@@ -40,5 +42,22 @@ public void AsyncTasksReportFailureIfServerUnavailable()
} }
} }
#endif #endif
[Fact]
public async Task AsyncTimeoutIsNoticed()
{
using (var conn = Create(syncTimeout: 1000))
{
var db = conn.GetDatabase();
var ex = await Assert.ThrowsAsync<RedisTimeoutException>(async () =>
{
await db.ExecuteAsync("client", "pause", 2500); // client pause returns immediately
await db.PingAsync(); // but *subsequent* operations are paused
});
Assert.Contains("Timeout awaiting response", ex.Message);
Writer.WriteLine(ex.Message);
}
}
} }
} }
...@@ -62,5 +62,7 @@ public enum CommandFlags ...@@ -62,5 +62,7 @@ public enum CommandFlags
/// Indicates that script-related operations should use EVAL, not SCRIPT LOAD + EVALSHA /// Indicates that script-related operations should use EVAL, not SCRIPT LOAD + EVALSHA
/// </summary> /// </summary>
NoScriptCache = 512, NoScriptCache = 512,
// 1024: used for timed-out; never user-specified, so not visible on the public API
} }
} }
...@@ -49,10 +49,12 @@ internal abstract class Message : ICompletable ...@@ -49,10 +49,12 @@ internal abstract class Message : ICompletable
public readonly int Db; public readonly int Db;
internal const CommandFlags InternalCallFlag = (CommandFlags)128; internal const CommandFlags InternalCallFlag = (CommandFlags)128;
protected RedisCommand command; protected RedisCommand command;
private const CommandFlags AskingFlag = (CommandFlags)32, private const CommandFlags AskingFlag = (CommandFlags)32,
ScriptUnavailableFlag = (CommandFlags)256; ScriptUnavailableFlag = (CommandFlags)256,
TimedOutFlag = (CommandFlags)1024;
private const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster private const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster
| CommandFlags.DemandSlave | CommandFlags.DemandSlave
...@@ -612,11 +614,34 @@ internal void SetEnqueued() ...@@ -612,11 +614,34 @@ internal void SetEnqueued()
performance?.SetEnqueued(); performance?.SetEnqueued();
} }
internal void SetRequestSent() internal void SetRequestSent()
{ {
Status = CommandStatus.Sent; Status = CommandStatus.Sent;
_writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that
performance?.SetRequestSent(); performance?.SetRequestSent();
} }
// the time (ticks) at which this message was considered written
private int _writeTickCount;
internal bool HasTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken)
{
if ((flags & TimedOutFlag) == 0)
{
millisecondsTaken = unchecked(now - _writeTickCount); // note: we can't just check "if sent < cutoff" because of wrap-aro
if (millisecondsTaken >= timeoutMilliseconds)
{
flags |= TimedOutFlag; // note: we don't remove it from the queue - still might need to marry it up; but: it is toast
return true;
}
}
else
{
millisecondsTaken = default;
}
return false;
}
internal void SetAsking(bool value) internal void SetAsking(bool value)
{ {
......
...@@ -489,7 +489,30 @@ internal void GetStormLog(StringBuilder sb) ...@@ -489,7 +489,30 @@ internal void GetStormLog(StringBuilder sb)
internal void OnBridgeHeartbeat() internal void OnBridgeHeartbeat()
{ {
Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount); var now = Environment.TickCount;
Interlocked.Exchange(ref lastBeatTickCount, now);
lock (_writtenAwaitingResponse)
{
if (_writtenAwaitingResponse.Count != 0)
{
bool includeDetail = Multiplexer.IncludeDetailInExceptions;
var server = Bridge.ServerEndPoint;
var timeout = Multiplexer.TimeoutMilliseconds;
foreach (var msg in _writtenAwaitingResponse)
{
if (msg.HasTimedOut(now, timeout, out var elapsed))
{
var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
msg.SetException(timeoutEx); // tell the message that it is doomed
Bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc
}
// note: it is important that we **do not** remove the message unless we're tearing down the socket; that
// would disrupt the chain for MatchResult; we just pre-emptively abort the message from the caller's
// perspective, and set a flag on the message so we don't keep doing it
}
}
}
} }
internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null) internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
......
...@@ -7,20 +7,9 @@ namespace StackExchange.Redis ...@@ -7,20 +7,9 @@ namespace StackExchange.Redis
{ {
internal abstract partial class ResultBox internal abstract partial class ResultBox
{ {
protected Exception exception; protected Exception _exception;
public void SetException(Exception exception) public void SetException(Exception exception) => _exception = exception;
{
this.exception = exception;
//try
//{
// throw exception;
//}
//catch (Exception caught)
//{ // stacktrace etc
// this.exception = caught;
//}
}
public abstract bool TryComplete(bool isAsync); public abstract bool TryComplete(bool isAsync);
...@@ -75,9 +64,9 @@ public static void UnwrapAndRecycle(ResultBox<T> box, bool recycle, out T value, ...@@ -75,9 +64,9 @@ public static void UnwrapAndRecycle(ResultBox<T> box, bool recycle, out T value,
else else
{ {
value = box.value; value = box.value;
exception = box.exception; exception = box._exception;
box.value = default(T); box.value = default(T);
box.exception = null; box._exception = null;
if (recycle) if (recycle)
{ {
for (int i = 0; i < store.Length; i++) for (int i = 0; i < store.Length; i++)
...@@ -134,7 +123,7 @@ public override bool TryComplete(bool isAsync) ...@@ -134,7 +123,7 @@ public override bool TryComplete(bool isAsync)
private void Reset(object stateOrCompletionSource) private void Reset(object stateOrCompletionSource)
{ {
value = default(T); value = default(T);
exception = null; _exception = null;
this.stateOrCompletionSource = stateOrCompletionSource; this.stateOrCompletionSource = stateOrCompletionSource;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment