Commit 8739aca9 authored by Marc Gravell's avatar Marc Gravell

Fix: Issue 1 - connection failed/restored now more reliable

parent ff30e3cb
...@@ -104,7 +104,7 @@ private void Muxer_ErrorMessage(object sender, RedisErrorEventArgs e) ...@@ -104,7 +104,7 @@ private void Muxer_ErrorMessage(object sender, RedisErrorEventArgs e)
Log(e.EndPoint + ": " + e.Message); Log(e.EndPoint + ": " + e.Message);
} }
private void Muxer_ConnectionRestored(object sender, EndPointEventArgs e) private void Muxer_ConnectionRestored(object sender, ConnectionFailedEventArgs e)
{ {
Log("Endpoint restored: " + e.EndPoint); Log("Endpoint restored: " + e.EndPoint);
} }
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class ConnectionShutdown : TestBase
{
protected override string GetConfiguration()
{
return PrimaryServer + ":" + PrimaryPortString;
}
[Test]
public void ShutdownRaisesConnectionFailedAndRestore()
{
using(var conn = Create(allowAdmin: true))
{
int failed = 0, restored = 0;
Stopwatch watch = Stopwatch.StartNew();
conn.ConnectionFailed += (sender,args)=>
{
Console.WriteLine(watch.Elapsed + ": failed: " + EndPointCollection.ToString(args.EndPoint) + "/" + args.ConnectionType);
Interlocked.Increment(ref failed);
};
conn.ConnectionRestored += (sender, args) =>
{
Console.WriteLine(watch.Elapsed + ": restored: " + EndPointCollection.ToString(args.EndPoint) + "/" + args.ConnectionType);
Interlocked.Increment(ref restored);
};
var db = conn.GetDatabase();
db.Ping();
Assert.AreEqual(0, Interlocked.CompareExchange(ref failed, 0, 0));
Assert.AreEqual(0, Interlocked.CompareExchange(ref restored, 0, 0));
conn.AllowConnect = false;
var server = (IRedisServerDebug)conn.GetServer(PrimaryServer, PrimaryPort);
SetExpectedAmbientFailureCount(2);
server.SimulateConnectionFailure();
db.Ping(CommandFlags.FireAndForget);
Thread.Sleep(250);
Assert.AreEqual(2, Interlocked.CompareExchange(ref failed, 0, 0), "failed");
Assert.AreEqual(0, Interlocked.CompareExchange(ref restored, 0, 0), "restored");
conn.AllowConnect = true;
db.Ping(CommandFlags.FireAndForget);
Thread.Sleep(1500);
Assert.AreEqual(2, Interlocked.CompareExchange(ref failed, 0, 0), "failed");
Assert.AreEqual(2, Interlocked.CompareExchange(ref restored, 0, 0), "restored");
watch.Stop();
}
}
}
}
...@@ -63,6 +63,7 @@ ...@@ -63,6 +63,7 @@
<Compile Include="BasicOps.cs" /> <Compile Include="BasicOps.cs" />
<Compile Include="Cluster.cs" /> <Compile Include="Cluster.cs" />
<Compile Include="Commands.cs" /> <Compile Include="Commands.cs" />
<Compile Include="ConnectionShutdown.cs" />
<Compile Include="Databases.cs" /> <Compile Include="Databases.cs" />
<Compile Include="Expiry.cs" /> <Compile Include="Expiry.cs" />
<Compile Include="Config.cs" /> <Compile Include="Config.cs" />
......
...@@ -10,15 +10,17 @@ namespace StackExchange.Redis ...@@ -10,15 +10,17 @@ namespace StackExchange.Redis
public sealed class ConnectionFailedEventArgs : EventArgs, ICompletable public sealed class ConnectionFailedEventArgs : EventArgs, ICompletable
{ {
private readonly EndPoint endpoint; private readonly EndPoint endpoint;
private readonly ConnectionType connectionType;
private readonly Exception exception; private readonly Exception exception;
private readonly ConnectionFailureType failureType; private readonly ConnectionFailureType failureType;
private readonly EventHandler<ConnectionFailedEventArgs> handler; private readonly EventHandler<ConnectionFailedEventArgs> handler;
private readonly object sender; private readonly object sender;
internal ConnectionFailedEventArgs(EventHandler<ConnectionFailedEventArgs> handler, object sender, EndPoint endPoint, ConnectionFailureType failureType, Exception exception) internal ConnectionFailedEventArgs(EventHandler<ConnectionFailedEventArgs> handler, object sender, EndPoint endPoint, ConnectionType connectionType, ConnectionFailureType failureType, Exception exception)
{ {
this.handler = handler; this.handler = handler;
this.sender = sender; this.sender = sender;
this.endpoint = endPoint; this.endpoint = endPoint;
this.connectionType = connectionType;
this.exception = exception; this.exception = exception;
this.failureType = failureType; this.failureType = failureType;
} }
...@@ -31,6 +33,14 @@ public EndPoint EndPoint ...@@ -31,6 +33,14 @@ public EndPoint EndPoint
get { return endpoint; } get { return endpoint; }
} }
/// <summary>
/// Gets the connection-type of the failing connection
/// </summary>
public ConnectionType ConnectionType
{
get { return connectionType; }
}
/// <summary> /// <summary>
/// Gets the exception if available (this can be null) /// Gets the exception if available (this can be null)
/// </summary> /// </summary>
......
...@@ -75,14 +75,14 @@ public string Configuration ...@@ -75,14 +75,14 @@ public string Configuration
get { return configuration.ToString(); } get { return configuration.ToString(); }
} }
internal void OnConnectionFailed(EndPoint endpoint, ConnectionFailureType failureType, Exception exception, bool reconfigure) internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionType, ConnectionFailureType failureType, Exception exception, bool reconfigure)
{ {
if (isDisposed) return; if (isDisposed) return;
var handler = ConnectionFailed; var handler = ConnectionFailed;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( unprocessableCompletionManager.CompleteSyncOrAsync(
new ConnectionFailedEventArgs(handler, this, endpoint, failureType, exception) new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception)
); );
} }
if (reconfigure) if (reconfigure)
...@@ -90,14 +90,14 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionFailureType failur ...@@ -90,14 +90,14 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionFailureType failur
ReconfigureIfNeeded(endpoint, false, "connection failed"); ReconfigureIfNeeded(endpoint, false, "connection failed");
} }
} }
internal void OnConnectionRestored(EndPoint endpoint) internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionType)
{ {
if (isDisposed) return; if (isDisposed) return;
var handler = ConnectionRestored; var handler = ConnectionRestored;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( unprocessableCompletionManager.CompleteSyncOrAsync(
new EndPointEventArgs(handler, this, endpoint) new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null)
); );
} }
ReconfigureIfNeeded(endpoint, false, "connection restored"); ReconfigureIfNeeded(endpoint, false, "connection restored");
...@@ -392,7 +392,7 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer) ...@@ -392,7 +392,7 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer)
/// <summary> /// <summary>
/// Raised whenever a physical connection is established /// Raised whenever a physical connection is established
/// </summary> /// </summary>
public event EventHandler<EndPointEventArgs> ConnectionRestored; public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored;
/// <summary> /// <summary>
/// Raised when configuration changes are detected /// Raised when configuration changes are detected
......
...@@ -7,7 +7,7 @@ namespace StackExchange.Redis ...@@ -7,7 +7,7 @@ namespace StackExchange.Redis
/// <summary> /// <summary>
/// Event information related to redis endpoints /// Event information related to redis endpoints
/// </summary> /// </summary>
public sealed class EndPointEventArgs : EventArgs, ICompletable public class EndPointEventArgs : EventArgs, ICompletable
{ {
private readonly EndPoint endpoint; private readonly EndPoint endpoint;
private readonly EventHandler<EndPointEventArgs> handler; private readonly EventHandler<EndPointEventArgs> handler;
......
...@@ -275,15 +275,15 @@ internal void OnConnected(PhysicalConnection connection) ...@@ -275,15 +275,15 @@ internal void OnConnected(PhysicalConnection connection)
internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException) internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException)
{ {
if (connection == physical && reportNextFailure) if (reportNextFailure)
{ {
reportNextFailure = false; // until it is restored reportNextFailure = false; // until it is restored
var endpoint = serverEndPoint.EndPoint; var endpoint = serverEndPoint.EndPoint;
multiplexer.OnConnectionFailed(endpoint, failureType, innerException, reconfigureNextFailure); multiplexer.OnConnectionFailed(endpoint, connectionType, failureType, innerException, reconfigureNextFailure);
} }
} }
internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnection connection) internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnection connection, out bool isCurrent)
{ {
Trace("OnDisconnected"); Trace("OnDisconnected");
...@@ -298,7 +298,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti ...@@ -298,7 +298,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
CompleteSyncOrAsync(ping); CompleteSyncOrAsync(ping);
} }
if (physical == connection) if (isCurrent = physical == connection)
{ {
Trace("Bridge noting disconnect from active connection" + (isDisposed ? " (disposed)" : "")); Trace("Bridge noting disconnect from active connection" + (isDisposed ? " (disposed)" : ""));
ChangeState(State.Disconnected); ChangeState(State.Disconnected);
...@@ -368,7 +368,8 @@ internal void OnHeartbeat() ...@@ -368,7 +368,8 @@ internal void OnHeartbeat()
} }
else else
{ {
OnDisconnected(ConnectionFailureType.SocketFailure, tmp); bool ignore;
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out ignore);
} }
} }
} }
......
...@@ -145,8 +145,10 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -145,8 +145,10 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
// stop anything new coming in... // stop anything new coming in...
bridge.Trace("Failed: " + failureType); bridge.Trace("Failed: " + failureType);
bridge.OnDisconnected(failureType, this); bool isCurrent;
if (Interlocked.CompareExchange(ref failureReported, 1, 0) == 0) bridge.OnDisconnected(failureType, this, out isCurrent);
if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0)
{ {
try try
{ {
......
...@@ -310,7 +310,7 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -310,7 +310,7 @@ internal void OnFullyEstablished(PhysicalConnection connection)
{ {
multiplexer.ResendSubscriptions(this); multiplexer.ResendSubscriptions(this);
} }
multiplexer.OnConnectionRestored(endpoint); multiplexer.OnConnectionRestored(endpoint, bridge.ConnectionType);
} }
catch (Exception ex) catch (Exception ex)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment