Commit a4d087f6 authored by Nick Craver's avatar Nick Craver Committed by Marc Gravell

Simplify Position down into RedisValue (#912)

Not 100% sure this is what we figured out, but good first pass at getting there if not.
parent ea85ae80
......@@ -831,8 +831,8 @@ public void StreamClaimMessagesReturningIds()
[Fact]
public void StreamConsumerGroupSetPosition()
{
wrapper.StreamConsumerGroupSetPosition("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPosition("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
wrapper.StreamConsumerGroupSetPosition("key", "group", StreamPosition.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPosition("prefix:key", "group", StreamPosition.Beginning, CommandFlags.HighPriority));
}
[Fact]
......@@ -845,8 +845,8 @@ public void StreamConsumerInfoGet()
[Fact]
public void StreamCreateConsumerGroup()
{
wrapper.StreamCreateConsumerGroup("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
wrapper.StreamCreateConsumerGroup("key", "group", StreamPosition.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", StreamPosition.Beginning, CommandFlags.HighPriority));
}
[Fact]
......@@ -924,15 +924,15 @@ public void StreamRead_1()
[Fact]
public void StreamRead_2()
{
wrapper.StreamRead("key", new Position("0-0"), null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead("prefix:key", new Position("0-0"), null, CommandFlags.HighPriority));
wrapper.StreamRead("key", "0-0", null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead("prefix:key", "0-0", null, CommandFlags.HighPriority));
}
[Fact]
public void StreamStreamReadGroup_1()
{
wrapper.StreamReadGroup("key", "group", "consumer", new Position("0-0"), 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", new Position("0-0"), 10, CommandFlags.HighPriority));
wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority));
}
[Fact]
......
......@@ -135,15 +135,15 @@ public void StreamConsumerGroupSetId()
db.StreamAdd(key, "field2", "value2");
// Create a group and set the position to deliver new messages only.
db.StreamCreateConsumerGroup(key, groupName, Position.New);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.NewMessages);
// Read into the group, expect nothing
var firstRead = db.StreamReadGroup(key, groupName, consumer, Position.New);
var firstRead = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages);
// Reset the ID back to read from the beginning.
db.StreamConsumerGroupSetPosition(key, groupName, Position.Beginning);
db.StreamConsumerGroupSetPosition(key, groupName, StreamPosition.Beginning);
var secondRead = db.StreamReadGroup(key, groupName, consumer, Position.New);
var secondRead = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages);
Assert.NotNull(firstRead);
Assert.NotNull(secondRead);
......@@ -168,7 +168,7 @@ public void StreamConsumerGroupWithNoConsumers()
db.StreamAdd(key, "field1", "value1");
// Create a group
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
db.StreamCreateConsumerGroup(key, groupName, "0-0");
// Query redis for the group consumers, expect an empty list in response.
var consumers = db.StreamConsumerInfo(key, groupName);
......@@ -193,7 +193,7 @@ public void StreamCreateConsumerGroup()
db.StreamAdd(key, "field1", "value1");
// Create a group
var result = db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
var result = db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
Assert.True(result);
}
......@@ -219,7 +219,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
db.StreamCreateConsumerGroup(key, groupName);
// Read, expect no messages
var entries = db.StreamReadGroup(key, groupName, "test_consumer", new Position("0-0"));
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0");
Assert.True(entries.Length == 0);
}
......@@ -240,9 +240,9 @@ public void StreamConsumerGroupReadFromStreamBeginning()
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", new Position("0-0"));
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0");
Assert.True(entries.Length == 2);
Assert.True(id1 == entries[0].Id);
......@@ -268,9 +268,9 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
var id4 = db.StreamAdd(key, "field4", "value4");
// Start reading after id1.
db.StreamCreateConsumerGroup(key, groupName, new Position(id1));
db.StreamCreateConsumerGroup(key, groupName, id1);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", Position.New, 2);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", StreamPosition.NewMessages, 2);
// Ensure we only received the requested count and that the IDs match the expected values.
Assert.True(entries.Length == 2);
......@@ -297,10 +297,10 @@ public void StreamConsumerGroupAcknowledgeMessage()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName, consumer, new Position("0-0"));
var entries = db.StreamReadGroup(key, groupName, consumer, "0-0");
// Send XACK for 3 of the messages
......@@ -311,7 +311,7 @@ public void StreamConsumerGroupAcknowledgeMessage()
var twoAck = db.StreamAcknowledge(key, groupName, new RedisValue[] { id3, id4 });
// Read the group again, it should only return the unacknowledged message.
var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, new Position("0-0"));
var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0");
Assert.True(entries.Length == 4);
Assert.Equal(1, oneAck);
......@@ -340,7 +340,7 @@ public void StreamConsumerGroupClaimMessages()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
db.StreamCreateConsumerGroup(key, groupName, "0-0");
// Read a single message into the first consumer.
db.StreamReadGroup(key, groupName, consumer1, count: 1);
......@@ -391,10 +391,10 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, Position.Beginning, 1);
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, StreamPosition.Beginning, 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
......@@ -452,13 +452,13 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
db.StreamCreateConsumerGroup(stream1, groupName);
// stream2 set up to read from the beginning of the stream
db.StreamCreateConsumerGroup(stream2, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(stream2, groupName, StreamPosition.Beginning);
// Read for both streams from the beginning. We shouldn't get anything back for stream1.
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.Beginning),
new StreamPosition(stream2, Position.Beginning)
new StreamPosition(stream1, StreamPosition.Beginning),
new StreamPosition(stream2, StreamPosition.Beginning)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
......@@ -493,8 +493,8 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
// We shouldn't get anything for either stream.
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.Beginning),
new StreamPosition(stream2, Position.Beginning)
new StreamPosition(stream1, StreamPosition.Beginning),
new StreamPosition(stream2, StreamPosition.Beginning)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
......@@ -534,8 +534,8 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
// Read the new messages (messages created after the group was created).
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.New),
new StreamPosition(stream2, Position.New)
new StreamPosition(stream1, StreamPosition.NewMessages),
new StreamPosition(stream2, StreamPosition.NewMessages)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
......@@ -570,14 +570,14 @@ public void StreamConsumerGroupReadMultipleRestrictCount()
var id2_3 = db.StreamAdd(stream2, "field2-3", "value2-3");
// Allow reading from the beginning in both streams
db.StreamCreateConsumerGroup(stream1, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(stream2, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(stream1, groupName, StreamPosition.Beginning);
db.StreamCreateConsumerGroup(stream2, groupName, StreamPosition.Beginning);
var pairs = new StreamPosition[]
{
// Read after the first id in both streams
new StreamPosition(stream1, new Position(id1_1)),
new StreamPosition(stream2, new Position(id2_1))
new StreamPosition(stream1, id1_1),
new StreamPosition(stream2, id2_1)
};
// Restrict the count to 2 (expect only 1 message from first stream, 2 from the second).
......@@ -605,7 +605,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers()
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
var pendingInfo = db.StreamPending(key, groupName);
......@@ -631,7 +631,7 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
db.StreamCreateConsumerGroup(key, groupName, "0-0");
var pendingMessages = db.StreamPendingMessages(key,
groupName,
......@@ -662,10 +662,10 @@ public void StreamConsumerGroupViewPendingInfoSummary()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, Position.Beginning, 1);
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, StreamPosition.Beginning, 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
......@@ -704,7 +704,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
......@@ -745,7 +745,7 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
......@@ -782,8 +782,8 @@ public void StreamDeleteConsumer()
db.StreamAdd(key, "fiedl2", "value2");
// Create a consumer group and read the message.
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamReadGroup(key, groupName, consumer, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
db.StreamReadGroup(key, groupName, consumer, StreamPosition.Beginning);
var preDeleteConsumers = db.StreamConsumerInfo(key, groupName);
......@@ -816,8 +816,8 @@ public void StreamDeleteConsumerGroup()
db.StreamAdd(key, "field1", "value1");
// Create a consumer group and read the messages.
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamReadGroup(key, groupName, consumer, Position.Beginning);
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
db.StreamReadGroup(key, groupName, consumer, StreamPosition.Beginning);
var preDeleteInfo = db.StreamInfo(key);
......@@ -900,8 +900,8 @@ public void StreamGroupInfoGet()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group1, Position.Beginning);
db.StreamCreateConsumerGroup(key, group2, Position.Beginning);
db.StreamCreateConsumerGroup(key, group1, StreamPosition.Beginning);
db.StreamCreateConsumerGroup(key, group2, StreamPosition.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, group1, consumer1, count: 1);
......@@ -941,7 +941,7 @@ public void StreamGroupConsumerInfoGet()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group, Position.Beginning);
db.StreamCreateConsumerGroup(key, group, StreamPosition.Beginning);
db.StreamReadGroup(key, group, consumer1, count: 1);
db.StreamReadGroup(key, group, consumer2);
......@@ -1046,7 +1046,7 @@ public void StreamPendingNoMessagesOrConsumers()
var id = db.StreamAdd(key, "field1", "value1");
db.StreamDelete(key, new RedisValue[] { id });
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
db.StreamCreateConsumerGroup(key, groupName, "0-0");
var pendingInfo = db.StreamPending(key, "test_group");
......@@ -1061,38 +1061,38 @@ public void StreamPendingNoMessagesOrConsumers()
[Fact]
public void StreamPositionDefaultValueIsBeginning()
{
Position position = default(Position);
RedisValue position = StreamPosition.Beginning;
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREAD));
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREADGROUP));
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XGROUP));
Assert.Equal(StreamConstants.ReadMinValue, StreamPosition.Resolve(position, RedisCommand.XREAD));
Assert.Equal(StreamConstants.ReadMinValue, StreamPosition.Resolve(position, RedisCommand.XREADGROUP));
Assert.Equal(StreamConstants.ReadMinValue, StreamPosition.Resolve(position, RedisCommand.XGROUP));
}
[Fact]
public void StreamPositionValidateBeginning()
{
var position = Position.Beginning;
var position = StreamPosition.Beginning;
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREAD));
Assert.Equal(StreamConstants.ReadMinValue, StreamPosition.Resolve(position, RedisCommand.XREAD));
}
[Fact]
public void StreamPositionValidateExplicit()
{
var explicitValue = "1-0";
var position = new Position(explicitValue);
var position = explicitValue;
Assert.Equal(explicitValue, position.ResolveForCommand(RedisCommand.XREAD));
Assert.Equal(explicitValue, StreamPosition.Resolve(position, RedisCommand.XREAD));
}
[Fact]
public void StreamPositionValidateNew()
{
var position = Position.New;
var position = StreamPosition.NewMessages;
Assert.Equal(StreamConstants.NewMessages, position.ResolveForCommand(RedisCommand.XGROUP));
Assert.Equal(StreamConstants.UndeliveredMessages, position.ResolveForCommand(RedisCommand.XREADGROUP));
Assert.ThrowsAny<InvalidOperationException>(() => position.ResolveForCommand(RedisCommand.XREAD));
Assert.Equal(StreamConstants.NewMessages, StreamPosition.Resolve(position, RedisCommand.XGROUP));
Assert.Equal(StreamConstants.UndeliveredMessages, StreamPosition.Resolve(position, RedisCommand.XREADGROUP));
Assert.ThrowsAny<InvalidOperationException>(() => StreamPosition.Resolve(position, RedisCommand.XREAD));
}
[Fact]
......@@ -1111,7 +1111,7 @@ public void StreamRead()
var id3 = db.StreamAdd(key, "field3", "value3");
// Read the entire stream from the beginning.
var entries = db.StreamRead(key, new Position("0-0"));
var entries = db.StreamRead(key, "0-0");
Assert.True(entries.Length == 3);
Assert.Equal(id1, entries[0].Id);
......@@ -1139,7 +1139,7 @@ public void StreamReadEmptyStream()
var len = db.StreamLength(key);
// Read the entire stream from the beginning.
var entries = db.StreamRead(key, new Position("0-0"));
var entries = db.StreamRead(key, "0-0");
Assert.True(entries.Length == 0);
Assert.Equal(0, len);
......@@ -1170,8 +1170,8 @@ public void StreamReadEmptyStreams()
var len2 = db.StreamLength(key2);
// Read the entire stream from the beginning.
var entries1 = db.StreamRead(key1, new Position("0-0"));
var entries2 = db.StreamRead(key2, new Position("0-0"));
var entries1 = db.StreamRead(key1, "0-0");
var entries2 = db.StreamRead(key2, "0-0");
Assert.True(entries1.Length == 0);
Assert.True(entries2.Length == 0);
......@@ -1190,8 +1190,8 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream()
var streamPositions = new StreamPosition[]
{
new StreamPosition("key1", new Position("0-0")),
new StreamPosition("key2", new Position("0-0"))
new StreamPosition("key1", "0-0"),
new StreamPosition("key2", "0-0")
};
......@@ -1210,7 +1210,7 @@ public void StreamReadExpectedExceptionInvalidCountSingleStream()
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, new Position("0-0"), 0));
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, "0-0", 0));
}
}
......@@ -1261,8 +1261,8 @@ public void StreamReadMultipleStreams()
// Read from both streams at the same time.
var streamList = new StreamPosition[2]
{
new StreamPosition(key1, new Position("0-0")),
new StreamPosition(key2, new Position("0-0"))
new StreamPosition(key1, "0-0"),
new StreamPosition(key2, "0-0")
};
var streams = db.StreamRead(streamList);
......@@ -1300,8 +1300,8 @@ public void StreamReadMultipleStreamsWithCount()
var streamList = new StreamPosition[2]
{
new StreamPosition(key1, new Position("0-0")),
new StreamPosition(key2, new Position("0-0"))
new StreamPosition(key1, "0-0"),
new StreamPosition(key2, "0-0")
};
var streams = db.StreamRead(streamList, countPerStream: 1);
......@@ -1338,10 +1338,10 @@ public void StreamReadMultipleStreamsWithReadPastSecondStream()
var streamList = new StreamPosition[]
{
new StreamPosition(key1, new Position("0-0")),
new StreamPosition(key1, "0-0"),
// read past the end of stream # 2
new StreamPosition(key2, new Position(id4))
new StreamPosition(key2, id4)
};
var streams = db.StreamRead(streamList);
......@@ -1374,8 +1374,8 @@ public void StreamReadMultipleStreamsWithEmptyResponse()
var streamList = new StreamPosition[]
{
// Read past the end of both streams.
new StreamPosition(key1, new Position(id2)),
new StreamPosition(key2, new Position(id4))
new StreamPosition(key1, id2),
new StreamPosition(key2, id4)
};
var streams = db.StreamRead(streamList);
......@@ -1401,7 +1401,7 @@ public void StreamReadPastEndOfStream()
// Read after the final ID in the stream, we expect an empty array as a response.
var entries = db.StreamRead(key, new Position(id2));
var entries = db.StreamRead(key, id2);
Assert.True(entries.Length == 0);
}
......@@ -1533,7 +1533,7 @@ public void StreamReadWithAfterIdAndCount_1()
var id3 = db.StreamAdd(key, "field3", "value3");
// Only read a single item from the stream.
var entries = db.StreamRead(key, new Position(id1), 1);
var entries = db.StreamRead(key, id1, 1);
Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id);
......@@ -1557,7 +1557,7 @@ public void StreamReadWithAfterIdAndCount_2()
var id4 = db.StreamAdd(key, "field4", "value4");
// Read multiple items from the stream.
var entries = db.StreamRead(key, new Position(id1), 2);
var entries = db.StreamRead(key, id1, 2);
Assert.True(entries.Length == 2);
Assert.Equal(id2, entries[0].Id);
......
......@@ -796,15 +796,15 @@ public void StreamConsumerInfoGetAsync()
[Fact]
public void StreamConsumerGroupSetPositionAsync()
{
wrapper.StreamConsumerGroupSetPositionAsync("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPositionAsync("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
wrapper.StreamConsumerGroupSetPositionAsync("key", "group", StreamPosition.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPositionAsync("prefix:key", "group", StreamPosition.Beginning, CommandFlags.HighPriority));
}
[Fact]
public void StreamCreateConsumerGroupAsync()
{
wrapper.StreamCreateConsumerGroupAsync("key", "group", new Position("0-0"), CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", new Position("0-0"), CommandFlags.HighPriority));
wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority));
}
[Fact]
......@@ -882,15 +882,15 @@ public void StreamReadAsync_1()
[Fact]
public void StreamReadAsync_2()
{
wrapper.StreamReadAsync("key", new Position("0-0"), null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync("prefix:key", new Position("0-0"), null, CommandFlags.HighPriority));
wrapper.StreamReadAsync("key", "0-0", null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync("prefix:key", "0-0", null, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadGroupAsync_1()
{
wrapper.StreamReadGroupAsync("key", "group", "consumer", Position.Beginning, 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", Position.Beginning, 10, CommandFlags.HighPriority));
wrapper.StreamReadGroupAsync("key", "group", "consumer", StreamPosition.Beginning, 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", StreamPosition.Beginning, 10, CommandFlags.HighPriority));
}
[Fact]
......
......@@ -1472,7 +1472,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None);
bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
......@@ -1489,11 +1489,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="Position.New"/>.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="StreamPosition.NewMessages"/>.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None);
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
......@@ -1600,7 +1600,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None);
StreamEntry[] StreamRead(RedisKey key, RedisValue position, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams.
......@@ -1619,12 +1619,12 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="Position.New"/> when null.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
......
......@@ -1382,7 +1382,7 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None);
Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
......@@ -1399,11 +1399,11 @@ public interface IDatabaseAsync : IRedisAsync
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="Position.New"/>.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="StreamPosition.NewMessages"/>.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None);
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
......@@ -1510,7 +1510,7 @@ public interface IDatabaseAsync : IRedisAsync
/// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None);
Task<StreamEntry[]> StreamReadAsync(RedisKey key, RedisValue position, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams.
......@@ -1529,12 +1529,12 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="Position.New"/> when null.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
......
......@@ -617,12 +617,12 @@ public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, R
return Inner.StreamClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamConsumerGroupSetPosition(ToInner(key), groupName, position, flags);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, position, flags);
}
......@@ -677,7 +677,7 @@ public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisVa
return Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);
}
public StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamRead(RedisKey key, RedisValue position, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRead(ToInner(key), position, count, flags);
}
......@@ -687,7 +687,7 @@ public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerS
return Inner.StreamRead(streamPositions, countPerStream, flags);
}
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, flags);
}
......
......@@ -596,12 +596,12 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return Inner.StreamClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamConsumerGroupSetPositionAsync(ToInner(key), groupName, position, flags);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, position, flags);
}
......@@ -656,7 +656,7 @@ public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = nu
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);
}
public Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadAsync(RedisKey key, RedisValue position, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadAsync(ToInner(key), position, count, flags);
}
......@@ -666,7 +666,7 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
return Inner.StreamReadAsync(streamPositions, countPerStream, flags);
}
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, flags);
}
......
using System;
namespace StackExchange.Redis
{
/// <summary>
/// A position within a stream. Defaults to <see cref="Position.New"/>.
/// </summary>
public struct Position
{
/// <summary>
/// Indicate a position from which to read a stream.
/// </summary>
/// <param name="readAfter">The position from which to read a stream.</param>
public Position(RedisValue readAfter)
{
if (readAfter == RedisValue.Null) throw new ArgumentNullException(nameof(readAfter), "readAfter cannot be RedisValue.Null.");
Kind = PositionKind.Explicit;
ExplicitValue = readAfter;
}
private Position(PositionKind kind)
{
Kind = kind;
ExplicitValue = RedisValue.Null;
}
private PositionKind Kind { get; }
private RedisValue ExplicitValue { get; }
/// <summary>
/// Read new messages.
/// </summary>
public static Position New = new Position(PositionKind.New);
/// <summary>
/// Read from the beginning of a stream.
/// </summary>
public static Position Beginning = new Position(PositionKind.Beginning);
internal RedisValue ResolveForCommand(RedisCommand command)
{
if (Kind == PositionKind.Explicit) return ExplicitValue;
if (Kind == PositionKind.Beginning) return StreamConstants.ReadMinValue;
// PositionKind.New
if (command == RedisCommand.XREAD) throw new InvalidOperationException("Position.New cannot be used with StreamRead.");
if (command == RedisCommand.XREADGROUP) return StreamConstants.UndeliveredMessages;
if (command == RedisCommand.XGROUP) return StreamConstants.NewMessages;
throw new ArgumentException($"Unsupported command in ResolveForCommand: {command}.", nameof(command));
}
}
}
......@@ -1703,7 +1703,7 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return ExecuteAsync(msg, ResultProcessor.RedisValueArray);
}
public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
......@@ -1713,13 +1713,13 @@ public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, P
StreamConstants.SetId,
key.AsRedisValue(),
groupName,
position.ResolveForCommand(RedisCommand.XGROUP)
StreamPosition.Resolve(position, RedisCommand.XGROUP)
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
......@@ -1729,15 +1729,15 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g
StreamConstants.SetId,
key.AsRedisValue(),
groupName,
position.ResolveForCommand(RedisCommand.XGROUP)
StreamPosition.Resolve(position, RedisCommand.XGROUP)
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var actualPosition = position ?? StreamConstants.NewMessages;
var msg = Message.Create(Database,
flags,
......@@ -1747,15 +1747,15 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Positi
StreamConstants.Create,
key.AsRedisValue(),
groupName,
actualPosition.ResolveForCommand(RedisCommand.XGROUP)
StreamPosition.Resolve(actualPosition, RedisCommand.XGROUP)
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var actualPosition = position ?? StreamPosition.NewMessages;
var msg = Message.Create(Database,
flags,
......@@ -1765,7 +1765,7 @@ public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN
StreamConstants.Create,
key.AsRedisValue(),
groupName,
actualPosition.ResolveForCommand(RedisCommand.XGROUP)
StreamPosition.Resolve(actualPosition, RedisCommand.XGROUP)
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
......@@ -1983,20 +1983,20 @@ public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = nu
return ExecuteAsync(msg, ResultProcessor.SingleStream);
}
public StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamRead(RedisKey key, RedisValue position, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetSingleStreamReadMessage(key,
position.ResolveForCommand(RedisCommand.XREAD),
StreamPosition.Resolve(position, RedisCommand.XREAD),
count,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadAsync(RedisKey key, RedisValue position, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetSingleStreamReadMessage(key,
position.ResolveForCommand(RedisCommand.XREAD),
StreamPosition.Resolve(position, RedisCommand.XREAD),
count,
flags);
......@@ -2015,28 +2015,28 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
return ExecuteAsync(msg, ResultProcessor.MultiStream);
}
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var actualPosition = position ?? StreamPosition.NewMessages;
var msg = GetStreamReadGroupMessage(key,
groupName,
consumerName,
actualPosition.ResolveForCommand(RedisCommand.XREADGROUP),
StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP),
count,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var actualPosition = position ?? StreamPosition.NewMessages;
var msg = GetStreamReadGroupMessage(key,
groupName,
consumerName,
actualPosition.ResolveForCommand(RedisCommand.XREADGROUP),
StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP),
count,
flags);
......@@ -2459,7 +2459,7 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions,
for (var i = 0; i < pairCount; i++)
{
values[offset] = streamPositions[i].Key.AsRedisValue();
values[offset + pairCount] = streamPositions[i].Position.ResolveForCommand(RedisCommand.XREADGROUP);
values[offset + pairCount] = StreamPosition.Resolve(streamPositions[i].Position, RedisCommand.XREADGROUP);
offset++;
}
......@@ -2515,7 +2515,7 @@ private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int?
for (var i = 0; i < pairCount; i++)
{
values[offset] = streamPositions[i].Key.AsRedisValue();
values[offset + pairCount] = streamPositions[i].Position.ResolveForCommand(RedisCommand.XREAD);
values[offset + pairCount] = StreamPosition.Resolve(streamPositions[i].Position, RedisCommand.XREAD);
offset++;
}
......
namespace StackExchange.Redis
using System;
namespace StackExchange.Redis
{
/// <summary>
/// Describes a pair consisting of the Stream Key and the <see cref="Position"/> from which to begin reading a stream.
/// </summary>
public struct StreamPosition
{
/// <summary>
/// Read from the beginning of a stream.
/// </summary>
public static RedisValue Beginning => StreamConstants.ReadMinValue;
/// <summary>
/// Read new messages.
/// </summary>
public static RedisValue NewMessages => StreamConstants.NewMessages;
/// <summary>
/// Initializes a <see cref="StreamPosition"/> value.
/// </summary>
/// <param name="key">The key for the stream.</param>
/// <param name="position">The position from which to begin reading the stream.</param>
public StreamPosition(RedisKey key, Position position)
public StreamPosition(RedisKey key, RedisValue position)
{
Key = key;
Position = position;
......@@ -24,6 +36,22 @@ public StreamPosition(RedisKey key, Position position)
/// <summary>
/// The offset at which to begin reading the stream.
/// </summary>
public Position Position { get; }
public RedisValue Position { get; }
internal static RedisValue Resolve(RedisValue value, RedisCommand command)
{
if (value == NewMessages)
{
switch (command)
{
case RedisCommand.XREAD: throw new InvalidOperationException("StreamPosition.NewMessages cannot be used with StreamRead.");
case RedisCommand.XREADGROUP: return StreamConstants.UndeliveredMessages;
case RedisCommand.XGROUP: return StreamConstants.NewMessages;
default: // new is only valid for the above
throw new ArgumentException($"Unsupported command in StreamPosition.Resolve: {command}.", nameof(command));
}
}
return value;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment