Commit b8a98e44 authored by dverma's avatar dverma Committed by Nick Craver

SE.Redis Reconnect retry policy (#510)

* SE.Redis Reconnect retry policy
* retry policy code review updates
parent 09f9d8ab
using System;
using NUnit.Framework;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class TransientErrorTests : TestBase
{
[TestCase]
public void TestExponentialRetry()
{
IReconnectRetryPolicy exponentialRetry = new ExponentialRetry(5000);
Assert.False(exponentialRetry.ShouldRetry(0, 0));
Assert.True(exponentialRetry.ShouldRetry(1, 5600));
Assert.True(exponentialRetry.ShouldRetry(2, 6050));
Assert.False(exponentialRetry.ShouldRetry(2, 4050));
}
[TestCase]
public void TestExponentialMaxRetry()
{
IReconnectRetryPolicy exponentialRetry = new ExponentialRetry(5000);
Assert.True(exponentialRetry.ShouldRetry(long.MaxValue, (int)TimeSpan.FromSeconds(30).TotalMilliseconds));
}
[TestCase]
public void TestLinearRetry()
{
IReconnectRetryPolicy linearRetry = new LinearRetry(5000);
Assert.False(linearRetry.ShouldRetry(0, 0));
Assert.False(linearRetry.ShouldRetry(2, 4999));
Assert.True(linearRetry.ShouldRetry(1, 5000));
}
}
}
...@@ -115,6 +115,8 @@ public static string TryNormalize(string value) ...@@ -115,6 +115,8 @@ public static string TryNormalize(string value)
private Proxy? proxy; private Proxy? proxy;
private IReconnectRetryPolicy reconnectRetryPolicy;
/// <summary> /// <summary>
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note /// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
/// that this cannot be specified in the configuration-string. /// that this cannot be specified in the configuration-string.
...@@ -207,6 +209,12 @@ public CommandMap CommandMap ...@@ -207,6 +209,12 @@ public CommandMap CommandMap
set { connectTimeout = value; } set { connectTimeout = value; }
} }
/// <summary>
/// The retry policy to be used for connection reconnects
/// </summary>
public IReconnectRetryPolicy ReconnectRetryPolicy { get { return reconnectRetryPolicy ?? (reconnectRetryPolicy = new LinearRetry(ConnectTimeout)); } set { reconnectRetryPolicy = value; } }
/// <summary> /// <summary>
/// The server version to assume /// The server version to assume
/// </summary> /// </summary>
...@@ -350,6 +358,7 @@ public ConfigurationOptions Clone() ...@@ -350,6 +358,7 @@ public ConfigurationOptions Clone()
configCheckSeconds = configCheckSeconds, configCheckSeconds = configCheckSeconds,
responseTimeout = responseTimeout, responseTimeout = responseTimeout,
defaultDatabase = defaultDatabase, defaultDatabase = defaultDatabase,
ReconnectRetryPolicy = reconnectRetryPolicy,
}; };
foreach (var item in endpoints) foreach (var item in endpoints)
options.endpoints.Add(item); options.endpoints.Add(item);
......
using System;
namespace StackExchange.Redis
{
/// <summary>
/// Represents a retry policy that performs retries, using a randomized exponential back off scheme to determine the interval between retries.
/// </summary>
public class ExponentialRetry : IReconnectRetryPolicy
{
private int deltaBackOffMilliseconds;
private int maxDeltaBackOffMilliseconds = (int)TimeSpan.FromSeconds(10).TotalMilliseconds;
[ThreadStatic]
private static Random r;
/// <summary>
/// Initializes a new instance using the specified back off interval with default maxDeltaBackOffMilliseconds of 10 seconds
/// </summary>
/// <param name="deltaBackOffMilliseconds">time in milliseconds for the back-off interval between retries</param>
public ExponentialRetry(int deltaBackOffMilliseconds) : this(deltaBackOffMilliseconds, (int)TimeSpan.FromSeconds(10).TotalMilliseconds)
{
}
/// <summary>
/// Initializes a new instance using the specified back off interval.
/// </summary>
/// <param name="deltaBackOffMilliseconds">time in milliseconds for the back-off interval between retries</param>
/// <param name="maxDeltaBackOffMilliseconds">time in milliseconds for the maximum value that the back-off interval can exponentailly grow upto</param>
public ExponentialRetry(int deltaBackOffMilliseconds, int maxDeltaBackOffMilliseconds)
{
this.deltaBackOffMilliseconds = deltaBackOffMilliseconds;
this.maxDeltaBackOffMilliseconds = maxDeltaBackOffMilliseconds;
}
/// <summary>
/// This method is called by the ConnectionMultiplexer to determine if a reconnect operation can be retried now.
/// </summary>
/// <param name="currentRetryCount">The number of times reconnect retries have already been made by the ConnectionMultiplexer while it was in the connecting state</param>
/// <param name="timeElapsedMillisecondsSinceLastRetry">Total elapsed time in milliseconds since the last reconnect retry was made</param>
public bool ShouldRetry(long currentRetryCount, int timeElapsedMillisecondsSinceLastRetry)
{
var exponential = (int)Math.Min(maxDeltaBackOffMilliseconds, deltaBackOffMilliseconds * Math.Pow(1.1, currentRetryCount));
int random;
r = r ?? new Random();
random = r.Next((int)deltaBackOffMilliseconds, exponential);
return timeElapsedMillisecondsSinceLastRetry >= random;
//exponential backoff with deltaBackOff of 5000ms
//deltabackoff exponential
//5000 5500
//5000 6050
//5000 6655
//5000 8053
//5000 10718
//5000 17261
//5000 37001
//5000 127738
}
}
}
\ No newline at end of file
using System;
namespace StackExchange.Redis
{
/// <summary>
/// Describes retry policy functionality that can be provided to the multiplexer to be used for connection reconnects
/// </summary>
public interface IReconnectRetryPolicy
{
/// <summary>
/// This method is called by the multiplexer to determine if a reconnect operation can be retried now.
/// </summary>
/// <param name="currentRetryCount">The number of times reconnect retries have already been made by the multiplexer while it was in connecting state</param>
/// <param name="timeElapsedMillisecondsSinceLastRetry">Total time elapsed in milliseconds since the last reconnect retry was made</param>
bool ShouldRetry(long currentRetryCount, int timeElapsedMillisecondsSinceLastRetry);
}
}
\ No newline at end of file
using System;
namespace StackExchange.Redis
{
/// <summary>
/// Represents a retry policy that performs retries at a fixed interval. The retries are performed upto a maximum allowed time.
/// </summary>
public class LinearRetry : IReconnectRetryPolicy
{
private int maxRetryElapsedTimeAllowedMilliseconds;
/// <summary>
/// Initializes a new instance using the specified maximum retry elapsed time allowed.
/// </summary>
/// <param name="maxRetryElapsedTimeAllowedMilliseconds">maximum elapsed time in milliseconds to be allowed for it to perform retries</param>
public LinearRetry(int maxRetryElapsedTimeAllowedMilliseconds)
{
this.maxRetryElapsedTimeAllowedMilliseconds = maxRetryElapsedTimeAllowedMilliseconds;
}
/// <summary>
/// This method is called by the ConnectionMultiplexer to determine if a reconnect operation can be retried now.
/// </summary>
/// <param name="currentRetryCount">The number of times reconnect retries have already been made by the ConnectionMultiplexer while it was in the connecting state</param>
/// <param name="timeElapsedMillisecondsSinceLastRetry">Total elapsed time in milliseconds since the last reconnect retry was made</param>
public bool ShouldRetry(long currentRetryCount, int timeElapsedMillisecondsSinceLastRetry)
{
return timeElapsedMillisecondsSinceLastRetry >= maxRetryElapsedTimeAllowedMilliseconds;
}
}
}
\ No newline at end of file
...@@ -47,6 +47,7 @@ sealed partial class PhysicalBridge : IDisposable ...@@ -47,6 +47,7 @@ sealed partial class PhysicalBridge : IDisposable
int profileLogIndex; int profileLogIndex;
volatile bool reportNextFailure = true, reconfigureNextFailure = false; volatile bool reportNextFailure = true, reconfigureNextFailure = false;
private volatile int state = (int)State.Disconnected; private volatile int state = (int)State.Disconnected;
public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type) public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type)
{ {
ServerEndPoint = serverEndPoint; ServerEndPoint = serverEndPoint;
...@@ -364,6 +365,8 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -364,6 +365,8 @@ internal void OnFullyEstablished(PhysicalConnection connection)
} }
private int connectStartTicks; private int connectStartTicks;
private long connectTimeoutRetryCount = 0;
internal void OnHeartbeat(bool ifConnectedOnly) internal void OnHeartbeat(bool ifConnectedOnly)
{ {
bool runThisTime = false; bool runThisTime = false;
...@@ -381,8 +384,10 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -381,8 +384,10 @@ internal void OnHeartbeat(bool ifConnectedOnly)
{ {
case (int)State.Connecting: case (int)State.Connecting:
int connectTimeMilliseconds = unchecked(Environment.TickCount - VolatileWrapper.Read(ref connectStartTicks)); int connectTimeMilliseconds = unchecked(Environment.TickCount - VolatileWrapper.Read(ref connectStartTicks));
if (connectTimeMilliseconds >= Multiplexer.RawConfig.ConnectTimeout) bool shouldRetry = Multiplexer.RawConfig.ReconnectRetryPolicy.ShouldRetry(Interlocked.Read(ref connectTimeoutRetryCount), connectTimeMilliseconds);
if (shouldRetry)
{ {
Interlocked.Increment(ref connectTimeoutRetryCount);
LastException = ExceptionFactory.UnableToConnect("ConnectTimeout"); LastException = ExceptionFactory.UnableToConnect("ConnectTimeout");
Trace("Aborting connect"); Trace("Aborting connect");
// abort and reconnect // abort and reconnect
...@@ -405,6 +410,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -405,6 +410,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
{ {
if(state == (int)State.ConnectedEstablished) if(state == (int)State.ConnectedEstablished)
{ {
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
tmp.Bridge.ServerEndPoint.ClearUnselectable(UnselectableFlags.DidNotRespond); tmp.Bridge.ServerEndPoint.ClearUnselectable(UnselectableFlags.DidNotRespond);
} }
tmp.OnHeartbeat(); tmp.OnHeartbeat();
...@@ -441,6 +447,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -441,6 +447,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
} }
break; break;
case (int)State.Disconnected: case (int)State.Disconnected:
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
AbortUnsent(); AbortUnsent();
...@@ -449,6 +456,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -449,6 +456,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
} }
break; break;
default: default:
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
AbortUnsent(); AbortUnsent();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment