Commit dbe1c4d0 authored by Marc Gravell's avatar Marc Gravell

Merge pull request #127 from PashaPash/stale-connection-fix

Azure stale connection detection
parents 57f1c3b9 d29f54fb
...@@ -8,7 +8,7 @@ namespace StackExchange.Redis.Tests ...@@ -8,7 +8,7 @@ namespace StackExchange.Redis.Tests
public class ConnectingFailDetection : TestBase public class ConnectingFailDetection : TestBase
{ {
#if DEBUG #if DEBUG
[TestCase] [Test]
public void FastNoticesFailOnConnectingSync() public void FastNoticesFailOnConnectingSync()
{ {
try try
...@@ -34,16 +34,15 @@ public void FastNoticesFailOnConnectingSync() ...@@ -34,16 +34,15 @@ public void FastNoticesFailOnConnectingSync()
Assert.IsTrue(muxer.IsConnected); Assert.IsTrue(muxer.IsConnected);
} }
ClearAmbientFailures();
} }
finally finally
{ {
SocketManager.ConnectCompletionType = CompletionType.Any; SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
} }
} }
[TestCase] [Test]
public void ConnectsWhenBeginConnectCompletesSynchronously() public void ConnectsWhenBeginConnectCompletesSynchronously()
{ {
try try
...@@ -57,16 +56,15 @@ public void ConnectsWhenBeginConnectCompletesSynchronously() ...@@ -57,16 +56,15 @@ public void ConnectsWhenBeginConnectCompletesSynchronously()
Assert.IsTrue(muxer.IsConnected); Assert.IsTrue(muxer.IsConnected);
} }
ClearAmbientFailures();
} }
finally finally
{ {
SocketManager.ConnectCompletionType = CompletionType.Any; SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
} }
} }
[TestCase] [Test]
public void FastNoticesFailOnConnectingAsync() public void FastNoticesFailOnConnectingAsync()
{ {
try try
...@@ -92,15 +90,43 @@ public void FastNoticesFailOnConnectingAsync() ...@@ -92,15 +90,43 @@ public void FastNoticesFailOnConnectingAsync()
Thread.Sleep(2000); Thread.Sleep(2000);
Assert.IsTrue(muxer.IsConnected); Assert.IsTrue(muxer.IsConnected);
ClearAmbientFailures();
} }
} }
finally finally
{ {
SocketManager.ConnectCompletionType = CompletionType.Any; SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
[Test]
public void ReconnectsOnStaleConnection()
{
try
{
using (var muxer = Create(keepAlive: 1, connectTimeout: 3000))
{
var conn = muxer.GetDatabase();
conn.Ping();
Assert.IsTrue(muxer.IsConnected);
PhysicalConnection.EmulateStaleConnection = true;
Thread.Sleep(500);
Assert.IsFalse(muxer.IsConnected);
PhysicalConnection.EmulateStaleConnection = false;
Thread.Sleep(1000);
Assert.IsTrue(muxer.IsConnected);
}
}
finally
{
PhysicalConnection.EmulateStaleConnection = false;
ClearAmbientFailures();
} }
} }
#endif #endif
} }
} }
...@@ -260,6 +260,26 @@ bool ISocketCallback.IgnoreConnect ...@@ -260,6 +260,26 @@ bool ISocketCallback.IgnoreConnect
{ {
get { return multiplexer.IgnoreConnect; } get { return multiplexer.IgnoreConnect; }
} }
private volatile static bool emulateStaleConnection;
public static bool EmulateStaleConnection
{ get
{
return emulateStaleConnection;
}
set
{
emulateStaleConnection = value;
}
}
partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite)
{
if (emulateStaleConnection)
{
firstUnansweredWrite = Environment.TickCount - 100000;
}
}
} }
#endif #endif
......
...@@ -679,23 +679,6 @@ private bool ChangeState(State oldState, State newState) ...@@ -679,23 +679,6 @@ private bool ChangeState(State oldState, State newState)
return result; return result;
} }
private void Flush()
{
var tmp = physical;
if (tmp != null)
{
try
{
Trace(connectionType + " flushed");
tmp.Flush();
}
catch (Exception ex)
{
OnInternalError(ex);
}
}
}
private PhysicalConnection GetConnection() private PhysicalConnection GetConnection()
{ {
if (state == (int)State.Disconnected) if (state == (int)State.Disconnected)
......
...@@ -64,6 +64,7 @@ private static readonly Message ...@@ -64,6 +64,7 @@ private static readonly Message
int ioBufferBytes = 0; int ioBufferBytes = 0;
int lastWriteTickCount, lastReadTickCount, lastBeatTickCount; int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
int firstUnansweredWriteTickCount;
private Stream netStream, outStream; private Stream netStream, outStream;
...@@ -85,6 +86,7 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -85,6 +86,7 @@ public PhysicalConnection(PhysicalBridge bridge)
public void BeginConnect() public void BeginConnect()
{ {
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var endpoint = this.bridge.ServerEndPoint.EndPoint; var endpoint = this.bridge.ServerEndPoint.EndPoint;
multiplexer.Trace("Connecting...", physicalName); multiplexer.Trace("Connecting...", physicalName);
...@@ -164,14 +166,15 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -164,14 +166,15 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0) if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0)
{ {
//try
//{
int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount), int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount),
lastBeat = Thread.VolatileRead(ref lastBeatTickCount); lastBeat = Thread.VolatileRead(ref lastBeatTickCount);
int unansweredRead = Thread.VolatileRead(ref firstUnansweredWriteTickCount);
string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount() + ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount()
+ ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: " + ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago"
+ ", unanswered-write: " + unchecked(now - unansweredRead) / 1000 + "s ago"
+ ", keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: "
+ bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago")) + bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago"))
+ (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: " + (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: "
+ ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago"; + ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago";
...@@ -181,13 +184,6 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -181,13 +184,6 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
: new RedisConnectionException(failureType, message, innerException); : new RedisConnectionException(failureType, message, innerException);
bridge.OnConnectionFailed(this, failureType, ex); bridge.OnConnectionFailed(this, failureType, ex);
// throw ex;
//}
//catch (Exception caught)
//{
// bridge.OnConnectionFailed(this, failureType, caught);
//}
} }
// cleanup // cleanup
...@@ -390,6 +386,10 @@ internal void WriteHeader(RedisCommand command, int arguments) ...@@ -390,6 +386,10 @@ internal void WriteHeader(RedisCommand command, int arguments)
throw ExceptionFactory.CommandDisabled(multiplexer.IncludeDetailInExceptions, command, null, bridge.ServerEndPoint); throw ExceptionFactory.CommandDisabled(multiplexer.IncludeDetailInExceptions, command, null, bridge.ServerEndPoint);
} }
outStream.WriteByte((byte)'*'); outStream.WriteByte((byte)'*');
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
WriteRaw(outStream, arguments + 1); WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes); WriteUnified(outStream, commandBytes);
} }
...@@ -825,6 +825,10 @@ private bool ProcessReadBytes(int bytesRead) ...@@ -825,6 +825,10 @@ private bool ProcessReadBytes(int bytesRead)
} }
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount); Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
// reset unanswered write timestamp
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
ioBufferBytes += bytesRead; ioBufferBytes += bytesRead;
multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName); multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes; int offset = 0, count = ioBufferBytes;
...@@ -962,5 +966,22 @@ RawResult TryParseResult(byte[] buffer, ref int offset, ref int count) ...@@ -962,5 +966,22 @@ RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType); throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType);
} }
} }
partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite);
public void CheckForStaleConnection()
{
int firstUnansweredWrite;
firstUnansweredWrite = Thread.VolatileRead(ref firstUnansweredWriteTickCount);
DebugEmulateStaleConnection(ref firstUnansweredWrite);
int now = Environment.TickCount;
if (firstUnansweredWrite != 0 && (now - firstUnansweredWrite) > this.multiplexer.RawConfig.SyncTimeout)
{
this.RecordConnectionFailed(ConnectionFailureType.SocketFailure, origin: "CheckForStaleConnection");
}
}
} }
} }
...@@ -178,6 +178,7 @@ internal ManagerState State ...@@ -178,6 +178,7 @@ internal ManagerState State
private void ReadImpl() private void ReadImpl()
{ {
List<IntPtr> dead = null, active = new List<IntPtr>(); List<IntPtr> dead = null, active = new List<IntPtr>();
List<ISocketCallback> activeCallbacks = new List<ISocketCallback>();
IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers; IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers;
long lastHeartbeat = Environment.TickCount; long lastHeartbeat = Environment.TickCount;
SocketPair[] allSocketPairs = null; SocketPair[] allSocketPairs = null;
...@@ -185,6 +186,7 @@ private void ReadImpl() ...@@ -185,6 +186,7 @@ private void ReadImpl()
{ {
managerState = ManagerState.CheckForHeartbeat; managerState = ManagerState.CheckForHeartbeat;
active.Clear(); active.Clear();
activeCallbacks.Clear();
if (dead != null) dead.Clear(); if (dead != null) dead.Clear();
// this check is actually a pace-maker; sometimes the Timer callback stalls for // this check is actually a pace-maker; sometimes the Timer callback stalls for
...@@ -227,6 +229,7 @@ private void ReadImpl() ...@@ -227,6 +229,7 @@ private void ReadImpl()
if (pair.Value.Socket.Connected) if (pair.Value.Socket.Connected)
{ {
active.Add(pair.Key); active.Add(pair.Key);
activeCallbacks.Add(pair.Value.Callback);
} }
else else
{ {
...@@ -267,6 +270,13 @@ private void ReadImpl() ...@@ -267,6 +270,13 @@ private void ReadImpl()
ready = select(0, readSockets, null, errorSockets, ref timeout); ready = select(0, readSockets, null, errorSockets, ref timeout);
if (ready <= 0) if (ready <= 0)
{ {
if (ready == 0)
{
foreach (var s in activeCallbacks)
{
s.CheckForStaleConnection();
}
}
continue; // -ve typically means a socket was disposed just before; just retry continue; // -ve typically means a socket was disposed just before; just retry
} }
......
...@@ -39,6 +39,9 @@ internal partial interface ISocketCallback ...@@ -39,6 +39,9 @@ internal partial interface ISocketCallback
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously /// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary> /// </summary>
void StartReading(); void StartReading();
// check for write-read timeout
void CheckForStaleConnection();
} }
internal struct SocketToken internal struct SocketToken
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment