Commit be2fb0e5 authored by Nick Craver's avatar Nick Craver

Tests: ensure setup on each failover run

parent 8c548885
......@@ -9,7 +9,30 @@ namespace StackExchange.Redis.Tests
public class Failover : TestBase
{
protected override string GetConfiguration() => GetMasterSlaveConfig().ToString();
public Failover(ITestOutputHelper output) : base(output) { }
public Failover(ITestOutputHelper output) : base(output)
{
using (var mutex = Create())
{
var shouldBeMaster = mutex.GetServer(TestConfig.Current.FailoverMasterServerAndPort);
if (shouldBeMaster.IsSlave)
{
Output.WriteLine(shouldBeMaster.EndPoint + " should be master, fixing...");
shouldBeMaster.MakeMaster(ReplicationChangeOptions.SetTiebreaker);
}
Output.WriteLine("Flushing all databases...");
shouldBeMaster.FlushAllDatabases(CommandFlags.FireAndForget);
var shouldBeReplica = mutex.GetServer(TestConfig.Current.FailoverSlaveServerAndPort);
if (!shouldBeReplica.IsSlave)
{
Output.WriteLine(shouldBeReplica.EndPoint + " should be a slave, fixing...");
shouldBeReplica.SlaveOf(shouldBeMaster.EndPoint);
Thread.Sleep(2000);
}
}
}
private static ConfigurationOptions GetMasterSlaveConfig()
{
......@@ -154,136 +177,105 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
using (var a = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
using (var b = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
{
RedisChannel channel = Me();
var subA = a.GetSubscriber();
var subB = b.GetSubscriber();
long masterChanged = 0, aCount = 0, bCount = 0;
a.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("A noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
b.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("B noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
subA.Subscribe(channel, (_, message) =>
{
Output.WriteLine("A got message: " + message);
Interlocked.Increment(ref aCount);
});
subB.Subscribe(channel, (_, message) =>
{
Output.WriteLine("B got message: " + message);
Interlocked.Increment(ref bCount);
});
Assert.False(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a master");
Assert.True(a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a master");
Assert.True(b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a slave");
var epA = subA.SubscribedEndpoint(channel);
var epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
subA.Publish(channel, "A1");
subB.Publish(channel, "B1");
subA.Ping();
subB.Ping();
Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount));
Assert.Equal(0, Interlocked.Read(ref masterChanged));
try
{
// Ensure config setup
await EnsureMasterSlaveSetupAsync(a).ForAwait();
RedisChannel channel = Me();
var subA = a.GetSubscriber();
var subB = b.GetSubscriber();
long masterChanged = 0, aCount = 0, bCount = 0;
a.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("A noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
b.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("B noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
subA.Subscribe(channel, (_, message) =>
Interlocked.Exchange(ref masterChanged, 0);
Interlocked.Exchange(ref aCount, 0);
Interlocked.Exchange(ref bCount, 0);
Output.WriteLine("Changing master...");
using (var sw = new StringWriter())
{
Output.WriteLine("A got message: " + message);
Interlocked.Increment(ref aCount);
});
subB.Subscribe(channel, (_, message) =>
{
Output.WriteLine("B got message: " + message);
Interlocked.Increment(ref bCount);
});
Assert.False(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a master");
Assert.True(a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a master");
Assert.True(b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a slave");
var epA = subA.SubscribedEndpoint(channel);
var epB = subB.SubscribedEndpoint(channel);
a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw);
Output.WriteLine(sw.ToString());
}
await Task.Delay(5000).ForAwait();
subA.Ping();
subB.Ping();
Output.WriteLine("Pausing...");
Output.WriteLine("A " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("A " + TestConfig.Current.FailoverSlaveServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.FailoverSlaveServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Assert.True(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave");
Assert.False(a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master");
Assert.True(b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master");
Output.WriteLine("Pause complete");
Output.WriteLine("A outstanding: " + a.GetCounters().TotalOutstanding);
Output.WriteLine("B outstanding: " + b.GetCounters().TotalOutstanding);
subA.Ping();
subB.Ping();
await Task.Delay(2000).ForAwait();
epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
subA.Publish(channel, "A1");
subB.Publish(channel, "B1");
Output.WriteLine("A2 sent to: " + subA.Publish(channel, "A2"));
Output.WriteLine("B2 sent to: " + subB.Publish(channel, "B2"));
subA.Ping();
subB.Ping();
Output.WriteLine("Checking...");
Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount));
Assert.Equal(0, Interlocked.Read(ref masterChanged));
try
{
Interlocked.Exchange(ref masterChanged, 0);
Interlocked.Exchange(ref aCount, 0);
Interlocked.Exchange(ref bCount, 0);
Output.WriteLine("Changing master...");
using (var sw = new StringWriter())
{
a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw);
Output.WriteLine(sw.ToString());
}
await Task.Delay(5000).ForAwait();
subA.Ping();
subB.Ping();
Output.WriteLine("Pausing...");
Output.WriteLine("A " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("A " + TestConfig.Current.FailoverSlaveServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.FailoverSlaveServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Assert.True(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave");
Assert.False(a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master");
Assert.True(b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master");
Output.WriteLine("Pause complete");
Output.WriteLine("A outstanding: " + a.GetCounters().TotalOutstanding);
Output.WriteLine("B outstanding: " + b.GetCounters().TotalOutstanding);
subA.Ping();
subB.Ping();
await Task.Delay(2000).ForAwait();
epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
Output.WriteLine("A2 sent to: " + subA.Publish(channel, "A2"));
Output.WriteLine("B2 sent to: " + subB.Publish(channel, "B2"));
subA.Ping();
subB.Ping();
Output.WriteLine("Checking...");
Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount));
// Expect 6, because a sees a, but b sees a and b due to replication
Assert.Equal(6, Interlocked.CompareExchange(ref masterChanged, 0, 0));
}
finally
{
Output.WriteLine("Restoring configuration...");
try
{
a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).MakeMaster(ReplicationChangeOptions.All);
await Task.Delay(1000).ForAwait();
}
catch { }
}
// Expect 6, because a sees a, but b sees a and b due to replication
Assert.Equal(6, Interlocked.CompareExchange(ref masterChanged, 0, 0));
}
finally
{
// Put it back, even if we fail...
await EnsureMasterSlaveSetupAsync(a);
Output.WriteLine("Restoring configuration...");
try
{
a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).MakeMaster(ReplicationChangeOptions.All);
await Task.Delay(1000).ForAwait();
}
catch { }
}
}
}
protected async Task EnsureMasterSlaveSetupAsync(ConnectionMultiplexer mutex)
{
var shouldBeMaster = mutex.GetServer(TestConfig.Current.FailoverMasterServerAndPort);
if (shouldBeMaster.IsSlave)
{
Output.WriteLine(shouldBeMaster.EndPoint + " should be master, fixing...");
shouldBeMaster.MakeMaster(ReplicationChangeOptions.SetTiebreaker);
}
Output.WriteLine("Flushing all databases...");
shouldBeMaster.FlushAllDatabases(CommandFlags.FireAndForget);
var shouldBeReplica = mutex.GetServer(TestConfig.Current.FailoverSlaveServerAndPort);
if (!shouldBeReplica.IsSlave)
{
Output.WriteLine(shouldBeReplica.EndPoint + " should be a slave, fixing...");
shouldBeReplica.SlaveOf(shouldBeMaster.EndPoint);
await Task.Delay(1000).ForAwait();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment