Commit 22c4c1ce authored by Nick Craver's avatar Nick Craver

General cleanup

parent 53c43581
...@@ -25,7 +25,7 @@ public async Task ParallelTransactionsWithConditions() ...@@ -25,7 +25,7 @@ public async Task ParallelTransactionsWithConditions()
RedisKey hits = Me(), trigger = Me() + "3"; RedisKey hits = Me(), trigger = Me() + "3";
int expectedSuccess = 0; int expectedSuccess = 0;
await muxers[0].GetDatabase().KeyDeleteAsync(new[] { hits, trigger }); await muxers[0].GetDatabase().KeyDeleteAsync(new[] { hits, trigger }).ForAwait();
Task[] tasks = new Task[Workers]; Task[] tasks = new Task[Workers];
for (int i = 0; i < tasks.Length; i++) for (int i = 0; i < tasks.Length; i++)
...@@ -36,12 +36,12 @@ public async Task ParallelTransactionsWithConditions() ...@@ -36,12 +36,12 @@ public async Task ParallelTransactionsWithConditions()
{ {
for (int j = 0; j < PerThread; j++) for (int j = 0; j < PerThread; j++)
{ {
var oldVal = await scopedDb.StringGetAsync(trigger); var oldVal = await scopedDb.StringGetAsync(trigger).ForAwait();
var tran = scopedDb.CreateTransaction(); var tran = scopedDb.CreateTransaction();
tran.AddCondition(Condition.StringEqual(trigger, oldVal)); tran.AddCondition(Condition.StringEqual(trigger, oldVal));
var x = tran.StringIncrementAsync(trigger); var x = tran.StringIncrementAsync(trigger);
var y = tran.StringIncrementAsync(hits); var y = tran.StringIncrementAsync(hits);
if (await tran.ExecuteAsync()) if (await tran.ExecuteAsync().ForAwait())
{ {
Interlocked.Increment(ref expectedSuccess); Interlocked.Increment(ref expectedSuccess);
await x; await x;
...@@ -49,8 +49,8 @@ public async Task ParallelTransactionsWithConditions() ...@@ -49,8 +49,8 @@ public async Task ParallelTransactionsWithConditions()
} }
else else
{ {
await Assert.ThrowsAsync<TaskCanceledException>(() => x); await Assert.ThrowsAsync<TaskCanceledException>(() => x).ForAwait();
await Assert.ThrowsAsync<TaskCanceledException>(() => y); await Assert.ThrowsAsync<TaskCanceledException>(() => y).ForAwait();
} }
} }
}); });
...@@ -59,7 +59,7 @@ public async Task ParallelTransactionsWithConditions() ...@@ -59,7 +59,7 @@ public async Task ParallelTransactionsWithConditions()
{ {
await tasks[i]; await tasks[i];
} }
var actual = (int)await muxers[0].GetDatabase().StringGetAsync(hits); var actual = (int)await muxers[0].GetDatabase().StringGetAsync(hits).ForAwait();
Assert.Equal(expectedSuccess, actual); Assert.Equal(expectedSuccess, actual);
Writer.WriteLine($"success: {actual} out of {Workers * PerThread} attempts"); Writer.WriteLine($"success: {actual} out of {Workers * PerThread} attempts");
} }
...@@ -81,10 +81,14 @@ public void RunCompetingBatchesOnSameMuxer() ...@@ -81,10 +81,14 @@ public void RunCompetingBatchesOnSameMuxer()
{ {
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
Thread x = new Thread(state => BatchRunPings((IDatabase)state)); Thread x = new Thread(state => BatchRunPings((IDatabase)state))
x.Name = nameof(BatchRunPings); {
Thread y = new Thread(state => BatchRunIntegers((IDatabase)state)); Name = nameof(BatchRunPings)
y.Name = nameof(BatchRunIntegers); };
Thread y = new Thread(state => BatchRunIntegers((IDatabase)state))
{
Name = nameof(BatchRunIntegers)
};
x.Start(db); x.Start(db);
y.Start(db); y.Start(db);
...@@ -151,8 +155,8 @@ public async Task RunCompetingBatchesOnSameMuxerAsync() ...@@ -151,8 +155,8 @@ public async Task RunCompetingBatchesOnSameMuxerAsync()
private async Task BatchRunIntegersAsync(IDatabase db) private async Task BatchRunIntegersAsync(IDatabase db)
{ {
var key = Me(); var key = Me();
await db.KeyDeleteAsync(key); await db.KeyDeleteAsync(key).ForAwait();
await db.StringSetAsync(key, 1); await db.StringSetAsync(key, 1).ForAwait();
Task[] tasks = new Task[InnerCount]; Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++) for (int i = 0; i < IterationCount; i++)
{ {
...@@ -168,7 +172,7 @@ private async Task BatchRunIntegersAsync(IDatabase db) ...@@ -168,7 +172,7 @@ private async Task BatchRunIntegersAsync(IDatabase db)
} }
} }
var count = (long)await db.StringGetAsync(key); var count = (long)await db.StringGetAsync(key).ForAwait();
Writer.WriteLine($"tally: {count}"); Writer.WriteLine($"tally: {count}");
} }
...@@ -197,10 +201,14 @@ public void RunCompetingTransactionsOnSameMuxer() ...@@ -197,10 +201,14 @@ public void RunCompetingTransactionsOnSameMuxer()
{ {
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
Thread x = new Thread(state => TranRunPings((IDatabase)state)); Thread x = new Thread(state => TranRunPings((IDatabase)state))
x.Name = nameof(BatchRunPings); {
Thread y = new Thread(state => TranRunIntegers((IDatabase)state)); Name = nameof(BatchRunPings)
y.Name = nameof(BatchRunIntegers); };
Thread y = new Thread(state => TranRunIntegers((IDatabase)state))
{
Name = nameof(BatchRunIntegers)
};
x.Start(db); x.Start(db);
y.Start(db); y.Start(db);
...@@ -271,8 +279,8 @@ public async Task RunCompetingTransactionsOnSameMuxerAsync() ...@@ -271,8 +279,8 @@ public async Task RunCompetingTransactionsOnSameMuxerAsync()
private async Task TranRunIntegersAsync(IDatabase db) private async Task TranRunIntegersAsync(IDatabase db)
{ {
var key = Me(); var key = Me();
await db.KeyDeleteAsync(key); await db.KeyDeleteAsync(key).ForAwait();
await db.StringSetAsync(key, 1); await db.StringSetAsync(key, 1).ForAwait();
Task[] tasks = new Task[InnerCount]; Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++) for (int i = 0; i < IterationCount; i++)
{ {
...@@ -282,15 +290,14 @@ private async Task TranRunIntegersAsync(IDatabase db) ...@@ -282,15 +290,14 @@ private async Task TranRunIntegersAsync(IDatabase db)
{ {
tasks[j] = batch.StringIncrementAsync(key); tasks[j] = batch.StringIncrementAsync(key);
} }
await batch.ExecuteAsync(); await batch.ExecuteAsync().ForAwait();
for (int j = tasks.Length - 1; j >= 0; j--) for (int j = tasks.Length - 1; j >= 0; j--)
{ {
await tasks[j]; await tasks[j];
} }
} }
var count = (long)await db.StringGetAsync(key).ForAwait();
var count = (long)await db.StringGetAsync(key);
Writer.WriteLine($"tally: {count}"); Writer.WriteLine($"tally: {count}");
} }
...@@ -307,7 +314,7 @@ private async Task TranRunPingsAsync(IDatabase db) ...@@ -307,7 +314,7 @@ private async Task TranRunPingsAsync(IDatabase db)
{ {
tasks[j] = batch.PingAsync(); tasks[j] = batch.PingAsync();
} }
await batch.ExecuteAsync(); await batch.ExecuteAsync().ForAwait();
for (int j = tasks.Length - 1; j >= 0; j--) for (int j = tasks.Length - 1; j >= 0; j--)
{ {
await tasks[j]; await tasks[j];
......
...@@ -28,6 +28,7 @@ public static IEnumerable<object[]> GetTestData() ...@@ -28,6 +28,7 @@ public static IEnumerable<object[]> GetTestData()
yield return new object[] { "$4\r\nPING\r\n$4\r\nPONG\r\n$4\r\nPONG\r\n", 3 }; yield return new object[] { "$4\r\nPING\r\n$4\r\nPONG\r\n$4\r\nPONG\r\n", 3 };
yield return new object[] { "$4\r\nPING\r\n$4\r\nPONG\r\n$4\r\nPONG\r\n$", 3 }; yield return new object[] { "$4\r\nPING\r\n$4\r\nPONG\r\n$4\r\nPONG\r\n$", 3 };
} }
[Theory] [Theory]
[MemberData(nameof(GetTestData))] [MemberData(nameof(GetTestData))]
public void ParseAsSingleChunk(string ascii, int expected) public void ParseAsSingleChunk(string ascii, int expected)
...@@ -36,7 +37,6 @@ public void ParseAsSingleChunk(string ascii, int expected) ...@@ -36,7 +37,6 @@ public void ParseAsSingleChunk(string ascii, int expected)
ProcessMessages(buffer, expected); ProcessMessages(buffer, expected);
} }
[Theory] [Theory]
[MemberData(nameof(GetTestData))] [MemberData(nameof(GetTestData))]
public void ParseAsLotsOfChunks(string ascii, int expected) public void ParseAsLotsOfChunks(string ascii, int expected)
...@@ -59,10 +59,9 @@ public void ParseAsLotsOfChunks(string ascii, int expected) ...@@ -59,10 +59,9 @@ public void ParseAsLotsOfChunks(string ascii, int expected)
var buffer = new ReadOnlySequence<byte>(chain, 0, tail, 1); var buffer = new ReadOnlySequence<byte>(chain, 0, tail, 1);
Assert.Equal(bytes.Length, buffer.Length); Assert.Equal(bytes.Length, buffer.Length);
ProcessMessages(buffer, expected); ProcessMessages(buffer, expected);
} }
void ProcessMessages(ReadOnlySequence<byte> buffer, int expected)
private void ProcessMessages(ReadOnlySequence<byte> buffer, int expected)
{ {
Writer.WriteLine($"chain: {buffer.Length}"); Writer.WriteLine($"chain: {buffer.Length}");
var reader = new BufferReader(buffer); var reader = new BufferReader(buffer);
...@@ -76,8 +75,7 @@ void ProcessMessages(ReadOnlySequence<byte> buffer, int expected) ...@@ -76,8 +75,7 @@ void ProcessMessages(ReadOnlySequence<byte> buffer, int expected)
Assert.Equal(expected, found); Assert.Equal(expected, found);
} }
private class FragmentedSegment<T> : ReadOnlySequenceSegment<T>
class FragmentedSegment<T> : ReadOnlySequenceSegment<T>
{ {
public FragmentedSegment(long runningIndex, ReadOnlyMemory<T> memory) public FragmentedSegment(long runningIndex, ReadOnlyMemory<T> memory)
{ {
......
...@@ -121,8 +121,8 @@ public void StreamAddMultipleValuePairsWithManualId() ...@@ -121,8 +121,8 @@ public void StreamAddMultipleValuePairsWithManualId()
public void StreamConsumerGroupSetId() public void StreamConsumerGroupSetId()
{ {
var key = GetUniqueKey("group_set_id"); var key = GetUniqueKey("group_set_id");
var groupName = "test_group"; const string groupName = "test_group";
var consumer = "consumer"; const string consumer = "consumer";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -269,7 +269,7 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount() ...@@ -269,7 +269,7 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
// Start reading after id1. // Start reading after id1.
db.StreamCreateConsumerGroup(key, groupName, id1); db.StreamCreateConsumerGroup(key, groupName, id1);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", StreamPosition.NewMessages, 2); var entries = db.StreamReadGroup(key, groupName, "test_consumer", StreamPosition.NewMessages, 2);
// Ensure we only received the requested count and that the IDs match the expected values. // Ensure we only received the requested count and that the IDs match the expected values.
...@@ -430,7 +430,7 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew() ...@@ -430,7 +430,7 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
// Ask redis to read from the beginning of both stream, expect messages // Ask redis to read from the beginning of both stream, expect messages
// for only the stream set to read from the beginning. // for only the stream set to read from the beginning.
var groupName = "test_group"; const string groupName = "test_group";
var stream1 = GetUniqueKey("stream1"); var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2"); var stream2 = GetUniqueKey("stream2");
...@@ -472,7 +472,7 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew() ...@@ -472,7 +472,7 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
[Fact] [Fact]
public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult() public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
{ {
var groupName = "test_group"; const string groupName = "test_group";
var stream1 = GetUniqueKey("stream1"); var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2"); var stream2 = GetUniqueKey("stream2");
...@@ -484,7 +484,7 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult() ...@@ -484,7 +484,7 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
db.StreamAdd(stream1, "field1-1", "value1-1"); db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream2, "field2-1", "value2-1"); db.StreamAdd(stream2, "field2-1", "value2-1");
// set both streams to read only new messages (default behavior). // set both streams to read only new messages (default behavior).
db.StreamCreateConsumerGroup(stream1, groupName); db.StreamCreateConsumerGroup(stream1, groupName);
db.StreamCreateConsumerGroup(stream2, groupName); db.StreamCreateConsumerGroup(stream2, groupName);
...@@ -508,7 +508,7 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult() ...@@ -508,7 +508,7 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
[Fact] [Fact]
public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result() public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
{ {
var groupName = "test_group"; const string groupName = "test_group";
var stream1 = GetUniqueKey("stream1"); var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2"); var stream2 = GetUniqueKey("stream2");
...@@ -551,7 +551,7 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result() ...@@ -551,7 +551,7 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
[Fact] [Fact]
public void StreamConsumerGroupReadMultipleRestrictCount() public void StreamConsumerGroupReadMultipleRestrictCount()
{ {
var groupName = "test_group"; const string groupName = "test_group";
var stream1 = GetUniqueKey("stream1"); var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2"); var stream2 = GetUniqueKey("stream2");
...@@ -767,8 +767,8 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer() ...@@ -767,8 +767,8 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
public void StreamDeleteConsumer() public void StreamDeleteConsumer()
{ {
var key = GetUniqueKey("delete_consumer_group"); var key = GetUniqueKey("delete_consumer_group");
var groupName = "test_group"; const string groupName = "test_group";
var consumer = "test_consumer"; const string consumer = "test_consumer";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -802,8 +802,8 @@ public void StreamDeleteConsumer() ...@@ -802,8 +802,8 @@ public void StreamDeleteConsumer()
public void StreamDeleteConsumerGroup() public void StreamDeleteConsumerGroup()
{ {
var key = GetUniqueKey("delete_consumer_group"); var key = GetUniqueKey("delete_consumer_group");
var groupName = "test_group"; const string groupName = "test_group";
var consumer = "test_consumer"; const string consumer = "test_consumer";
using (var conn = Create()) using (var conn = Create())
{ {
...@@ -1056,7 +1056,7 @@ public void StreamPendingNoMessagesOrConsumers() ...@@ -1056,7 +1056,7 @@ public void StreamPendingNoMessagesOrConsumers()
Assert.True(pendingInfo.Consumers.Length == 0); Assert.True(pendingInfo.Consumers.Length == 0);
} }
} }
[Fact] [Fact]
public void StreamPositionDefaultValueIsBeginning() public void StreamPositionDefaultValueIsBeginning()
{ {
...@@ -1078,8 +1078,8 @@ public void StreamPositionValidateBeginning() ...@@ -1078,8 +1078,8 @@ public void StreamPositionValidateBeginning()
[Fact] [Fact]
public void StreamPositionValidateExplicit() public void StreamPositionValidateExplicit()
{ {
var explicitValue = "1-0"; const string explicitValue = "1-0";
var position = explicitValue; const string position = explicitValue;
Assert.Equal(explicitValue, StreamPosition.Resolve(position, RedisCommand.XREAD)); Assert.Equal(explicitValue, StreamPosition.Resolve(position, RedisCommand.XREAD));
} }
...@@ -1507,7 +1507,7 @@ public void StreamReadRangeReverseWithCount() ...@@ -1507,7 +1507,7 @@ public void StreamReadRangeReverseWithCount()
var id1 = db.StreamAdd(key, "field1", "value1"); var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2"); var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key, id1, id2, 1, Order.Descending); var entries = db.StreamRange(key, id1, id2, 1, Order.Descending);
Assert.True(entries.Length == 1); Assert.True(entries.Length == 1);
......
...@@ -299,7 +299,6 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -299,7 +299,6 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}"); muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}");
muxer.Closing += complete => muxer.Closing += complete =>
{ {
int count; int count;
lock(ActiveMultiplexers) lock(ActiveMultiplexers)
{ {
...@@ -310,7 +309,6 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -310,7 +309,6 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
} }
} }
Writer.WriteLine((complete ? "Closed (" : "Closing... (") + count.ToString() + " remaining)"); Writer.WriteLine((complete ? "Closed (" : "Closing... (") + count.ToString() + " remaining)");
}; };
return muxer; return muxer;
} }
......
...@@ -33,11 +33,12 @@ public interface IProfiledCommand ...@@ -33,11 +33,12 @@ public interface IProfiledCommand
CommandFlags Flags { get; } CommandFlags Flags { get; }
/// <summary> /// <summary>
/// <para>
/// When this command was *created*, will be approximately /// When this command was *created*, will be approximately
/// when the paired method of StackExchange.Redis was called but /// when the paired method of StackExchange.Redis was called but
/// before that method returned. /// before that method returned.
/// /// </para>
/// Note that the resolution of the returned DateTime is limited by DateTime.UtcNow. /// <para>Note that the resolution of the returned DateTime is limited by DateTime.UtcNow.</para>
/// </summary> /// </summary>
DateTime CommandCreated { get; } DateTime CommandCreated { get; }
...@@ -67,17 +68,17 @@ public interface IProfiledCommand ...@@ -67,17 +68,17 @@ public interface IProfiledCommand
TimeSpan ResponseToCompletion { get; } TimeSpan ResponseToCompletion { get; }
/// <summary> /// <summary>
/// How long it took this redis command to be processed, from creation to deserializing the final response. /// <para>How long it took this redis command to be processed, from creation to deserializing the final response.</para>
/// /// <para>Note that this TimeSpan *does not* include time spent awaiting a Task in consumer code.</para>
/// Note that this TimeSpan *does not* include time spent awaiting a Task in consumer code.
/// </summary> /// </summary>
TimeSpan ElapsedTime { get; } TimeSpan ElapsedTime { get; }
/// <summary> /// <summary>
/// <para>
/// If a command has to be resent due to an ASK or MOVED response from redis (in a cluster configuration), /// If a command has to be resent due to an ASK or MOVED response from redis (in a cluster configuration),
/// the second sending of the command will have this property set to the original IProfiledCommand. /// the second sending of the command will have this property set to the original IProfiledCommand.
/// /// </para>
/// This can only be set if redis is configured as a cluster. /// <para>This can only be set if redis is configured as a cluster.</para>
/// </summary> /// </summary>
IProfiledCommand RetransmissionOf { get; } IProfiledCommand RetransmissionOf { get; }
......
...@@ -23,7 +23,7 @@ private static void Main() ...@@ -23,7 +23,7 @@ private static void Main()
RunCompetingBatchesOnSameMuxer(); RunCompetingBatchesOnSameMuxer();
} while (DateTime.UtcNow < stop); } while (DateTime.UtcNow < stop);
} }
static ConnectionMultiplexer Create() private static ConnectionMultiplexer Create()
{ {
var options = new ConfigurationOptions var options = new ConfigurationOptions
{ {
...@@ -42,10 +42,14 @@ public static void RunCompetingBatchesOnSameMuxer() ...@@ -42,10 +42,14 @@ public static void RunCompetingBatchesOnSameMuxer()
{ {
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
Thread x = new Thread(state => BatchRunPings((IDatabase)state)); Thread x = new Thread(state => BatchRunPings((IDatabase)state))
x.Name = nameof(BatchRunPings); {
Thread y = new Thread(state => BatchRunIntegers((IDatabase)state)); Name = nameof(BatchRunPings)
y.Name = nameof(BatchRunIntegers); };
Thread y = new Thread(state => BatchRunIntegers((IDatabase)state))
{
Name = nameof(BatchRunIntegers)
};
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
x.Start(db); x.Start(db);
...@@ -59,7 +63,7 @@ public static void RunCompetingBatchesOnSameMuxer() ...@@ -59,7 +63,7 @@ public static void RunCompetingBatchesOnSameMuxer()
} }
} }
static RedisKey Me([CallerMemberName]string caller = null) => caller; private static RedisKey Me([CallerMemberName]string caller = null) => caller;
private static void BatchRunIntegers(IDatabase db) private static void BatchRunIntegers(IDatabase db)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment