Commit 31eedc47 authored by Nick Craver's avatar Nick Craver

Tests: key fixes and cleanup

Also parallelizes most Config test for quicker runs - moved the unfriendly ones to Failover.
parent 6de5908a
...@@ -9,41 +9,44 @@ namespace StackExchange.Redis.Tests.Booksleeve.Issues ...@@ -9,41 +9,44 @@ namespace StackExchange.Redis.Tests.Booksleeve.Issues
{ {
public class Massive_Delete : BookSleeveTestBase public class Massive_Delete : BookSleeveTestBase
{ {
public Massive_Delete(ITestOutputHelper output) : base(output) public Massive_Delete(ITestOutputHelper output) : base(output) { }
private void Prep(int db, string key)
{ {
var prefix = Me();
using (var muxer = GetUnsecuredConnection(allowAdmin: true)) using (var muxer = GetUnsecuredConnection(allowAdmin: true))
{ {
GetServer(muxer).FlushDatabase(db); GetServer(muxer).FlushDatabase(db);
Task last = null; Task last = null;
var conn = muxer.GetDatabase(db); var conn = muxer.GetDatabase(db);
for (int i = 0; i < 100000; i++) for (int i = 0; i < 10000; i++)
{ {
string key = "key" + i; string iKey = prefix + i;
conn.StringSetAsync(key, key); conn.StringSetAsync(iKey, iKey);
last = conn.SetAddAsync(todoKey, key); last = conn.SetAddAsync(key, iKey);
} }
conn.Wait(last); conn.Wait(last);
} }
} }
private const int db = 4; [FactLongRunning]
private const string todoKey = "todo";
[Fact]
public async Task ExecuteMassiveDelete() public async Task ExecuteMassiveDelete()
{ {
const int db = 4;
var key = Me();
Prep(db, key);
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
using (var muxer = GetUnsecuredConnection()) using (var muxer = GetUnsecuredConnection())
using (var throttle = new SemaphoreSlim(1)) using (var throttle = new SemaphoreSlim(1))
{ {
var conn = muxer.GetDatabase(db); var conn = muxer.GetDatabase(db);
var originally = await conn.SetLengthAsync(todoKey); var originally = await conn.SetLengthAsync(key).ForAwait();
int keepChecking = 1; int keepChecking = 1;
Task last = null; Task last = null;
while (Volatile.Read(ref keepChecking) == 1) while (Volatile.Read(ref keepChecking) == 1)
{ {
throttle.Wait(); // acquire throttle.Wait(); // acquire
var x = conn.SetPopAsync(todoKey).ContinueWith(task => var x = conn.SetPopAsync(key).ContinueWith(task =>
{ {
throttle.Release(); throttle.Release();
if (task.IsCompleted) if (task.IsCompleted)
...@@ -65,7 +68,7 @@ public async Task ExecuteMassiveDelete() ...@@ -65,7 +68,7 @@ public async Task ExecuteMassiveDelete()
await last; await last;
} }
watch.Stop(); watch.Stop();
long remaining = await conn.SetLengthAsync(todoKey); long remaining = await conn.SetLengthAsync(key).ForAwait();
Output.WriteLine("From {0} to {1}; {2}ms", originally, remaining, Output.WriteLine("From {0} to {1}; {2}ms", originally, remaining,
watch.ElapsedMilliseconds); watch.ElapsedMilliseconds);
......
...@@ -90,7 +90,7 @@ public async Task PubSubGetAllAnyOrder() ...@@ -90,7 +90,7 @@ public async Task PubSubGetAllAnyOrder()
var syncLock = new object(); var syncLock = new object();
var data = new HashSet<int>(); var data = new HashSet<int>();
await sub.SubscribeAsync(channel, (key, val) => await sub.SubscribeAsync(channel, (_, val) =>
{ {
bool pulse; bool pulse;
lock (data) lock (data)
...@@ -139,15 +139,15 @@ public async Task PubSubGetAllCorrectOrder() ...@@ -139,15 +139,15 @@ public async Task PubSubGetAllCorrectOrder()
var syncLock = new object(); var syncLock = new object();
var data = new List<int>(count); var data = new List<int>(count);
var subChannel = await sub.SubscribeAsync(channel); var subChannel = await sub.SubscribeAsync(channel).ForAwait();
await sub.PingAsync(); await sub.PingAsync().ForAwait();
async Task RunLoop() async Task RunLoop()
{ {
while (!subChannel.IsCompleted) while (!subChannel.IsCompleted)
{ {
var work = await subChannel.ReadAsync(); var work = await subChannel.ReadAsync().ForAwait();
int i = int.Parse(Encoding.UTF8.GetString(work.Message)); int i = int.Parse(Encoding.UTF8.GetString(work.Message));
lock (data) lock (data)
{ {
...@@ -185,9 +185,8 @@ async Task RunLoop() ...@@ -185,9 +185,8 @@ async Task RunLoop()
Assert.True(subChannel.IsCompleted); Assert.True(subChannel.IsCompleted);
await Assert.ThrowsAsync<ChannelClosedException>(async delegate await Assert.ThrowsAsync<ChannelClosedException>(async delegate
{ {
var final = await subChannel.ReadAsync(); var final = await subChannel.ReadAsync().ForAwait();
}); }).ForAwait();
} }
} }
...@@ -203,7 +202,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync() ...@@ -203,7 +202,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync()
var syncLock = new object(); var syncLock = new object();
var data = new List<int>(count); var data = new List<int>(count);
var subChannel = await sub.SubscribeAsync(channel); var subChannel = await sub.SubscribeAsync(channel).ForAwait();
subChannel.OnMessage(msg => subChannel.OnMessage(msg =>
{ {
int i = int.Parse(Encoding.UTF8.GetString(msg.Message)); int i = int.Parse(Encoding.UTF8.GetString(msg.Message));
...@@ -222,7 +221,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync() ...@@ -222,7 +221,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync()
} }
} }
}); });
await sub.PingAsync(); await sub.PingAsync().ForAwait();
lock (syncLock) lock (syncLock)
{ {
...@@ -246,9 +245,8 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync() ...@@ -246,9 +245,8 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync()
Assert.True(subChannel.IsCompleted); Assert.True(subChannel.IsCompleted);
await Assert.ThrowsAsync<ChannelClosedException>(async delegate await Assert.ThrowsAsync<ChannelClosedException>(async delegate
{ {
var final = await subChannel.ReadAsync(); var final = await subChannel.ReadAsync().ForAwait();
}); }).ForAwait();
} }
} }
...@@ -264,7 +262,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async() ...@@ -264,7 +262,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async()
var syncLock = new object(); var syncLock = new object();
var data = new List<int>(count); var data = new List<int>(count);
var subChannel = await sub.SubscribeAsync(channel); var subChannel = await sub.SubscribeAsync(channel).ForAwait();
subChannel.OnMessage(msg => subChannel.OnMessage(msg =>
{ {
int i = int.Parse(Encoding.UTF8.GetString(msg.Message)); int i = int.Parse(Encoding.UTF8.GetString(msg.Message));
...@@ -284,7 +282,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async() ...@@ -284,7 +282,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async()
} }
return i % 2 == 0 ? null : Task.CompletedTask; return i % 2 == 0 ? null : Task.CompletedTask;
}); });
await sub.PingAsync(); await sub.PingAsync().ForAwait();
lock (syncLock) lock (syncLock)
{ {
...@@ -308,9 +306,8 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async() ...@@ -308,9 +306,8 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async()
Assert.True(subChannel.IsCompleted); Assert.True(subChannel.IsCompleted);
await Assert.ThrowsAsync<ChannelClosedException>(async delegate await Assert.ThrowsAsync<ChannelClosedException>(async delegate
{ {
var final = await subChannel.ReadAsync(); var final = await subChannel.ReadAsync().ForAwait();
}); }).ForAwait();
} }
} }
......
...@@ -46,7 +46,7 @@ public async Task ConnectUsesSingleSocket() ...@@ -46,7 +46,7 @@ public async Task ConnectUsesSingleSocket()
{ {
using (var muxer = Create(failMessage: i + ": ", log: sw)) using (var muxer = Create(failMessage: i + ": ", log: sw))
{ {
await Task.Delay(500); await Task.Delay(500).ForAwait();
foreach (var ep in muxer.GetEndPoints()) foreach (var ep in muxer.GetEndPoints())
{ {
var srv = muxer.GetServer(ep); var srv = muxer.GetServer(ep);
......
using System; using System;
using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using Xunit; using Xunit;
using Xunit.Abstractions; using Xunit.Abstractions;
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
{ {
[Collection(NonParallelCollection.Name)]
public class Config : TestBase public class Config : TestBase
{ {
public Config(ITestOutputHelper output) : base (output) { } public Config(ITestOutputHelper output) : base (output) { }
...@@ -140,30 +137,6 @@ public void ReadConfig() ...@@ -140,30 +137,6 @@ public void ReadConfig()
} }
} }
[Fact]
public async System.Threading.Tasks.Task TestConfigureAsync()
{
using (var muxer = Create())
{
Thread.Sleep(1000);
Debug.WriteLine("About to reconfigure.....");
await muxer.ConfigureAsync().ForAwait();
Debug.WriteLine("Reconfigured");
}
}
[Fact]
public void TestConfigureSync()
{
using (var muxer = Create())
{
Thread.Sleep(1000);
Debug.WriteLine("About to reconfigure.....");
muxer.Configure();
Debug.WriteLine("Reconfigured");
}
}
[Fact] [Fact]
public void GetTime() public void GetTime()
{ {
......
...@@ -45,6 +45,30 @@ private static ConfigurationOptions GetMasterSlaveConfig() ...@@ -45,6 +45,30 @@ private static ConfigurationOptions GetMasterSlaveConfig()
}; };
} }
[Fact]
public async Task ConfigureAsync()
{
using (var muxer = Create())
{
Thread.Sleep(1000);
Output.WriteLine("About to reconfigure.....");
await muxer.ConfigureAsync().ForAwait();
Output.WriteLine("Reconfigured");
}
}
[Fact]
public void ConfigureSync()
{
using (var muxer = Create())
{
Thread.Sleep(1000);
Output.WriteLine("About to reconfigure.....");
muxer.Configure();
Output.WriteLine("Reconfigured");
}
}
[Fact] [Fact]
public async Task ConfigVerifyReceiveConfigChangeBroadcast() public async Task ConfigVerifyReceiveConfigChangeBroadcast()
{ {
......
...@@ -27,17 +27,17 @@ public async Task SetMembers() ...@@ -27,17 +27,17 @@ public async Task SetMembers()
var key = Me(); var key = Me();
const int count = (int)5e6; const int count = (int)5e6;
var len = await db.SetLengthAsync(key); var len = await db.SetLengthAsync(key).ForAwait();
if (len != count) if (len != count)
{ {
await db.KeyDeleteAsync(key); await db.KeyDeleteAsync(key).ForAwait();
foreach (var _ in Enumerable.Range(0, count)) foreach (var _ in Enumerable.Range(0, count))
db.SetAdd(key, Guid.NewGuid().ToByteArray(), CommandFlags.FireAndForget); db.SetAdd(key, Guid.NewGuid().ToByteArray(), CommandFlags.FireAndForget);
Assert.Equal(count, await db.SetLengthAsync(key)); // SCARD for set Assert.Equal(count, await db.SetLengthAsync(key).ForAwait()); // SCARD for set
} }
var result = await db.SetMembersAsync(key); var result = await db.SetMembersAsync(key).ForAwait();
Assert.Equal(count, result.Length); // SMEMBERS result length Assert.Equal(count, result.Length); // SMEMBERS result length
} }
} }
...@@ -55,25 +55,24 @@ public async Task SetUnion() ...@@ -55,25 +55,24 @@ public async Task SetUnion()
const int count = (int)5e6; const int count = (int)5e6;
var len1 = await db.SetLengthAsync(key1); var len1 = await db.SetLengthAsync(key1).ForAwait();
var len2 = await db.SetLengthAsync(key2); var len2 = await db.SetLengthAsync(key2).ForAwait();
await db.KeyDeleteAsync(dstkey); await db.KeyDeleteAsync(dstkey).ForAwait();
if (len1 != count || len2 != count) if (len1 != count || len2 != count)
{ {
await db.KeyDeleteAsync(key1); await db.KeyDeleteAsync(key1).ForAwait();
await db.KeyDeleteAsync(key2); await db.KeyDeleteAsync(key2).ForAwait();
foreach (var _ in Enumerable.Range(0, count)) foreach (var _ in Enumerable.Range(0, count))
{ {
db.SetAdd(key1, Guid.NewGuid().ToByteArray(), CommandFlags.FireAndForget); db.SetAdd(key1, Guid.NewGuid().ToByteArray(), CommandFlags.FireAndForget);
db.SetAdd(key2, Guid.NewGuid().ToByteArray(), CommandFlags.FireAndForget); db.SetAdd(key2, Guid.NewGuid().ToByteArray(), CommandFlags.FireAndForget);
} }
Assert.Equal(count, await db.SetLengthAsync(key1)); // SCARD for set 1 Assert.Equal(count, await db.SetLengthAsync(key1).ForAwait()); // SCARD for set 1
Assert.Equal(count, await db.SetLengthAsync(key2)); // SCARD for set 2 Assert.Equal(count, await db.SetLengthAsync(key2).ForAwait()); // SCARD for set 2
} }
await db.SetCombineAndStoreAsync(SetOperation.Union, dstkey, key1, key2); await db.SetCombineAndStoreAsync(SetOperation.Union, dstkey, key1, key2).ForAwait();
var dstLen = db.SetLength(dstkey); var dstLen = db.SetLength(dstkey);
Assert.Equal(count * 2, dstLen); // SCARD for destination set Assert.Equal(count * 2, dstLen); // SCARD for destination set
} }
......
...@@ -66,7 +66,7 @@ public void SentinelGetMasterAddressByNameNegativeTest() ...@@ -66,7 +66,7 @@ public void SentinelGetMasterAddressByNameNegativeTest()
[Fact] [Fact]
public async Task SentinelGetMasterAddressByNameAsyncNegativeTest() public async Task SentinelGetMasterAddressByNameAsyncNegativeTest()
{ {
var endpoint = await Server.SentinelGetMasterAddressByNameAsync("FakeServiceName"); var endpoint = await Server.SentinelGetMasterAddressByNameAsync("FakeServiceName").ForAwait();
Assert.Null(endpoint); Assert.Null(endpoint);
} }
......
...@@ -65,17 +65,17 @@ public void StreamAddMultipleValuePairsWithAutoId() ...@@ -65,17 +65,17 @@ public void StreamAddMultipleValuePairsWithAutoId()
Assert.True(entries.Length == 1); Assert.True(entries.Length == 1);
Assert.Equal(messageId, entries[0].Id); Assert.Equal(messageId, entries[0].Id);
Assert.True(entries[0].Values.Length == 2); Assert.True(entries[0].Values.Length == 2);
Assert.True(entries[0].Values[0].Name == "field1" && Assert.True(entries[0].Values[0].Name == "field1");
entries[0].Values[0].Value == "value1"); Assert.True(entries[0].Values[0].Value == "value1");
Assert.True(entries[0].Values[1].Name == "field2" && Assert.True(entries[0].Values[1].Name == "field2");
entries[0].Values[1].Value == "value2"); Assert.True(entries[0].Values[1].Value == "value2");
} }
} }
[Fact] [Fact]
public void StreamAddWithManualId() public void StreamAddWithManualId()
{ {
var id = "42-0"; const string id = "42-0";
var key = GetUniqueKey("manual_id"); var key = GetUniqueKey("manual_id");
using (var conn = Create()) using (var conn = Create())
...@@ -92,7 +92,7 @@ public void StreamAddWithManualId() ...@@ -92,7 +92,7 @@ public void StreamAddWithManualId()
[Fact] [Fact]
public void StreamAddMultipleValuePairsWithManualId() public void StreamAddMultipleValuePairsWithManualId()
{ {
var id = "42-0"; const string id = "42-0";
var key = GetUniqueKey("manual_id_multiple_values"); var key = GetUniqueKey("manual_id_multiple_values");
using (var conn = Create()) using (var conn = Create())
...@@ -121,7 +121,7 @@ public void StreamAddMultipleValuePairsWithManualId() ...@@ -121,7 +121,7 @@ public void StreamAddMultipleValuePairsWithManualId()
public void StreamConsumerGroupWithNoConsumers() public void StreamConsumerGroupWithNoConsumers()
{ {
var key = GetUniqueKey("group_with_no_consumers"); var key = GetUniqueKey("group_with_no_consumers");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -146,7 +146,7 @@ public void StreamConsumerGroupWithNoConsumers() ...@@ -146,7 +146,7 @@ public void StreamConsumerGroupWithNoConsumers()
public void StreamCreateConsumerGroup() public void StreamCreateConsumerGroup()
{ {
var key = GetUniqueKey("group_create"); var key = GetUniqueKey("group_create");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -168,7 +168,7 @@ public void StreamCreateConsumerGroup() ...@@ -168,7 +168,7 @@ public void StreamCreateConsumerGroup()
public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse() public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
{ {
var key = GetUniqueKey("group_read"); var key = GetUniqueKey("group_read");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -194,7 +194,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse() ...@@ -194,7 +194,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
public void StreamConsumerGroupReadFromStreamBeginning() public void StreamConsumerGroupReadFromStreamBeginning()
{ {
var key = GetUniqueKey("group_read_beginning"); var key = GetUniqueKey("group_read_beginning");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -219,7 +219,7 @@ public void StreamConsumerGroupReadFromStreamBeginning() ...@@ -219,7 +219,7 @@ public void StreamConsumerGroupReadFromStreamBeginning()
public void StreamConsumerGroupReadFromStreamBeginningWithCount() public void StreamConsumerGroupReadFromStreamBeginningWithCount()
{ {
var key = GetUniqueKey("group_read_with_count"); var key = GetUniqueKey("group_read_with_count");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -248,8 +248,8 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount() ...@@ -248,8 +248,8 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
public void StreamConsumerGroupAcknowledgeMessage() public void StreamConsumerGroupAcknowledgeMessage()
{ {
var key = GetUniqueKey("group_ack"); var key = GetUniqueKey("group_ack");
var groupName = "test_group"; const string groupName = "test_group";
var consumer = "test_consumer"; const string consumer = "test_consumer";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -290,9 +290,9 @@ public void StreamConsumerGroupAcknowledgeMessage() ...@@ -290,9 +290,9 @@ public void StreamConsumerGroupAcknowledgeMessage()
public void StreamConsumerGroupClaimMessages() public void StreamConsumerGroupClaimMessages()
{ {
var key = GetUniqueKey("group_claim"); var key = GetUniqueKey("group_claim");
var groupName = "test_group"; const string groupName = "test_group";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -341,9 +341,9 @@ public void StreamConsumerGroupClaimMessages() ...@@ -341,9 +341,9 @@ public void StreamConsumerGroupClaimMessages()
public void StreamConsumerGroupClaimMessagesReturningIds() public void StreamConsumerGroupClaimMessagesReturningIds()
{ {
var key = GetUniqueKey("group_claim_view_ids"); var key = GetUniqueKey("group_claim_view_ids");
var groupName = "test_group"; const string groupName = "test_group";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -384,14 +384,13 @@ public void StreamConsumerGroupClaimMessagesReturningIds() ...@@ -384,14 +384,13 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
Assert.Equal(id3, messageIds[1]); Assert.Equal(id3, messageIds[1]);
Assert.Equal(id4, messageIds[2]); Assert.Equal(id4, messageIds[2]);
} }
} }
[Fact] [Fact]
public void StreamConsumerGroupViewPendingInfoNoConsumers() public void StreamConsumerGroupViewPendingInfoNoConsumers()
{ {
var key = GetUniqueKey("group_pending_info_no_consumers"); var key = GetUniqueKey("group_pending_info_no_consumers");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -417,7 +416,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers() ...@@ -417,7 +416,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers()
public void StreamConsumerGroupViewPendingInfoWhenNothingPending() public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
{ {
var key = GetUniqueKey("group_pending_info_nothing_pending"); var key = GetUniqueKey("group_pending_info_nothing_pending");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -443,9 +442,9 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending() ...@@ -443,9 +442,9 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
public void StreamConsumerGroupViewPendingInfoSummary() public void StreamConsumerGroupViewPendingInfoSummary()
{ {
var key = GetUniqueKey("group_pending_info"); var key = GetUniqueKey("group_pending_info");
var groupName = "test_group"; const string groupName = "test_group";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -485,9 +484,9 @@ public void StreamConsumerGroupViewPendingInfoSummary() ...@@ -485,9 +484,9 @@ public void StreamConsumerGroupViewPendingInfoSummary()
public async Task StreamConsumerGroupViewPendingMessageInfo() public async Task StreamConsumerGroupViewPendingMessageInfo()
{ {
var key = GetUniqueKey("group_pending_messages"); var key = GetUniqueKey("group_pending_messages");
var groupName = "test_group"; const string groupName = "test_group";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -508,7 +507,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo() ...@@ -508,7 +507,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
// Read the remaining messages into the second consumer. // Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
await Task.Delay(10); await Task.Delay(10).ForAwait();
// Get the pending info about the messages themselves. // Get the pending info about the messages themselves.
var pendingMessageInfoList = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null); var pendingMessageInfoList = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null);
...@@ -526,9 +525,9 @@ public async Task StreamConsumerGroupViewPendingMessageInfo() ...@@ -526,9 +525,9 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
public void StreamConsumerGroupViewPendingMessageInfoForConsumer() public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
{ {
var key = GetUniqueKey("group_pending_for_consumer"); var key = GetUniqueKey("group_pending_for_consumer");
var groupName = "test_group"; const string groupName = "test_group";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -612,10 +611,10 @@ public void StreamDeleteMessages() ...@@ -612,10 +611,10 @@ public void StreamDeleteMessages()
public void StreamGroupInfoGet() public void StreamGroupInfoGet()
{ {
var key = GetUniqueKey("group_info"); var key = GetUniqueKey("group_info");
var group1 = "test_group_1"; const string group1 = "test_group_1";
var group2 = "test_group_2"; const string group2 = "test_group_2";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -654,9 +653,9 @@ public void StreamGroupInfoGet() ...@@ -654,9 +653,9 @@ public void StreamGroupInfoGet()
public void StreamGroupConsumerInfoGet() public void StreamGroupConsumerInfoGet()
{ {
var key = GetUniqueKey("group_consumer_info"); var key = GetUniqueKey("group_consumer_info");
var group = "test_group"; const string group = "test_group";
var consumer1 = "test_consumer_1"; const string consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2"; const string consumer2 = "test_consumer_2";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -763,7 +762,7 @@ public void StreamNoConsumerGroups() ...@@ -763,7 +762,7 @@ public void StreamNoConsumerGroups()
public void StreamPendingNoMessagesOrConsumers() public void StreamPendingNoMessagesOrConsumers()
{ {
var key = GetUniqueKey("stream_pending_empty"); var key = GetUniqueKey("stream_pending_empty");
var groupName = "test_group"; const string groupName = "test_group";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -885,7 +884,6 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream() ...@@ -885,7 +884,6 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream()
new StreamIdPair("key2", "0-0") new StreamIdPair("key2", "0-0")
}; };
var db = conn.GetDatabase(); var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPairs, 0)); Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPairs, 0));
} }
......
...@@ -26,5 +26,6 @@ public static Task<T> ObserveErrors<T>(this Task<T> task) ...@@ -26,5 +26,6 @@ public static Task<T> ObserveErrors<T>(this Task<T> task)
public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false); public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false); public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable<T> ForAwait<T>(this ValueTask<T> task) => task.ConfigureAwait(false);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment