Commit e887b3b3 authored by Pavel Pochobut's avatar Pavel Pochobut

fix for #124 - Azure timeouts/disconnection detection

parent 3bf6cba7
...@@ -98,6 +98,35 @@ public void FastNoticesFailOnConnectingAsync() ...@@ -98,6 +98,35 @@ public void FastNoticesFailOnConnectingAsync()
ClearAmbientFailures(); ClearAmbientFailures();
} }
} }
[Test]
public void ReconnectsOnStaleConnection()
{
try
{
using (var muxer = Create(keepAlive: 1, connectTimeout: 3000))
{
var conn = muxer.GetDatabase();
conn.Ping();
Assert.IsTrue(muxer.IsConnected);
PhysicalConnection.EmulateStaleConnection = true;
Thread.Sleep(500);
Assert.IsFalse(muxer.IsConnected);
PhysicalConnection.EmulateStaleConnection = false;
Thread.Sleep(1000);
Assert.IsTrue(muxer.IsConnected);
}
}
finally
{
PhysicalConnection.EmulateStaleConnection = false;
ClearAmbientFailures();
}
}
#endif #endif
} }
} }
...@@ -260,6 +260,26 @@ bool ISocketCallback.IgnoreConnect ...@@ -260,6 +260,26 @@ bool ISocketCallback.IgnoreConnect
{ {
get { return multiplexer.IgnoreConnect; } get { return multiplexer.IgnoreConnect; }
} }
private volatile static bool emulateStaleConnection;
public static bool EmulateStaleConnection
{ get
{
return emulateStaleConnection;
}
set
{
emulateStaleConnection = value;
}
}
partial void DebugEmulateStaleConnection(ref int lastRead)
{
if (emulateStaleConnection)
{
lastRead -= 100500;
}
}
} }
#endif #endif
......
...@@ -953,5 +953,21 @@ RawResult TryParseResult(byte[] buffer, ref int offset, ref int count) ...@@ -953,5 +953,21 @@ RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType); throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType);
} }
} }
partial void DebugEmulateStaleConnection(ref int lastWrite);
public void CheckForStaleConnection()
{
int lastRead, lastWrite;
lastRead = Thread.VolatileRead(ref this.lastReadTickCount);
lastWrite = Thread.VolatileRead(ref this.lastWriteTickCount);
DebugEmulateStaleConnection(ref lastRead);
if ((lastWrite - lastRead) > this.multiplexer.RawConfig.SyncTimeout)
{
this.RecordConnectionFailed(ConnectionFailureType.SocketFailure, origin: "CheckForStaleConnection");
}
}
} }
} }
...@@ -178,6 +178,7 @@ internal ManagerState State ...@@ -178,6 +178,7 @@ internal ManagerState State
private void ReadImpl() private void ReadImpl()
{ {
List<IntPtr> dead = null, active = new List<IntPtr>(); List<IntPtr> dead = null, active = new List<IntPtr>();
List<ISocketCallback> activeCallbacks = new List<ISocketCallback>();
IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers; IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers;
long lastHeartbeat = Environment.TickCount; long lastHeartbeat = Environment.TickCount;
SocketPair[] allSocketPairs = null; SocketPair[] allSocketPairs = null;
...@@ -185,6 +186,7 @@ private void ReadImpl() ...@@ -185,6 +186,7 @@ private void ReadImpl()
{ {
managerState = ManagerState.CheckForHeartbeat; managerState = ManagerState.CheckForHeartbeat;
active.Clear(); active.Clear();
activeCallbacks.Clear();
if (dead != null) dead.Clear(); if (dead != null) dead.Clear();
// this check is actually a pace-maker; sometimes the Timer callback stalls for // this check is actually a pace-maker; sometimes the Timer callback stalls for
...@@ -227,6 +229,7 @@ private void ReadImpl() ...@@ -227,6 +229,7 @@ private void ReadImpl()
if (pair.Value.Socket.Connected) if (pair.Value.Socket.Connected)
{ {
active.Add(pair.Key); active.Add(pair.Key);
activeCallbacks.Add(pair.Value.Callback);
} }
else else
{ {
...@@ -267,6 +270,13 @@ private void ReadImpl() ...@@ -267,6 +270,13 @@ private void ReadImpl()
ready = select(0, readSockets, null, errorSockets, ref timeout); ready = select(0, readSockets, null, errorSockets, ref timeout);
if (ready <= 0) if (ready <= 0)
{ {
if (ready == 0)
{
foreach (var s in activeCallbacks)
{
s.CheckForStaleConnection();
}
}
continue; // -ve typically means a socket was disposed just before; just retry continue; // -ve typically means a socket was disposed just before; just retry
} }
......
...@@ -39,6 +39,9 @@ internal partial interface ISocketCallback ...@@ -39,6 +39,9 @@ internal partial interface ISocketCallback
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously /// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary> /// </summary>
void StartReading(); void StartReading();
// check for write-read timeout
void CheckForStaleConnection();
} }
internal struct SocketToken internal struct SocketToken
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment