Commit 9b36a8ea authored by Nick Craver's avatar Nick Craver

Tests: ensure restoration of replication after pub/sub failure we know about

parent 43d21d08
...@@ -40,105 +40,113 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana ...@@ -40,105 +40,113 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
using (var a = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager)) using (var a = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
using (var b = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager)) using (var b = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
{ {
// Ensure config setup
await EnsureMasterSlaveSetupAsync(a).ForAwait();
RedisChannel channel = Me();
var subA = a.GetSubscriber();
var subB = b.GetSubscriber();
long masterChanged = 0, aCount = 0, bCount = 0;
a.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("A noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
b.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("B noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
subA.Subscribe(channel, (_, message) =>
{
Output.WriteLine("A got message: " + message);
Interlocked.Increment(ref aCount);
});
subB.Subscribe(channel, (_, message) =>
{
Output.WriteLine("B got message: " + message);
Interlocked.Increment(ref bCount);
});
Assert.False(a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.MasterServerAndPort} should be a master");
Assert.True(a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.SlaveServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.MasterServerAndPort} should be a master");
Assert.True(b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.SlaveServerAndPort} should be a slave");
var epA = subA.SubscribedEndpoint(channel);
var epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
subA.Publish(channel, "A1");
subB.Publish(channel, "B1");
subA.Ping();
subB.Ping();
Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount));
Assert.Equal(0, Interlocked.Read(ref masterChanged));
try try
{ {
Interlocked.Exchange(ref masterChanged, 0); // Ensure config setup
Interlocked.Exchange(ref aCount, 0); await EnsureMasterSlaveSetupAsync(a).ForAwait();
Interlocked.Exchange(ref bCount, 0); RedisChannel channel = Me();
Output.WriteLine("Changing master..."); var subA = a.GetSubscriber();
using (var sw = new StringWriter()) var subB = b.GetSubscriber();
long masterChanged = 0, aCount = 0, bCount = 0;
a.ConfigurationChangedBroadcast += delegate
{ {
a.GetServer(TestConfig.Current.SlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw); Output.WriteLine("A noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
Output.WriteLine(sw.ToString()); };
} b.ConfigurationChangedBroadcast += delegate
await Task.Delay(5000).ForAwait(); {
subA.Ping(); Output.WriteLine("B noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
subB.Ping(); };
Output.WriteLine("Pausing..."); subA.Subscribe(channel, (_, message) =>
Output.WriteLine("A " + TestConfig.Current.MasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave ? "Slave" : "Master")); {
Output.WriteLine("A " + TestConfig.Current.SlaveServerAndPort + " status: " + (a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave ? "Slave" : "Master")); Output.WriteLine("A got message: " + message);
Output.WriteLine("B " + TestConfig.Current.MasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave ? "Slave" : "Master")); Interlocked.Increment(ref aCount);
Output.WriteLine("B " + TestConfig.Current.SlaveServerAndPort + " status: " + (b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave ? "Slave" : "Master")); });
subB.Subscribe(channel, (_, message) =>
{
Output.WriteLine("B got message: " + message);
Interlocked.Increment(ref bCount);
});
Assert.True(a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.MasterServerAndPort} should be a slave"); Assert.False(a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.MasterServerAndPort} should be a master");
Assert.False(a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.SlaveServerAndPort} should be a master"); Assert.True(a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.SlaveServerAndPort} should be a slave");
Assert.True(b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.MasterServerAndPort} should be a slave"); Assert.False(b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.MasterServerAndPort} should be a master");
Assert.False(b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.SlaveServerAndPort} should be a master"); Assert.True(b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.SlaveServerAndPort} should be a slave");
Output.WriteLine("Pause complete"); var epA = subA.SubscribedEndpoint(channel);
Output.WriteLine("A outstanding: " + a.GetCounters().TotalOutstanding); var epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("B outstanding: " + b.GetCounters().TotalOutstanding);
subA.Ping();
subB.Ping();
await Task.Delay(2000).ForAwait();
epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("A: " + EndPointCollection.ToString(epA)); Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB)); Output.WriteLine("B: " + EndPointCollection.ToString(epB));
Output.WriteLine("A2 sent to: " + subA.Publish(channel, "A2")); subA.Publish(channel, "A1");
Output.WriteLine("B2 sent to: " + subB.Publish(channel, "B2")); subB.Publish(channel, "B1");
subA.Ping(); subA.Ping();
subB.Ping(); subB.Ping();
Output.WriteLine("Checking...");
Assert.Equal(2, Interlocked.Read(ref aCount)); Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount)); Assert.Equal(2, Interlocked.Read(ref bCount));
// Expect 6, because a sees a, but b sees a and b due to replication Assert.Equal(0, Interlocked.Read(ref masterChanged));
Assert.Equal(6, Interlocked.CompareExchange(ref masterChanged, 0, 0));
}
finally
{
Output.WriteLine("Restoring configuration...");
try try
{ {
a.GetServer(TestConfig.Current.MasterServerAndPort).MakeMaster(ReplicationChangeOptions.All); Interlocked.Exchange(ref masterChanged, 0);
await Task.Delay(1000).ForAwait(); Interlocked.Exchange(ref aCount, 0);
Interlocked.Exchange(ref bCount, 0);
Output.WriteLine("Changing master...");
using (var sw = new StringWriter())
{
a.GetServer(TestConfig.Current.SlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw);
Output.WriteLine(sw.ToString());
}
await Task.Delay(5000).ForAwait();
subA.Ping();
subB.Ping();
Output.WriteLine("Pausing...");
Output.WriteLine("A " + TestConfig.Current.MasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("A " + TestConfig.Current.SlaveServerAndPort + " status: " + (a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.MasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.SlaveServerAndPort + " status: " + (b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Assert.True(a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.MasterServerAndPort} should be a slave");
Assert.False(a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.SlaveServerAndPort} should be a master");
Assert.True(b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.MasterServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.SlaveServerAndPort} should be a master");
Output.WriteLine("Pause complete");
Output.WriteLine("A outstanding: " + a.GetCounters().TotalOutstanding);
Output.WriteLine("B outstanding: " + b.GetCounters().TotalOutstanding);
subA.Ping();
subB.Ping();
await Task.Delay(2000).ForAwait();
epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
Output.WriteLine("A2 sent to: " + subA.Publish(channel, "A2"));
Output.WriteLine("B2 sent to: " + subB.Publish(channel, "B2"));
subA.Ping();
subB.Ping();
Output.WriteLine("Checking...");
Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount));
// Expect 6, because a sees a, but b sees a and b due to replication
Assert.Equal(6, Interlocked.CompareExchange(ref masterChanged, 0, 0));
}
finally
{
Output.WriteLine("Restoring configuration...");
try
{
a.GetServer(TestConfig.Current.MasterServerAndPort).MakeMaster(ReplicationChangeOptions.All);
await Task.Delay(1000).ForAwait();
}
catch { }
} }
catch { } }
finally
{
// Put it back, even if we fail...
await EnsureMasterSlaveSetupAsync(a);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment