Commit ea85ae80 authored by Todd Tingen's avatar Todd Tingen Committed by Nick Craver

Eliminate the need for Stream magic strings & added additional XGROUP… (#878)

* Eliminate the need for Stream magic strings & added additional XGROUP subcommands.

- Replaced the need for magic strings with Range, ReadOffset, and GroupReadOffset structs.
- Condensed some of the method signatures due to the new structs.
- Added methods & tests for two of the XGROUP subcommands (DESTROY & DELCONSUMER).

* Updated tests with their original IDs used.

* Use a single struct to represent stream position for Group, Read, and ReadGroup commands.

- Consolidated GroupCreateOptions, GroupReadOffset, and ReadOffset into a single struct, Position.
- Removed the "Pair" structs and created StreamPosition, this will be used in the multi-stream read commands.
- Renamed RedisStreamEntry to StreamEntry.
- Also added the remaining subcommand for the XGROUP command (SETID) and an associated unit test.

* File cleanup, delete consolidated structs.

* Additional changes and unit tests for the Position struct.

* Update PositionKind enum with explicit zero value.
parent 1ea6c217
......@@ -828,6 +828,13 @@ public void StreamClaimMessagesReturningIds()
mock.Verify(_ => _.StreamClaimIdsOnly("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamConsumerGroupSetPosition()
{
wrapper.StreamConsumerGroupSetPosition("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPosition("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
}
[Fact]
public void StreamConsumerInfoGet()
{
......@@ -838,8 +845,8 @@ public void StreamConsumerInfoGet()
[Fact]
public void StreamCreateConsumerGroup()
{
wrapper.StreamCreateConsumerGroup("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", "0-0", CommandFlags.HighPriority));
wrapper.StreamCreateConsumerGroup("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
}
[Fact]
......@@ -871,6 +878,20 @@ public void StreamMessagesDelete()
mock.Verify(_ => _.StreamDelete("prefix:key", messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamDeleteConsumer()
{
wrapper.StreamDeleteConsumer("key", "group", "consumer", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumer("prefix:key", "group", "consumer", CommandFlags.HighPriority));
}
[Fact]
public void StreamDeleteConsumerGroup()
{
wrapper.StreamDeleteConsumerGroup("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumerGroup("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamPendingInfoGet()
{
......@@ -881,37 +902,45 @@ public void StreamPendingInfoGet()
[Fact]
public void StreamPendingMessageInfoGet()
{
wrapper.StreamPendingMessages("key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority));
wrapper.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority));
}
[Fact]
public void StreamRange()
{
wrapper.StreamRange("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRange("prefix:key", "-", "+",null, Order.Ascending, CommandFlags.HighPriority));
mock.Verify(_ => _.StreamRange("prefix:key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority));
}
[Fact]
public void StreamRead_1()
{
var keysAndIds = new StreamIdPair[0] { };
wrapper.StreamRead(keysAndIds, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead(keysAndIds, null, CommandFlags.HighPriority));
var streamPositions = new StreamPosition[0] { };
wrapper.StreamRead(streamPositions, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead(streamPositions, null, CommandFlags.HighPriority));
}
[Fact]
public void StreamRead_2()
{
wrapper.StreamRead("key", "0-0", null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead("prefix:key", "0-0", null, CommandFlags.HighPriority));
wrapper.StreamRead("key", new Position("0-0"), null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead("prefix:key", new Position("0-0"), null, CommandFlags.HighPriority));
}
[Fact]
public void StreamStreamReadGroup_1()
{
wrapper.StreamReadGroup("key", "group", "consumer", new Position("0-0"), 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", new Position("0-0"), 10, CommandFlags.HighPriority));
}
[Fact]
public void StreamStreamReadGroup()
public void StreamStreamReadGroup_2()
{
wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority));
var streamPositions = new StreamPosition[0] { };
wrapper.StreamReadGroup(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority));
}
[Fact]
......
......@@ -117,6 +117,41 @@ public void StreamAddMultipleValuePairsWithManualId()
}
}
[Fact]
public void StreamConsumerGroupSetId()
{
var key = GetUniqueKey("group_set_id");
var groupName = "test_group";
var consumer = "consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Create a stream
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "field2", "value2");
// Create a group and set the position to deliver new messages only.
db.StreamCreateConsumerGroup(key, groupName, Position.New);
// Read into the group, expect nothing
var firstRead = db.StreamReadGroup(key, groupName, consumer, Position.New);
// Reset the ID back to read from the beginning.
db.StreamConsumerGroupSetPosition(key, groupName, Position.Beginning);
var secondRead = db.StreamReadGroup(key, groupName, consumer, Position.New);
Assert.NotNull(firstRead);
Assert.NotNull(secondRead);
Assert.True(firstRead.Length == 0);
Assert.True(secondRead.Length == 2);
}
}
[Fact]
public void StreamConsumerGroupWithNoConsumers()
{
......@@ -133,7 +168,7 @@ public void StreamConsumerGroupWithNoConsumers()
db.StreamAdd(key, "field1", "value1");
// Create a group
db.StreamCreateConsumerGroup(key, groupName, "0-0");
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
// Query redis for the group consumers, expect an empty list in response.
var consumers = db.StreamConsumerInfo(key, groupName);
......@@ -158,7 +193,7 @@ public void StreamCreateConsumerGroup()
db.StreamAdd(key, "field1", "value1");
// Create a group
var result = db.StreamCreateConsumerGroup(key, groupName, "-");
var result = db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
Assert.True(result);
}
......@@ -184,7 +219,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
db.StreamCreateConsumerGroup(key, groupName);
// Read, expect no messages
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0");
var entries = db.StreamReadGroup(key, groupName, "test_consumer", new Position("0-0"));
Assert.True(entries.Length == 0);
}
......@@ -205,9 +240,9 @@ public void StreamConsumerGroupReadFromStreamBeginning()
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0");
var entries = db.StreamReadGroup(key, groupName, "test_consumer", new Position("0-0"));
Assert.True(entries.Length == 2);
Assert.True(id1 == entries[0].Id);
......@@ -232,10 +267,10 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Start reading after id1.
var entries = db.StreamReadGroup(key, groupName, "test_consumer", id1, 2);
db.StreamCreateConsumerGroup(key, groupName, new Position(id1));
var entries = db.StreamReadGroup(key, groupName, "test_consumer", Position.New, 2);
// Ensure we only received the requested count and that the IDs match the expected values.
Assert.True(entries.Length == 2);
......@@ -262,10 +297,10 @@ public void StreamConsumerGroupAcknowledgeMessage()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName, consumer, "0-0");
var entries = db.StreamReadGroup(key, groupName, consumer, new Position("0-0"));
// Send XACK for 3 of the messages
......@@ -276,7 +311,7 @@ public void StreamConsumerGroupAcknowledgeMessage()
var twoAck = db.StreamAcknowledge(key, groupName, new RedisValue[] { id3, id4 });
// Read the group again, it should only return the unacknowledged message.
var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0");
var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, new Position("0-0"));
Assert.True(entries.Length == 4);
Assert.Equal(1, oneAck);
......@@ -305,7 +340,7 @@ public void StreamConsumerGroupClaimMessages()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "0-0");
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
// Read a single message into the first consumer.
db.StreamReadGroup(key, groupName, consumer1, count: 1);
......@@ -356,10 +391,10 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1);
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, Position.Beginning, 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
......@@ -387,6 +422,175 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
}
[Fact]
public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
{
// Create a group for each stream. One set to read from the beginning of the
// stream and the other to begin reading only new messages.
// Ask redis to read from the beginning of both stream, expect messages
// for only the stream set to read from the beginning.
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream1, "field1-2", "value1-2");
db.StreamAdd(stream2, "field2-1", "value2-1");
db.StreamAdd(stream2, "field2-2", "value2-2");
db.StreamAdd(stream2, "field2-3", "value2-3");
// stream1 set up to read only new messages.
db.StreamCreateConsumerGroup(stream1, groupName);
// stream2 set up to read from the beginning of the stream
db.StreamCreateConsumerGroup(stream2, groupName, Position.Beginning);
// Read for both streams from the beginning. We shouldn't get anything back for stream1.
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.Beginning),
new StreamPosition(stream2, Position.Beginning)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 0);
Assert.True(streams[1].Entries.Length == 3);
}
}
[Fact]
public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
{
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream2, "field2-1", "value2-1");
// set both streams to read only new messages (default behavior).
db.StreamCreateConsumerGroup(stream1, groupName);
db.StreamCreateConsumerGroup(stream2, groupName);
// We shouldn't get anything for either stream.
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.Beginning),
new StreamPosition(stream2, Position.Beginning)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 0);
Assert.True(streams[1].Entries.Length == 0);
}
}
[Fact]
public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
{
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// These messages won't be read.
db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream2, "field2-1", "value2-1");
// set both streams to read only new messages (default behavior).
db.StreamCreateConsumerGroup(stream1, groupName);
db.StreamCreateConsumerGroup(stream2, groupName);
// We should read these though.
var id1 = db.StreamAdd(stream1, "field1-2", "value1-2");
var id2 = db.StreamAdd(stream2, "field2-2", "value2-2");
// Read the new messages (messages created after the group was created).
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.New),
new StreamPosition(stream2, Position.New)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 1);
Assert.True(streams[1].Entries.Length == 1);
Assert.Equal(id1, streams[0].Entries[0].Id);
Assert.Equal(id2, streams[1].Entries[0].Id);
}
}
[Fact]
public void StreamConsumerGroupReadMultipleRestrictCount()
{
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1_1 = db.StreamAdd(stream1, "field1-1", "value1-1");
var id1_2 = db.StreamAdd(stream1, "field1-2", "value1-2");
var id2_1 = db.StreamAdd(stream2, "field2-1", "value2-1");
var id2_2 = db.StreamAdd(stream2, "field2-2", "value2-2");
var id2_3 = db.StreamAdd(stream2, "field2-3", "value2-3");
// Allow reading from the beginning in both streams
db.StreamCreateConsumerGroup(stream1, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(stream2, groupName, Position.Beginning);
var pairs = new StreamPosition[]
{
// Read after the first id in both streams
new StreamPosition(stream1, new Position(id1_1)),
new StreamPosition(stream2, new Position(id2_1))
};
// Restrict the count to 2 (expect only 1 message from first stream, 2 from the second).
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer", 2);
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 1);
Assert.True(streams[1].Entries.Length == 2);
Assert.Equal(id1_2, streams[0].Entries[0].Id);
}
}
[Fact]
public void StreamConsumerGroupViewPendingInfoNoConsumers()
{
......@@ -401,7 +605,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers()
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
var pendingInfo = db.StreamPending(key, groupName);
......@@ -427,7 +631,7 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, "0-0");
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
var pendingMessages = db.StreamPendingMessages(key,
groupName,
......@@ -458,10 +662,10 @@ public void StreamConsumerGroupViewPendingInfoSummary()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1);
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, Position.Beginning, 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
......@@ -500,7 +704,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
......@@ -541,7 +745,7 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
......@@ -560,6 +764,74 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
}
}
[Fact]
public void StreamDeleteConsumer()
{
var key = GetUniqueKey("delete_consumer_group");
var groupName = "test_group";
var consumer = "test_consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add a message to create the stream.
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "fiedl2", "value2");
// Create a consumer group and read the message.
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamReadGroup(key, groupName, consumer, Position.Beginning);
var preDeleteConsumers = db.StreamConsumerInfo(key, groupName);
// Delete the consumer.
var deleteResult = db.StreamDeleteConsumer(key, groupName, consumer);
// Should get 2 messages in the deleteResult.
var postDeleteConsumers = db.StreamConsumerInfo(key, groupName);
Assert.Equal(2, deleteResult);
Assert.True(preDeleteConsumers.Length == 1);
Assert.True(postDeleteConsumers.Length == 0);
}
}
[Fact]
public void StreamDeleteConsumerGroup()
{
var key = GetUniqueKey("delete_consumer_group");
var groupName = "test_group";
var consumer = "test_consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add a message to create the stream.
db.StreamAdd(key, "field1", "value1");
// Create a consumer group and read the messages.
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamReadGroup(key, groupName, consumer, Position.Beginning);
var preDeleteInfo = db.StreamInfo(key);
// Now delete the group.
var deleteResult = db.StreamDeleteConsumerGroup(key, groupName);
var postDeleteInfo = db.StreamInfo(key);
Assert.True(deleteResult);
Assert.True(preDeleteInfo.ConsumerGroupCount == 1);
Assert.True(postDeleteInfo.ConsumerGroupCount == 0);
}
}
[Fact]
public void StreamDeleteMessage()
{
......@@ -577,7 +849,7 @@ public void StreamDeleteMessage()
var id4 = db.StreamAdd(key, "field4", "value4");
var deletedCount = db.StreamDelete(key, new RedisValue[] { id3 });
var messages = db.StreamRange(key, "-", "+");
var messages = db.StreamRange(key);
Assert.Equal(1, deletedCount);
Assert.Equal(3, messages.Length);
......@@ -601,7 +873,7 @@ public void StreamDeleteMessages()
var id4 = db.StreamAdd(key, "field4", "value4");
var deletedCount = db.StreamDelete(key, new RedisValue[] { id2, id3 }, CommandFlags.None);
var messages = db.StreamRange(key, "-", "+");
var messages = db.StreamRange(key);
Assert.Equal(2, deletedCount);
Assert.Equal(2, messages.Length);
......@@ -628,8 +900,8 @@ public void StreamGroupInfoGet()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group1, "-");
db.StreamCreateConsumerGroup(key, group2, "-");
db.StreamCreateConsumerGroup(key, group1, Position.Beginning);
db.StreamCreateConsumerGroup(key, group2, Position.Beginning);
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, group1, consumer1, count: 1);
......@@ -669,7 +941,7 @@ public void StreamGroupConsumerInfoGet()
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group, "-");
db.StreamCreateConsumerGroup(key, group, Position.Beginning);
db.StreamReadGroup(key, group, consumer1, count: 1);
db.StreamReadGroup(key, group, consumer2);
......@@ -774,7 +1046,7 @@ public void StreamPendingNoMessagesOrConsumers()
var id = db.StreamAdd(key, "field1", "value1");
db.StreamDelete(key, new RedisValue[] { id });
db.StreamCreateConsumerGroup(key, groupName, "0-0");
db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
var pendingInfo = db.StreamPending(key, "test_group");
......@@ -786,6 +1058,43 @@ public void StreamPendingNoMessagesOrConsumers()
}
}
[Fact]
public void StreamPositionDefaultValueIsBeginning()
{
Position position = default(Position);
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREAD));
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREADGROUP));
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XGROUP));
}
[Fact]
public void StreamPositionValidateBeginning()
{
var position = Position.Beginning;
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREAD));
}
[Fact]
public void StreamPositionValidateExplicit()
{
var explicitValue = "1-0";
var position = new Position(explicitValue);
Assert.Equal(explicitValue, position.ResolveForCommand(RedisCommand.XREAD));
}
[Fact]
public void StreamPositionValidateNew()
{
var position = Position.New;
Assert.Equal(StreamConstants.NewMessages, position.ResolveForCommand(RedisCommand.XGROUP));
Assert.Equal(StreamConstants.UndeliveredMessages, position.ResolveForCommand(RedisCommand.XREADGROUP));
Assert.ThrowsAny<InvalidOperationException>(() => position.ResolveForCommand(RedisCommand.XREAD));
}
[Fact]
public void StreamRead()
{
......@@ -802,7 +1111,7 @@ public void StreamRead()
var id3 = db.StreamAdd(key, "field3", "value3");
// Read the entire stream from the beginning.
var entries = db.StreamRead(key, "0-0");
var entries = db.StreamRead(key, new Position("0-0"));
Assert.True(entries.Length == 3);
Assert.Equal(id1, entries[0].Id);
......@@ -830,7 +1139,7 @@ public void StreamReadEmptyStream()
var len = db.StreamLength(key);
// Read the entire stream from the beginning.
var entries = db.StreamRead(key, "0-0");
var entries = db.StreamRead(key, new Position("0-0"));
Assert.True(entries.Length == 0);
Assert.Equal(0, len);
......@@ -861,8 +1170,8 @@ public void StreamReadEmptyStreams()
var len2 = db.StreamLength(key2);
// Read the entire stream from the beginning.
var entries1 = db.StreamRead(key1, "0-0");
var entries2 = db.StreamRead(key2, "0-0");
var entries1 = db.StreamRead(key1, new Position("0-0"));
var entries2 = db.StreamRead(key2, new Position("0-0"));
Assert.True(entries1.Length == 0);
Assert.True(entries2.Length == 0);
......@@ -879,15 +1188,15 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream()
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var streamPairs = new StreamIdPair[]
var streamPositions = new StreamPosition[]
{
new StreamIdPair("key1", "0-0"),
new StreamIdPair("key2", "0-0")
new StreamPosition("key1", new Position("0-0")),
new StreamPosition("key2", new Position("0-0"))
};
var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPairs, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPositions, 0));
}
}
......@@ -901,7 +1210,7 @@ public void StreamReadExpectedExceptionInvalidCountSingleStream()
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, "0-0", 0));
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, new Position("0-0"), 0));
}
}
......@@ -926,7 +1235,7 @@ public void StreamReadExpectedExceptionEmptyStreamList()
var db = conn.GetDatabase();
var emptyList = new StreamIdPair[0];
var emptyList = new StreamPosition[0];
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(emptyList));
}
......@@ -950,10 +1259,10 @@ public void StreamReadMultipleStreams()
var id4 = db.StreamAdd(key2, "field4", "value4");
// Read from both streams at the same time.
var streamList = new StreamIdPair[2]
var streamList = new StreamPosition[2]
{
new StreamIdPair(key1, "0-0"),
new StreamIdPair(key2, "0-0")
new StreamPosition(key1, new Position("0-0")),
new StreamPosition(key2, new Position("0-0"))
};
var streams = db.StreamRead(streamList);
......@@ -989,10 +1298,10 @@ public void StreamReadMultipleStreamsWithCount()
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[2]
var streamList = new StreamPosition[2]
{
new StreamIdPair(key1, "0-0"),
new StreamIdPair(key2, "0-0")
new StreamPosition(key1, new Position("0-0")),
new StreamPosition(key2, new Position("0-0"))
};
var streams = db.StreamRead(streamList, countPerStream: 1);
......@@ -1027,12 +1336,12 @@ public void StreamReadMultipleStreamsWithReadPastSecondStream()
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[2]
var streamList = new StreamPosition[]
{
new StreamIdPair(key1, "0-0"),
new StreamPosition(key1, new Position("0-0")),
// read past the end of stream # 2
new StreamIdPair(key2, id4)
new StreamPosition(key2, new Position(id4))
};
var streams = db.StreamRead(streamList);
......@@ -1062,11 +1371,11 @@ public void StreamReadMultipleStreamsWithEmptyResponse()
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[]
var streamList = new StreamPosition[]
{
// Read past the end of both streams.
new StreamIdPair(key1, id2),
new StreamIdPair(key2, id4)
new StreamPosition(key1, new Position(id2)),
new StreamPosition(key2, new Position(id4))
};
var streams = db.StreamRead(streamList);
......@@ -1092,7 +1401,7 @@ public void StreamReadPastEndOfStream()
// Read after the final ID in the stream, we expect an empty array as a response.
var entries = db.StreamRead(key, id2);
var entries = db.StreamRead(key, new Position(id2));
Assert.True(entries.Length == 0);
}
......@@ -1136,7 +1445,7 @@ public void StreamReadRangeOfEmptyStream()
var deleted = db.StreamDelete(key, new RedisValue[] { id1, id2 });
var entries = db.StreamRange(key, "-", "+");
var entries = db.StreamRange(key);
Assert.Equal(2, deleted);
Assert.NotNull(entries);
......@@ -1201,7 +1510,7 @@ public void StreamReadRangeReverseWithCount()
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key, count: 1, messageOrder: Order.Descending);
var entries = db.StreamRange(key, id1, id2, 1, Order.Descending);
Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id);
......@@ -1224,7 +1533,7 @@ public void StreamReadWithAfterIdAndCount_1()
var id3 = db.StreamAdd(key, "field3", "value3");
// Only read a single item from the stream.
var entries = db.StreamRead(key, id1, 1);
var entries = db.StreamRead(key, new Position(id1), 1);
Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id);
......@@ -1248,7 +1557,7 @@ public void StreamReadWithAfterIdAndCount_2()
var id4 = db.StreamAdd(key, "field4", "value4");
// Read multiple items from the stream.
var entries = db.StreamRead(key, id1, 2);
var entries = db.StreamRead(key, new Position(id1), 2);
Assert.True(entries.Length == 2);
Assert.Equal(id2, entries[0].Id);
......
......@@ -793,11 +793,18 @@ public void StreamConsumerInfoGetAsync()
mock.Verify(_ => _.StreamConsumerInfoAsync("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamConsumerGroupSetPositionAsync()
{
wrapper.StreamConsumerGroupSetPositionAsync("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPositionAsync("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
}
[Fact]
public void StreamCreateConsumerGroupAsync()
{
wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority));
wrapper.StreamCreateConsumerGroupAsync("key", "group", new Position("0-0"), CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", new Position("0-0"), CommandFlags.HighPriority));
}
[Fact]
......@@ -829,6 +836,20 @@ public void StreamMessagesDeleteAsync()
mock.Verify(_ => _.StreamDeleteAsync("prefix:key", messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamDeleteConsumerAsync()
{
wrapper.StreamDeleteConsumerAsync("key", "group", "consumer", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumerAsync("prefix:key", "group", "consumer", CommandFlags.HighPriority));
}
[Fact]
public void StreamDeleteConsumerGroupAsync()
{
wrapper.StreamDeleteConsumerGroupAsync("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumerGroupAsync("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamPendingInfoGetAsync()
{
......@@ -853,23 +874,31 @@ public void StreamRangeAsync()
[Fact]
public void StreamReadAsync_1()
{
var keysAndIds = new StreamIdPair[0] { };
wrapper.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority));
var streamPositions = new StreamPosition[0] { };
wrapper.StreamReadAsync(streamPositions, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync(streamPositions, null, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadAsync_2()
{
wrapper.StreamReadAsync("key", "0-0", null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync("prefix:key", "0-0", null, CommandFlags.HighPriority));
wrapper.StreamReadAsync("key", new Position("0-0"), null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync("prefix:key", new Position("0-0"), null, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadGroupAsync_1()
{
wrapper.StreamReadGroupAsync("key", "group", "consumer", Position.Beginning, 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", Position.Beginning, 10, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadGroupAsync()
public void StreamStreamReadGroupAsync_2()
{
wrapper.StreamReadGroupAsync("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority));
var streamPositions = new StreamPosition[0] { };
wrapper.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority));
}
[Fact]
......
namespace StackExchange.Redis
{
internal enum PositionKind
{
Beginning = 0,
Explicit = 1,
New = 2
}
}
......@@ -1449,7 +1449,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s).
......@@ -1464,6 +1464,16 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks>
RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Set the position from which to read a stream for a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// </summary>
......@@ -1479,11 +1489,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="readFrom">The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="Position.New"/>.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None);
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
......@@ -1495,6 +1505,25 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks>
long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer from a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The name of the consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages that were pending for the deleted consumer.</returns>
long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if deleted, otherwise false.</returns>
bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key".
/// </summary>
......@@ -1523,7 +1552,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// View information about pending messages for a stream. A pending message is a message read using StreamReadGroup (XREADGROUP) but not yet acknowledged.
/// View information about pending messages for a stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group</param>
......@@ -1557,45 +1586,59 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xrange</remarks>
RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from a single stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="afterId">The position from within the stream to begin reading.</param>
/// <param name="position">The position from which to read the stream.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None);
StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams.
/// </summary>
/// <param name="streamIdPairs">The list of streams and the ID from which to begin reading for each stream.</param>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read messages from a stream and an associated consumer group.
/// Read messages from a stream into an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="readFromId">The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="Position.New"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None);
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Trim the stream to a specified maximum length.
......
......@@ -1359,7 +1359,7 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s).
......@@ -1374,6 +1374,16 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Set the position from which to read a stream for a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// </summary>
......@@ -1389,11 +1399,11 @@ public interface IDatabaseAsync : IRedisAsync
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="readFrom">The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="Position.New"/>.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None);
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
......@@ -1405,6 +1415,25 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer from a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The name of the consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages that were pending for the deleted consumer.</returns>
Task<long> StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if deleted, otherwise false.</returns>
Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key".
/// </summary>
......@@ -1467,45 +1496,59 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xrange</remarks>
Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from a single stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="afterId">The message ID from within the stream to begin reading.</param>
/// <param name="position">The position from which to read the stream.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None);
Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams.
/// </summary>
/// <param name="streamIdPairs">The list of streams and the ID from which to begin reading for each stream.</param>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read messages from a stream and an associated consumer group.
/// Read messages from a stream into an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="readFromId">The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="Position.New"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None);
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Trim the stream to a specified maximum length.
......
......@@ -607,7 +607,7 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal
return Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
}
public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
......@@ -617,9 +617,14 @@ public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, R
return Inner.StreamClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, readFrom, flags);
return Inner.StreamConsumerGroupSetPosition(ToInner(key), groupName, position, flags);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, position, flags);
}
public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
......@@ -647,6 +652,16 @@ public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags fla
return Inner.StreamDelete(ToInner(key), messageIds, flags);
}
public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumer(ToInner(key), groupName, consumerName, flags);
}
public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumerGroup(ToInner(key), groupName, flags);
}
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamPending(ToInner(key), groupName, flags);
......@@ -657,24 +672,29 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
return Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
}
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);
}
public StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRange(ToInner(key), minId, maxId, count, order, flags);
return Inner.StreamRead(ToInner(key), position, count, flags);
}
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRead(ToInner(key), afterId, count, flags);
return Inner.StreamRead(streamPositions, countPerStream, flags);
}
public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRead(streamIdPairs, countPerStream, flags);
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, flags);
}
public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, readFromId, count, flags);
return Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags);
}
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
......
......@@ -586,7 +586,7 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPair
return Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
}
public Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
......@@ -596,9 +596,14 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return Inner.StreamClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, readFrom, flags);
return Inner.StreamConsumerGroupSetPositionAsync(ToInner(key), groupName, position, flags);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, position, flags);
}
public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
......@@ -626,6 +631,16 @@ public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, Comma
return Inner.StreamDeleteAsync(ToInner(key), messageIds, flags);
}
public Task<long> StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumerAsync(ToInner(key), groupName, consumerName, flags);
}
public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumerGroupAsync(ToInner(key), groupName, flags);
}
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamPendingAsync(ToInner(key), groupName, flags);
......@@ -636,24 +651,29 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
return Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
}
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);
}
public Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, order, flags);
return Inner.StreamReadAsync(ToInner(key), position, count, flags);
}
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadAsync(ToInner(key), afterId, count, flags);
return Inner.StreamReadAsync(streamPositions, countPerStream, flags);
}
public Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadAsync(streamIdPairs, countPerStream, flags);
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, flags);
}
public Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, readFromId, count, flags);
return Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags);
}
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
......
using System;
namespace StackExchange.Redis
{
/// <summary>
/// A position within a stream. Defaults to <see cref="Position.New"/>.
/// </summary>
public struct Position
{
/// <summary>
/// Indicate a position from which to read a stream.
/// </summary>
/// <param name="readAfter">The position from which to read a stream.</param>
public Position(RedisValue readAfter)
{
if (readAfter == RedisValue.Null) throw new ArgumentNullException(nameof(readAfter), "readAfter cannot be RedisValue.Null.");
Kind = PositionKind.Explicit;
ExplicitValue = readAfter;
}
private Position(PositionKind kind)
{
Kind = kind;
ExplicitValue = RedisValue.Null;
}
private PositionKind Kind { get; }
private RedisValue ExplicitValue { get; }
/// <summary>
/// Read new messages.
/// </summary>
public static Position New = new Position(PositionKind.New);
/// <summary>
/// Read from the beginning of a stream.
/// </summary>
public static Position Beginning = new Position(PositionKind.Beginning);
internal RedisValue ResolveForCommand(RedisCommand command)
{
if (Kind == PositionKind.Explicit) return ExplicitValue;
if (Kind == PositionKind.Beginning) return StreamConstants.ReadMinValue;
// PositionKind.New
if (command == RedisCommand.XREAD) throw new InvalidOperationException("Position.New cannot be used with StreamRead.");
if (command == RedisCommand.XREADGROUP) return StreamConstants.UndeliveredMessages;
if (command == RedisCommand.XGROUP) return StreamConstants.NewMessages;
throw new ArgumentException($"Unsupported command in ResolveForCommand: {command}.", nameof(command));
}
}
}
......@@ -1651,7 +1651,7 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPair
return ExecuteAsync(msg, ResultProcessor.RedisValue);
}
public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamClaimMessage(key,
consumerGroup,
......@@ -1664,7 +1664,7 @@ public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, Re
return ExecuteSync(msg, ResultProcessor.SingleStream);
}
public Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamClaimMessage(key,
consumerGroup,
......@@ -1703,8 +1703,42 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return ExecuteAsync(msg, ResultProcessor.RedisValueArray);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.SetId,
key.AsRedisValue(),
groupName,
position.ResolveForCommand(RedisCommand.XGROUP)
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.SetId,
key.AsRedisValue(),
groupName,
position.ResolveForCommand(RedisCommand.XGROUP)
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
......@@ -1713,14 +1747,16 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisV
StreamConstants.Create,
key.AsRedisValue(),
groupName,
readFrom ?? StreamConstants.NewMessages
actualPosition.ResolveForCommand(RedisCommand.XGROUP)
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
......@@ -1729,7 +1765,7 @@ public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN
StreamConstants.Create,
key.AsRedisValue(),
groupName,
readFrom ?? StreamConstants.NewMessages
actualPosition.ResolveForCommand(RedisCommand.XGROUP)
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
......@@ -1823,6 +1859,68 @@ public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, Comma
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.DeleteConsumer,
key.AsRedisValue(),
groupName,
consumerName
});
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.DeleteConsumer,
key.AsRedisValue(),
groupName,
consumerName
});
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.Destroy,
key.AsRedisValue(),
groupName
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.Destroy,
key.AsRedisValue(),
groupName
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
}
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName);
......@@ -1837,64 +1935,126 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags);
var msg = GetStreamPendingMessagesMessage(key,
groupName,
minId,
maxId,
count,
consumerName,
flags);
return ExecuteSync(msg, ResultProcessor.StreamPendingMessages);
}
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags);
var msg = GetStreamPendingMessagesMessage(key,
groupName,
minId,
maxId,
count,
consumerName,
flags);
return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages);
}
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags);
var msg = GetStreamRangeMessage(key,
minId,
maxId,
count,
messageOrder,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStream);
}
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags);
var msg = GetStreamRangeMessage(key,
minId,
maxId,
count,
messageOrder,
flags);
return ExecuteAsync(msg, ResultProcessor.SingleStream);
}
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetSingleStreamReadMessage(key, afterId, count, flags);
var msg = GetSingleStreamReadMessage(key,
position.ResolveForCommand(RedisCommand.XREAD),
count,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetSingleStreamReadMessage(key, afterId, count, flags);
var msg = GetSingleStreamReadMessage(key,
position.ResolveForCommand(RedisCommand.XREAD),
count,
flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags);
var msg = GetMultiStreamReadMessage(streamPositions, countPerStream, flags);
return ExecuteSync(msg, ResultProcessor.MultiStream);
}
public Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags);
var msg = GetMultiStreamReadMessage(streamPositions, countPerStream, flags);
return ExecuteAsync(msg, ResultProcessor.MultiStream);
}
public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags);
var actualPosition = position ?? Position.New;
var msg = GetStreamReadGroupMessage(key,
groupName,
consumerName,
actualPosition.ResolveForCommand(RedisCommand.XREADGROUP),
count,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags);
var actualPosition = position ?? Position.New;
var msg = GetStreamReadGroupMessage(key,
groupName,
consumerName,
actualPosition.ResolveForCommand(RedisCommand.XREADGROUP),
count,
flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadGroupMessage(streamPositions, groupName, consumerName, countPerStream, flags);
return ExecuteSync(msg, ResultProcessor.MultiStream);
}
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadGroupMessage(streamPositions, groupName, consumerName, countPerStream, flags);
return ExecuteAsync(msg, ResultProcessor.MultiStream);
}
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags);
......@@ -2264,12 +2424,55 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart)
return result;
}
private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? countPerStream, CommandFlags flags)
private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags)
{
// Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2
if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions));
if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item.");
if (countPerStream.HasValue && countPerStream <= 0)
{
throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0.");
}
var values = new RedisValue[
4 // Room for GROUP groupName consumerName & STREAMS
+ (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs.
+ (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null.
var offset = 0;
values[offset++] = StreamConstants.Group;
values[offset++] = groupName;
values[offset++] = consumerName;
if (countPerStream.HasValue)
{
values[offset++] = StreamConstants.Count;
values[offset++] = countPerStream;
}
values[offset++] = StreamConstants.Streams;
var pairCount = streamPositions.Length;
for (var i = 0; i < pairCount; i++)
{
values[offset] = streamPositions[i].Key.AsRedisValue();
values[offset + pairCount] = streamPositions[i].Position.ResolveForCommand(RedisCommand.XREADGROUP);
offset++;
}
return Message.Create(Database, flags, RedisCommand.XREADGROUP, values);
}
private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int? countPerStream, CommandFlags flags)
{
// Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
if (streamIdPairs == null) throw new ArgumentNullException(nameof(streamIdPairs));
if (streamIdPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamIdPairs), "streamAndIdPairs must contain at least one item.");
if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions));
if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item.");
if (countPerStream.HasValue && countPerStream <= 0)
{
......@@ -2278,7 +2481,7 @@ private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? cou
var values = new RedisValue[
1 // Streams keyword.
+ (streamIdPairs.Length * 2) // Room for the stream names and the ID from which to begin reading.
+ (streamPositions.Length * 2) // Room for the stream names and the ID after which to begin reading.
+ (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null.
var offset = 0;
......@@ -2307,12 +2510,12 @@ private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? cou
*
* */
var pairCount = streamIdPairs.Length;
var pairCount = streamPositions.Length;
for (var i = 0; i < pairCount; i++)
{
values[offset] = streamIdPairs[i].Key.AsRedisValue();
values[offset + pairCount] = streamIdPairs[i].Id;
values[offset] = streamPositions[i].Key.AsRedisValue();
values[offset + pairCount] = streamPositions[i].Position.ResolveForCommand(RedisCommand.XREAD);
offset++;
}
......@@ -2756,7 +2959,7 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu
values);
}
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId, int? count, CommandFlags flags)
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, CommandFlags flags)
{
// Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
if (count.HasValue && count <= 0)
......@@ -2781,7 +2984,7 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re
values[offset++] = StreamConstants.Streams;
values[offset++] = key.AsRedisValue();
values[offset] = readFromId ?? StreamConstants.UndeliveredMessages;
values[offset] = afterId;
return Message.Create(Database,
flags,
......
......@@ -5,7 +5,7 @@
/// </summary>
public struct RedisStream
{
internal RedisStream(RedisKey key, RedisStreamEntry[] entries)
internal RedisStream(RedisKey key, StreamEntry[] entries)
{
Key = key;
Entries = entries;
......@@ -19,6 +19,6 @@ internal RedisStream(RedisKey key, RedisStreamEntry[] entries)
/// <summary>
/// An arry of entries contained within the stream.
/// </summary>
public RedisStreamEntry[] Entries { get; }
public StreamEntry[] Entries { get; }
}
}
......@@ -1325,7 +1325,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}
internal sealed class SingleStreamProcessor : StreamProcessorBase<RedisStreamEntry[]>
internal sealed class SingleStreamProcessor : StreamProcessorBase<StreamEntry[]>
{
private bool skipStreamName;
......@@ -1339,7 +1339,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if (result.IsNull)
{
// Server returns 'nil' if no entries are returned for the given stream.
SetResult(message, new RedisStreamEntry[0]);
SetResult(message, new StreamEntry[0]);
return true;
}
......@@ -1348,7 +1348,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
RedisStreamEntry[] entries = null;
StreamEntry[] entries = null;
if (skipStreamName)
{
......@@ -1667,7 +1667,7 @@ internal abstract class StreamProcessorBase<T> : ResultProcessor<T>
{
// For command response formats see https://redis.io/topics/streams-intro.
protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result)
protected StreamEntry[] ParseRedisStreamEntries(RawResult result)
{
if (result.Type != ResultType.MultiBulk)
{
......@@ -1680,7 +1680,7 @@ protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result)
{
if (item.IsNull || item.Type != ResultType.MultiBulk)
{
return RedisStreamEntry.Null;
return StreamEntry.Null;
}
// Process the Multibulk array for each entry. The entry contains the following elements:
......@@ -1688,7 +1688,7 @@ protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result)
// [1] = Multibulk array of the name/value pairs of the stream entry's data
var entryDetails = item.GetItems();
return new RedisStreamEntry(id: entryDetails[0].AsRedisValue(),
return new StreamEntry(id: entryDetails[0].AsRedisValue(),
values: ParseStreamEntryValues(entryDetails[1]));
});
}
......
......@@ -42,12 +42,18 @@ internal static class StreamConstants
internal static readonly RedisValue Create = "CREATE";
internal static readonly RedisValue DeleteConsumer = "DELCONSUMER";
internal static readonly RedisValue Destroy = "DESTROY";
internal static readonly RedisValue Group = "GROUP";
internal static readonly RedisValue Groups = "GROUPS";
internal static readonly RedisValue JustId = "JUSTID";
internal static readonly RedisValue SetId = "SETID";
internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue Stream = "STREAM";
......
......@@ -3,9 +3,9 @@
/// <summary>
/// Describes an entry contained in a Redis Stream.
/// </summary>
public struct RedisStreamEntry
public struct StreamEntry
{
internal RedisStreamEntry(RedisValue id, NameValueEntry[] values)
internal StreamEntry(RedisValue id, NameValueEntry[] values)
{
Id = id;
Values = values;
......@@ -14,7 +14,7 @@ internal RedisStreamEntry(RedisValue id, NameValueEntry[] values)
/// <summary>
/// A null stream entry.
/// </summary>
public static RedisStreamEntry Null { get; } = new RedisStreamEntry(RedisValue.Null, null);
public static StreamEntry Null { get; } = new StreamEntry(RedisValue.Null, null);
/// <summary>
/// The ID assigned to the message.
......
......@@ -10,8 +10,8 @@ public struct StreamInfo
int radixTreeKeys,
int radixTreeNodes,
int groups,
RedisStreamEntry firstEntry,
RedisStreamEntry lastEntry)
StreamEntry firstEntry,
StreamEntry lastEntry)
{
Length = length;
RadixTreeKeys = radixTreeKeys;
......@@ -44,11 +44,11 @@ public struct StreamInfo
/// <summary>
/// The first entry in the stream.
/// </summary>
public RedisStreamEntry FirstEntry { get; }
public StreamEntry FirstEntry { get; }
/// <summary>
/// The last entry in the stream.
/// </summary>
public RedisStreamEntry LastEntry { get; }
public StreamEntry LastEntry { get; }
}
}

namespace StackExchange.Redis
namespace StackExchange.Redis
{
/// <summary>
/// Describes a pair consisting of the Stream Key and the ID from which to read.
/// Describes a pair consisting of the Stream Key and the <see cref="Position"/> from which to begin reading a stream.
/// </summary>
/// <remarks><see cref="IDatabase.StreamRead(StreamIdPair[], int?, CommandFlags)"/></remarks>
public struct StreamIdPair
public struct StreamPosition
{
/// <summary>
/// Initializes a <see cref="StreamIdPair"/> value.
/// Initializes a <see cref="StreamPosition"/> value.
/// </summary>
/// <param name="key">The key for the stream.</param>
/// <param name="id">The ID from which to begin reading the stream.</param>
public StreamIdPair(RedisKey key, RedisValue id)
/// <param name="position">The position from which to begin reading the stream.</param>
public StreamPosition(RedisKey key, Position position)
{
Key = key;
Id = id;
Position = position;
}
/// <summary>
/// The key for the stream.
/// The stream key.
/// </summary>
public RedisKey Key { get; }
/// <summary>
/// The ID from which to begin reading the stream.
/// The offset at which to begin reading the stream.
/// </summary>
public RedisValue Id { get; }
/// <summary>
/// See Object.ToString()
/// </summary>
public override string ToString() => $"{Key}: {Id}";
public Position Position { get; }
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment