Commit 43d21d08 authored by Nick Craver's avatar Nick Craver

Tests: improve PubSubNonParallel

This is a race in the implementation due to pub/sub order and locking. Just improving the test to help illustrate it.

High delays before weren't fixing the test on a stressed server, they were allowing the interval-based reconfiguration to mask the failure.
parent 040f77d2
......@@ -9,6 +9,27 @@ namespace StackExchange.Redis.Tests
[Collection(NonParallelCollection.Name)]
public class PubSubNonParallel : TestBase
{
private async Task EnsureMasterSlaveSetupAsync(ConnectionMultiplexer mutex)
{
var shouldBeMaster = mutex.GetServer(TestConfig.Current.MasterServerAndPort);
if (shouldBeMaster.IsSlave)
{
Output.WriteLine(shouldBeMaster.EndPoint + " should be master, fixing...");
shouldBeMaster.MakeMaster(ReplicationChangeOptions.SetTiebreaker);
}
Output.WriteLine("Flushing all databases...");
shouldBeMaster.FlushAllDatabases(CommandFlags.FireAndForget);
var shouldBeReplica = mutex.GetServer(TestConfig.Current.SlaveServerAndPort);
if (!shouldBeReplica.IsSlave)
{
Output.WriteLine(shouldBeReplica.EndPoint + " should be a slave, fixing...");
shouldBeReplica.SlaveOf(shouldBeMaster.EndPoint);
await Task.Delay(1000).ForAwait();
}
}
public PubSubNonParallel(ITestOutputHelper output) : base(output) { }
[Theory]
......@@ -19,6 +40,8 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
using (var a = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
using (var b = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
{
// Ensure config setup
await EnsureMasterSlaveSetupAsync(a).ForAwait();
RedisChannel channel = Me();
var subA = a.GetSubscriber();
var subB = b.GetSubscriber();
......@@ -26,20 +49,20 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
long masterChanged = 0, aCount = 0, bCount = 0;
a.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("a noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
Output.WriteLine("A noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
b.ConfigurationChangedBroadcast += delegate
{
Output.WriteLine("b noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
Output.WriteLine("B noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
};
subA.Subscribe(channel, (ch, message) =>
subA.Subscribe(channel, (_, message) =>
{
Output.WriteLine("a got message: " + message);
Output.WriteLine("A got message: " + message);
Interlocked.Increment(ref aCount);
});
subB.Subscribe(channel, (ch, message) =>
subB.Subscribe(channel, (_, message) =>
{
Output.WriteLine("b got message: " + message);
Output.WriteLine("B got message: " + message);
Interlocked.Increment(ref bCount);
});
......@@ -50,10 +73,10 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
var epA = subA.SubscribedEndpoint(channel);
var epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("a: " + EndPointCollection.ToString(epA));
Output.WriteLine("b: " + EndPointCollection.ToString(epB));
subA.Publish(channel, "a1");
subB.Publish(channel, "b1");
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
subA.Publish(channel, "A1");
subB.Publish(channel, "B1");
subA.Ping();
subB.Ping();
......@@ -72,10 +95,14 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
a.GetServer(TestConfig.Current.SlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw);
Output.WriteLine(sw.ToString());
}
await Task.Delay(5000).ForAwait();
subA.Ping();
subB.Ping();
Output.WriteLine("Pausing...");
await Task.Delay(6000).ForAwait();
Output.WriteLine("A " + TestConfig.Current.MasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("A " + TestConfig.Current.SlaveServerAndPort + " status: " + (a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.MasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave ? "Slave" : "Master"));
Output.WriteLine("B " + TestConfig.Current.SlaveServerAndPort + " status: " + (b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave ? "Slave" : "Master"));
Assert.True(a.GetServer(TestConfig.Current.MasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.MasterServerAndPort} should be a slave");
Assert.False(a.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.SlaveServerAndPort} should be a master");
......@@ -83,19 +110,17 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
Assert.False(b.GetServer(TestConfig.Current.SlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.SlaveServerAndPort} should be a master");
Output.WriteLine("Pause complete");
var counters = a.GetCounters();
Output.WriteLine("a outstanding: " + counters.TotalOutstanding);
counters = b.GetCounters();
Output.WriteLine("b outstanding: " + counters.TotalOutstanding);
Output.WriteLine("A outstanding: " + a.GetCounters().TotalOutstanding);
Output.WriteLine("B outstanding: " + b.GetCounters().TotalOutstanding);
subA.Ping();
subB.Ping();
await Task.Delay(2000).ForAwait();
epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel);
Output.WriteLine("a: " + EndPointCollection.ToString(epA));
Output.WriteLine("b: " + EndPointCollection.ToString(epB));
Output.WriteLine("a2 sent to: " + subA.Publish(channel, "a2"));
Output.WriteLine("b2 sent to: " + subB.Publish(channel, "b2"));
Output.WriteLine("A: " + EndPointCollection.ToString(epA));
Output.WriteLine("B: " + EndPointCollection.ToString(epB));
Output.WriteLine("A2 sent to: " + subA.Publish(channel, "A2"));
Output.WriteLine("B2 sent to: " + subB.Publish(channel, "B2"));
subA.Ping();
subB.Ping();
Output.WriteLine("Checking...");
......@@ -111,10 +136,9 @@ public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketMana
try
{
a.GetServer(TestConfig.Current.MasterServerAndPort).MakeMaster(ReplicationChangeOptions.All);
await Task.Delay(6000).ForAwait();
await Task.Delay(1000).ForAwait();
}
catch
{ }
catch { }
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment