Commit e8f6b512 authored by Nick Craver's avatar Nick Craver

Merge branch 'master' into pipelines

parents 0c2e6ddc ea85ae80
...@@ -838,6 +838,13 @@ public void StreamClaimMessagesReturningIds() ...@@ -838,6 +838,13 @@ public void StreamClaimMessagesReturningIds()
mock.Verify(_ => _.StreamClaimIdsOnly("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamClaimIdsOnly("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority));
} }
[Fact]
public void StreamConsumerGroupSetPosition()
{
wrapper.StreamConsumerGroupSetPosition("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPosition("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
}
[Fact] [Fact]
public void StreamConsumerInfoGet() public void StreamConsumerInfoGet()
{ {
...@@ -848,8 +855,8 @@ public void StreamConsumerInfoGet() ...@@ -848,8 +855,8 @@ public void StreamConsumerInfoGet()
[Fact] [Fact]
public void StreamCreateConsumerGroup() public void StreamCreateConsumerGroup()
{ {
wrapper.StreamCreateConsumerGroup("key", "group", "0-0", CommandFlags.HighPriority); wrapper.StreamCreateConsumerGroup("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", "0-0", CommandFlags.HighPriority)); mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
} }
[Fact] [Fact]
...@@ -881,6 +888,20 @@ public void StreamMessagesDelete() ...@@ -881,6 +888,20 @@ public void StreamMessagesDelete()
mock.Verify(_ => _.StreamDelete("prefix:key", messageIds, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamDelete("prefix:key", messageIds, CommandFlags.HighPriority));
} }
[Fact]
public void StreamDeleteConsumer()
{
wrapper.StreamDeleteConsumer("key", "group", "consumer", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumer("prefix:key", "group", "consumer", CommandFlags.HighPriority));
}
[Fact]
public void StreamDeleteConsumerGroup()
{
wrapper.StreamDeleteConsumerGroup("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumerGroup("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact] [Fact]
public void StreamPendingInfoGet() public void StreamPendingInfoGet()
{ {
...@@ -891,37 +912,45 @@ public void StreamPendingInfoGet() ...@@ -891,37 +912,45 @@ public void StreamPendingInfoGet()
[Fact] [Fact]
public void StreamPendingMessageInfoGet() public void StreamPendingMessageInfoGet()
{ {
wrapper.StreamPendingMessages("key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority); wrapper.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority));
} }
[Fact] [Fact]
public void StreamRange() public void StreamRange()
{ {
wrapper.StreamRange("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority); wrapper.StreamRange("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRange("prefix:key", "-", "+",null, Order.Ascending, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamRange("prefix:key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority));
} }
[Fact] [Fact]
public void StreamRead_1() public void StreamRead_1()
{ {
var keysAndIds = new StreamIdPair[0] { }; var streamPositions = new StreamPosition[0] { };
wrapper.StreamRead(keysAndIds, null, CommandFlags.HighPriority); wrapper.StreamRead(streamPositions, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead(keysAndIds, null, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamRead(streamPositions, null, CommandFlags.HighPriority));
} }
[Fact] [Fact]
public void StreamRead_2() public void StreamRead_2()
{ {
wrapper.StreamRead("key", "0-0", null, CommandFlags.HighPriority); wrapper.StreamRead("key", new Position("0-0"), null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead("prefix:key", "0-0", null, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamRead("prefix:key", new Position("0-0"), null, CommandFlags.HighPriority));
}
[Fact]
public void StreamStreamReadGroup_1()
{
wrapper.StreamReadGroup("key", "group", "consumer", new Position("0-0"), 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", new Position("0-0"), 10, CommandFlags.HighPriority));
} }
[Fact] [Fact]
public void StreamStreamReadGroup() public void StreamStreamReadGroup_2()
{ {
wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority); var streamPositions = new StreamPosition[0] { };
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority)); wrapper.StreamReadGroup(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority));
} }
[Fact] [Fact]
......
...@@ -88,6 +88,8 @@ public void CheckDatabaseMethodsUseKeys(Type type) ...@@ -88,6 +88,8 @@ public void CheckDatabaseMethodsUseKeys(Type type)
case nameof(IDatabaseAsync.ScriptEvaluateAsync): case nameof(IDatabaseAsync.ScriptEvaluateAsync):
case nameof(IDatabase.StreamRead): case nameof(IDatabase.StreamRead):
case nameof(IDatabase.StreamReadAsync): case nameof(IDatabase.StreamReadAsync):
case nameof(IDatabase.StreamReadGroup):
case nameof(IDatabase.StreamReadGroupAsync):
continue; // they're fine, but don't want to widen check to return type continue; // they're fine, but don't want to widen check to return type
} }
...@@ -99,7 +101,6 @@ public void CheckDatabaseMethodsUseKeys(Type type) ...@@ -99,7 +101,6 @@ public void CheckDatabaseMethodsUseKeys(Type type)
private static bool UsesKey(Type type) private static bool UsesKey(Type type)
{ {
if (type == typeof(RedisKey)) return true; if (type == typeof(RedisKey)) return true;
if (type == typeof(StreamIdPair)) return true;
if (type.IsArray) if (type.IsArray)
{ {
......
...@@ -117,6 +117,41 @@ public void StreamAddMultipleValuePairsWithManualId() ...@@ -117,6 +117,41 @@ public void StreamAddMultipleValuePairsWithManualId()
} }
} }
[Fact]
public void StreamConsumerGroupSetId()
{
var key = GetUniqueKey("group_set_id");
var groupName = "test_group";
var consumer = "consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Create a stream
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "field2", "value2");
// Create a group and set the position to deliver new messages only.
db.StreamCreateConsumerGroup(key, groupName, Position.New);
// Read into the group, expect nothing
var firstRead = db.StreamReadGroup(key, groupName, consumer, Position.New);
// Reset the ID back to read from the beginning.
db.StreamConsumerGroupSetPosition(key, groupName, Position.Beginning);
var secondRead = db.StreamReadGroup(key, groupName, consumer, Position.New);
Assert.NotNull(firstRead);
Assert.NotNull(secondRead);
Assert.True(firstRead.Length == 0);
Assert.True(secondRead.Length == 2);
}
}
[Fact] [Fact]
public void StreamConsumerGroupWithNoConsumers() public void StreamConsumerGroupWithNoConsumers()
{ {
...@@ -133,7 +168,7 @@ public void StreamConsumerGroupWithNoConsumers() ...@@ -133,7 +168,7 @@ public void StreamConsumerGroupWithNoConsumers()
db.StreamAdd(key, "field1", "value1"); db.StreamAdd(key, "field1", "value1");
// Create a group // Create a group
db.StreamCreateConsumerGroup(key, groupName, "0-0"); db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
// Query redis for the group consumers, expect an empty list in response. // Query redis for the group consumers, expect an empty list in response.
var consumers = db.StreamConsumerInfo(key, groupName); var consumers = db.StreamConsumerInfo(key, groupName);
...@@ -158,7 +193,7 @@ public void StreamCreateConsumerGroup() ...@@ -158,7 +193,7 @@ public void StreamCreateConsumerGroup()
db.StreamAdd(key, "field1", "value1"); db.StreamAdd(key, "field1", "value1");
// Create a group // Create a group
var result = db.StreamCreateConsumerGroup(key, groupName, "-"); var result = db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
Assert.True(result); Assert.True(result);
} }
...@@ -184,7 +219,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse() ...@@ -184,7 +219,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
db.StreamCreateConsumerGroup(key, groupName); db.StreamCreateConsumerGroup(key, groupName);
// Read, expect no messages // Read, expect no messages
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0"); var entries = db.StreamReadGroup(key, groupName, "test_consumer", new Position("0-0"));
Assert.True(entries.Length == 0); Assert.True(entries.Length == 0);
} }
...@@ -205,9 +240,9 @@ public void StreamConsumerGroupReadFromStreamBeginning() ...@@ -205,9 +240,9 @@ public void StreamConsumerGroupReadFromStreamBeginning()
var id1 = db.StreamAdd(key, "field1", "value1"); var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2"); var id2 = db.StreamAdd(key, "field2", "value2");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0"); var entries = db.StreamReadGroup(key, groupName, "test_consumer", new Position("0-0"));
Assert.True(entries.Length == 2); Assert.True(entries.Length == 2);
Assert.True(id1 == entries[0].Id); Assert.True(id1 == entries[0].Id);
...@@ -232,10 +267,10 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount() ...@@ -232,10 +267,10 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Start reading after id1. // Start reading after id1.
var entries = db.StreamReadGroup(key, groupName, "test_consumer", id1, 2); db.StreamCreateConsumerGroup(key, groupName, new Position(id1));
var entries = db.StreamReadGroup(key, groupName, "test_consumer", Position.New, 2);
// Ensure we only received the requested count and that the IDs match the expected values. // Ensure we only received the requested count and that the IDs match the expected values.
Assert.True(entries.Length == 2); Assert.True(entries.Length == 2);
...@@ -262,10 +297,10 @@ public void StreamConsumerGroupAcknowledgeMessage() ...@@ -262,10 +297,10 @@ public void StreamConsumerGroupAcknowledgeMessage()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read all 4 messages, they will be assigned to the consumer // Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName, consumer, "0-0"); var entries = db.StreamReadGroup(key, groupName, consumer, new Position("0-0"));
// Send XACK for 3 of the messages // Send XACK for 3 of the messages
...@@ -276,7 +311,7 @@ public void StreamConsumerGroupAcknowledgeMessage() ...@@ -276,7 +311,7 @@ public void StreamConsumerGroupAcknowledgeMessage()
var twoAck = db.StreamAcknowledge(key, groupName, new RedisValue[] { id3, id4 }); var twoAck = db.StreamAcknowledge(key, groupName, new RedisValue[] { id3, id4 });
// Read the group again, it should only return the unacknowledged message. // Read the group again, it should only return the unacknowledged message.
var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0"); var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, new Position("0-0"));
Assert.True(entries.Length == 4); Assert.True(entries.Length == 4);
Assert.Equal(1, oneAck); Assert.Equal(1, oneAck);
...@@ -305,7 +340,7 @@ public void StreamConsumerGroupClaimMessages() ...@@ -305,7 +340,7 @@ public void StreamConsumerGroupClaimMessages()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "0-0"); db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
// Read a single message into the first consumer. // Read a single message into the first consumer.
db.StreamReadGroup(key, groupName, consumer1, count: 1); db.StreamReadGroup(key, groupName, consumer1, count: 1);
...@@ -356,10 +391,10 @@ public void StreamConsumerGroupClaimMessagesReturningIds() ...@@ -356,10 +391,10 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1); var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, Position.Beginning, 1);
// Read the remaining messages into the second consumer. // Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
...@@ -386,6 +421,175 @@ public void StreamConsumerGroupClaimMessagesReturningIds() ...@@ -386,6 +421,175 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
} }
} }
[Fact]
public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
{
// Create a group for each stream. One set to read from the beginning of the
// stream and the other to begin reading only new messages.
// Ask redis to read from the beginning of both stream, expect messages
// for only the stream set to read from the beginning.
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream1, "field1-2", "value1-2");
db.StreamAdd(stream2, "field2-1", "value2-1");
db.StreamAdd(stream2, "field2-2", "value2-2");
db.StreamAdd(stream2, "field2-3", "value2-3");
// stream1 set up to read only new messages.
db.StreamCreateConsumerGroup(stream1, groupName);
// stream2 set up to read from the beginning of the stream
db.StreamCreateConsumerGroup(stream2, groupName, Position.Beginning);
// Read for both streams from the beginning. We shouldn't get anything back for stream1.
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.Beginning),
new StreamPosition(stream2, Position.Beginning)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 0);
Assert.True(streams[1].Entries.Length == 3);
}
}
[Fact]
public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
{
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream2, "field2-1", "value2-1");
// set both streams to read only new messages (default behavior).
db.StreamCreateConsumerGroup(stream1, groupName);
db.StreamCreateConsumerGroup(stream2, groupName);
// We shouldn't get anything for either stream.
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.Beginning),
new StreamPosition(stream2, Position.Beginning)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 0);
Assert.True(streams[1].Entries.Length == 0);
}
}
[Fact]
public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
{
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// These messages won't be read.
db.StreamAdd(stream1, "field1-1", "value1-1");
db.StreamAdd(stream2, "field2-1", "value2-1");
// set both streams to read only new messages (default behavior).
db.StreamCreateConsumerGroup(stream1, groupName);
db.StreamCreateConsumerGroup(stream2, groupName);
// We should read these though.
var id1 = db.StreamAdd(stream1, "field1-2", "value1-2");
var id2 = db.StreamAdd(stream2, "field2-2", "value2-2");
// Read the new messages (messages created after the group was created).
var pairs = new StreamPosition[]
{
new StreamPosition(stream1, Position.New),
new StreamPosition(stream2, Position.New)
};
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 1);
Assert.True(streams[1].Entries.Length == 1);
Assert.Equal(id1, streams[0].Entries[0].Id);
Assert.Equal(id2, streams[1].Entries[0].Id);
}
}
[Fact]
public void StreamConsumerGroupReadMultipleRestrictCount()
{
var groupName = "test_group";
var stream1 = GetUniqueKey("stream1");
var stream2 = GetUniqueKey("stream2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1_1 = db.StreamAdd(stream1, "field1-1", "value1-1");
var id1_2 = db.StreamAdd(stream1, "field1-2", "value1-2");
var id2_1 = db.StreamAdd(stream2, "field2-1", "value2-1");
var id2_2 = db.StreamAdd(stream2, "field2-2", "value2-2");
var id2_3 = db.StreamAdd(stream2, "field2-3", "value2-3");
// Allow reading from the beginning in both streams
db.StreamCreateConsumerGroup(stream1, groupName, Position.Beginning);
db.StreamCreateConsumerGroup(stream2, groupName, Position.Beginning);
var pairs = new StreamPosition[]
{
// Read after the first id in both streams
new StreamPosition(stream1, new Position(id1_1)),
new StreamPosition(stream2, new Position(id2_1))
};
// Restrict the count to 2 (expect only 1 message from first stream, 2 from the second).
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer", 2);
Assert.NotNull(streams);
Assert.True(streams.Length == 2);
Assert.True(streams[0].Entries.Length == 1);
Assert.True(streams[1].Entries.Length == 2);
Assert.Equal(id1_2, streams[0].Entries[0].Id);
}
}
[Fact] [Fact]
public void StreamConsumerGroupViewPendingInfoNoConsumers() public void StreamConsumerGroupViewPendingInfoNoConsumers()
{ {
...@@ -400,7 +604,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers() ...@@ -400,7 +604,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers()
var id1 = db.StreamAdd(key, "field1", "value1"); var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
var pendingInfo = db.StreamPending(key, groupName); var pendingInfo = db.StreamPending(key, groupName);
...@@ -426,7 +630,7 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending() ...@@ -426,7 +630,7 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
var id1 = db.StreamAdd(key, "field1", "value1"); var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, "0-0"); db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
var pendingMessages = db.StreamPendingMessages(key, var pendingMessages = db.StreamPendingMessages(key,
groupName, groupName,
...@@ -457,10 +661,10 @@ public void StreamConsumerGroupViewPendingInfoSummary() ...@@ -457,10 +661,10 @@ public void StreamConsumerGroupViewPendingInfoSummary()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1); var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, Position.Beginning, 1);
// Read the remaining messages into the second consumer. // Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
...@@ -499,7 +703,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo() ...@@ -499,7 +703,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1); var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
...@@ -540,7 +744,7 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer() ...@@ -540,7 +744,7 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-"); db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1); var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
...@@ -559,6 +763,74 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer() ...@@ -559,6 +763,74 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
} }
} }
[Fact]
public void StreamDeleteConsumer()
{
var key = GetUniqueKey("delete_consumer_group");
var groupName = "test_group";
var consumer = "test_consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add a message to create the stream.
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "fiedl2", "value2");
// Create a consumer group and read the message.
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamReadGroup(key, groupName, consumer, Position.Beginning);
var preDeleteConsumers = db.StreamConsumerInfo(key, groupName);
// Delete the consumer.
var deleteResult = db.StreamDeleteConsumer(key, groupName, consumer);
// Should get 2 messages in the deleteResult.
var postDeleteConsumers = db.StreamConsumerInfo(key, groupName);
Assert.Equal(2, deleteResult);
Assert.True(preDeleteConsumers.Length == 1);
Assert.True(postDeleteConsumers.Length == 0);
}
}
[Fact]
public void StreamDeleteConsumerGroup()
{
var key = GetUniqueKey("delete_consumer_group");
var groupName = "test_group";
var consumer = "test_consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add a message to create the stream.
db.StreamAdd(key, "field1", "value1");
// Create a consumer group and read the messages.
db.StreamCreateConsumerGroup(key, groupName, Position.Beginning);
db.StreamReadGroup(key, groupName, consumer, Position.Beginning);
var preDeleteInfo = db.StreamInfo(key);
// Now delete the group.
var deleteResult = db.StreamDeleteConsumerGroup(key, groupName);
var postDeleteInfo = db.StreamInfo(key);
Assert.True(deleteResult);
Assert.True(preDeleteInfo.ConsumerGroupCount == 1);
Assert.True(postDeleteInfo.ConsumerGroupCount == 0);
}
}
[Fact] [Fact]
public void StreamDeleteMessage() public void StreamDeleteMessage()
{ {
...@@ -576,7 +848,7 @@ public void StreamDeleteMessage() ...@@ -576,7 +848,7 @@ public void StreamDeleteMessage()
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
var deletedCount = db.StreamDelete(key, new RedisValue[] { id3 }); var deletedCount = db.StreamDelete(key, new RedisValue[] { id3 });
var messages = db.StreamRange(key, "-", "+"); var messages = db.StreamRange(key);
Assert.Equal(1, deletedCount); Assert.Equal(1, deletedCount);
Assert.Equal(3, messages.Length); Assert.Equal(3, messages.Length);
...@@ -600,7 +872,7 @@ public void StreamDeleteMessages() ...@@ -600,7 +872,7 @@ public void StreamDeleteMessages()
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
var deletedCount = db.StreamDelete(key, new RedisValue[] { id2, id3 }, CommandFlags.None); var deletedCount = db.StreamDelete(key, new RedisValue[] { id2, id3 }, CommandFlags.None);
var messages = db.StreamRange(key, "-", "+"); var messages = db.StreamRange(key);
Assert.Equal(2, deletedCount); Assert.Equal(2, deletedCount);
Assert.Equal(2, messages.Length); Assert.Equal(2, messages.Length);
...@@ -627,8 +899,8 @@ public void StreamGroupInfoGet() ...@@ -627,8 +899,8 @@ public void StreamGroupInfoGet()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group1, "-"); db.StreamCreateConsumerGroup(key, group1, Position.Beginning);
db.StreamCreateConsumerGroup(key, group2, "-"); db.StreamCreateConsumerGroup(key, group2, Position.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, group1, consumer1, count: 1); var consumer1Messages = db.StreamReadGroup(key, group1, consumer1, count: 1);
...@@ -668,7 +940,7 @@ public void StreamGroupConsumerInfoGet() ...@@ -668,7 +940,7 @@ public void StreamGroupConsumerInfoGet()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group, "-"); db.StreamCreateConsumerGroup(key, group, Position.Beginning);
db.StreamReadGroup(key, group, consumer1, count: 1); db.StreamReadGroup(key, group, consumer1, count: 1);
db.StreamReadGroup(key, group, consumer2); db.StreamReadGroup(key, group, consumer2);
...@@ -773,7 +1045,7 @@ public void StreamPendingNoMessagesOrConsumers() ...@@ -773,7 +1045,7 @@ public void StreamPendingNoMessagesOrConsumers()
var id = db.StreamAdd(key, "field1", "value1"); var id = db.StreamAdd(key, "field1", "value1");
db.StreamDelete(key, new RedisValue[] { id }); db.StreamDelete(key, new RedisValue[] { id });
db.StreamCreateConsumerGroup(key, groupName, "0-0"); db.StreamCreateConsumerGroup(key, groupName, new Position("0-0"));
var pendingInfo = db.StreamPending(key, "test_group"); var pendingInfo = db.StreamPending(key, "test_group");
...@@ -784,6 +1056,43 @@ public void StreamPendingNoMessagesOrConsumers() ...@@ -784,6 +1056,43 @@ public void StreamPendingNoMessagesOrConsumers()
Assert.True(pendingInfo.Consumers.Length == 0); Assert.True(pendingInfo.Consumers.Length == 0);
} }
} }
[Fact]
public void StreamPositionDefaultValueIsBeginning()
{
Position position = default(Position);
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREAD));
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREADGROUP));
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XGROUP));
}
[Fact]
public void StreamPositionValidateBeginning()
{
var position = Position.Beginning;
Assert.Equal(StreamConstants.ReadMinValue, position.ResolveForCommand(RedisCommand.XREAD));
}
[Fact]
public void StreamPositionValidateExplicit()
{
var explicitValue = "1-0";
var position = new Position(explicitValue);
Assert.Equal(explicitValue, position.ResolveForCommand(RedisCommand.XREAD));
}
[Fact]
public void StreamPositionValidateNew()
{
var position = Position.New;
Assert.Equal(StreamConstants.NewMessages, position.ResolveForCommand(RedisCommand.XGROUP));
Assert.Equal(StreamConstants.UndeliveredMessages, position.ResolveForCommand(RedisCommand.XREADGROUP));
Assert.ThrowsAny<InvalidOperationException>(() => position.ResolveForCommand(RedisCommand.XREAD));
}
[Fact] [Fact]
public void StreamRead() public void StreamRead()
...@@ -801,7 +1110,7 @@ public void StreamRead() ...@@ -801,7 +1110,7 @@ public void StreamRead()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
// Read the entire stream from the beginning. // Read the entire stream from the beginning.
var entries = db.StreamRead(key, "0-0"); var entries = db.StreamRead(key, new Position("0-0"));
Assert.True(entries.Length == 3); Assert.True(entries.Length == 3);
Assert.Equal(id1, entries[0].Id); Assert.Equal(id1, entries[0].Id);
...@@ -829,7 +1138,7 @@ public void StreamReadEmptyStream() ...@@ -829,7 +1138,7 @@ public void StreamReadEmptyStream()
var len = db.StreamLength(key); var len = db.StreamLength(key);
// Read the entire stream from the beginning. // Read the entire stream from the beginning.
var entries = db.StreamRead(key, "0-0"); var entries = db.StreamRead(key, new Position("0-0"));
Assert.True(entries.Length == 0); Assert.True(entries.Length == 0);
Assert.Equal(0, len); Assert.Equal(0, len);
...@@ -860,8 +1169,8 @@ public void StreamReadEmptyStreams() ...@@ -860,8 +1169,8 @@ public void StreamReadEmptyStreams()
var len2 = db.StreamLength(key2); var len2 = db.StreamLength(key2);
// Read the entire stream from the beginning. // Read the entire stream from the beginning.
var entries1 = db.StreamRead(key1, "0-0"); var entries1 = db.StreamRead(key1, new Position("0-0"));
var entries2 = db.StreamRead(key2, "0-0"); var entries2 = db.StreamRead(key2, new Position("0-0"));
Assert.True(entries1.Length == 0); Assert.True(entries1.Length == 0);
Assert.True(entries2.Length == 0); Assert.True(entries2.Length == 0);
...@@ -878,14 +1187,14 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream() ...@@ -878,14 +1187,14 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream()
{ {
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var streamPairs = new StreamIdPair[] var streamPositions = new StreamPosition[]
{ {
new StreamIdPair("key1", "0-0"), new StreamPosition("key1", new Position("0-0")),
new StreamIdPair("key2", "0-0") new StreamPosition("key2", new Position("0-0"))
}; };
var db = conn.GetDatabase(); var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPairs, 0)); Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPositions, 0));
} }
} }
...@@ -899,7 +1208,7 @@ public void StreamReadExpectedExceptionInvalidCountSingleStream() ...@@ -899,7 +1208,7 @@ public void StreamReadExpectedExceptionInvalidCountSingleStream()
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase(); var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, "0-0", 0)); Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, new Position("0-0"), 0));
} }
} }
...@@ -924,7 +1233,7 @@ public void StreamReadExpectedExceptionEmptyStreamList() ...@@ -924,7 +1233,7 @@ public void StreamReadExpectedExceptionEmptyStreamList()
var db = conn.GetDatabase(); var db = conn.GetDatabase();
var emptyList = new StreamIdPair[0]; var emptyList = new StreamPosition[0];
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(emptyList)); Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(emptyList));
} }
...@@ -948,10 +1257,10 @@ public void StreamReadMultipleStreams() ...@@ -948,10 +1257,10 @@ public void StreamReadMultipleStreams()
var id4 = db.StreamAdd(key2, "field4", "value4"); var id4 = db.StreamAdd(key2, "field4", "value4");
// Read from both streams at the same time. // Read from both streams at the same time.
var streamList = new StreamIdPair[2] var streamList = new StreamPosition[2]
{ {
new StreamIdPair(key1, "0-0"), new StreamPosition(key1, new Position("0-0")),
new StreamIdPair(key2, "0-0") new StreamPosition(key2, new Position("0-0"))
}; };
var streams = db.StreamRead(streamList); var streams = db.StreamRead(streamList);
...@@ -987,10 +1296,10 @@ public void StreamReadMultipleStreamsWithCount() ...@@ -987,10 +1296,10 @@ public void StreamReadMultipleStreamsWithCount()
var id3 = db.StreamAdd(key2, "field3", "value3"); var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4"); var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[2] var streamList = new StreamPosition[2]
{ {
new StreamIdPair(key1, "0-0"), new StreamPosition(key1, new Position("0-0")),
new StreamIdPair(key2, "0-0") new StreamPosition(key2, new Position("0-0"))
}; };
var streams = db.StreamRead(streamList, countPerStream: 1); var streams = db.StreamRead(streamList, countPerStream: 1);
...@@ -1025,12 +1334,12 @@ public void StreamReadMultipleStreamsWithReadPastSecondStream() ...@@ -1025,12 +1334,12 @@ public void StreamReadMultipleStreamsWithReadPastSecondStream()
var id3 = db.StreamAdd(key2, "field3", "value3"); var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4"); var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[2] var streamList = new StreamPosition[]
{ {
new StreamIdPair(key1, "0-0"), new StreamPosition(key1, new Position("0-0")),
// read past the end of stream # 2 // read past the end of stream # 2
new StreamIdPair(key2, id4) new StreamPosition(key2, new Position(id4))
}; };
var streams = db.StreamRead(streamList); var streams = db.StreamRead(streamList);
...@@ -1060,11 +1369,11 @@ public void StreamReadMultipleStreamsWithEmptyResponse() ...@@ -1060,11 +1369,11 @@ public void StreamReadMultipleStreamsWithEmptyResponse()
var id3 = db.StreamAdd(key2, "field3", "value3"); var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4"); var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[] var streamList = new StreamPosition[]
{ {
// Read past the end of both streams. // Read past the end of both streams.
new StreamIdPair(key1, id2), new StreamPosition(key1, new Position(id2)),
new StreamIdPair(key2, id4) new StreamPosition(key2, new Position(id4))
}; };
var streams = db.StreamRead(streamList); var streams = db.StreamRead(streamList);
...@@ -1090,7 +1399,7 @@ public void StreamReadPastEndOfStream() ...@@ -1090,7 +1399,7 @@ public void StreamReadPastEndOfStream()
// Read after the final ID in the stream, we expect an empty array as a response. // Read after the final ID in the stream, we expect an empty array as a response.
var entries = db.StreamRead(key, id2); var entries = db.StreamRead(key, new Position(id2));
Assert.True(entries.Length == 0); Assert.True(entries.Length == 0);
} }
...@@ -1134,7 +1443,7 @@ public void StreamReadRangeOfEmptyStream() ...@@ -1134,7 +1443,7 @@ public void StreamReadRangeOfEmptyStream()
var deleted = db.StreamDelete(key, new RedisValue[] { id1, id2 }); var deleted = db.StreamDelete(key, new RedisValue[] { id1, id2 });
var entries = db.StreamRange(key, "-", "+"); var entries = db.StreamRange(key);
Assert.Equal(2, deleted); Assert.Equal(2, deleted);
Assert.NotNull(entries); Assert.NotNull(entries);
...@@ -1198,8 +1507,8 @@ public void StreamReadRangeReverseWithCount() ...@@ -1198,8 +1507,8 @@ public void StreamReadRangeReverseWithCount()
var id1 = db.StreamAdd(key, "field1", "value1"); var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2"); var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key, count: 1, messageOrder: Order.Descending); var entries = db.StreamRange(key, id1, id2, 1, Order.Descending);
Assert.True(entries.Length == 1); Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id); Assert.Equal(id2, entries[0].Id);
...@@ -1222,7 +1531,7 @@ public void StreamReadWithAfterIdAndCount_1() ...@@ -1222,7 +1531,7 @@ public void StreamReadWithAfterIdAndCount_1()
var id3 = db.StreamAdd(key, "field3", "value3"); var id3 = db.StreamAdd(key, "field3", "value3");
// Only read a single item from the stream. // Only read a single item from the stream.
var entries = db.StreamRead(key, id1, 1); var entries = db.StreamRead(key, new Position(id1), 1);
Assert.True(entries.Length == 1); Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id); Assert.Equal(id2, entries[0].Id);
...@@ -1246,7 +1555,7 @@ public void StreamReadWithAfterIdAndCount_2() ...@@ -1246,7 +1555,7 @@ public void StreamReadWithAfterIdAndCount_2()
var id4 = db.StreamAdd(key, "field4", "value4"); var id4 = db.StreamAdd(key, "field4", "value4");
// Read multiple items from the stream. // Read multiple items from the stream.
var entries = db.StreamRead(key, id1, 2); var entries = db.StreamRead(key, new Position(id1), 2);
Assert.True(entries.Length == 2); Assert.True(entries.Length == 2);
Assert.Equal(id2, entries[0].Id); Assert.Equal(id2, entries[0].Id);
......
...@@ -803,11 +803,18 @@ public void StreamConsumerInfoGetAsync() ...@@ -803,11 +803,18 @@ public void StreamConsumerInfoGetAsync()
mock.Verify(_ => _.StreamConsumerInfoAsync("prefix:key", "group", CommandFlags.HighPriority)); mock.Verify(_ => _.StreamConsumerInfoAsync("prefix:key", "group", CommandFlags.HighPriority));
} }
[Fact]
public void StreamConsumerGroupSetPositionAsync()
{
wrapper.StreamConsumerGroupSetPositionAsync("key", "group", Position.Beginning, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerGroupSetPositionAsync("prefix:key", "group", Position.Beginning, CommandFlags.HighPriority));
}
[Fact] [Fact]
public void StreamCreateConsumerGroupAsync() public void StreamCreateConsumerGroupAsync()
{ {
wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", CommandFlags.HighPriority); wrapper.StreamCreateConsumerGroupAsync("key", "group", new Position("0-0"), CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority)); mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", new Position("0-0"), CommandFlags.HighPriority));
} }
[Fact] [Fact]
...@@ -839,6 +846,20 @@ public void StreamMessagesDeleteAsync() ...@@ -839,6 +846,20 @@ public void StreamMessagesDeleteAsync()
mock.Verify(_ => _.StreamDeleteAsync("prefix:key", messageIds, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamDeleteAsync("prefix:key", messageIds, CommandFlags.HighPriority));
} }
[Fact]
public void StreamDeleteConsumerAsync()
{
wrapper.StreamDeleteConsumerAsync("key", "group", "consumer", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumerAsync("prefix:key", "group", "consumer", CommandFlags.HighPriority));
}
[Fact]
public void StreamDeleteConsumerGroupAsync()
{
wrapper.StreamDeleteConsumerGroupAsync("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteConsumerGroupAsync("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact] [Fact]
public void StreamPendingInfoGetAsync() public void StreamPendingInfoGetAsync()
{ {
...@@ -863,23 +884,31 @@ public void StreamRangeAsync() ...@@ -863,23 +884,31 @@ public void StreamRangeAsync()
[Fact] [Fact]
public void StreamReadAsync_1() public void StreamReadAsync_1()
{ {
var keysAndIds = new StreamIdPair[0] { }; var streamPositions = new StreamPosition[0] { };
wrapper.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority); wrapper.StreamReadAsync(streamPositions, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamReadAsync(streamPositions, null, CommandFlags.HighPriority));
} }
[Fact] [Fact]
public void StreamReadAsync_2() public void StreamReadAsync_2()
{ {
wrapper.StreamReadAsync("key", "0-0", null, CommandFlags.HighPriority); wrapper.StreamReadAsync("key", new Position("0-0"), null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync("prefix:key", "0-0", null, CommandFlags.HighPriority)); mock.Verify(_ => _.StreamReadAsync("prefix:key", new Position("0-0"), null, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadGroupAsync_1()
{
wrapper.StreamReadGroupAsync("key", "group", "consumer", Position.Beginning, 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", Position.Beginning, 10, CommandFlags.HighPriority));
} }
[Fact] [Fact]
public void StreamReadGroupAsync() public void StreamStreamReadGroupAsync_2()
{ {
wrapper.StreamReadGroupAsync("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority); var streamPositions = new StreamPosition[0] { };
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority)); wrapper.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, CommandFlags.HighPriority));
} }
[Fact] [Fact]
......
namespace StackExchange.Redis
{
internal enum PositionKind
{
Beginning = 0,
Explicit = 1,
New = 2
}
}
...@@ -1497,7 +1497,7 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1497,7 +1497,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>The messages successfully claimed by the given consumer.</returns> /// <returns>The messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s). /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s).
...@@ -1512,6 +1512,16 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1512,6 +1512,16 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Set the position from which to read a stream for a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group". /// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// </summary> /// </summary>
...@@ -1527,11 +1537,11 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1527,11 +1537,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param> /// <param name="groupName">The name of the group to create.</param>
/// <param name="readFrom">The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages.</param> /// <param name="position">The position to begin reading the stream. Defaults to <see cref="Position.New"/>.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns> /// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None); bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Delete messages in the stream. This method does not delete the stream. /// Delete messages in the stream. This method does not delete the stream.
...@@ -1543,6 +1553,25 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1543,6 +1553,25 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer from a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The name of the consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages that were pending for the deleted consumer.</returns>
long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if deleted, otherwise false.</returns>
bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key". /// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key".
/// </summary> /// </summary>
...@@ -1571,7 +1600,7 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1571,7 +1600,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None); long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// View information about pending messages for a stream. A pending message is a message read using StreamReadGroup (XREADGROUP) but not yet acknowledged. /// View information about pending messages for a stream.
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group</param> /// <param name="groupName">The name of the consumer group</param>
...@@ -1605,45 +1634,59 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1605,45 +1634,59 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="count">The maximum number of messages to return.</param> /// <param name="count">The maximum number of messages to return.</param>
/// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param> /// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns> /// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xrange</remarks> /// <remarks>https://redis.io/commands/xrange</remarks>
RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None); StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Read from a single stream. /// Read from a single stream.
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="afterId">The position from within the stream to begin reading.</param> /// <param name="position">The position from which to read the stream.</param>
/// <param name="count">The maximum number of messages to return.</param> /// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns> /// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks> /// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks> /// <remarks>https://redis.io/commands/xread</remarks>
RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None); StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Read from multiple streams. /// Read from multiple streams.
/// </summary> /// </summary>
/// <param name="streamIdPairs">The list of streams and the ID from which to begin reading for each stream.</param> /// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param> /// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns> /// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks> /// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks> /// <remarks>https://redis.io/commands/xread</remarks>
RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None); RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Read messages from a stream and an associated consumer group. /// Read messages from a stream into an associated consumer group.
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param> /// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param> /// <param name="consumerName">The consumer name.</param>
/// <param name="readFromId">The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages.</param> /// <param name="position">The position from which to read the stream. Defaults to <see cref="Position.New"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param> /// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns> /// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks> /// <remarks>https://redis.io/commands/xreadgroup</remarks>
RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None); RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Trim the stream to a specified maximum length. /// Trim the stream to a specified maximum length.
......
...@@ -1408,7 +1408,7 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1408,7 +1408,7 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>The messages successfully claimed by the given consumer.</returns> /// <returns>The messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s). /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s).
...@@ -1423,6 +1423,16 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1423,6 +1423,16 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Set the position from which to read a stream for a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group". /// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// </summary> /// </summary>
...@@ -1434,15 +1444,15 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1434,15 +1444,15 @@ public interface IDatabaseAsync : IRedisAsync
Task<StreamConsumerInfo[]> StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); Task<StreamConsumerInfo[]> StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Create a consumer group for the given stream. /// Create a consumer group for the given stream.
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param> /// <param name="groupName">The name of the group to create.</param>
/// <param name="readFrom">The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages.</param> /// <param name="position">The position to begin reading the stream. Defaults to <see cref="Position.New"/>.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns> /// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None); Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Delete messages in the stream. This method does not delete the stream. /// Delete messages in the stream. This method does not delete the stream.
...@@ -1454,6 +1464,25 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1454,6 +1464,25 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer from a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The name of the consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages that were pending for the deleted consumer.</returns>
Task<long> StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete a consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if deleted, otherwise false.</returns>
Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key". /// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key".
/// </summary> /// </summary>
...@@ -1516,45 +1545,59 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1516,45 +1545,59 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="count">The maximum number of messages to return.</param> /// <param name="count">The maximum number of messages to return.</param>
/// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param> /// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns> /// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xrange</remarks> /// <remarks>https://redis.io/commands/xrange</remarks>
Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None); Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Read from a single stream. /// Read from a single stream.
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="afterId">The message ID from within the stream to begin reading.</param> /// <param name="position">The position from which to read the stream.</param>
/// <param name="count">The maximum number of messages to return.</param> /// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns> /// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks> /// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks> /// <remarks>https://redis.io/commands/xread</remarks>
Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None); Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Read from multiple streams. /// Read from multiple streams.
/// </summary> /// </summary>
/// <param name="streamIdPairs">The list of streams and the ID from which to begin reading for each stream.</param> /// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param> /// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns> /// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks> /// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks> /// <remarks>https://redis.io/commands/xread</remarks>
Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None); Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Read messages from a stream and an associated consumer group. /// Read messages from a stream into an associated consumer group.
/// </summary> /// </summary>
/// <param name="key">The key of the stream.</param> /// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param> /// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param> /// <param name="consumerName">The consumer name.</param>
/// <param name="readFromId">The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages.</param> /// <param name="position">The position from which to read the stream. Defaults to <see cref="Position.New"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param> /// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns> /// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks> /// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None); Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Trim the stream to a specified maximum length. /// Trim the stream to a specified maximum length.
......
...@@ -626,7 +626,7 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal ...@@ -626,7 +626,7 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal
return Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); return Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
} }
public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); return Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
} }
...@@ -636,9 +636,14 @@ public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, R ...@@ -636,9 +636,14 @@ public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, R
return Inner.StreamClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); return Inner.StreamClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
} }
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, readFrom, flags); return Inner.StreamConsumerGroupSetPosition(ToInner(key), groupName, position, flags);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, position, flags);
} }
public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None) public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
...@@ -666,6 +671,16 @@ public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags fla ...@@ -666,6 +671,16 @@ public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags fla
return Inner.StreamDelete(ToInner(key), messageIds, flags); return Inner.StreamDelete(ToInner(key), messageIds, flags);
} }
public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumer(ToInner(key), groupName, consumerName, flags);
}
public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumerGroup(ToInner(key), groupName, flags);
}
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamPending(ToInner(key), groupName, flags); return Inner.StreamPending(ToInner(key), groupName, flags);
...@@ -676,24 +691,29 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue ...@@ -676,24 +691,29 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
return Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags); return Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
} }
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags); return Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);
} }
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRead(ToInner(key), position, count, flags);
}
public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamRead(ToInner(key), afterId, count, flags); return Inner.StreamRead(streamPositions, countPerStream, flags);
} }
public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamRead(streamIdPairs, countPerStream, flags); return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, flags);
} }
public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, readFromId, count, flags); return Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags);
} }
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
......
...@@ -606,7 +606,7 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPair ...@@ -606,7 +606,7 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPair
return Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); return Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
} }
public Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); return Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
} }
...@@ -616,9 +616,14 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu ...@@ -616,9 +616,14 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return Inner.StreamClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); return Inner.StreamClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
} }
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, readFrom, flags); return Inner.StreamConsumerGroupSetPositionAsync(ToInner(key), groupName, position, flags);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, position, flags);
} }
public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None) public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
...@@ -646,6 +651,16 @@ public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, Comma ...@@ -646,6 +651,16 @@ public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, Comma
return Inner.StreamDeleteAsync(ToInner(key), messageIds, flags); return Inner.StreamDeleteAsync(ToInner(key), messageIds, flags);
} }
public Task<long> StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumerAsync(ToInner(key), groupName, consumerName, flags);
}
public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteConsumerGroupAsync(ToInner(key), groupName, flags);
}
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamPendingAsync(ToInner(key), groupName, flags); return Inner.StreamPendingAsync(ToInner(key), groupName, flags);
...@@ -656,24 +671,29 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, ...@@ -656,24 +671,29 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
return Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags); return Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
} }
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags); return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);
} }
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadAsync(ToInner(key), position, count, flags);
}
public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamReadAsync(ToInner(key), afterId, count, flags); return Inner.StreamReadAsync(streamPositions, countPerStream, flags);
} }
public Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamReadAsync(streamIdPairs, countPerStream, flags); return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, flags);
} }
public Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, readFromId, count, flags); return Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags);
} }
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
......
using System;
namespace StackExchange.Redis
{
/// <summary>
/// A position within a stream. Defaults to <see cref="Position.New"/>.
/// </summary>
public struct Position
{
/// <summary>
/// Indicate a position from which to read a stream.
/// </summary>
/// <param name="readAfter">The position from which to read a stream.</param>
public Position(RedisValue readAfter)
{
if (readAfter == RedisValue.Null) throw new ArgumentNullException(nameof(readAfter), "readAfter cannot be RedisValue.Null.");
Kind = PositionKind.Explicit;
ExplicitValue = readAfter;
}
private Position(PositionKind kind)
{
Kind = kind;
ExplicitValue = RedisValue.Null;
}
private PositionKind Kind { get; }
private RedisValue ExplicitValue { get; }
/// <summary>
/// Read new messages.
/// </summary>
public static Position New = new Position(PositionKind.New);
/// <summary>
/// Read from the beginning of a stream.
/// </summary>
public static Position Beginning = new Position(PositionKind.Beginning);
internal RedisValue ResolveForCommand(RedisCommand command)
{
if (Kind == PositionKind.Explicit) return ExplicitValue;
if (Kind == PositionKind.Beginning) return StreamConstants.ReadMinValue;
// PositionKind.New
if (command == RedisCommand.XREAD) throw new InvalidOperationException("Position.New cannot be used with StreamRead.");
if (command == RedisCommand.XREADGROUP) return StreamConstants.UndeliveredMessages;
if (command == RedisCommand.XGROUP) return StreamConstants.NewMessages;
throw new ArgumentException($"Unsupported command in ResolveForCommand: {command}.", nameof(command));
}
}
}
...@@ -1716,7 +1716,7 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPair ...@@ -1716,7 +1716,7 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPair
return ExecuteAsync(msg, ResultProcessor.RedisValue); return ExecuteAsync(msg, ResultProcessor.RedisValue);
} }
public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamClaimMessage(key, var msg = GetStreamClaimMessage(key,
consumerGroup, consumerGroup,
...@@ -1729,7 +1729,7 @@ public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, Re ...@@ -1729,7 +1729,7 @@ public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, Re
return ExecuteSync(msg, ResultProcessor.SingleStream); return ExecuteSync(msg, ResultProcessor.SingleStream);
} }
public Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamClaimMessage(key, var msg = GetStreamClaimMessage(key,
consumerGroup, consumerGroup,
...@@ -1768,8 +1768,42 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu ...@@ -1768,8 +1768,42 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return ExecuteAsync(msg, ResultProcessor.RedisValueArray); return ExecuteAsync(msg, ResultProcessor.RedisValueArray);
} }
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.SetId,
key.AsRedisValue(),
groupName,
position.ResolveForCommand(RedisCommand.XGROUP)
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, Position position, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.SetId,
key.AsRedisValue(),
groupName,
position.ResolveForCommand(RedisCommand.XGROUP)
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? Position.New;
var msg = Message.Create(Database, var msg = Message.Create(Database,
flags, flags,
RedisCommand.XGROUP, RedisCommand.XGROUP,
...@@ -1778,14 +1812,16 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisV ...@@ -1778,14 +1812,16 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisV
StreamConstants.Create, StreamConstants.Create,
key.AsRedisValue(), key.AsRedisValue(),
groupName, groupName,
readFrom ?? StreamConstants.NewMessages actualPosition.ResolveForCommand(RedisCommand.XGROUP)
}); });
return ExecuteSync(msg, ResultProcessor.Boolean); return ExecuteSync(msg, ResultProcessor.Boolean);
} }
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, Position? position = null, CommandFlags flags = CommandFlags.None)
{ {
var actualPosition = position ?? Position.New;
var msg = Message.Create(Database, var msg = Message.Create(Database,
flags, flags,
RedisCommand.XGROUP, RedisCommand.XGROUP,
...@@ -1794,7 +1830,7 @@ public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN ...@@ -1794,7 +1830,7 @@ public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN
StreamConstants.Create, StreamConstants.Create,
key.AsRedisValue(), key.AsRedisValue(),
groupName, groupName,
readFrom ?? StreamConstants.NewMessages actualPosition.ResolveForCommand(RedisCommand.XGROUP)
}); });
return ExecuteAsync(msg, ResultProcessor.Boolean); return ExecuteAsync(msg, ResultProcessor.Boolean);
...@@ -1888,6 +1924,68 @@ public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, Comma ...@@ -1888,6 +1924,68 @@ public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, Comma
return ExecuteAsync(msg, ResultProcessor.Int64); return ExecuteAsync(msg, ResultProcessor.Int64);
} }
public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.DeleteConsumer,
key.AsRedisValue(),
groupName,
consumerName
});
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.DeleteConsumer,
key.AsRedisValue(),
groupName,
consumerName
});
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.Destroy,
key.AsRedisValue(),
groupName
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.Destroy,
key.AsRedisValue(),
groupName
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
}
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName); var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName);
...@@ -1902,64 +2000,126 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group ...@@ -1902,64 +2000,126 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags); var msg = GetStreamPendingMessagesMessage(key,
groupName,
minId,
maxId,
count,
consumerName,
flags);
return ExecuteSync(msg, ResultProcessor.StreamPendingMessages); return ExecuteSync(msg, ResultProcessor.StreamPendingMessages);
} }
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags); var msg = GetStreamPendingMessagesMessage(key,
groupName,
minId,
maxId,
count,
consumerName,
flags);
return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages); return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages);
} }
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags); var msg = GetStreamRangeMessage(key,
minId,
maxId,
count,
messageOrder,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStream); return ExecuteSync(msg, ResultProcessor.SingleStream);
} }
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags); var msg = GetStreamRangeMessage(key,
minId,
maxId,
count,
messageOrder,
flags);
return ExecuteAsync(msg, ResultProcessor.SingleStream); return ExecuteAsync(msg, ResultProcessor.SingleStream);
} }
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamRead(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetSingleStreamReadMessage(key, afterId, count, flags); var msg = GetSingleStreamReadMessage(key,
position.ResolveForCommand(RedisCommand.XREAD),
count,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip); return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
} }
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadAsync(RedisKey key, Position position, int? count = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetSingleStreamReadMessage(key, afterId, count, flags); var msg = GetSingleStreamReadMessage(key,
position.ResolveForCommand(RedisCommand.XREAD),
count,
flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip); return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
} }
public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags); var msg = GetMultiStreamReadMessage(streamPositions, countPerStream, flags);
return ExecuteSync(msg, ResultProcessor.MultiStream); return ExecuteSync(msg, ResultProcessor.MultiStream);
} }
public Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags); var msg = GetMultiStreamReadMessage(streamPositions, countPerStream, flags);
return ExecuteAsync(msg, ResultProcessor.MultiStream); return ExecuteAsync(msg, ResultProcessor.MultiStream);
} }
public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags); var actualPosition = position ?? Position.New;
var msg = GetStreamReadGroupMessage(key,
groupName,
consumerName,
actualPosition.ResolveForCommand(RedisCommand.XREADGROUP),
count,
flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip); return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
} }
public Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, Position? position = null, int? count = null, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags); var actualPosition = position ?? Position.New;
var msg = GetStreamReadGroupMessage(key,
groupName,
consumerName,
actualPosition.ResolveForCommand(RedisCommand.XREADGROUP),
count,
flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip); return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
} }
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadGroupMessage(streamPositions, groupName, consumerName, countPerStream, flags);
return ExecuteSync(msg, ResultProcessor.MultiStream);
}
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadGroupMessage(streamPositions, groupName, consumerName, countPerStream, flags);
return ExecuteAsync(msg, ResultProcessor.MultiStream);
}
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags); var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags);
...@@ -2329,12 +2489,55 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart) ...@@ -2329,12 +2489,55 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart)
return result; return result;
} }
private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? countPerStream, CommandFlags flags) private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags)
{
// Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2
if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions));
if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item.");
if (countPerStream.HasValue && countPerStream <= 0)
{
throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0.");
}
var values = new RedisValue[
4 // Room for GROUP groupName consumerName & STREAMS
+ (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs.
+ (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null.
var offset = 0;
values[offset++] = StreamConstants.Group;
values[offset++] = groupName;
values[offset++] = consumerName;
if (countPerStream.HasValue)
{
values[offset++] = StreamConstants.Count;
values[offset++] = countPerStream;
}
values[offset++] = StreamConstants.Streams;
var pairCount = streamPositions.Length;
for (var i = 0; i < pairCount; i++)
{
values[offset] = streamPositions[i].Key.AsRedisValue();
values[offset + pairCount] = streamPositions[i].Position.ResolveForCommand(RedisCommand.XREADGROUP);
offset++;
}
return Message.Create(Database, flags, RedisCommand.XREADGROUP, values);
}
private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int? countPerStream, CommandFlags flags)
{ {
// Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 // Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
if (streamIdPairs == null) throw new ArgumentNullException(nameof(streamIdPairs)); if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions));
if (streamIdPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamIdPairs), "streamAndIdPairs must contain at least one item."); if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item.");
if (countPerStream.HasValue && countPerStream <= 0) if (countPerStream.HasValue && countPerStream <= 0)
{ {
...@@ -2343,7 +2546,7 @@ private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? cou ...@@ -2343,7 +2546,7 @@ private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? cou
var values = new RedisValue[ var values = new RedisValue[
1 // Streams keyword. 1 // Streams keyword.
+ (streamIdPairs.Length * 2) // Room for the stream names and the ID from which to begin reading. + (streamPositions.Length * 2) // Room for the stream names and the ID after which to begin reading.
+ (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null. + (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null.
var offset = 0; var offset = 0;
...@@ -2372,12 +2575,12 @@ private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? cou ...@@ -2372,12 +2575,12 @@ private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? cou
* *
* */ * */
var pairCount = streamIdPairs.Length; var pairCount = streamPositions.Length;
for (var i = 0; i < pairCount; i++) for (var i = 0; i < pairCount; i++)
{ {
values[offset] = streamIdPairs[i].Key.AsRedisValue(); values[offset] = streamPositions[i].Key.AsRedisValue();
values[offset + pairCount] = streamIdPairs[i].Id; values[offset + pairCount] = streamPositions[i].Position.ResolveForCommand(RedisCommand.XREAD);
offset++; offset++;
} }
...@@ -2821,7 +3024,7 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu ...@@ -2821,7 +3024,7 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu
values); values);
} }
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId, int? count, CommandFlags flags) private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, CommandFlags flags)
{ {
// Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > // Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
if (count.HasValue && count <= 0) if (count.HasValue && count <= 0)
...@@ -2846,7 +3049,7 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re ...@@ -2846,7 +3049,7 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re
values[offset++] = StreamConstants.Streams; values[offset++] = StreamConstants.Streams;
values[offset++] = key.AsRedisValue(); values[offset++] = key.AsRedisValue();
values[offset] = readFromId ?? StreamConstants.UndeliveredMessages; values[offset] = afterId;
return Message.Create(Database, return Message.Create(Database,
flags, flags,
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
/// </summary> /// </summary>
public readonly struct RedisStream public readonly struct RedisStream
{ {
internal RedisStream(RedisKey key, RedisStreamEntry[] entries) internal RedisStream(RedisKey key, StreamEntry[] entries)
{ {
Key = key; Key = key;
Entries = entries; Entries = entries;
...@@ -19,6 +19,6 @@ internal RedisStream(RedisKey key, RedisStreamEntry[] entries) ...@@ -19,6 +19,6 @@ internal RedisStream(RedisKey key, RedisStreamEntry[] entries)
/// <summary> /// <summary>
/// An arry of entries contained within the stream. /// An arry of entries contained within the stream.
/// </summary> /// </summary>
public RedisStreamEntry[] Entries { get; } public StreamEntry[] Entries { get; }
} }
} }
...@@ -1330,7 +1330,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1330,7 +1330,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
} }
internal sealed class SingleStreamProcessor : StreamProcessorBase<RedisStreamEntry[]> internal sealed class SingleStreamProcessor : StreamProcessorBase<StreamEntry[]>
{ {
private readonly bool skipStreamName; private readonly bool skipStreamName;
...@@ -1344,7 +1344,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1344,7 +1344,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if (result.IsNull) if (result.IsNull)
{ {
// Server returns 'nil' if no entries are returned for the given stream. // Server returns 'nil' if no entries are returned for the given stream.
SetResult(message, Array.Empty<RedisStreamEntry>()); SetResult(message, Array.Empty<StreamEntry>());
return true; return true;
} }
...@@ -1353,7 +1353,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1353,7 +1353,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false; return false;
} }
RedisStreamEntry[] entries = null; StreamEntry[] entries = null;
if (skipStreamName) if (skipStreamName)
{ {
...@@ -1688,7 +1688,7 @@ internal abstract class StreamProcessorBase<T> : ResultProcessor<T> ...@@ -1688,7 +1688,7 @@ internal abstract class StreamProcessorBase<T> : ResultProcessor<T>
{ {
// For command response formats see https://redis.io/topics/streams-intro. // For command response formats see https://redis.io/topics/streams-intro.
protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result) protected StreamEntry[] ParseRedisStreamEntries(RawResult result)
{ {
if (result.Type != ResultType.MultiBulk) if (result.Type != ResultType.MultiBulk)
{ {
...@@ -1701,7 +1701,7 @@ protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result) ...@@ -1701,7 +1701,7 @@ protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result)
{ {
if (item.IsNull || item.Type != ResultType.MultiBulk) if (item.IsNull || item.Type != ResultType.MultiBulk)
{ {
return RedisStreamEntry.Null; return StreamEntry.Null;
} }
// Process the Multibulk array for each entry. The entry contains the following elements: // Process the Multibulk array for each entry. The entry contains the following elements:
...@@ -1709,7 +1709,7 @@ protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result) ...@@ -1709,7 +1709,7 @@ protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result)
// [1] = Multibulk array of the name/value pairs of the stream entry's data // [1] = Multibulk array of the name/value pairs of the stream entry's data
var entryDetails = item.GetItems(); var entryDetails = item.GetItems();
return new RedisStreamEntry(id: entryDetails[0].AsRedisValue(), return new StreamEntry(id: entryDetails[0].AsRedisValue(),
values: ParseStreamEntryValues(entryDetails[1])); values: ParseStreamEntryValues(entryDetails[1]));
}); });
} }
...@@ -1719,7 +1719,7 @@ protected NameValueEntry[] ParseStreamEntryValues(RawResult result) ...@@ -1719,7 +1719,7 @@ protected NameValueEntry[] ParseStreamEntryValues(RawResult result)
// The XRANGE, XREVRANGE, XREAD commands return stream entries // The XRANGE, XREVRANGE, XREAD commands return stream entries
// in the following format. The name/value pairs are interleaved // in the following format. The name/value pairs are interleaved
// in the same fashion as the HGETALL response. // in the same fashion as the HGETALL response.
// //
// 1) 1) 1518951480106-0 // 1) 1) 1518951480106-0
// 2) 1) "sensor-id" // 2) 1) "sensor-id"
// 2) "1234" // 2) "1234"
...@@ -1817,7 +1817,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R ...@@ -1817,7 +1817,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
} }
return final; return final;
} }
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
bool happy; bool happy;
......
...@@ -42,12 +42,18 @@ internal static class StreamConstants ...@@ -42,12 +42,18 @@ internal static class StreamConstants
internal static readonly RedisValue Create = "CREATE"; internal static readonly RedisValue Create = "CREATE";
internal static readonly RedisValue DeleteConsumer = "DELCONSUMER";
internal static readonly RedisValue Destroy = "DESTROY";
internal static readonly RedisValue Group = "GROUP"; internal static readonly RedisValue Group = "GROUP";
internal static readonly RedisValue Groups = "GROUPS"; internal static readonly RedisValue Groups = "GROUPS";
internal static readonly RedisValue JustId = "JUSTID"; internal static readonly RedisValue JustId = "JUSTID";
internal static readonly RedisValue SetId = "SETID";
internal static readonly RedisValue MaxLen = "MAXLEN"; internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue Stream = "STREAM"; internal static readonly RedisValue Stream = "STREAM";
......
...@@ -3,9 +3,9 @@ ...@@ -3,9 +3,9 @@
/// <summary> /// <summary>
/// Describes an entry contained in a Redis Stream. /// Describes an entry contained in a Redis Stream.
/// </summary> /// </summary>
public readonly struct RedisStreamEntry public readonly struct StreamEntry
{ {
internal RedisStreamEntry(RedisValue id, NameValueEntry[] values) internal StreamEntry(RedisValue id, NameValueEntry[] values)
{ {
Id = id; Id = id;
Values = values; Values = values;
...@@ -14,7 +14,7 @@ internal RedisStreamEntry(RedisValue id, NameValueEntry[] values) ...@@ -14,7 +14,7 @@ internal RedisStreamEntry(RedisValue id, NameValueEntry[] values)
/// <summary> /// <summary>
/// A null stream entry. /// A null stream entry.
/// </summary> /// </summary>
public static RedisStreamEntry Null { get; } = new RedisStreamEntry(RedisValue.Null, null); public static StreamEntry Null { get; } = new StreamEntry(RedisValue.Null, null);
/// <summary> /// <summary>
/// The ID assigned to the message. /// The ID assigned to the message.
......
...@@ -10,8 +10,8 @@ namespace StackExchange.Redis ...@@ -10,8 +10,8 @@ namespace StackExchange.Redis
int radixTreeKeys, int radixTreeKeys,
int radixTreeNodes, int radixTreeNodes,
int groups, int groups,
RedisStreamEntry firstEntry, StreamEntry firstEntry,
RedisStreamEntry lastEntry) StreamEntry lastEntry)
{ {
Length = length; Length = length;
RadixTreeKeys = radixTreeKeys; RadixTreeKeys = radixTreeKeys;
...@@ -44,11 +44,11 @@ namespace StackExchange.Redis ...@@ -44,11 +44,11 @@ namespace StackExchange.Redis
/// <summary> /// <summary>
/// The first entry in the stream. /// The first entry in the stream.
/// </summary> /// </summary>
public RedisStreamEntry FirstEntry { get; } public StreamEntry FirstEntry { get; }
/// <summary> /// <summary>
/// The last entry in the stream. /// The last entry in the stream.
/// </summary> /// </summary>
public RedisStreamEntry LastEntry { get; } public StreamEntry LastEntry { get; }
} }
} }
 namespace StackExchange.Redis
namespace StackExchange.Redis
{ {
/// <summary> /// <summary>
/// Describes a pair consisting of the Stream Key and the ID from which to read. /// Describes a pair consisting of the Stream Key and the <see cref="Position"/> from which to begin reading a stream.
/// </summary> /// </summary>
/// <remarks><see cref="IDatabase.StreamRead(StreamIdPair[], int?, CommandFlags)"/></remarks> public struct StreamPosition
public readonly struct StreamIdPair
{ {
/// <summary> /// <summary>
/// Initializes a <see cref="StreamIdPair"/> value. /// Initializes a <see cref="StreamPosition"/> value.
/// </summary> /// </summary>
/// <param name="key">The key for the stream.</param> /// <param name="key">The key for the stream.</param>
/// <param name="id">The ID from which to begin reading the stream.</param> /// <param name="position">The position from which to begin reading the stream.</param>
public StreamIdPair(RedisKey key, RedisValue id) public StreamPosition(RedisKey key, Position position)
{ {
Key = key; Key = key;
Id = id; Position = position;
} }
/// <summary> /// <summary>
/// The key for the stream. /// The stream key.
/// </summary> /// </summary>
public RedisKey Key { get; } public RedisKey Key { get; }
/// <summary> /// <summary>
/// The ID from which to begin reading the stream. /// The offset at which to begin reading the stream.
/// </summary> /// </summary>
public RedisValue Id { get; } public Position Position { get; }
/// <summary>
/// See Object.ToString()
/// </summary>
public override string ToString() => $"{Key}: {Id}";
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment