Commit 47e1ec70 authored by Marc Gravell's avatar Marc Gravell

do a better job of detecting master/slave changes by optionally performing an...

do a better job of detecting master/slave changes by optionally performing an "info replication" on a loop
parent b4766acb
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis.Tests.Issues
{
[TestFixture]
public class SO24807536 : TestBase
{
public void Exec()
{
var key = Me();
using(var conn = Create())
{
var cache = conn.GetDatabase();
// setup some data
cache.KeyDelete(key);
cache.HashSet(key, "full", "some value");
cache.KeyExpire(key, TimeSpan.FromSeconds(3));
// test while exists
var exists = cache.KeyExists(key);
var ttl = cache.KeyTimeToLive(key);
var fullWait = cache.HashGetAsync(key, "full", flags: CommandFlags.None);
Assert.IsTrue(exists, "key exists");
Assert.IsNotNull(ttl, "ttl");
Assert.AreEqual("some value", (string)fullWait.Result);
// wait for expiry
Thread.Sleep(TimeSpan.FromSeconds(4));
// test once expired
exists = cache.KeyExists(key);
ttl = cache.KeyTimeToLive(key);
fullWait = cache.HashGetAsync(key, "full", flags: CommandFlags.None);
Assert.IsFalse(exists, "key exists");
Assert.IsNull(ttl, "ttl");
Assert.IsNull((string)fullWait.Result);
}
}
}
}
using NUnit.Framework;
using System;
using System.Threading;
namespace StackExchange.Redis.Tests.Issues
{
[TestFixture]
public class SO25113323 : TestBase
{
[Test]
public void SetExpirationToPassed()
{
var key = Me();
using (var conn = Create())
{
// Given
var cache = conn.GetDatabase();
cache.KeyDelete(key);
cache.HashSet(key, "full", "test", When.NotExists, CommandFlags.PreferMaster);
Thread.Sleep(10 * 1000);
// When
var expiresOn = DateTime.UtcNow.AddSeconds(-10);
var firstResult = cache.KeyExpire(key, expiresOn, CommandFlags.PreferMaster);
var secondResult = cache.KeyExpire(key, expiresOn, CommandFlags.PreferMaster);
var exists = cache.KeyExists(key);
var ttl = cache.KeyTimeToLive(key);
// Then
Assert.IsTrue(firstResult, "first"); // could set the first time, but this nukes the key
Assert.IsFalse(secondResult, "second"); // can't set, since nuked
Assert.IsFalse(exists, "exists"); // does not exist since nuked
Assert.IsNull(ttl, "ttl"); // no expiry since nuked
}
}
}
}
...@@ -78,6 +78,7 @@ ...@@ -78,6 +78,7 @@
<Compile Include="Issues\Issue6.cs" /> <Compile Include="Issues\Issue6.cs" />
<Compile Include="Issues\SO22786599.cs" /> <Compile Include="Issues\SO22786599.cs" />
<Compile Include="Issues\SO23949477.cs" /> <Compile Include="Issues\SO23949477.cs" />
<Compile Include="Issues\SO24807536.cs" />
<Compile Include="Keys.cs" /> <Compile Include="Keys.cs" />
<Compile Include="KeysAndValues.cs" /> <Compile Include="KeysAndValues.cs" />
<Compile Include="Lex.cs" /> <Compile Include="Lex.cs" />
...@@ -96,10 +97,12 @@ ...@@ -96,10 +97,12 @@
<Compile Include="Secure.cs" /> <Compile Include="Secure.cs" />
<Compile Include="Sentinel.cs" /> <Compile Include="Sentinel.cs" />
<Compile Include="Sets.cs" /> <Compile Include="Sets.cs" />
<Compile Include="Issues\SO25113323.cs" />
<Compile Include="SSDB.cs" /> <Compile Include="SSDB.cs" />
<Compile Include="SSL.cs" /> <Compile Include="SSL.cs" />
<Compile Include="TaskTests.cs" /> <Compile Include="TaskTests.cs" />
<Compile Include="TestBase.cs" /> <Compile Include="TestBase.cs" />
<Compile Include="TestInfoReplicationChecks.cs" />
<Compile Include="Transactions.cs" /> <Compile Include="Transactions.cs" />
<Compile Include="VPNTest.cs" /> <Compile Include="VPNTest.cs" />
</ItemGroup> </ItemGroup>
......
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using System.Threading;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class TestInfoReplicationChecks : TestBase
{
[Test]
public void Exec()
{
using(var conn = Create())
{
var parsed = ConfigurationOptions.Parse(conn.Configuration);
Assert.AreEqual(5, parsed.ConfigCheckSeconds);
var before = conn.GetCounters();
Thread.Sleep(TimeSpan.FromSeconds(13));
var after = conn.GetCounters();
int done = (int)(after.Interactive.CompletedSynchronously - before.Interactive.CompletedSynchronously);
Assert.IsTrue(done >= 2);
}
}
protected override string GetConfiguration()
{
return base.GetConfiguration() + ",configCheckSeconds=5";
}
}
}
...@@ -72,7 +72,8 @@ internal static void Unknown(string key) ...@@ -72,7 +72,8 @@ internal static void Unknown(string key)
Version = "version", ConnectTimeout = "connectTimeout", Password = "password", Version = "version", ConnectTimeout = "connectTimeout", Password = "password",
TieBreaker = "tiebreaker", WriteBuffer = "writeBuffer", Ssl = "ssl", SslHost = "sslHost", TieBreaker = "tiebreaker", WriteBuffer = "writeBuffer", Ssl = "ssl", SslHost = "sslHost",
ConfigChannel = "configChannel", AbortOnConnectFail = "abortConnect", ResolveDns = "resolveDns", ConfigChannel = "configChannel", AbortOnConnectFail = "abortConnect", ResolveDns = "resolveDns",
ChannelPrefix = "channelPrefix", Proxy = "proxy", ConnectRetry = "connectRetry"; ChannelPrefix = "channelPrefix", Proxy = "proxy", ConnectRetry = "connectRetry",
ConfigCheckSeconds = "configCheckSeconds";
private static readonly Dictionary<string, string> normalizedOptions = new[] private static readonly Dictionary<string, string> normalizedOptions = new[]
{ {
AllowAdmin, SyncTimeout, AllowAdmin, SyncTimeout,
...@@ -80,7 +81,8 @@ internal static void Unknown(string key) ...@@ -80,7 +81,8 @@ internal static void Unknown(string key)
Version, ConnectTimeout, Password, Version, ConnectTimeout, Password,
TieBreaker, WriteBuffer, Ssl, SslHost, TieBreaker, WriteBuffer, Ssl, SslHost,
ConfigChannel, AbortOnConnectFail, ResolveDns, ConfigChannel, AbortOnConnectFail, ResolveDns,
ChannelPrefix, Proxy, ConnectRetry ChannelPrefix, Proxy, ConnectRetry,
ConfigCheckSeconds
}.ToDictionary(x => x, StringComparer.InvariantCultureIgnoreCase); }.ToDictionary(x => x, StringComparer.InvariantCultureIgnoreCase);
public static string TryNormalize(string value) public static string TryNormalize(string value)
...@@ -105,7 +107,7 @@ public static string TryNormalize(string value) ...@@ -105,7 +107,7 @@ public static string TryNormalize(string value)
private Version defaultVersion; private Version defaultVersion;
private int? keepAlive, syncTimeout, connectTimeout, writeBuffer, connectRetry; private int? keepAlive, syncTimeout, connectTimeout, writeBuffer, connectRetry, configCheckSeconds;
private Proxy? proxy; private Proxy? proxy;
...@@ -260,6 +262,12 @@ public CommandMap CommandMap ...@@ -260,6 +262,12 @@ public CommandMap CommandMap
// these just rip out the underlying handlers, bypassing the event accessors - needed when creating the SSL stream // these just rip out the underlying handlers, bypassing the event accessors - needed when creating the SSL stream
internal RemoteCertificateValidationCallback CertificateValidationCallback { get { return CertificateValidation; } private set { CertificateValidation = value; } } internal RemoteCertificateValidationCallback CertificateValidationCallback { get { return CertificateValidation; } private set { CertificateValidation = value; } }
/// <summary>
/// Check configuration every n seconds (disabled by default)
/// </summary>
public int ConfigCheckSeconds { get { return configCheckSeconds.GetValueOrDefault(); } set { configCheckSeconds = value; } }
/// <summary> /// <summary>
/// Parse the configuration from a comma-delimited configuration string /// Parse the configuration from a comma-delimited configuration string
/// </summary> /// </summary>
...@@ -311,7 +319,8 @@ public ConfigurationOptions Clone() ...@@ -311,7 +319,8 @@ public ConfigurationOptions Clone()
CertificateSelectionCallback = CertificateSelectionCallback, CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(), ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager, SocketManager = SocketManager,
connectRetry = connectRetry connectRetry = connectRetry,
configCheckSeconds = configCheckSeconds
}; };
foreach (var item in endpoints) foreach (var item in endpoints)
options.endpoints.Add(item); options.endpoints.Add(item);
...@@ -355,7 +364,8 @@ public override string ToString() ...@@ -355,7 +364,8 @@ public override string ToString()
Append(sb, OptionKeys.ChannelPrefix, (string)ChannelPrefix); Append(sb, OptionKeys.ChannelPrefix, (string)ChannelPrefix);
Append(sb, OptionKeys.ConnectRetry, connectRetry); Append(sb, OptionKeys.ConnectRetry, connectRetry);
Append(sb, OptionKeys.Proxy, proxy); Append(sb, OptionKeys.Proxy, proxy);
if(commandMap != null) commandMap.AppendDeltas(sb); Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds);
if (commandMap != null) commandMap.AppendDeltas(sb);
return sb.ToString(); return sb.ToString();
} }
...@@ -444,7 +454,7 @@ static bool IsOption(string option, string prefix) ...@@ -444,7 +454,7 @@ static bool IsOption(string option, string prefix)
void Clear() void Clear()
{ {
clientName = serviceName = password = tieBreaker = sslHost = configChannel = null; clientName = serviceName = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = connectRetry = null; keepAlive = syncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = null;
allowAdmin = abortOnConnectFail = resolveDns = ssl = null; allowAdmin = abortOnConnectFail = resolveDns = ssl = null;
defaultVersion = null; defaultVersion = null;
endpoints.Clear(); endpoints.Clear();
...@@ -523,6 +533,9 @@ private void DoParse(string configuration, bool ignoreUnknown) ...@@ -523,6 +533,9 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.ConnectRetry: case OptionKeys.ConnectRetry:
ConnectRetry = OptionKeys.ParseInt32(key, value); ConnectRetry = OptionKeys.ParseInt32(key, value);
break; break;
case OptionKeys.ConfigCheckSeconds:
ConfigCheckSeconds = OptionKeys.ParseInt32(key, value);
break;
case OptionKeys.Version: case OptionKeys.Version:
DefaultVersion = OptionKeys.ParseVersion(key, value); DefaultVersion = OptionKeys.ParseVersion(key, value);
break; break;
......
...@@ -49,6 +49,12 @@ internal static EndPoint ParseEndPoint(string host, int port) ...@@ -49,6 +49,12 @@ internal static EndPoint ParseEndPoint(string host, int port)
if (IPAddress.TryParse(host, out ip)) return new IPEndPoint(ip, port); if (IPAddress.TryParse(host, out ip)) return new IPEndPoint(ip, port);
return new DnsEndPoint(host, port); return new DnsEndPoint(host, port);
} }
internal static EndPoint TryParseEndPoint(string host, string port)
{
if (string.IsNullOrEmpty(host) || string.IsNullOrEmpty(port)) return null;
int i;
return TryParseInt32(port, out i) ? ParseEndPoint(host, i) : null;
}
internal static string ToString(long value) internal static string ToString(long value)
{ {
......
...@@ -367,6 +367,7 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -367,6 +367,7 @@ internal void OnFullyEstablished(PhysicalConnection connection)
Interlocked.Exchange(ref failConnectCount, 0); Interlocked.Exchange(ref failConnectCount, 0);
serverEndPoint.OnFullyEstablished(connection); serverEndPoint.OnFullyEstablished(connection);
multiplexer.RequestWrite(this, true); multiplexer.RequestWrite(this, true);
if(connectionType == ConnectionType.Interactive) serverEndPoint.CheckInfoReplication();
} }
else else
{ {
...@@ -414,8 +415,16 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -414,8 +415,16 @@ internal void OnHeartbeat(bool ifConnectedOnly)
if (tmp != null) if (tmp != null)
{ {
tmp.OnHeartbeat(); tmp.OnHeartbeat();
int writeEverySeconds = serverEndPoint.WriteEverySeconds; int writeEverySeconds = serverEndPoint.WriteEverySeconds,
if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds) checkConfigSeconds = multiplexer.RawConfig.ConfigCheckSeconds;
if(state == (int)State.ConnectedEstablished && connectionType == ConnectionType.Interactive
&& checkConfigSeconds > 0 && serverEndPoint.LastInfoReplicationCheckSecondsAgo >= checkConfigSeconds
&& serverEndPoint.CheckInfoReplication())
{
// that serves as a keep-alive, if it is accepted
}
else if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds)
{ {
Trace("OnHeartbeat - overdue"); Trace("OnHeartbeat - overdue");
if (state == (int)State.ConnectedEstablished) if (state == (int)State.ConnectedEstablished)
......
...@@ -501,6 +501,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -501,6 +501,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
SetResult(message, true); SetResult(message, true);
return true; return true;
} }
string masterHost = null, masterPort = null;
bool roleSeen = false;
using (var reader = new StringReader(info)) using (var reader = new StringReader(info))
{ {
while ((line = reader.ReadLine()) != null) while ((line = reader.ReadLine()) != null)
...@@ -510,6 +512,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -510,6 +512,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
string val; string val;
if ((val = Extract(line, "role:")) != null) if ((val = Extract(line, "role:")) != null)
{ {
roleSeen = true;
switch (val) switch (val)
{ {
case "master": case "master":
...@@ -522,6 +525,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -522,6 +525,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
break; break;
} }
} }
else if ((val = Extract(line, "master_host:")) != null)
{
masterHost = val;
}
else if ((val = Extract(line, "master_port:")) != null)
{
masterPort = val;
}
else if ((val = Extract(line, "redis_version:")) != null) else if ((val = Extract(line, "redis_version:")) != null)
{ {
Version version; Version version;
...@@ -554,6 +565,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -554,6 +565,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
server.RunId = val; server.RunId = val;
} }
} }
if (roleSeen)
{ // these are in the same section, if presnt
server.MasterEndPoint = Format.TryParseEndPoint(masterHost, masterPort);
}
} }
} }
SetResult(message, true); SetResult(message, true);
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Text; using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace StackExchange.Redis namespace StackExchange.Redis
...@@ -271,6 +272,7 @@ internal void AutoConfigure(PhysicalConnection connection) ...@@ -271,6 +272,7 @@ internal void AutoConfigure(PhysicalConnection connection)
} }
if (commandMap.IsAvailable(RedisCommand.INFO)) if (commandMap.IsAvailable(RedisCommand.INFO))
{ {
lastInfoReplicationCheckTicks = Environment.TickCount;
if (features.InfoSections) if (features.InfoSections)
{ {
msg = Message.Create(-1, flags, RedisCommand.INFO, RedisLiterals.replication); msg = Message.Create(-1, flags, RedisCommand.INFO, RedisLiterals.replication);
...@@ -470,6 +472,36 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -470,6 +472,36 @@ internal void OnFullyEstablished(PhysicalConnection connection)
} }
} }
internal int LastInfoReplicationCheckSecondsAgo
{
get { return unchecked(Environment.TickCount - Thread.VolatileRead(ref lastInfoReplicationCheckTicks)) / 1000; }
}
private EndPoint masterEndPoint;
public EndPoint MasterEndPoint
{
get { return masterEndPoint; }
set { SetConfig(ref masterEndPoint, value); }
}
internal bool CheckInfoReplication()
{
lastInfoReplicationCheckTicks = Environment.TickCount;
PhysicalBridge bridge;
if (version >= RedisFeatures.v2_8_0 && multiplexer.CommandMap.IsAvailable(RedisCommand.INFO)
&& (bridge = GetBridge(ConnectionType.Interactive, false)) != null)
{
var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication);
msg.SetInternalCall();
QueueDirectFireAndForget(msg, ResultProcessor.AutoConfigure, bridge);
return true;
}
return false;
}
private int lastInfoReplicationCheckTicks;
internal void OnHeartbeat() internal void OnHeartbeat()
{ {
try try
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment