Unverified Commit b90f991d authored by Nick Craver's avatar Nick Craver Committed by GitHub

Fix up boadcasts, tests, and logging across the board (#1400)

This changes does the following:
- Moves the broadcast to be only before the master reconfiguration to both before *and* after - a fix following https://github.com/StackExchange/StackExchange.Redis/commit/88dcf0c989b25623fb8ffb1f4d48c24593246cfe to fix the gap.
- Tweaks tests by overall lessening runtime (and thus build server time). Overall, fixes a few static timeouts to be conditional (so they short circuit faster if met), and brings some tests that were some variant of the above into RELEASE since they're safe now.
- Changes `UntilCondition` to take a `TimeSpan`, just because clearer. Even though I IntelliSense completed `.FromMinutes()` earlier and watched it like an idiot for a while...I stand by this decision!
- Locks the `ITestOutputHelper` writer because...that was jacked up in races:

![off to the races](https://user-images.githubusercontent.com/454813/77232395-2f799980-6b77-11ea-8fb4-0398de25e313.png)

------

Note: a lot of the test changes are just optimizations to delays which allow longer but short-circuit sooner. The important changes are in broadcast and locking around the test runner. I can think of downsides to neither, but want some @mgravell eyes. This should resolve a lot of flaky-ness with local (and build agent) tests. Not all of it, but a lot of it!
parent 2090d7da
...@@ -453,22 +453,31 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -453,22 +453,31 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
// ConfigurationOptions.ConfigCheckSeconds interval to identify the current (created by this method call) topology correctly. // ConfigurationOptions.ConfigCheckSeconds interval to identify the current (created by this method call) topology correctly.
var blockingReconfig = Interlocked.CompareExchange(ref activeConfigCause, "Block: Pending Master Reconfig", null) == null; var blockingReconfig = Interlocked.CompareExchange(ref activeConfigCause, "Block: Pending Master Reconfig", null) == null;
// try and broadcast this everywhere, to catch the maximum audience // Try and broadcast the fact a change happened to all members
if ((options & ReplicationChangeOptions.Broadcast) != 0 && ConfigurationChangedChannel != null // We want everyone possible to pick it up.
&& CommandMap.IsAvailable(RedisCommand.PUBLISH)) // We broadcast before *and after* the change to remote members, so that they don't go without detecting a change happened.
{ // This eliminates the race of pub/sub *then* re-slaving happening, since a method both preceeds and follows.
RedisValue channel = ConfigurationChangedChannel; void Broadcast(ReadOnlySpan<ServerEndPoint> serverNodes)
foreach (var node in nodes) {
if ((options & ReplicationChangeOptions.Broadcast) != 0 && ConfigurationChangedChannel != null
&& CommandMap.IsAvailable(RedisCommand.PUBLISH))
{ {
if (!node.IsConnected) continue; RedisValue channel = ConfigurationChangedChannel;
log?.WriteLine($"Broadcasting via {Format.ToString(node.EndPoint)}..."); foreach (var node in serverNodes)
msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, newMaster); {
if (!node.IsConnected) continue;
log?.WriteLine($"Broadcasting via {Format.ToString(node.EndPoint)}...");
msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, newMaster);
#pragma warning disable CS0618 #pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.Int64); node.WriteDirectFireAndForgetSync(msg, ResultProcessor.Int64);
#pragma warning restore CS0618 #pragma warning restore CS0618
}
} }
} }
// Send a message before it happens - because afterwards a new slave may be unresponsive
Broadcast(nodes);
if ((options & ReplicationChangeOptions.EnslaveSubordinates) != 0) if ((options & ReplicationChangeOptions.EnslaveSubordinates) != 0)
{ {
foreach (var node in nodes) foreach (var node in nodes)
...@@ -483,6 +492,11 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -483,6 +492,11 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
} }
} }
// ...and send one after it happens - because the first broadcast may have landed on a secondary client
// and it can reconfgure before any topology change actually happened. This is most likely to happen
// in low-latency environments.
Broadcast(nodes);
// and reconfigure the muxer // and reconfigure the muxer
log?.WriteLine("Reconfiguring all endpoints..."); log?.WriteLine("Reconfiguring all endpoints...");
// Yes, there is a tiny latency race possible between this code and the next call, but it's far more minute than before. // Yes, there is a tiny latency race possible between this code and the next call, but it's far more minute than before.
......
...@@ -771,8 +771,7 @@ public partial interface IServer : IRedis ...@@ -771,8 +771,7 @@ public partial interface IServer : IRedis
KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None); KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Force a failover as if the master was not reachable, and without asking for agreement to other Sentinels /// Show the state and info of the specified master.
/// (however a new version of the configuration will be published so that the other Sentinels will update their configurations).
/// </summary> /// </summary>
/// <param name="serviceName">The sentinel service name.</param> /// <param name="serviceName">The sentinel service name.</param>
/// <param name="flags">The command flags to use.</param> /// <param name="flags">The command flags to use.</param>
......
...@@ -292,12 +292,13 @@ public async Task TestSevered() ...@@ -292,12 +292,13 @@ public async Task TestSevered()
string key = Guid.NewGuid().ToString(); string key = Guid.NewGuid().ToString();
db.KeyDelete(key, CommandFlags.FireAndForget); db.KeyDelete(key, CommandFlags.FireAndForget);
db.StringSet(key, key, flags: CommandFlags.FireAndForget); db.StringSet(key, key, flags: CommandFlags.FireAndForget);
GetServer(muxer).SimulateConnectionFailure(); var server = GetServer(muxer);
server.SimulateConnectionFailure();
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
db.Ping(); await UntilCondition(TimeSpan.FromSeconds(10), () => server.IsConnected);
watch.Stop(); watch.Stop();
Log("Time to re-establish: {0}ms (any order)", watch.ElapsedMilliseconds); Log("Time to re-establish: {0}ms (any order)", watch.ElapsedMilliseconds);
await Task.Delay(2000).ForAwait(); await UntilCondition(TimeSpan.FromSeconds(10), () => key == db.StringGet(key));
Debug.WriteLine("Pinging..."); Debug.WriteLine("Pinging...");
Assert.Equal(key, db.StringGet(key)); Assert.Equal(key, db.StringGet(key));
} }
......
using System.Threading; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using Xunit; using Xunit;
using Xunit.Abstractions; using Xunit.Abstractions;
...@@ -9,7 +9,6 @@ public class ConnectFailTimeout : TestBase ...@@ -9,7 +9,6 @@ public class ConnectFailTimeout : TestBase
{ {
public ConnectFailTimeout(ITestOutputHelper output) : base (output) { } public ConnectFailTimeout(ITestOutputHelper output) : base (output) { }
#if DEBUG
[Fact] [Fact]
public async Task NoticesConnectFail() public async Task NoticesConnectFail()
{ {
...@@ -32,7 +31,7 @@ public async Task NoticesConnectFail() ...@@ -32,7 +31,7 @@ public async Task NoticesConnectFail()
Assert.Throws<RedisConnectionException>(() => server.Ping()); Assert.Throws<RedisConnectionException>(() => server.Ping());
Log("pinged"); Log("pinged");
// Heartbeat should reconnect by now // Heartbeat should reconnect by now
await Task.Delay(5000).ConfigureAwait(false); await UntilCondition(TimeSpan.FromSeconds(10), () => server.IsConnected);
Log("pinging - expect success"); Log("pinging - expect success");
var time = server.Ping(); var time = server.Ping();
...@@ -40,6 +39,5 @@ public async Task NoticesConnectFail() ...@@ -40,6 +39,5 @@ public async Task NoticesConnectFail()
Log(time.ToString()); Log(time.ToString());
} }
} }
#endif
} }
} }
...@@ -46,7 +46,7 @@ public void CanNotOpenNonsenseConnection_IP() ...@@ -46,7 +46,7 @@ public void CanNotOpenNonsenseConnection_IP()
{ {
var ex = Assert.Throws<RedisConnectionException>(() => var ex = Assert.Throws<RedisConnectionException>(() =>
{ {
using (var conn = ConnectionMultiplexer.Connect(TestConfig.Current.MasterServer + ":6500", Writer)) { } using (var conn = ConnectionMultiplexer.Connect(TestConfig.Current.MasterServer + ":6500,connectTimeout=1000", Writer)) { }
}); });
Log(ex.ToString()); Log(ex.ToString());
} }
...@@ -56,7 +56,7 @@ public async Task CanNotOpenNonsenseConnection_DNS() ...@@ -56,7 +56,7 @@ public async Task CanNotOpenNonsenseConnection_DNS()
{ {
var ex = await Assert.ThrowsAsync<RedisConnectionException>(async () => var ex = await Assert.ThrowsAsync<RedisConnectionException>(async () =>
{ {
using (var conn = await ConnectionMultiplexer.ConnectAsync($"doesnot.exist.ds.{Guid.NewGuid():N}.com:6500", Writer).ForAwait()) { } using (var conn = await ConnectionMultiplexer.ConnectAsync($"doesnot.exist.ds.{Guid.NewGuid():N}.com:6500,connectTimeout=1000", Writer).ForAwait()) { }
}).ForAwait(); }).ForAwait();
Log(ex.ToString()); Log(ex.ToString());
} }
...@@ -64,7 +64,7 @@ public async Task CanNotOpenNonsenseConnection_DNS() ...@@ -64,7 +64,7 @@ public async Task CanNotOpenNonsenseConnection_DNS()
[Fact] [Fact]
public void CreateDisconnectedNonsenseConnection_IP() public void CreateDisconnectedNonsenseConnection_IP()
{ {
using (var conn = ConnectionMultiplexer.Connect(TestConfig.Current.MasterServer + ":6500,abortConnect=false", Writer)) using (var conn = ConnectionMultiplexer.Connect(TestConfig.Current.MasterServer + ":6500,abortConnect=false,connectTimeout=1000", Writer))
{ {
Assert.False(conn.GetServer(conn.GetEndPoints().Single()).IsConnected); Assert.False(conn.GetServer(conn.GetEndPoints().Single()).IsConnected);
Assert.False(conn.GetDatabase().IsConnected(default(RedisKey))); Assert.False(conn.GetDatabase().IsConnected(default(RedisKey)));
...@@ -74,7 +74,7 @@ public void CreateDisconnectedNonsenseConnection_IP() ...@@ -74,7 +74,7 @@ public void CreateDisconnectedNonsenseConnection_IP()
[Fact] [Fact]
public void CreateDisconnectedNonsenseConnection_DNS() public void CreateDisconnectedNonsenseConnection_DNS()
{ {
using (var conn = ConnectionMultiplexer.Connect($"doesnot.exist.ds.{Guid.NewGuid():N}.com:6500, abortConnect=false", Writer)) using (var conn = ConnectionMultiplexer.Connect($"doesnot.exist.ds.{Guid.NewGuid():N}.com:6500,abortConnect=false,connectTimeout=1000", Writer))
{ {
Assert.False(conn.GetServer(conn.GetEndPoints().Single()).IsConnected); Assert.False(conn.GetServer(conn.GetEndPoints().Single()).IsConnected);
Assert.False(conn.GetDatabase().IsConnected(default(RedisKey))); Assert.False(conn.GetDatabase().IsConnected(default(RedisKey)));
......
using System.Threading; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Xunit; using Xunit;
using Xunit.Abstractions; using Xunit.Abstractions;
...@@ -113,7 +114,7 @@ public void ConnectsWhenBeginConnectCompletesSynchronously() ...@@ -113,7 +114,7 @@ public void ConnectsWhenBeginConnectCompletesSynchronously()
} }
[Fact] [Fact]
public void Issue922_ReconnectRaised() public async Task Issue922_ReconnectRaised()
{ {
var config = ConfigurationOptions.Parse(TestConfig.Current.MasterServerAndPort); var config = ConfigurationOptions.Parse(TestConfig.Current.MasterServerAndPort);
config.AbortOnConnectFail = true; config.AbortOnConnectFail = true;
...@@ -130,15 +131,14 @@ public void Issue922_ReconnectRaised() ...@@ -130,15 +131,14 @@ public void Issue922_ReconnectRaised()
muxer.ConnectionRestored += delegate { Interlocked.Increment(ref restoreCount); }; muxer.ConnectionRestored += delegate { Interlocked.Increment(ref restoreCount); };
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
db.Ping();
Assert.Equal(0, Volatile.Read(ref failCount)); Assert.Equal(0, Volatile.Read(ref failCount));
Assert.Equal(0, Volatile.Read(ref restoreCount)); Assert.Equal(0, Volatile.Read(ref restoreCount));
var server = muxer.GetServer(TestConfig.Current.MasterServerAndPort); var server = muxer.GetServer(TestConfig.Current.MasterServerAndPort);
server.SimulateConnectionFailure(); server.SimulateConnectionFailure();
Thread.Sleep(1000);
db.Ping(); // interactive+subscriber = 2 await UntilCondition(TimeSpan.FromSeconds(10), () => Volatile.Read(ref failCount) + Volatile.Read(ref restoreCount) == 4);
// interactive+subscriber = 2
Assert.Equal(2, Volatile.Read(ref failCount)); Assert.Equal(2, Volatile.Read(ref failCount));
Assert.Equal(2, Volatile.Read(ref restoreCount)); Assert.Equal(2, Volatile.Read(ref restoreCount));
} }
......
...@@ -35,13 +35,12 @@ public void NullSnapshot() ...@@ -35,13 +35,12 @@ public void NullSnapshot()
Assert.Null(ex.InnerException); Assert.Null(ex.InnerException);
} }
#if DEBUG
[Fact] [Fact]
public void MultipleEndpointsThrowConnectionException() public void MultipleEndpointsThrowConnectionException()
{ {
try try
{ {
using (var muxer = Create(keepAlive: 1, connectTimeout: 10000, allowAdmin: true)) using (var muxer = Create(keepAlive: 1, connectTimeout: 10000, allowAdmin: true, shared: false))
{ {
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
muxer.AllowConnect = false; muxer.AllowConnect = false;
...@@ -55,7 +54,8 @@ public void MultipleEndpointsThrowConnectionException() ...@@ -55,7 +54,8 @@ public void MultipleEndpointsThrowConnectionException()
var outer = Assert.IsType<RedisConnectionException>(ex); var outer = Assert.IsType<RedisConnectionException>(ex);
Assert.Equal(ConnectionFailureType.UnableToResolvePhysicalConnection, outer.FailureType); Assert.Equal(ConnectionFailureType.UnableToResolvePhysicalConnection, outer.FailureType);
var inner = Assert.IsType<RedisConnectionException>(outer.InnerException); var inner = Assert.IsType<RedisConnectionException>(outer.InnerException);
Assert.Equal(ConnectionFailureType.SocketFailure, inner.FailureType); Assert.True(inner.FailureType == ConnectionFailureType.SocketFailure
|| inner.FailureType == ConnectionFailureType.InternalFailure);
} }
} }
finally finally
...@@ -69,7 +69,7 @@ public void ServerTakesPrecendenceOverSnapshot() ...@@ -69,7 +69,7 @@ public void ServerTakesPrecendenceOverSnapshot()
{ {
try try
{ {
using (var muxer = Create(keepAlive: 1, connectTimeout: 10000, allowAdmin: true)) using (var muxer = Create(keepAlive: 1, connectTimeout: 10000, allowAdmin: true, shared: false))
{ {
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
muxer.AllowConnect = false; muxer.AllowConnect = false;
...@@ -87,7 +87,6 @@ public void ServerTakesPrecendenceOverSnapshot() ...@@ -87,7 +87,6 @@ public void ServerTakesPrecendenceOverSnapshot()
ClearAmbientFailures(); ClearAmbientFailures();
} }
} }
#endif
[Fact] [Fact]
public void NullInnerExceptionForMultipleEndpointsWithNoLastException() public void NullInnerExceptionForMultipleEndpointsWithNoLastException()
......
using System.IO; using System;
using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Xunit; using Xunit;
...@@ -103,6 +104,7 @@ public async Task ConfigVerifyReceiveConfigChangeBroadcast() ...@@ -103,6 +104,7 @@ public async Task ConfigVerifyReceiveConfigChangeBroadcast()
} }
} }
[Fact] [Fact]
public async Task DeslaveGoesToPrimary() public async Task DeslaveGoesToPrimary()
{ {
...@@ -181,6 +183,8 @@ public async Task DeslaveGoesToPrimary() ...@@ -181,6 +183,8 @@ public async Task DeslaveGoesToPrimary()
Assert.Equal(secondary2.EndPoint, db2.IdentifyEndpoint(key, CommandFlags.DemandSlave)); Assert.Equal(secondary2.EndPoint, db2.IdentifyEndpoint(key, CommandFlags.DemandSlave));
} }
await UntilCondition(TimeSpan.FromSeconds(20), () => !primary.IsSlave && secondary.IsSlave);
Assert.False(primary.IsSlave, $"{primary.EndPoint} should be a master."); Assert.False(primary.IsSlave, $"{primary.EndPoint} should be a master.");
Assert.True(secondary.IsSlave, $"{secondary.EndPoint} should be a slave."); Assert.True(secondary.IsSlave, $"{secondary.EndPoint} should be a slave.");
...@@ -205,6 +209,7 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() ...@@ -205,6 +209,7 @@ public async Task SubscriptionsSurviveMasterSwitchAsync()
using (var b = Create(allowAdmin: true, shared: false)) using (var b = Create(allowAdmin: true, shared: false))
{ {
RedisChannel channel = Me(); RedisChannel channel = Me();
Log("Using Channel: " + channel);
var subA = a.GetSubscriber(); var subA = a.GetSubscriber();
var subB = b.GetSubscriber(); var subB = b.GetSubscriber();
...@@ -247,7 +252,7 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() ...@@ -247,7 +252,7 @@ public async Task SubscriptionsSurviveMasterSwitchAsync()
Log(" SubA ping: " + subA.Ping()); Log(" SubA ping: " + subA.Ping());
Log(" SubB ping: " + subB.Ping()); Log(" SubB ping: " + subB.Ping());
// If redis is under load due to this suite, it may take a moment to send across. // If redis is under load due to this suite, it may take a moment to send across.
await UntilCondition(5000, () => Interlocked.Read(ref aCount) == 2 && Interlocked.Read(ref bCount) == 2).ForAwait(); await UntilCondition(TimeSpan.FromSeconds(5), () => Interlocked.Read(ref aCount) == 2 && Interlocked.Read(ref bCount) == 2).ForAwait();
Assert.Equal(2, Interlocked.Read(ref aCount)); Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount)); Assert.Equal(2, Interlocked.Read(ref bCount));
...@@ -264,7 +269,8 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() ...@@ -264,7 +269,8 @@ public async Task SubscriptionsSurviveMasterSwitchAsync()
a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw); a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw);
Log(sw.ToString()); Log(sw.ToString());
} }
await UntilCondition(3000, () => b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave).ForAwait(); Log("Waiting for connection B to detect...");
await UntilCondition(TimeSpan.FromSeconds(10), () => b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave).ForAwait();
subA.Ping(); subA.Ping();
subB.Ping(); subB.Ping();
Log("Falover 2 Attempted. Pausing..."); Log("Falover 2 Attempted. Pausing...");
...@@ -281,10 +287,19 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() ...@@ -281,10 +287,19 @@ public async Task SubscriptionsSurviveMasterSwitchAsync()
Assert.True(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave"); Assert.True(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave");
Assert.False(a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master"); Assert.False(a.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"A Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master");
await UntilCondition(TimeSpan.FromSeconds(10), () => b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave).ForAwait();
var sanityCheck = b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave; var sanityCheck = b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsSlave;
if (!sanityCheck) if (!sanityCheck)
{ {
Skip.Inconclusive("Not enough latency."); Log("FAILURE: B has not detected the topology change.");
foreach (var server in b.GetServerSnapshot().ToArray())
{
Log(" Server" + server.EndPoint);
Log(" State: " + server.ConnectionState);
Log(" IsMaster: " + !server.IsSlave);
Log(" Type: " + server.ServerType);
}
//Skip.Inconclusive("Not enough latency.");
} }
Assert.True(sanityCheck, $"B Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave"); Assert.True(sanityCheck, $"B Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a slave");
Assert.False(b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master"); Assert.False(b.GetServer(TestConfig.Current.FailoverSlaveServerAndPort).IsSlave, $"B Connection: {TestConfig.Current.FailoverSlaveServerAndPort} should be a master");
...@@ -294,18 +309,20 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() ...@@ -294,18 +309,20 @@ public async Task SubscriptionsSurviveMasterSwitchAsync()
Log(" B outstanding: " + b.GetCounters().TotalOutstanding); Log(" B outstanding: " + b.GetCounters().TotalOutstanding);
subA.Ping(); subA.Ping();
subB.Ping(); subB.Ping();
await Task.Delay(1000).ForAwait(); await Task.Delay(5000).ForAwait();
epA = subA.SubscribedEndpoint(channel); epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel); epB = subB.SubscribedEndpoint(channel);
Log("Subscription complete"); Log("Subscription complete");
Log(" A: " + EndPointCollection.ToString(epA)); Log(" A: " + EndPointCollection.ToString(epA));
Log(" B: " + EndPointCollection.ToString(epB)); Log(" B: " + EndPointCollection.ToString(epB));
Log(" A2 sent to: " + subA.Publish(channel, "A2")); var aSentTo = subA.Publish(channel, "A2");
Log(" B2 sent to: " + subB.Publish(channel, "B2")); var bSentTo = subB.Publish(channel, "B2");
Log(" A2 sent to: " + aSentTo);
Log(" B2 sent to: " + bSentTo);
subA.Ping(); subA.Ping();
subB.Ping(); subB.Ping();
Log("Ping Complete. Checking..."); Log("Ping Complete. Checking...");
await UntilCondition(10000, () => Interlocked.Read(ref aCount) == 2 && Interlocked.Read(ref bCount) == 2).ForAwait(); await UntilCondition(TimeSpan.FromSeconds(10), () => Interlocked.Read(ref aCount) == 2 && Interlocked.Read(ref bCount) == 2).ForAwait();
Log("Counts so far:"); Log("Counts so far:");
Log(" aCount: " + Interlocked.Read(ref aCount)); Log(" aCount: " + Interlocked.Read(ref aCount));
...@@ -314,8 +331,15 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() ...@@ -314,8 +331,15 @@ public async Task SubscriptionsSurviveMasterSwitchAsync()
Assert.Equal(2, Interlocked.Read(ref aCount)); Assert.Equal(2, Interlocked.Read(ref aCount));
Assert.Equal(2, Interlocked.Read(ref bCount)); Assert.Equal(2, Interlocked.Read(ref bCount));
// Expect 6, because a sees a, but b sees a and b due to replication // Expect 10, because a sees a, but b sees a and b due to replication
Assert.Equal(6, Interlocked.CompareExchange(ref masterChanged, 0, 0)); Assert.Equal(10, Interlocked.CompareExchange(ref masterChanged, 0, 0));
}
catch
{
LogNoTime("");
Log("ERROR: Something went bad - see above! Roooooolling back. Back it up. Baaaaaack it on up.");
LogNoTime("");
throw;
} }
finally finally
{ {
......
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -40,9 +41,9 @@ public async Task ExecuteWithUnsubscribeViaChannel() ...@@ -40,9 +41,9 @@ public async Task ExecuteWithUnsubscribeViaChannel()
return Task.CompletedTask; return Task.CompletedTask;
}); });
second.OnMessage(_ => Interlocked.Increment(ref i)); second.OnMessage(_ => Interlocked.Increment(ref i));
await Task.Delay(100); await Task.Delay(200);
await pubsub.PublishAsync(name, "abc"); await pubsub.PublishAsync(name, "abc");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
...@@ -53,9 +54,9 @@ public async Task ExecuteWithUnsubscribeViaChannel() ...@@ -53,9 +54,9 @@ public async Task ExecuteWithUnsubscribeViaChannel()
Assert.False(second.Completion.IsCompleted, "completed"); Assert.False(second.Completion.IsCompleted, "completed");
await first.UnsubscribeAsync(); await first.UnsubscribeAsync();
await Task.Delay(100); await Task.Delay(200);
await pubsub.PublishAsync(name, "def"); await pubsub.PublishAsync(name, "def");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1 && Volatile.Read(ref i) == 2);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
...@@ -66,9 +67,9 @@ public async Task ExecuteWithUnsubscribeViaChannel() ...@@ -66,9 +67,9 @@ public async Task ExecuteWithUnsubscribeViaChannel()
AssertCounts(pubsub, name, true, 0, 1); AssertCounts(pubsub, name, true, 0, 1);
await second.UnsubscribeAsync(); await second.UnsubscribeAsync();
await Task.Delay(100); await Task.Delay(200);
await pubsub.PublishAsync(name, "ghi"); await pubsub.PublishAsync(name, "ghi");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
...@@ -110,7 +111,7 @@ public async Task ExecuteWithUnsubscribeViaSubscriber() ...@@ -110,7 +111,7 @@ public async Task ExecuteWithUnsubscribeViaSubscriber()
await Task.Delay(100); await Task.Delay(100);
await pubsub.PublishAsync(name, "abc"); await pubsub.PublishAsync(name, "abc");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
...@@ -123,7 +124,7 @@ public async Task ExecuteWithUnsubscribeViaSubscriber() ...@@ -123,7 +124,7 @@ public async Task ExecuteWithUnsubscribeViaSubscriber()
await pubsub.UnsubscribeAsync(name); await pubsub.UnsubscribeAsync(name);
await Task.Delay(100); await Task.Delay(100);
await pubsub.PublishAsync(name, "def"); await pubsub.PublishAsync(name, "def");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
...@@ -161,7 +162,7 @@ public async Task ExecuteWithUnsubscribeViaClearAll() ...@@ -161,7 +162,7 @@ public async Task ExecuteWithUnsubscribeViaClearAll()
second.OnMessage(_ => Interlocked.Increment(ref i)); second.OnMessage(_ => Interlocked.Increment(ref i));
await Task.Delay(100); await Task.Delay(100);
await pubsub.PublishAsync(name, "abc"); await pubsub.PublishAsync(name, "abc");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
...@@ -174,7 +175,7 @@ public async Task ExecuteWithUnsubscribeViaClearAll() ...@@ -174,7 +175,7 @@ public async Task ExecuteWithUnsubscribeViaClearAll()
await pubsub.UnsubscribeAllAsync(); await pubsub.UnsubscribeAllAsync();
await Task.Delay(100); await Task.Delay(100);
await pubsub.PublishAsync(name, "def"); await pubsub.PublishAsync(name, "def");
await Task.Delay(100); await UntilCondition(TimeSpan.FromSeconds(10), () => values.Count == 1);
lock (values) lock (values)
{ {
Assert.Equal("abc", Assert.Single(values)); Assert.Equal("abc", Assert.Single(values));
......
...@@ -21,7 +21,7 @@ public async Task Exec() ...@@ -21,7 +21,7 @@ public async Task Exec()
// setup some data // setup some data
cache.KeyDelete(key, CommandFlags.FireAndForget); cache.KeyDelete(key, CommandFlags.FireAndForget);
cache.HashSet(key, "full", "some value", flags: CommandFlags.FireAndForget); cache.HashSet(key, "full", "some value", flags: CommandFlags.FireAndForget);
cache.KeyExpire(key, TimeSpan.FromSeconds(3), CommandFlags.FireAndForget); cache.KeyExpire(key, TimeSpan.FromSeconds(1), CommandFlags.FireAndForget);
// test while exists // test while exists
var keyExists = cache.KeyExists(key); var keyExists = cache.KeyExists(key);
...@@ -32,7 +32,7 @@ public async Task Exec() ...@@ -32,7 +32,7 @@ public async Task Exec()
Assert.Equal("some value", fullWait.Result); Assert.Equal("some value", fullWait.Result);
// wait for expiry // wait for expiry
await Task.Delay(4000).ForAwait(); await Task.Delay(2000).ForAwait();
// test once expired // test once expired
keyExists = cache.KeyExists(key); keyExists = cache.KeyExists(key);
......
...@@ -30,11 +30,14 @@ public async Task Basic() ...@@ -30,11 +30,14 @@ public async Task Basic()
toDb.KeyDelete(key, CommandFlags.FireAndForget); toDb.KeyDelete(key, CommandFlags.FireAndForget);
fromDb.StringSet(key, "foo", flags: CommandFlags.FireAndForget); fromDb.StringSet(key, "foo", flags: CommandFlags.FireAndForget);
var dest = to.GetEndPoints(true).Single(); var dest = to.GetEndPoints(true).Single();
fromDb.KeyMigrate(key, dest); fromDb.KeyMigrate(key, dest, migrateOptions: MigrateOptions.Replace);
await Task.Delay(1000); // this is *meant* to be synchronous at the redis level, but
// this is *meant* to be synchronous at the redis level, but
// we keep seeing it fail on the CI server where the key has *left* the origin, but // we keep seeing it fail on the CI server where the key has *left* the origin, but
// has *not* yet arrived at the destination; adding a pause while we investigate with // has *not* yet arrived at the destination; adding a pause while we investigate with
// the redis folks // the redis folks
await UntilCondition(TimeSpan.FromSeconds(5), () => !fromDb.KeyExists(key) && toDb.KeyExists(key));
Assert.False(fromDb.KeyExists(key)); Assert.False(fromDb.KeyExists(key));
Assert.True(toDb.KeyExists(key)); Assert.True(toDb.KeyExists(key));
string s = toDb.StringGet(key); string s = toDb.StringGet(key);
......
...@@ -27,21 +27,20 @@ public async Task ExplicitPublishMode() ...@@ -27,21 +27,20 @@ public async Task ExplicitPublishMode()
pub.Subscribe(new RedisChannel("ab*d", RedisChannel.PatternMode.Auto), (x, y) => Interlocked.Increment(ref c)); pub.Subscribe(new RedisChannel("ab*d", RedisChannel.PatternMode.Auto), (x, y) => Interlocked.Increment(ref c));
pub.Subscribe("abc*", (x, y) => Interlocked.Increment(ref d)); pub.Subscribe("abc*", (x, y) => Interlocked.Increment(ref d));
await Task.Delay(4100).ForAwait(); await Task.Delay(1000).ForAwait();
pub.Publish("abcd", "efg"); pub.Publish("abcd", "efg");
await Task.Delay(500).ForAwait(); await UntilCondition(TimeSpan.FromSeconds(10),
() => Thread.VolatileRead(ref b) == 1
&& Thread.VolatileRead(ref c) == 1
&& Thread.VolatileRead(ref d) == 1);
Assert.Equal(0, Thread.VolatileRead(ref a)); Assert.Equal(0, Thread.VolatileRead(ref a));
Assert.Equal(1, Thread.VolatileRead(ref b)); Assert.Equal(1, Thread.VolatileRead(ref b));
Assert.Equal(1, Thread.VolatileRead(ref c)); Assert.Equal(1, Thread.VolatileRead(ref c));
Assert.Equal(1, Thread.VolatileRead(ref d)); Assert.Equal(1, Thread.VolatileRead(ref d));
pub.Publish("*bcd", "efg"); pub.Publish("*bcd", "efg");
await Task.Delay(500).ForAwait(); await UntilCondition(TimeSpan.FromSeconds(10), () => Thread.VolatileRead(ref a) == 1);
Assert.Equal(1, Thread.VolatileRead(ref a)); Assert.Equal(1, Thread.VolatileRead(ref a));
//Assert.Equal(1, Thread.VolatileRead(ref b));
//Assert.Equal(1, Thread.VolatileRead(ref c));
//Assert.Equal(1, Thread.VolatileRead(ref d));
} }
} }
......
This diff is collapsed.
...@@ -38,16 +38,22 @@ protected TestBase(ITestOutputHelper output, SharedConnectionFixture fixture = n ...@@ -38,16 +38,22 @@ protected TestBase(ITestOutputHelper output, SharedConnectionFixture fixture = n
protected void LogNoTime(string message) => LogNoTime(Writer, message); protected void LogNoTime(string message) => LogNoTime(Writer, message);
internal static void LogNoTime(TextWriter output, string message) internal static void LogNoTime(TextWriter output, string message)
{ {
output.WriteLine(message); lock (output)
{
output.WriteLine(message);
}
if (TestConfig.Current.LogToConsole) if (TestConfig.Current.LogToConsole)
{ {
Console.WriteLine(message); Console.WriteLine(message);
} }
} }
protected void Log(string message) => Log(Writer, message); protected void Log(string message) => LogNoTime(Writer, message);
public static void Log(TextWriter output, string message) public static void Log(TextWriter output, string message)
{ {
output?.WriteLine(Time() + ": " + message); lock (output)
{
output?.WriteLine(Time() + ": " + message);
}
if (TestConfig.Current.LogToConsole) if (TestConfig.Current.LogToConsole)
{ {
Console.WriteLine(message); Console.WriteLine(message);
...@@ -55,7 +61,10 @@ public static void Log(TextWriter output, string message) ...@@ -55,7 +61,10 @@ public static void Log(TextWriter output, string message)
} }
protected void Log(string message, params object[] args) protected void Log(string message, params object[] args)
{ {
Output.WriteLine(Time() + ": " + message, args); lock (Output)
{
Output.WriteLine(Time() + ": " + message, args);
}
if (TestConfig.Current.LogToConsole) if (TestConfig.Current.LogToConsole)
{ {
Console.WriteLine(message, args); Console.WriteLine(message, args);
...@@ -409,13 +418,15 @@ void callback() ...@@ -409,13 +418,15 @@ void callback()
return watch.Elapsed; return watch.Elapsed;
} }
protected async Task UntilCondition(int maxMilliseconds, Func<bool> predicate, int perLoop = 100) private static readonly TimeSpan DefaultWaitPerLoop = TimeSpan.FromMilliseconds(50);
protected async Task UntilCondition(TimeSpan maxWaitTime, Func<bool> predicate, TimeSpan? waitPerLoop = null)
{ {
var spent = 0; TimeSpan spent = TimeSpan.Zero;
while (spent < maxMilliseconds && !predicate()) while (spent < maxWaitTime && !predicate())
{ {
await Task.Delay(perLoop).ForAwait(); var wait = waitPerLoop ?? DefaultWaitPerLoop;
spent += perLoop; await Task.Delay(wait).ForAwait();
spent += wait;
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment