Commit 838305e7 authored by Marc Gravell's avatar Marc Gravell

Stronger detection of failing connects

parent 460bf46a
using NUnit.Framework;
using System.Threading;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class ConnectFailTimeout : TestBase
{
[TestCase]
public void NoticesConnectFail()
{
SetExpectedAmbientFailureCount(-1);
using (var conn = Create(allowAdmin: true))
{
var server = conn.GetServer(conn.GetEndPoints()[0]);
conn.IgnoreConnect = true;
conn.ConnectionFailed += (s,a) => {
System.Console.WriteLine("Disconnected: " + EndPointCollection.ToString(a.EndPoint));
};
conn.ConnectionRestored += (s,a) => {
System.Console.WriteLine("Reconnected: " + EndPointCollection.ToString(a.EndPoint));
};
server.SimulateConnectionFailure();
Thread.Sleep(2000);
try
{
server.Ping();
Assert.Fail("Did not expect PING to succeed");
} catch(RedisConnectionException) { /* expected */ }
conn.IgnoreConnect = false;
Thread.Sleep(2000);
var time = server.Ping();
System.Console.WriteLine(time);
}
}
}
}
...@@ -65,6 +65,7 @@ ...@@ -65,6 +65,7 @@
<Compile Include="Bits.cs" /> <Compile Include="Bits.cs" />
<Compile Include="Cluster.cs" /> <Compile Include="Cluster.cs" />
<Compile Include="Commands.cs" /> <Compile Include="Commands.cs" />
<Compile Include="ConnectFailTimeout.cs" />
<Compile Include="ConnectionShutdown.cs" /> <Compile Include="ConnectionShutdown.cs" />
<Compile Include="Databases.cs" /> <Compile Include="Databases.cs" />
<Compile Include="DefaultPorts.cs" /> <Compile Include="DefaultPorts.cs" />
......
...@@ -849,7 +849,7 @@ private void OnHeartbeat() ...@@ -849,7 +849,7 @@ private void OnHeartbeat()
{ {
try try
{ {
long now = Environment.TickCount; int now = Environment.TickCount;
Interlocked.Exchange(ref lastHeartbeatTicks, now); Interlocked.Exchange(ref lastHeartbeatTicks, now);
Interlocked.Exchange(ref lastGlobalHeartbeatTicks, now); Interlocked.Exchange(ref lastGlobalHeartbeatTicks, now);
Trace("heartbeat"); Trace("heartbeat");
...@@ -863,16 +863,16 @@ private void OnHeartbeat() ...@@ -863,16 +863,16 @@ private void OnHeartbeat()
} }
} }
private long lastHeartbeatTicks; private int lastHeartbeatTicks;
private static long lastGlobalHeartbeatTicks = Environment.TickCount; private static int lastGlobalHeartbeatTicks = Environment.TickCount;
internal long LastHeartbeatSecondsAgo { internal long LastHeartbeatSecondsAgo {
get { get {
if (pulse == null) return -1; if (pulse == null) return -1;
return unchecked(Environment.TickCount - Interlocked.Read(ref lastHeartbeatTicks)) / 1000; return unchecked(Environment.TickCount - Thread.VolatileRead(ref lastHeartbeatTicks)) / 1000;
} }
} }
internal static long LastGlobalHeartbeatSecondsAgo internal static long LastGlobalHeartbeatSecondsAgo
{ get { return unchecked(Environment.TickCount - Interlocked.Read(ref lastGlobalHeartbeatTicks)) / 1000; } } { get { return unchecked(Environment.TickCount - Thread.VolatileRead(ref lastGlobalHeartbeatTicks)) / 1000; } }
internal CompletionManager UnprocessableCompletionManager { get { return unprocessableCompletionManager; } } internal CompletionManager UnprocessableCompletionManager { get { return unprocessableCompletionManager; } }
......
...@@ -176,7 +176,24 @@ public static long GetAsyncCompletionWorkerCount() ...@@ -176,7 +176,24 @@ public static long GetAsyncCompletionWorkerCount()
/// For debugging; when not enabled, servers cannot connect /// For debugging; when not enabled, servers cannot connect
/// </summary> /// </summary>
public bool AllowConnect { get { return allowConnect; } set { allowConnect = value; } } public bool AllowConnect { get { return allowConnect; } set { allowConnect = value; } }
private volatile bool allowConnect = true; private volatile bool allowConnect = true, ignoreConnect = false;
/// <summary>
/// For debugging; when not enabled, end-connect is silently ignored (to simulate a long-running connect)
/// </summary>
public bool IgnoreConnect { get { return ignoreConnect; } set { ignoreConnect = value; } }
}
partial class SocketManager
{
partial void ShouldIgnoreConnect(ISocketCallback callback, ref bool ignore)
{
ignore = callback.IgnoreConnect;
}
}
partial interface ISocketCallback
{
bool IgnoreConnect { get; }
} }
partial class MessageQueue partial class MessageQueue
...@@ -227,6 +244,11 @@ partial class PhysicalConnection ...@@ -227,6 +244,11 @@ partial class PhysicalConnection
throw new RedisConnectionException(ConnectionFailureType.InternalFailure, "debugging"); throw new RedisConnectionException(ConnectionFailureType.InternalFailure, "debugging");
} }
} }
bool ISocketCallback.IgnoreConnect
{
get { return multiplexer.IgnoreConnect; }
}
} }
#endif #endif
......
...@@ -363,6 +363,7 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -363,6 +363,7 @@ internal void OnFullyEstablished(PhysicalConnection connection)
} }
} }
private int connectStartTicks;
internal void OnHeartbeat(bool ifConnectedOnly) internal void OnHeartbeat(bool ifConnectedOnly)
{ {
bool runThisTime = false; bool runThisTime = false;
...@@ -378,6 +379,24 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -378,6 +379,24 @@ internal void OnHeartbeat(bool ifConnectedOnly)
switch (state) switch (state)
{ {
case (int)State.Connecting:
int connectTimeMilliseconds = unchecked(Environment.TickCount - Thread.VolatileRead(ref connectStartTicks));
if (connectTimeMilliseconds >= multiplexer.RawConfig.ConnectTimeout)
{
Trace("Aborting connect");
// abort and reconnect
var snapshot = physical;
bool isCurrent;
State oldState;
OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out isCurrent, out oldState);
using (snapshot) { } // dispose etc
TryConnect();
}
if (!ifConnectedOnly)
{
AbortUnsent();
}
break;
case (int)State.ConnectedEstablishing: case (int)State.ConnectedEstablishing:
case (int)State.ConnectedEstablished: case (int)State.ConnectedEstablished:
var tmp = physical; var tmp = physical;
...@@ -416,7 +435,6 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -416,7 +435,6 @@ internal void OnHeartbeat(bool ifConnectedOnly)
GetConnection(); GetConnection();
} }
break; break;
case (int)State.Connecting:
default: default:
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
...@@ -666,6 +684,7 @@ private PhysicalConnection GetConnection() ...@@ -666,6 +684,7 @@ private PhysicalConnection GetConnection()
if (ChangeState(State.Disconnected, State.Connecting)) if (ChangeState(State.Disconnected, State.Connecting))
{ {
Interlocked.Increment(ref socketCount); Interlocked.Increment(ref socketCount);
Interlocked.Exchange(ref connectStartTicks, Environment.TickCount);
physical = new PhysicalConnection(this); physical = new PhysicalConnection(this);
} }
} }
......
...@@ -62,7 +62,7 @@ private static readonly Message ...@@ -62,7 +62,7 @@ private static readonly Message
int ioBufferBytes = 0; int ioBufferBytes = 0;
long lastWriteTickCount, lastReadTickCount, lastBeatTickCount; int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private Stream netStream, outStream; private Stream netStream, outStream;
...@@ -99,7 +99,7 @@ public long LastWriteSecondsAgo ...@@ -99,7 +99,7 @@ public long LastWriteSecondsAgo
{ {
get get
{ {
return unchecked(Environment.TickCount - Interlocked.Read(ref lastWriteTickCount)) / 1000; return unchecked(Environment.TickCount - Thread.VolatileRead(ref lastWriteTickCount)) / 1000;
} }
} }
...@@ -161,8 +161,8 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -161,8 +161,8 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
{ {
//try //try
//{ //{
long now = Environment.TickCount, lastRead = Interlocked.Read(ref lastReadTickCount), lastWrite = Interlocked.Read(ref lastWriteTickCount), int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount),
lastBeat = Interlocked.Read(ref lastBeatTickCount); lastBeat = Thread.VolatileRead(ref lastBeatTickCount);
string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount() + ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount()
......
...@@ -18,7 +18,7 @@ internal enum SocketMode ...@@ -18,7 +18,7 @@ internal enum SocketMode
/// <summary> /// <summary>
/// Allows callbacks from SocketManager as work is discovered /// Allows callbacks from SocketManager as work is discovered
/// </summary> /// </summary>
internal interface ISocketCallback internal partial interface ISocketCallback
{ {
/// <summary> /// <summary>
/// Indicates that a socket has connected /// Indicates that a socket has connected
...@@ -124,7 +124,12 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback) ...@@ -124,7 +124,12 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
socket.NoDelay = true; socket.NoDelay = true;
try try
{ {
socket.BeginConnect(endpoint, EndConnect, Tuple.Create(socket, callback)); var ar = socket.BeginConnect(endpoint, EndConnect, Tuple.Create(socket, callback));
if (ar.CompletedSynchronously)
{
ConnectionMultiplexer.TraceWithoutContext("EndConnect (sync)");
EndConnectImpl(ar);
}
} catch (NotImplementedException ex) } catch (NotImplementedException ex)
{ {
if (!(endpoint is IPEndPoint)) if (!(endpoint is IPEndPoint))
...@@ -180,11 +185,22 @@ internal void Shutdown(SocketToken token) ...@@ -180,11 +185,22 @@ internal void Shutdown(SocketToken token)
} }
private void EndConnect(IAsyncResult ar) private void EndConnect(IAsyncResult ar)
{
if (!ar.CompletedSynchronously)
{
ConnectionMultiplexer.TraceWithoutContext("EndConnect (async)");
EndConnectImpl(ar);
}
}
private void EndConnectImpl(IAsyncResult ar)
{ {
Tuple<Socket, ISocketCallback> tuple = null; Tuple<Socket, ISocketCallback> tuple = null;
try try
{ {
tuple = (Tuple<Socket, ISocketCallback>)ar.AsyncState; tuple = (Tuple<Socket, ISocketCallback>)ar.AsyncState;
bool ignoreConnect = false;
ShouldIgnoreConnect(tuple.Item2, ref ignoreConnect);
if (ignoreConnect) return;
var socket = tuple.Item1; var socket = tuple.Item1;
var callback = tuple.Item2; var callback = tuple.Item2;
socket.EndConnect(ar); socket.EndConnect(ar);
...@@ -193,28 +209,35 @@ private void EndConnect(IAsyncResult ar) ...@@ -193,28 +209,35 @@ private void EndConnect(IAsyncResult ar)
switch (socketMode) switch (socketMode)
{ {
case SocketMode.Poll: case SocketMode.Poll:
ConnectionMultiplexer.TraceWithoutContext("Starting poll");
OnAddRead(socket, callback); OnAddRead(socket, callback);
break; break;
case SocketMode.Async: case SocketMode.Async:
ConnectionMultiplexer.TraceWithoutContext("Starting read");
try try
{ callback.StartReading(); } { callback.StartReading(); }
catch catch (Exception ex)
{ Shutdown(socket); } {
ConnectionMultiplexer.TraceWithoutContext(ex.Message);
Shutdown(socket);
}
break; break;
default: default:
ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown(socket); Shutdown(socket);
break; break;
} }
} }
catch catch(Exception outer)
{ {
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
if (tuple != null) if (tuple != null)
{ {
try try
{ tuple.Item2.Error(); } { tuple.Item2.Error(); }
catch (Exception ex) catch (Exception inner)
{ {
Trace.WriteLine(ex); ConnectionMultiplexer.TraceWithoutContext(inner.Message);
} }
} }
} }
...@@ -223,6 +246,8 @@ private void EndConnect(IAsyncResult ar) ...@@ -223,6 +246,8 @@ private void EndConnect(IAsyncResult ar)
partial void OnDispose(); partial void OnDispose();
partial void OnShutdown(Socket socket); partial void OnShutdown(Socket socket);
partial void ShouldIgnoreConnect(ISocketCallback callback, ref bool ignore);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")] [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown(Socket socket) private void Shutdown(Socket socket)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment