Commit 9c0adcbd authored by Marc Gravell's avatar Marc Gravell

Automatic retry during connect (configurable)

parent ef618429
......@@ -6,6 +6,7 @@ namespace StackExchange.Redis.Tests
[TestFixture]
public class ConnectFailTimeout : TestBase
{
#if DEBUG
[TestCase]
public void NoticesConnectFail()
{
......@@ -34,6 +35,6 @@ public void NoticesConnectFail()
System.Console.WriteLine(time);
}
}
#endif
}
}
......@@ -99,6 +99,7 @@
<Compile Include="TaskTests.cs" />
<Compile Include="TestBase.cs" />
<Compile Include="Transactions.cs" />
<Compile Include="VPNTest.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
......
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class VPNTest : TestBase
{
[Test]
[MaxTime(100000)]
[TestCase("or-devredis01.ds.stackexchange.com:6379")]
public void Execute(string config)
{
for (int i = 0; i < 50; i++)
{
var log = new StringWriter();
try
{
var options = ConfigurationOptions.Parse(config);
options.SyncTimeout = 3000;
options.ConnectRetry = 5;
using (var conn = ConnectionMultiplexer.Connect(options, log))
{
var ttl = conn.GetDatabase().Ping();
Console.WriteLine(ttl);
}
}
catch
{
Console.WriteLine(log);
Assert.Fail();
}
Console.WriteLine();
Console.WriteLine("===");
Console.WriteLine();
}
}
}
}
......@@ -72,7 +72,7 @@ internal static void Unknown(string key)
Version = "version", ConnectTimeout = "connectTimeout", Password = "password",
TieBreaker = "tiebreaker", WriteBuffer = "writeBuffer", Ssl = "ssl", SslHost = "sslHost",
ConfigChannel = "configChannel", AbortOnConnectFail = "abortConnect", ResolveDns = "resolveDns",
ChannelPrefix = "channelPrefix", Proxy = "proxy";
ChannelPrefix = "channelPrefix", Proxy = "proxy", ConnectRetry = "connectRetry";
private static readonly Dictionary<string, string> normalizedOptions = new[]
{
AllowAdmin, SyncTimeout,
......@@ -80,7 +80,7 @@ internal static void Unknown(string key)
Version, ConnectTimeout, Password,
TieBreaker, WriteBuffer, Ssl, SslHost,
ConfigChannel, AbortOnConnectFail, ResolveDns,
ChannelPrefix, Proxy
ChannelPrefix, Proxy, ConnectRetry
}.ToDictionary(x => x, StringComparer.InvariantCultureIgnoreCase);
public static string TryNormalize(string value)
......@@ -105,7 +105,7 @@ public static string TryNormalize(string value)
private Version defaultVersion;
private int? keepAlive, syncTimeout, connectTimeout, writeBuffer;
private int? keepAlive, syncTimeout, connectTimeout, writeBuffer, connectRetry;
private Proxy? proxy;
......@@ -153,6 +153,11 @@ public static string TryNormalize(string value)
/// </summary>
public string ClientName { get { return clientName; } set { clientName = value; } }
/// <summary>
/// The number of times to repeat the initial connect cycle if no servers respond promptly
/// </summary>
public int ConnectRetry { get { return connectRetry ?? 3; } set { connectRetry = value; } }
/// <summary>
/// The command-map associated with this configuration
/// </summary>
......@@ -302,6 +307,7 @@ public ConfigurationOptions Clone()
CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager,
connectRetry = connectRetry
};
foreach (var item in endpoints)
options.endpoints.Add(item);
......@@ -343,6 +349,7 @@ public override string ToString()
Append(sb, OptionKeys.AbortOnConnectFail, abortOnConnectFail);
Append(sb, OptionKeys.ResolveDns, resolveDns);
Append(sb, OptionKeys.ChannelPrefix, (string)ChannelPrefix);
Append(sb, OptionKeys.ConnectRetry, connectRetry);
Append(sb, OptionKeys.Proxy, proxy);
if(commandMap != null) commandMap.AppendDeltas(sb);
return sb.ToString();
......@@ -433,7 +440,7 @@ static bool IsOption(string option, string prefix)
void Clear()
{
clientName = serviceName = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = connectRetry = null;
allowAdmin = abortOnConnectFail = resolveDns = ssl = null;
defaultVersion = null;
endpoints.Clear();
......@@ -500,6 +507,9 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.ConnectTimeout:
ConnectTimeout = OptionKeys.ParseInt32(key, value);
break;
case OptionKeys.ConnectRetry:
ConnectRetry = OptionKeys.ParseInt32(key, value);
break;
case OptionKeys.Version:
DefaultVersion = OptionKeys.ParseVersion(key, value);
break;
......
......@@ -51,7 +51,6 @@ public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task)
/// </summary>
public sealed partial class ConnectionMultiplexer : IDisposable
{
/// <summary>
/// Get summary statistics associates with this server
/// </summary>
......@@ -726,7 +725,7 @@ public static ConnectionMultiplexer Connect(string configuration, TextWriter log
killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the reegular timeout
var task = muxer.ReconfigureAsync(true, false, log, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout))
if (!task.Wait(muxer.SyncConnectTimeout(true)))
{
task.ObserveErrors();
if (muxer.RawConfig.AbortOnConnectFail)
......@@ -755,7 +754,7 @@ public static ConnectionMultiplexer Connect(ConfigurationOptions configuration,
killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the reegular timeout
var task = muxer.ReconfigureAsync(true, false, log, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout))
if (!task.Wait(muxer.SyncConnectTimeout(true)))
{
task.ObserveErrors();
if (muxer.RawConfig.AbortOnConnectFail)
......@@ -1005,7 +1004,7 @@ public bool Configure(TextWriter log = null)
// note we expect ReconfigureAsync to internally allow [n] duration,
// so to avoid near misses, here we wait 2*[n]
var task = ReconfigureAsync(false, true, log, null, "configure");
if (!task.Wait(SyncConnectTimeout))
if (!task.Wait(SyncConnectTimeout(false)))
{
task.ObserveErrors();
if (configuration.AbortOnConnectFail)
......@@ -1017,14 +1016,17 @@ public bool Configure(TextWriter log = null)
return task.Result;
}
internal int SyncConnectTimeout
internal int SyncConnectTimeout(bool forConnect)
{
get
{
int timeout = configuration.ConnectTimeout;
if (timeout >= int.MaxValue - 500) return int.MaxValue;
return timeout + Math.Min(500, timeout);
}
int retryCount = forConnect ? RawConfig.ConnectRetry : 1;
if (retryCount <= 0) retryCount = 1;
int timeout = configuration.ConnectTimeout;
if (timeout >= int.MaxValue / retryCount) return int.MaxValue;
timeout *= retryCount;
if (timeout >= int.MaxValue - 500) return int.MaxValue;
return timeout + Math.Min(500, timeout);
}
/// <summary>
/// Provides a text overview of the status of all connections
......@@ -1117,175 +1119,196 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
}
var endpoints = configuration.EndPoints;
LogLocked(log, "{0} unique nodes specified", endpoints.Count);
int attemptsLeft = first ? configuration.ConnectRetry : 1;
if (endpoints.Count == 0)
bool healthy = false;
do
{
throw new InvalidOperationException("No nodes to consider");
}
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority;
var available = new Task<bool>[endpoints.Count];
var servers = new ServerEndPoint[available.Length];
bool useTieBreakers = !string.IsNullOrWhiteSpace(configuration.TieBreaker);
var tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey);
for (int i = 0; i < available.Length; i++)
{
Trace("Testing: " + Format.ToString(endpoints[i]));
var server = GetServerEndPoint(endpoints[i]);
//server.ReportNextFailure();
servers[i] = server;
if (reconfigureAll && server.IsConnected)
if (first)
{
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null);
attemptsLeft--;
}
available[i] = server.SendTracer();
Message msg;
if (useTieBreakers)
int standaloneCount = 0, clusterCount = 0;
var endpoints = configuration.EndPoints;
LogLocked(log, "{0} unique nodes specified", endpoints.Count);
if (endpoints.Count == 0)
{
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
throw new InvalidOperationException("No nodes to consider");
}
}
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(configuration.ConnectTimeout));
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(configuration.ConnectTimeout) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, configuration.ConnectTimeout).ForAwait();
List<ServerEndPoint> masters = new List<ServerEndPoint>(available.Length);
int standaloneCount = 0, clusterCount = 0;
for (int i = 0; i < available.Length; i++)
{
var task = available[i];
Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority;
var available = new Task<bool>[endpoints.Count];
var servers = new ServerEndPoint[available.Length];
bool useTieBreakers = !string.IsNullOrWhiteSpace(configuration.TieBreaker);
var tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey);
for (int i = 0; i < available.Length; i++)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
Trace("Testing: " + Format.ToString(endpoints[i]));
var server = GetServerEndPoint(endpoints[i]);
//server.ReportNextFailure();
servers[i] = server;
if (reconfigureAll && server.IsConnected)
{
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message);
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null);
}
available[i] = server.SendTracer();
Message msg;
if (useTieBreakers)
{
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
}
}
else if (task.IsCanceled)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i]));
}
else if (task.IsCompleted)
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(configuration.ConnectTimeout));
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(configuration.ConnectTimeout) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, configuration.ConnectTimeout).ForAwait();
List<ServerEndPoint> masters = new List<ServerEndPoint>(available.Length);
for (int i = 0; i < available.Length; i++)
{
var server = servers[i];
if (task.Result)
var task = available[i];
Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message);
}
}
else if (task.IsCanceled)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i]));
}
else if (task.IsCompleted)
{
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
var server = servers[i];
if (task.Result)
{
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
switch (server.ServerType)
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
standaloneCount++;
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
clusterCount++;
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
}
}
else
{
case ServerType.Twemproxy:
case ServerType.Standalone:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
standaloneCount++;
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
clusterCount++;
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i]));
}
} else
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i]));
LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
}
}
else
if (clusterCount == 0)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
this.serverSelectionStrategy.ServerType = RawConfig.Proxy == Proxy.Twemproxy ? ServerType.Twemproxy : ServerType.Standalone;
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters)
{
if (master == preferred)
{
master.ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
master.SetUnselectable(UnselectableFlags.RedundantMaster);
}
}
}
}
else
{
serverSelectionStrategy.ServerType = ServerType.Cluster;
long coveredSlots = serverSelectionStrategy.CountCoveredSlots();
LogLocked(log, "Cluster: {0} of {1} slots covered",
coveredSlots, serverSelectionStrategy.TotalSlots);
if (clusterCount == 0)
{
this.serverSelectionStrategy.ServerType = RawConfig.Proxy == Proxy.Twemproxy ? ServerType.Twemproxy : ServerType.Standalone;
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters)
}
if (!first)
{
if (master == preferred)
long subscriptionChanges = ValidateSubscriptions();
if (subscriptionChanges == 0)
{
master.ClearUnselectable(UnselectableFlags.RedundantMaster);
} else
LogLocked(log, "No subscription changes necessary");
}
else
{
master.SetUnselectable(UnselectableFlags.RedundantMaster);
LogLocked(log, "Subscriptions reconfigured: {0}", subscriptionChanges);
}
}
}
else
{
serverSelectionStrategy.ServerType = ServerType.Cluster;
long coveredSlots = serverSelectionStrategy.CountCoveredSlots();
LogLocked(log, "Cluster: {0} of {1} slots covered",
coveredSlots, serverSelectionStrategy.TotalSlots);
if (showStats)
{
GetStatus(log);
}
}
if (!first)
{
long subscriptionChanges = ValidateSubscriptions();
if (subscriptionChanges == 0)
string stormLog = GetStormLog();
if (!string.IsNullOrWhiteSpace(stormLog))
{
LogLocked(log, "No subscription changes necessary");
} else
LogLocked(log, "");
LogLocked(log, stormLog);
}
healthy = standaloneCount != 0 || clusterCount != 0;
if (first && !healthy && attemptsLeft > 0)
{
LogLocked(log, "Subscriptions reconfigured: {0}", subscriptionChanges);
LogLocked(log, "resetting failing connections to retry...");
ResetAllNonConnected();
LogLocked(log, "retrying; attempts left: " + attemptsLeft + "...");
}
}
if (showStats)
//WTF("?: " + attempts);
} while (first && !healthy && attemptsLeft > 0);
if(first && configuration.AbortOnConnectFail && !healthy)
{
GetStatus(log);
return false;
}
if (first)
{
LogLocked(log, "Starting heartbeat...");
pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
}
string stormLog = GetStormLog();
if (!string.IsNullOrWhiteSpace(stormLog))
{
LogLocked(log, "");
LogLocked(log, stormLog);
}
if(first && configuration.AbortOnConnectFail && (standaloneCount == 0 && clusterCount == 0))
{
return false;
}
return true;
} catch (Exception ex)
......@@ -1303,6 +1326,15 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
}
private void ResetAllNonConnected()
{
var snapshot = serverSnapshot;
foreach(var server in snapshot)
{
server.ResetNonConnected();
}
}
partial void OnTraceLog(TextWriter log, [System.Runtime.CompilerServices.CallerMemberName] string caller = null);
private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, ServerEndPoint[] servers, bool useTieBreakers, Task<string>[] tieBreakers, List<ServerEndPoint> masters)
{
......
......@@ -301,6 +301,17 @@ internal void OnConnected(PhysicalConnection connection)
}
}
internal void ResetNonConnected()
{
var tmp = physical;
if (tmp != null && state != (int)State.ConnectedEstablished)
{
tmp.RecordConnectionFailed(ConnectionFailureType.UnableToConnect);
}
GetConnection();
}
internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException)
{
if (reportNextFailure)
......
......@@ -46,6 +46,14 @@ internal sealed partial class ServerEndPoint : IDisposable
private Version version;
internal void ResetNonConnected()
{
var tmp = interactive;
if (tmp != null) tmp.ResetNonConnected();
tmp = subscription;
if (tmp != null) tmp.ResetNonConnected();
}
public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
{
this.multiplexer = multiplexer;
......
......@@ -138,7 +138,8 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
}
throw;
}
return new SocketToken(socket);
var token = new SocketToken(socket);
return token;
}
internal void SetFastLoopbackOption(Socket socket)
{
......@@ -228,6 +229,19 @@ private void EndConnectImpl(IAsyncResult ar)
break;
}
}
catch(ObjectDisposedException)
{
ConnectionMultiplexer.TraceWithoutContext("(socket shutdown)");
if (tuple != null)
{
try
{ tuple.Item2.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
catch(Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment