Commit 4a8061f1 authored by Todd Tingen's avatar Todd Tingen Committed by Nick Craver

Initial implementation of Redis Streams. (#860)

The Streams data type is available in release 5.0 RC1 and above.

- Implemented Sync & Async methods for all Stream related commands (minus the blocking options) as of 5.0 RC1.
- Added tests for the synchronous versions of the Streams API but the testing is a work in progress. Need to refactor for reuse within the streams tests and write a more thorough suite of tests.
- Added a NameValueEntry struct which mimicks HashEntry. Using HashEntry for the name/value pairs of stream entries seemed wrong. Perhaps refactor the usage of HashEntry to the more generic NameValueEntry and deprecate HashEntry?
parent 1799c39d
...@@ -782,6 +782,145 @@ public void SortedSetScore() ...@@ -782,6 +782,145 @@ public void SortedSetScore()
mock.Verify(_ => _.SortedSetScore("prefix:key", "member", CommandFlags.HighPriority)); mock.Verify(_ => _.SortedSetScore("prefix:key", "member", CommandFlags.HighPriority));
} }
[Fact]
public void StreamAcknowledge_1()
{
wrapper.StreamAcknowledge("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAcknowledge("prefix:key", "group", "0-0", CommandFlags.HighPriority));
}
[Fact]
public void StreamAcknowledge_2()
{
var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" };
wrapper.StreamAcknowledge("key", "group", messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAcknowledge("prefix:key", "group", messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamAdd_1()
{
wrapper.StreamAdd("key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAdd("prefix:key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority));
}
[Fact]
public void StreamAdd_2()
{
var fields = new NameValueEntry[0];
wrapper.StreamAdd("key", fields, "*", 1000, true, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAdd("prefix:key", fields, "*", 1000, true, CommandFlags.HighPriority));
}
[Fact]
public void StreamClaimMessages()
{
var messageIds = new RedisValue[0];
wrapper.StreamClaim("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamClaim("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamClaimMessagesReturningIds()
{
var messageIds = new RedisValue[0];
wrapper.StreamClaimIdsOnly("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamClaimIdsOnly("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamConsumerInfoGet()
{
wrapper.StreamConsumerInfo("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerInfo("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamCreateConsumerGroup()
{
wrapper.StreamCreateConsumerGroup("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", "0-0", CommandFlags.HighPriority));
}
[Fact]
public void StreamGroupInfoGet()
{
wrapper.StreamGroupInfo("key", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamGroupInfo("prefix:key", CommandFlags.HighPriority));
}
[Fact]
public void StreamInfoGet()
{
wrapper.StreamInfo("key", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamInfo("prefix:key", CommandFlags.HighPriority));
}
[Fact]
public void StreamLength()
{
wrapper.StreamLength("key", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamLength("prefix:key", CommandFlags.HighPriority));
}
[Fact]
public void StreamMessagesDelete()
{
var messageIds = new RedisValue[0] { };
wrapper.StreamDelete("key", messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDelete("prefix:key", messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamPendingInfoGet()
{
wrapper.StreamPending("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPending("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamPendingMessageInfoGet()
{
wrapper.StreamPendingMessages("key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority));
}
[Fact]
public void StreamRange()
{
wrapper.StreamRange("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRange("prefix:key", "-", "+",null, Order.Ascending, CommandFlags.HighPriority));
}
[Fact]
public void StreamRead_1()
{
var keysAndIds = new StreamIdPair[0] { };
wrapper.StreamRead(keysAndIds, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead(keysAndIds, null, CommandFlags.HighPriority));
}
[Fact]
public void StreamRead_2()
{
wrapper.StreamRead("key", "0-0", null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRead("prefix:key", "0-0", null, CommandFlags.HighPriority));
}
[Fact]
public void StreamStreamReadGroup()
{
wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority));
}
[Fact]
public void StreamTrim()
{
wrapper.StreamTrim("key", 1000, true, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamTrim("prefix:key", 1000, true, CommandFlags.HighPriority));
}
[Fact] [Fact]
public void StringAppend() public void StringAppend()
{ {
......
using System;
using System.Linq;
using Xunit;
using Xunit.Abstractions;
namespace StackExchange.Redis.Tests
{
public class Streams : TestBase
{
public Streams(ITestOutputHelper output) : base(output) { }
[Fact]
public void IsStreamType()
{
using (var conn = Create())
{
var key = GetUniqueKey("type_check");
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(key, "field1", "value1");
var keyType = db.KeyType(key);
Assert.Equal(RedisType.Stream, keyType);
}
}
[Fact]
public void StreamAddSinglePairWithAutoId()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var messageId = db.StreamAdd(GetUniqueKey("auto_id"), "field1", "value1");
Assert.True(messageId != RedisValue.Null && ((string)messageId).Length > 0);
}
}
[Fact]
public void StreamAddMultipleValuePairsWithAutoId()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var key = GetUniqueKey("multiple_value_pairs");
var fields = new NameValueEntry[]
{
new NameValueEntry("field1", "value1"),
new NameValueEntry("field2", "value2")
};
var db = conn.GetDatabase();
var messageId = db.StreamAdd(key, fields);
var entries = db.StreamRange(key);
Assert.True(entries.Length == 1);
Assert.Equal(messageId, entries[0].Id);
Assert.True(entries[0].Values.Length == 2);
Assert.True(entries[0].Values[0].Name == "field1" &&
entries[0].Values[0].Value == "value1");
Assert.True(entries[0].Values[1].Name == "field2" &&
entries[0].Values[1].Value == "value2");
}
}
[Fact]
public void StreamAddWithManualId()
{
var id = "42-0";
var key = GetUniqueKey("manual_id");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var messageId = db.StreamAdd(key, "field1", "value1", id);
Assert.Equal(id, messageId);
}
}
[Fact]
public void StreamAddMultipleValuePairsWithManualId()
{
var id = "42-0";
var key = GetUniqueKey("manual_id_multiple_values");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var fields = new NameValueEntry[]
{
new NameValueEntry("field1", "value1"),
new NameValueEntry("field2", "value2")
};
var messageId = db.StreamAdd(key, fields, id);
var entries = db.StreamRange(key);
Assert.Equal(id, messageId);
Assert.NotNull(entries);
Assert.True(entries.Length == 1);
Assert.Equal(id, entries[0].Id);
}
}
[Fact]
public void StreamConsumerGroupWithNoConsumers()
{
var key = GetUniqueKey("group_with_no_consumers");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Create a stream
db.StreamAdd(key, "field1", "value1");
// Create a group
db.StreamCreateConsumerGroup(key, groupName, "0-0");
// Query redis for the group consumers, expect an empty list in response.
var consumers = db.StreamConsumerInfo(key, groupName);
Assert.True(consumers.Length == 0);
}
}
[Fact]
public void StreamCreateConsumerGroup()
{
var key = GetUniqueKey("group_create");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Create a stream
db.StreamAdd(key, "field1", "value1");
// Create a group
var result = db.StreamCreateConsumerGroup(key, groupName, "-");
Assert.True(result);
}
}
[Fact]
public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
{
var key = GetUniqueKey("group_read");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Create a stream
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "field2", "value2");
// Create a group.
db.StreamCreateConsumerGroup(key, groupName);
// Read, expect no messages
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0");
Assert.True(entries.Length == 0);
}
}
[Fact]
public void StreamConsumerGroupReadFromStreamBeginning()
{
var key = GetUniqueKey("group_read_beginning");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
db.StreamCreateConsumerGroup(key, groupName, "-");
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0");
Assert.True(entries.Length == 2);
Assert.True(id1 == entries[0].Id);
Assert.True(id2 == entries[1].Id);
}
}
[Fact]
public void StreamConsumerGroupReadFromStreamBeginningWithCount()
{
var key = GetUniqueKey("group_read_with_count");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Start reading after id1.
var entries = db.StreamReadGroup(key, groupName, "test_consumer", id1, 2);
// Ensure we only received the requested count and that the IDs match the expected values.
Assert.True(entries.Length == 2);
Assert.True(id2 == entries[0].Id);
Assert.True(id3 == entries[1].Id);
}
}
[Fact]
public void StreamConsumerGroupAcknowledgeMessage()
{
var key = GetUniqueKey("group_ack");
var groupName = "test_group";
var consumer = "test_consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName, consumer, "0-0");
// Send XACK for 3 of the messages
// Single message Id overload.
var oneAck = db.StreamAcknowledge(key, groupName, id1);
// Multiple message Id overload.
var twoAck = db.StreamAcknowledge(key, groupName, new RedisValue[] { id3, id4 });
// Read the group again, it should only return the unacknowledged message.
var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0");
Assert.True(entries.Length == 4);
Assert.Equal(1, oneAck);
Assert.Equal(2, twoAck);
Assert.True(notAcknowledged.Length == 1);
Assert.Equal(id2, notAcknowledged[0].Id);
}
}
[Fact]
public void StreamConsumerGroupClaimMessages()
{
var key = GetUniqueKey("group_claim");
var groupName = "test_group";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "0-0");
// Read a single message into the first consumer.
db.StreamReadGroup(key, groupName, consumer1, count: 1);
// Read the remaining messages into the second consumer.
db.StreamReadGroup(key, groupName, consumer2);
// Claim the 3 messages consumed by consumer2 for consumer1.
// Get the pending messages for consumer2.
var pendingMessages = db.StreamPendingMessages(key, groupName,
10,
consumer2);
// Claim the messages for consumer1.
var messages = db.StreamClaim(key,
groupName,
consumer1,
0, // Min message idle time
messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray());
// Now see how many messages are pending for each consumer
var pendingSummary = db.StreamPending(key, groupName);
Assert.NotNull(pendingSummary.Consumers);
Assert.True(pendingSummary.Consumers.Length == 1);
Assert.Equal(4, pendingSummary.Consumers[0].PendingMessageCount);
Assert.True(pendingMessages.Length == messages.Length);
}
}
[Fact]
public void StreamConsumerGroupClaimMessagesReturningIds()
{
var key = GetUniqueKey("group_claim_view_ids");
var groupName = "test_group";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
// Claim the 3 messages consumed by consumer2 for consumer1.
// Get the pending messages for consumer2.
var pendingMessages = db.StreamPendingMessages(key, groupName,
10,
consumer2);
// Claim the messages for consumer1.
var messageIds = db.StreamClaimIdsOnly(key,
groupName,
consumer1,
0, // Min message idle time
messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray());
// We should get an array of 3 message IDs.
Assert.Equal(3, messageIds.Length);
Assert.Equal(id2, messageIds[0]);
Assert.Equal(id3, messageIds[1]);
Assert.Equal(id4, messageIds[2]);
}
}
[Fact]
public void StreamConsumerGroupViewPendingInfoNoConsumers()
{
var key = GetUniqueKey("group_pending_info_no_consumers");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, "-");
var pendingInfo = db.StreamPending(key, groupName);
Assert.Equal(0, pendingInfo.PendingMessageCount);
Assert.True(pendingInfo.LowestPendingMessageId == RedisValue.Null);
Assert.True(pendingInfo.HighestPendingMessageId == RedisValue.Null);
Assert.NotNull(pendingInfo.Consumers);
Assert.True(pendingInfo.Consumers.Length == 0);
}
}
[Fact]
public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
{
var key = GetUniqueKey("group_pending_info_nothing_pending");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamCreateConsumerGroup(key, groupName, "0-0");
var pendingMessages = db.StreamPendingMessages(key,
groupName,
10,
consumerName: RedisValue.Null);
Assert.NotNull(pendingMessages);
Assert.True(pendingMessages.Length == 0);
}
}
[Fact]
public void StreamConsumerGroupViewPendingInfoSummary()
{
var key = GetUniqueKey("group_pending_info");
var groupName = "test_group";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
var pendingInfo = db.StreamPending(key, groupName);
Assert.Equal(4, pendingInfo.PendingMessageCount);
Assert.Equal(id1, pendingInfo.LowestPendingMessageId);
Assert.Equal(id4, pendingInfo.HighestPendingMessageId);
Assert.True(pendingInfo.Consumers.Length == 2);
var consumer1Count = pendingInfo.Consumers.First(c => c.Name == consumer1).PendingMessageCount;
var consumer2Count = pendingInfo.Consumers.First(c => c.Name == consumer2).PendingMessageCount;
Assert.Equal(1, consumer1Count);
Assert.Equal(3, consumer2Count);
}
}
[Fact]
public void StreamConsumerGroupViewPendingMessageInfo()
{
var key = GetUniqueKey("group_pending_messages");
var groupName = "test_group";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
// Get the pending info about the messages themselves.
var pendingMessageInfoList = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null);
Assert.NotNull(pendingMessageInfoList);
Assert.Equal(4, pendingMessageInfoList.Length);
Assert.Equal(consumer1, pendingMessageInfoList[0].ConsumerName);
Assert.Equal(1, pendingMessageInfoList[0].DeliveryCount);
Assert.True((int)pendingMessageInfoList[0].IdleTimeInMilliseconds > 0);
Assert.Equal(id1, pendingMessageInfoList[0].MessageId);
}
}
[Fact]
public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
{
var key = GetUniqueKey("group_pending_for_consumer");
var groupName = "test_group";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, groupName, "-");
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
// Get the pending info about the messages themselves.
var pendingMessageInfoList = db.StreamPendingMessages(key,
groupName,
10,
consumer2);
Assert.NotNull(pendingMessageInfoList);
Assert.Equal(3, pendingMessageInfoList.Length);
}
}
[Fact]
public void StreamDeleteMessage()
{
var key = GetUniqueKey("delete_msg");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
var deletedCount = db.StreamDelete(key, new RedisValue[] { id3 });
var messages = db.StreamRange(key, "-", "+");
Assert.Equal(1, deletedCount);
Assert.Equal(3, messages.Length);
}
}
[Fact]
public void StreamDeleteMessages()
{
var key = GetUniqueKey("delete_msgs");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
var deletedCount = db.StreamDelete(key, new RedisValue[] { id2, id3 }, CommandFlags.None);
var messages = db.StreamRange(key, "-", "+");
Assert.Equal(2, deletedCount);
Assert.Equal(2, messages.Length);
}
}
[Fact]
public void StreamGroupInfoGet()
{
var key = GetUniqueKey("group_info");
var group1 = "test_group_1";
var group2 = "test_group_2";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group1, "-");
db.StreamCreateConsumerGroup(key, group2, "-");
// Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, group1, consumer1, count: 1);
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, group2, consumer2);
var groupInfoList = db.StreamGroupInfo(key);
Assert.NotNull(groupInfoList);
Assert.Equal(2, groupInfoList.Length);
Assert.Equal(group1, groupInfoList[0].Name);
Assert.Equal(1, groupInfoList[0].PendingMessageCount);
Assert.Equal(group2, groupInfoList[1].Name);
Assert.Equal(4, groupInfoList[1].PendingMessageCount);
}
}
[Fact]
public void StreamGroupConsumerInfoGet()
{
var key = GetUniqueKey("group_consumer_info");
var group = "test_group";
var consumer1 = "test_consumer_1";
var consumer2 = "test_consumer_2";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
db.StreamCreateConsumerGroup(key, group, "-");
db.StreamReadGroup(key, group, consumer1, count: 1);
db.StreamReadGroup(key, group, consumer2);
var consumerInfoList = db.StreamConsumerInfo(key, group);
Assert.NotNull(consumerInfoList);
Assert.Equal(2, consumerInfoList.Length);
Assert.Equal(consumer1, consumerInfoList[0].Name);
Assert.Equal(consumer2, consumerInfoList[1].Name);
Assert.Equal(1, consumerInfoList[0].PendingMessageCount);
Assert.Equal(3, consumerInfoList[1].PendingMessageCount);
}
}
[Fact]
public void StreamInfoGet()
{
var key = GetUniqueKey("stream_info");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
var streamInfo = db.StreamInfo(key);
Assert.Equal(4, streamInfo.Length);
Assert.True(streamInfo.RadixTreeKeys > 0);
Assert.True(streamInfo.RadixTreeNodes > 0);
Assert.Equal(id1, streamInfo.FirstEntry.Id);
Assert.Equal(id4, streamInfo.LastEntry.Id);
}
}
[Fact]
public void StreamInfoGetWithEmptyStream()
{
var key = GetUniqueKey("stream_info_empty");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add an entry and then delete it so the stream is empty, then run streaminfo
// to ensure it functions properly on an empty stream. Namely, the first-entry
// and last-entry messages should be null.
var id = db.StreamAdd(key, "field1", "value1");
db.StreamDelete(key, new RedisValue[] { id });
Assert.Equal(0, db.StreamLength(key));
var streamInfo = db.StreamInfo(key);
Assert.True(streamInfo.FirstEntry.IsNull);
Assert.True(streamInfo.LastEntry.IsNull);
}
}
[Fact]
public void StreamNoConsumerGroups()
{
var key = GetUniqueKey("stream_with_no_consumers");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(key, "field1", "value1");
var groups = db.StreamGroupInfo(key);
Assert.NotNull(groups);
Assert.True(groups.Length == 0);
}
}
[Fact]
public void StreamPendingNoMessagesOrConsumers()
{
var key = GetUniqueKey("stream_pending_empty");
var groupName = "test_group";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id = db.StreamAdd(key, "field1", "value1");
db.StreamDelete(key, new RedisValue[] { id });
db.StreamCreateConsumerGroup(key, groupName, "0-0");
var pendingInfo = db.StreamPending(key, "test_group");
Assert.Equal(0, pendingInfo.PendingMessageCount);
Assert.Equal(RedisValue.Null, pendingInfo.LowestPendingMessageId);
Assert.Equal(RedisValue.Null, pendingInfo.HighestPendingMessageId);
Assert.NotNull(pendingInfo.Consumers);
Assert.True(pendingInfo.Consumers.Length == 0);
}
}
[Fact]
public void StreamRead()
{
var key = GetUniqueKey("read");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
// Read the entire stream from the beginning.
var entries = db.StreamRead(key, "0-0");
Assert.True(entries.Length == 3);
Assert.Equal(id1, entries[0].Id);
Assert.Equal(id2, entries[1].Id);
Assert.Equal(id3, entries[2].Id);
}
}
[Fact]
public void StreamReadEmptyStream()
{
var key = GetUniqueKey("read_empty_stream");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Write to a stream to create the key.
var id1 = db.StreamAdd(key, "field1", "value1");
// Delete the key to empty the stream.
db.StreamDelete(key, new RedisValue[] { id1 });
var len = db.StreamLength(key);
// Read the entire stream from the beginning.
var entries = db.StreamRead(key, "0-0");
Assert.True(entries.Length == 0);
Assert.Equal(0, len);
}
}
[Fact]
public void StreamReadEmptyStreams()
{
var key1 = GetUniqueKey("read_empty_stream_1");
var key2 = GetUniqueKey("read_empty_stream_2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Write to a stream to create the key.
var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key2, "field2", "value2");
// Delete the key to empty the stream.
db.StreamDelete(key1, new RedisValue[] { id1 });
db.StreamDelete(key2, new RedisValue[] { id2 });
var len1 = db.StreamLength(key1);
var len2 = db.StreamLength(key2);
// Read the entire stream from the beginning.
var entries1 = db.StreamRead(key1, "0-0");
var entries2 = db.StreamRead(key2, "0-0");
Assert.True(entries1.Length == 0);
Assert.True(entries2.Length == 0);
Assert.Equal(0, len1);
Assert.Equal(0, len2);
}
}
[Fact]
public void StreamReadExpectedExceptionInvalidCountMultipleStream()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var streamPairs = new StreamIdPair[]
{
new StreamIdPair("key1", "0-0"),
new StreamIdPair("key2", "0-0")
};
var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(streamPairs, 0));
}
}
[Fact]
public void StreamReadExpectedExceptionInvalidCountSingleStream()
{
var key = GetUniqueKey("read_exception_invalid_count_single");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(key, "0-0", 0));
}
}
[Fact]
public void StreamReadExpectedExceptionNullStreamList()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
Assert.Throws<ArgumentNullException>(() => db.StreamRead(null));
}
}
[Fact]
public void StreamReadExpectedExceptionEmptyStreamList()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var emptyList = new StreamIdPair[0];
Assert.Throws<ArgumentOutOfRangeException>(() => db.StreamRead(emptyList));
}
}
[Fact]
public void StreamReadMultipleStreams()
{
var key1 = GetUniqueKey("read_multi_1");
var key2 = GetUniqueKey("read_multi_2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key1, "fiedl2", "value2");
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
// Read from both streams at the same time.
var streamList = new StreamIdPair[2]
{
new StreamIdPair(key1, "0-0"),
new StreamIdPair(key2, "0-0")
};
var streams = db.StreamRead(streamList);
Assert.True(streams.Length == 2);
Assert.Equal(key1, streams[0].Key);
Assert.True(streams[0].Entries.Length == 2);
Assert.Equal(id1, streams[0].Entries[0].Id);
Assert.Equal(id2, streams[0].Entries[1].Id);
Assert.Equal(key2, streams[1].Key);
Assert.True(streams[1].Entries.Length == 2);
Assert.Equal(id3, streams[1].Entries[0].Id);
Assert.Equal(id4, streams[1].Entries[1].Id);
}
}
[Fact]
public void StreamReadMultipleStreamsWithCount()
{
var key1 = GetUniqueKey("read_multi_count_1");
var key2 = GetUniqueKey("read_multi_count_2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key1, "fiedl2", "value2");
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[2]
{
new StreamIdPair(key1, "0-0"),
new StreamIdPair(key2, "0-0")
};
var streams = db.StreamRead(streamList, countPerStream: 1);
// We should get both streams back.
Assert.True(streams.Length == 2);
// Ensure we only got one message per stream.
Assert.True(streams[0].Entries.Length == 1);
Assert.True(streams[1].Entries.Length == 1);
// Check the message IDs as well.
Assert.Equal(id1, streams[0].Entries[0].Id);
Assert.Equal(id3, streams[1].Entries[0].Id);
}
}
[Fact]
public void StreamReadMultipleStreamsWithReadPastSecondStream()
{
var key1 = GetUniqueKey("read_multi_1");
var key2 = GetUniqueKey("read_multi_2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key1, "fiedl2", "value2");
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[2]
{
new StreamIdPair(key1, "0-0"),
// read past the end of stream # 2
new StreamIdPair(key2, id4)
};
var streams = db.StreamRead(streamList);
// We should only get the first stream back.
Assert.True(streams.Length == 1);
Assert.Equal(key1, streams[0].Key);
Assert.True(streams[0].Entries.Length == 2);
}
}
[Fact]
public void StreamReadMultipleStreamsWithEmptyResponse()
{
var key1 = GetUniqueKey("read_multi_1");
var key2 = GetUniqueKey("read_multi_2");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key1, "fiedl2", "value2");
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");
var streamList = new StreamIdPair[]
{
// Read past the end of both streams.
new StreamIdPair(key1, id2),
new StreamIdPair(key2, id4)
};
var streams = db.StreamRead(streamList);
// We expect an empty response.
Assert.True(streams.Length == 0);
}
}
[Fact]
public void StreamReadPastEndOfStream()
{
var key = GetUniqueKey("read_empty");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
// Read after the final ID in the stream, we expect an empty array as a response.
var entries = db.StreamRead(key, id2);
Assert.True(entries.Length == 0);
}
}
[Fact]
public void StreamReadRange()
{
var key = GetUniqueKey("range");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key);
Assert.Equal(2, entries.Length);
Assert.Equal(id1, entries[0].Id);
Assert.Equal(id2, entries[1].Id);
}
}
[Fact]
public void StreamReadRangeOfEmptyStream()
{
var key = GetUniqueKey("range_empty");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var deleted = db.StreamDelete(key, new RedisValue[] { id1, id2 });
var entries = db.StreamRange(key, "-", "+");
Assert.Equal(2, deleted);
Assert.NotNull(entries);
Assert.True(entries.Length == 0);
}
}
[Fact]
public void StreamReadRangeWithCount()
{
var key = GetUniqueKey("range_count");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key, count: 1);
Assert.True(entries.Length == 1);
Assert.Equal(id1, entries[0].Id);
}
}
[Fact]
public void StreamReadRangeReverse()
{
var key = GetUniqueKey("rangerev");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key, messageOrder: Order.Descending);
Assert.True(entries.Length == 2);
Assert.Equal(id2, entries[0].Id);
Assert.Equal(id1, entries[1].Id);
}
}
[Fact]
public void StreamReadRangeReverseWithCount()
{
var key = GetUniqueKey("rangerev_count");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var entries = db.StreamRange(key, count: 1, messageOrder: Order.Descending);
Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id);
}
}
[Fact]
public void StreamReadWithAfterIdAndCount_1()
{
var key = GetUniqueKey("read");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
// Only read a single item from the stream.
var entries = db.StreamRead(key, id1, 1);
Assert.True(entries.Length == 1);
Assert.Equal(id2, entries[0].Id);
}
}
[Fact]
public void StreamReadWithAfterIdAndCount_2()
{
var key = GetUniqueKey("read");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "fiedl2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");
// Read multiple items from the stream.
var entries = db.StreamRead(key, id1, 2);
Assert.True(entries.Length == 2);
Assert.Equal(id2, entries[0].Id);
Assert.Equal(id3, entries[1].Id);
}
}
[Fact]
public void StreamTrimLength()
{
var key = GetUniqueKey("trimlen");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add a couple items and check length.
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "fiedl2", "value2");
db.StreamAdd(key, "field3", "value3");
db.StreamAdd(key, "field4", "value4");
var numRemoved = db.StreamTrim(key, 1);
var len = db.StreamLength(key);
Assert.Equal(3, numRemoved);
Assert.Equal(1, len);
}
}
[Fact]
public void StreamVerifyLength()
{
var key = GetUniqueKey("len");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Add a couple items and check length.
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "fiedl2", "value2");
var len = db.StreamLength(key);
Assert.Equal(2, len);
}
}
private string GetUniqueKey(string type) => $"{type}_stream_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
}
}
...@@ -740,6 +740,145 @@ public void SortedSetScoreAsync() ...@@ -740,6 +740,145 @@ public void SortedSetScoreAsync()
mock.Verify(_ => _.SortedSetScoreAsync("prefix:key", "member", CommandFlags.HighPriority)); mock.Verify(_ => _.SortedSetScoreAsync("prefix:key", "member", CommandFlags.HighPriority));
} }
[Fact]
public void StreamAcknowledgeAsync_1()
{
wrapper.StreamAcknowledgeAsync("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAcknowledgeAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority));
}
[Fact]
public void StreamAcknowledgeAsync_2()
{
var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" };
wrapper.StreamAcknowledgeAsync("key", "group", messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAcknowledgeAsync("prefix:key", "group", messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamAddAsync_1()
{
wrapper.StreamAddAsync("key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAddAsync("prefix:key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority));
}
[Fact]
public void StreamAddAsync_2()
{
var fields = new NameValueEntry[0];
wrapper.StreamAddAsync("key", fields, "*", 1000, true, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamAddAsync("prefix:key", fields, "*", 1000, true, CommandFlags.HighPriority));
}
[Fact]
public void StreamClaimMessagesAsync()
{
var messageIds = new RedisValue[0];
wrapper.StreamClaimAsync("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamClaimAsync("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamClaimMessagesReturningIdsAsync()
{
var messageIds = new RedisValue[0];
wrapper.StreamClaimIdsOnlyAsync("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamClaimIdsOnlyAsync("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamConsumerInfoGetAsync()
{
wrapper.StreamConsumerInfoAsync("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamConsumerInfoAsync("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamCreateConsumerGroupAsync()
{
wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority));
}
[Fact]
public void StreamGroupInfoGetAsync()
{
wrapper.StreamGroupInfoAsync("key", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamGroupInfoAsync("prefix:key", CommandFlags.HighPriority));
}
[Fact]
public void StreamInfoGetAsync()
{
wrapper.StreamInfoAsync("key", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamInfoAsync("prefix:key", CommandFlags.HighPriority));
}
[Fact]
public void StreamLengthAsync()
{
wrapper.StreamLengthAsync("key", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamLengthAsync("prefix:key", CommandFlags.HighPriority));
}
[Fact]
public void StreamMessagesDeleteAsync()
{
var messageIds = new RedisValue[0] { };
wrapper.StreamDeleteAsync("key", messageIds, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamDeleteAsync("prefix:key", messageIds, CommandFlags.HighPriority));
}
[Fact]
public void StreamPendingInfoGetAsync()
{
wrapper.StreamPendingAsync("key", "group", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPendingAsync("prefix:key", "group", CommandFlags.HighPriority));
}
[Fact]
public void StreamPendingMessageInfoGetAsync()
{
wrapper.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority);
mock.Verify(_ => _.StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority));
}
[Fact]
public void StreamRangeAsync()
{
wrapper.StreamRangeAsync("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamRangeAsync("prefix:key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadAsync_1()
{
var keysAndIds = new StreamIdPair[0] { };
wrapper.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadAsync_2()
{
wrapper.StreamReadAsync("key", "0-0", null, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadAsync("prefix:key", "0-0", null, CommandFlags.HighPriority));
}
[Fact]
public void StreamReadGroupAsync()
{
wrapper.StreamReadGroupAsync("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority));
}
[Fact]
public void StreamTrimAsync()
{
wrapper.StreamTrimAsync("key", 1000, true, CommandFlags.HighPriority);
mock.Verify(_ => _.StreamTrimAsync("prefix:key", 1000, true, CommandFlags.HighPriority));
}
[Fact] [Fact]
public void StringAppendAsync() public void StringAppendAsync()
{ {
......
...@@ -163,6 +163,20 @@ internal enum RedisCommand ...@@ -163,6 +163,20 @@ internal enum RedisCommand
WATCH, WATCH,
XACK,
XADD,
XCLAIM,
XDEL,
XGROUP,
XINFO,
XLEN,
XPENDING,
XRANGE,
XREAD,
XREADGROUP,
XREVRANGE,
XTRIM,
ZADD, ZADD,
ZCARD, ZCARD,
ZCOUNT, ZCOUNT,
......
...@@ -38,6 +38,13 @@ public enum RedisType ...@@ -38,6 +38,13 @@ public enum RedisType
/// <remarks>https://redis.io/commands#hash</remarks> /// <remarks>https://redis.io/commands#hash</remarks>
Hash, Hash,
/// <summary> /// <summary>
/// A Redis Stream is a data structure which models the behavior of an append only log but it has more
/// advanced features for manipulating the data contained within the stream. Each entry in a
/// stream contains a unique message ID and a list of name/value pairs containing the entry's data.
/// </summary>
/// <remarks>https://redis.io/commands#stream</remarks>
Stream,
/// <summary>
/// The data-type was not recognised by the client library /// The data-type was not recognised by the client library
/// </summary> /// </summary>
Unknown, Unknown,
......
...@@ -122,6 +122,18 @@ public static class ExtensionMethods ...@@ -122,6 +122,18 @@ public static class ExtensionMethods
return result; return result;
} }
private static readonly RedisValue[] nixValues = new RedisValue[0];
/// <summary>
/// Create an array of RedisValues from an array of strings.
/// </summary>
/// <param name="values">The string array to convert to RedisValues</param>
public static RedisValue[] ToRedisValueArray(this string[] values)
{
if (values == null) return null;
if (values.Length == 0) return nixValues;
return Array.ConvertAll(values, x => (RedisValue)x);
}
private static readonly string[] nix = new string[0]; private static readonly string[] nix = new string[0];
/// <summary> /// <summary>
/// Create an array of strings from an array of values /// Create an array of strings from an array of values
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Net; using System.Net;
...@@ -1389,6 +1389,225 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1389,6 +1389,225 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/commands/zscore</remarks> /// <remarks>https://redis.io/commands/zscore</remarks>
double? SortedSetScore(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None); double? SortedSetScore(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="messageId">The ID of the message to acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages acknowledged.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="messageIds">The IDs of the messages to acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages acknowledged.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamField">The field name for the stream entry.</param>
/// <param name="streamValue">The value to set in the stream entry.</param>
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks>https://redis.io/commands/xadd</remarks>
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks>https://redis.io/commands/xadd</remarks>
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the complete message for the claimed message(s).
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the given message(s).</param>
/// <param name="minIdleTimeInMs">The minimum message idle time to allow the reassignment of the message(s).</param>
/// <param name="messageIds">The IDs of the messages to claim for the given consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s).
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the given message(s).</param>
/// <param name="minIdleTimeInMs">The minimum message idle time to allow the reassignment of the message(s).</param>
/// <param name="messageIds">The IDs of the messages to claim for the given consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The message IDs for the messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The consumer group name.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="Redis.StreamConsumerInfo"/> for each of the consumer group's consumers.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Create a consumer group for the given stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="readFrom">The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="messageIds">The IDs of the messages to delete.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns the number of messages successfully deleted from the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key".
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="Redis.StreamGroupInfo"/> for each of the stream's groups.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
StreamGroupInfo[] StreamGroupInfo(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the given stream. This is the equivalent of calling "XINFO STREAM key".
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A <see cref="Redis.StreamInfo"/> instance with information about the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the number of entries in a stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of entries inside the given stream.</returns>
/// <remarks>https://redis.io/commands/xlen</remarks>
long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// View information about pending messages for a stream. A pending message is a message read using StreamReadGroup (XREADGROUP) but not yet acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingInfo"/>. <see cref="StreamPendingInfo"/> contains the number of pending messages, the highest and lowest ID of the pending messages, and the consumers with their pending message count.</returns>
/// <remarks>The equivalent of calling XPENDING key group.</remarks>
/// <remarks>https://redis.io/commands/xpending</remarks>
StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// View information about each pending message.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="count">The maximum number of pending messages to return.</param>
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks>https://redis.io/commands/xpending</remarks>
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read a stream using the given range of IDs.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="minId">The minimum ID from which to read the stream. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream. The method will default to reading to the end of the stream.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xrange</remarks>
RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from a single stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="afterId">The position from within the stream to begin reading.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams.
/// </summary>
/// <param name="streamIdPairs">The list of streams and the ID from which to begin reading for each stream.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read messages from a stream and an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="readFromId">The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Trim the stream to a specified maximum length.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// If key already exists and is a string, this command appends the value at the end of the string. If key does not exist it is created and set as an empty string, /// If key already exists and is a string, this command appends the value at the end of the string. If key does not exist it is created and set as an empty string,
/// so APPEND will be similar to SET in this special case. /// so APPEND will be similar to SET in this special case.
......
...@@ -1299,6 +1299,225 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1299,6 +1299,225 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/commands/zscore</remarks> /// <remarks>https://redis.io/commands/zscore</remarks>
Task<double?> SortedSetScoreAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None); Task<double?> SortedSetScoreAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="messageId">The ID of the message to acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages acknowledged.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="messageIds">The IDs of the messages to acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages acknowledged.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamField">The field name for the stream entry.</param>
/// <param name="streamValue">The value to set in the stream entry.</param>
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks>https://redis.io/commands/xadd</remarks>
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks>https://redis.io/commands/xadd</remarks>
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the complete message for the claimed message(s).
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the given messages.</param>
/// <param name="minIdleTimeInMs">The minimum message idle time to allow the reassignment of the message(s).</param>
/// <param name="messageIds">The IDs of the messages to claim for the given consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s).
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the given message(s).</param>
/// <param name="minIdleTimeInMs">The minimum message idle time to allow the reassignment of the message(s).</param>
/// <param name="messageIds">The IDs of the messages to claim for the given consumer.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The message IDs for the messages successfully claimed by the given consumer.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The consumer group name.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamConsumerInfo"/> for each of the consumer group's consumers.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<StreamConsumerInfo[]> StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Create a consumer group for the given stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="readFrom">The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="messageIds">The IDs of the messages to delete.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns the number of messages successfully deleted from the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key".
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamGroupInfo"/> for each of the stream's groups.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<StreamGroupInfo[]> StreamGroupInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Retrieve information about the given stream. This is the equivalent of calling "XINFO STREAM key".
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A <see cref="StreamInfo"/> instance with information about the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the number of entries in a stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of entries inside the given stream.</returns>
/// <remarks>https://redis.io/commands/xlen</remarks>
Task<long> StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// View information about pending messages for a stream. A pending message is a message read using StreamReadGroup (XREADGROUP) but not yet acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingInfo"/>. <see cref="StreamPendingInfo"/> contains the number of pending messages, the highest and lowest ID of the pending messages, and the consumers with their pending message count.</returns>
/// <remarks>The equivalent of calling XPENDING key group.</remarks>
/// <remarks>https://redis.io/commands/xpending</remarks>
Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// View information about each pending message.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="count">The maximum number of pending messages to return.</param>
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks>https://redis.io/commands/xpending</remarks>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read a stream using the given range of IDs.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="minId">The minimum ID from which to read the stream. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream. The method will default to reading to the end of the stream.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="messageOrder">The order of the messages. <see cref="Order.Ascending"/> will execute XRANGE and <see cref="Order.Descending"/> wil execute XREVRANGE.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xrange</remarks>
Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from a single stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="afterId">The message ID from within the stream to begin reading.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams.
/// </summary>
/// <param name="streamIdPairs">The list of streams and the ID from which to begin reading for each stream.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read messages from a stream and an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="readFromId">The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns an instance of <see cref="RedisStreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Trim the stream to a specified maximum length.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// If key already exists and is a string, this command appends the value at the end of the string. If key does not exist it is created and set as an empty string, /// If key already exists and is a string, this command appends the value at the end of the string. If key does not exist it is created and set as an empty string,
/// so APPEND will be similar to SET in this special case. /// so APPEND will be similar to SET in this special case.
......
...@@ -587,6 +587,101 @@ public long SortedSetRemoveRangeByValue(RedisKey key, RedisValue min, RedisValue ...@@ -587,6 +587,101 @@ public long SortedSetRemoveRangeByValue(RedisKey key, RedisValue min, RedisValue
return Inner.SortedSetScore(ToInner(key), member, flags); return Inner.SortedSetScore(ToInner(key), member, flags);
} }
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAcknowledge(ToInner(key), groupName, messageId, flags);
}
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAcknowledge(ToInner(key), groupName, messageIds, flags);
}
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
}
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
}
public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, readFrom, flags);
}
public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamInfo(ToInner(key), flags);
}
public StreamGroupInfo[] StreamGroupInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamGroupInfo(ToInner(key), flags);
}
public StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamConsumerInfo(ToInner(key), groupName, flags);
}
public long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamLength(ToInner(key), flags);
}
public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDelete(ToInner(key), messageIds, flags);
}
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamPending(ToInner(key), groupName, flags);
}
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
}
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRange(ToInner(key), minId, maxId, count, order, flags);
}
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRead(ToInner(key), afterId, count, flags);
}
public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRead(streamIdPairs, countPerStream, flags);
}
public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, readFromId, count, flags);
}
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);
}
public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StringAppend(ToInner(key), value, flags); return Inner.StringAppend(ToInner(key), value, flags);
......
...@@ -566,6 +566,101 @@ public Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min, ...@@ -566,6 +566,101 @@ public Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min,
return Inner.SortedSetScoreAsync(ToInner(key), member, flags); return Inner.SortedSetScoreAsync(ToInner(key), member, flags);
} }
public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageId, flags);
}
public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageIds, flags);
}
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
}
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
}
public Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, readFrom, flags);
}
public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamInfoAsync(ToInner(key), flags);
}
public Task<StreamGroupInfo[]> StreamGroupInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamGroupInfoAsync(ToInner(key), flags);
}
public Task<StreamConsumerInfo[]> StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamConsumerInfoAsync(ToInner(key), groupName, flags);
}
public Task<long> StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamLengthAsync(ToInner(key), flags);
}
public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamDeleteAsync(ToInner(key), messageIds, flags);
}
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamPendingAsync(ToInner(key), groupName, flags);
}
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
}
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, order, flags);
}
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadAsync(ToInner(key), afterId, count, flags);
}
public Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadAsync(streamIdPairs, countPerStream, flags);
}
public Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, readFromId, count, flags);
}
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);
}
public Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) public Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StringAppendAsync(ToInner(key), value, flags); return Inner.StringAppendAsync(ToInner(key), value, flags);
......
using System;
using System.Collections.Generic;
namespace StackExchange.Redis
{
/// <summary>
/// Describes a value contained in a stream (a name/value pair).
/// </summary>
public struct NameValueEntry : IEquatable<NameValueEntry>
{
internal readonly RedisValue name, value;
/// <summary>
/// Initializes a <see cref="NameValueEntry"/> value.
/// </summary>
/// <param name="name">The name for this entry.</param>
/// <param name="value">The value for this entry.</param>
public NameValueEntry(RedisValue name, RedisValue value)
{
this.name = name;
this.value = value;
}
/// <summary>
/// The name of the field.
/// </summary>
public RedisValue Name => name;
/// <summary>
/// The value of the field.
/// </summary>
public RedisValue Value => value;
/// <summary>
/// Converts to a key/value pair
/// </summary>
/// <param name="value">The <see cref="NameValueEntry"/> to create a <see cref="KeyValuePair{TKey, TValue}"/> from.</param>
public static implicit operator KeyValuePair<RedisValue, RedisValue>(NameValueEntry value) =>
new KeyValuePair<RedisValue, RedisValue>(value.name, value.value);
/// <summary>
/// Converts from a key/value pair
/// </summary>
/// <param name="value">The <see cref="KeyValuePair{TKey, TValue}"/> to get a <see cref="NameValueEntry"/> from.</param>
public static implicit operator NameValueEntry(KeyValuePair<RedisValue, RedisValue> value) =>
new NameValueEntry(value.Key, value.Value);
/// <summary>
/// See Object.ToString()
/// </summary>
public override string ToString() => name + ": " + value;
/// <summary>
/// See Object.GetHashCode()
/// </summary>
public override int GetHashCode() => name.GetHashCode() ^ value.GetHashCode();
/// <summary>
/// Compares two values for equality.
/// </summary>
/// <param name="obj">The <see cref="NameValueEntry"/> to compare to.</param>
public override bool Equals(object obj) => obj is NameValueEntry heObj && Equals(heObj);
/// <summary>
/// Compares two values for equality.
/// </summary>
/// <param name="other">The <see cref="NameValueEntry"/> to compare to.</param>
public bool Equals(NameValueEntry other) => name == other.name && value == other.value;
/// <summary>
/// Compares two values for equality
/// </summary>
/// <param name="x">The first <see cref="NameValueEntry"/> to compare.</param>
/// <param name="y">The second <see cref="NameValueEntry"/> to compare.</param>
public static bool operator ==(NameValueEntry x, NameValueEntry y) => x.name == y.name && x.value == y.value;
/// <summary>
/// Compares two values for non-equality
/// </summary>
/// <param name="x">The first <see cref="NameValueEntry"/> to compare.</param>
/// <param name="y">The second <see cref="NameValueEntry"/> to compare.</param>
public static bool operator !=(NameValueEntry x, NameValueEntry y) => x.name != y.name || x.value != y.value;
}
}
...@@ -7,6 +7,9 @@ internal struct RawResult ...@@ -7,6 +7,9 @@ internal struct RawResult
{ {
public static readonly RawResult EmptyArray = new RawResult(new RawResult[0]); public static readonly RawResult EmptyArray = new RawResult(new RawResult[0]);
public static readonly RawResult Nil = new RawResult(); public static readonly RawResult Nil = new RawResult();
public static RawResult CreateMultiBulk(params RawResult[] results) => new RawResult(results);
private static readonly byte[] emptyBlob = new byte[0]; private static readonly byte[] emptyBlob = new byte[0];
private readonly int offset, count; private readonly int offset, count;
private readonly Array arr; private readonly Array arr;
......
...@@ -1579,6 +1579,334 @@ public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue patter ...@@ -1579,6 +1579,334 @@ public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue patter
return ExecuteAsync(msg, ResultProcessor.NullableDouble); return ExecuteAsync(msg, ResultProcessor.NullableDouble);
} }
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAcknowledgeMessage(key, groupName, messageId, flags);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAcknowledgeMessage(key, groupName, messageId, flags);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAcknowledgeMessage(key, groupName, messageIds, flags);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAcknowledgeMessage(key, groupName, messageIds, flags);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAddMessage(key,
messageId ?? StreamConstants.AutoGeneratedId,
maxLength,
useApproximateMaxLength,
new NameValueEntry(streamField, streamValue),
flags);
return ExecuteSync(msg, ResultProcessor.RedisValue);
}
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAddMessage(key,
messageId ?? StreamConstants.AutoGeneratedId,
maxLength,
useApproximateMaxLength,
new NameValueEntry(streamField, streamValue),
flags);
return ExecuteAsync(msg, ResultProcessor.RedisValue);
}
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAddMessage(key,
messageId ?? StreamConstants.AutoGeneratedId,
maxLength,
useApproximateMaxLength,
streamPairs,
flags);
return ExecuteSync(msg, ResultProcessor.RedisValue);
}
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAddMessage(key,
messageId ?? StreamConstants.AutoGeneratedId,
maxLength,
useApproximateMaxLength,
streamPairs,
flags);
return ExecuteAsync(msg, ResultProcessor.RedisValue);
}
public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamClaimMessage(key,
consumerGroup,
claimingConsumer,
minIdleTimeInMs,
messageIds,
returnJustIds: false,
flags: flags);
return ExecuteSync(msg, ResultProcessor.SingleStream);
}
public Task<RedisStreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamClaimMessage(key,
consumerGroup,
claimingConsumer,
minIdleTimeInMs,
messageIds,
returnJustIds: false,
flags: flags);
return ExecuteAsync(msg, ResultProcessor.SingleStream);
}
public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamClaimMessage(key,
consumerGroup,
claimingConsumer,
minIdleTimeInMs,
messageIds,
returnJustIds: true,
flags: flags);
return ExecuteSync(msg, ResultProcessor.RedisValueArray);
}
public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamClaimMessage(key,
consumerGroup,
claimingConsumer,
minIdleTimeInMs,
messageIds,
returnJustIds: true,
flags: flags);
return ExecuteAsync(msg, ResultProcessor.RedisValueArray);
}
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.Create,
key.AsRedisValue(),
groupName,
readFrom ?? StreamConstants.NewMessages
});
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XGROUP,
new RedisValue[]
{
StreamConstants.Create,
key.AsRedisValue(),
groupName,
readFrom ?? StreamConstants.NewMessages
});
return ExecuteAsync(msg, ResultProcessor.Boolean);
}
public StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XINFO,
new RedisValue[]
{
StreamConstants.Consumers,
key.AsRedisValue(),
groupName
});
return ExecuteSync(msg, ResultProcessor.StreamConsumerInfo);
}
public Task<StreamConsumerInfo[]> StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XINFO,
new RedisValue[]
{
StreamConstants.Consumers,
key.AsRedisValue(),
groupName
});
return ExecuteAsync(msg, ResultProcessor.StreamConsumerInfo);
}
public StreamGroupInfo[] StreamGroupInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Groups, key);
return ExecuteSync(msg, ResultProcessor.StreamGroupInfo);
}
public Task<StreamGroupInfo[]> StreamGroupInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Groups, key);
return ExecuteAsync(msg, ResultProcessor.StreamGroupInfo);
}
public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Stream, key);
return ExecuteSync(msg, ResultProcessor.StreamInfo);
}
public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Stream, key);
return ExecuteAsync(msg, ResultProcessor.StreamInfo);
}
public long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XLEN, key);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XLEN, key);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XDEL,
key,
messageIds);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database,
flags,
RedisCommand.XDEL,
key,
messageIds);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName);
return ExecuteSync(msg, ResultProcessor.StreamPendingInfo);
}
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName);
return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo);
}
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags);
return ExecuteSync(msg, ResultProcessor.StreamPendingMessages);
}
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags);
return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages);
}
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags);
return ExecuteSync(msg, ResultProcessor.SingleStream);
}
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags);
return ExecuteAsync(msg, ResultProcessor.SingleStream);
}
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetSingleStreamReadMessage(key, afterId, count, flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetSingleStreamReadMessage(key, afterId, count, flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags);
return ExecuteSync(msg, ResultProcessor.MultiStream);
}
public Task<RedisStream[]> StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags);
return ExecuteAsync(msg, ResultProcessor.MultiStream);
}
public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public Task<RedisStreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
}
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(Database, flags, RedisCommand.APPEND, key, value); var msg = Message.Create(Database, flags, RedisCommand.APPEND, key, value);
...@@ -1936,6 +2264,62 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart) ...@@ -1936,6 +2264,62 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart)
return result; return result;
} }
private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? countPerStream, CommandFlags flags)
{
// Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
if (streamIdPairs == null) throw new ArgumentNullException(nameof(streamIdPairs));
if (streamIdPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamIdPairs), "streamAndIdPairs must contain at least one item.");
if (countPerStream.HasValue && countPerStream <= 0)
{
throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0.");
}
var values = new RedisValue[
1 // Streams keyword.
+ (streamIdPairs.Length * 2) // Room for the stream names and the ID from which to begin reading.
+ (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null.
var offset = 0;
if (countPerStream.HasValue)
{
values[offset++] = StreamConstants.Count;
values[offset++] = countPerStream;
}
values[offset++] = StreamConstants.Streams;
// Write the stream names and the message IDs from which to read for the associated stream. Each pair
// will be separated by an offset of the index of the stream name plus the pair count.
/*
* [0] = COUNT
* [1] = 2
* [3] = STREAMS
* [4] = stream1
* [5] = stream2
* [6] = stream3
* [7] = id1
* [8] = id2
* [9] = id3
*
* */
var pairCount = streamIdPairs.Length;
for (var i = 0; i < pairCount; i++)
{
values[offset] = streamIdPairs[i].Key.AsRedisValue();
values[offset + pairCount] = streamIdPairs[i].Id;
offset++;
}
return Message.Create(Database, flags, RedisCommand.XREAD, values);
}
private RedisValue GetRange(double value, Exclude exclude, bool isStart) private RedisValue GetRange(double value, Exclude exclude, bool isStart)
{ {
if (isStart) if (isStart)
...@@ -2164,6 +2548,302 @@ private Message GetSortedSetRemoveRangeByScoreMessage(RedisKey key, double start ...@@ -2164,6 +2548,302 @@ private Message GetSortedSetRemoveRangeByScoreMessage(RedisKey key, double start
GetRange(start, exclude, true), GetRange(stop, exclude, false)); GetRange(start, exclude, true), GetRange(stop, exclude, false));
} }
private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags)
{
var values = new RedisValue[]
{
key.AsRedisValue(),
groupName,
messageId
};
return Message.Create(Database, flags, RedisCommand.XACK, values);
}
private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags)
{
if (messageIds == null) throw new ArgumentNullException(nameof(messageIds));
if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item.");
var values = new RedisValue[messageIds.Length + 2];
var offset = 0;
values[offset++] = key.AsRedisValue();
values[offset++] = groupName;
for (var i = 0; i < messageIds.Length; i++)
{
values[offset++] = messageIds[i];
}
return Message.Create(Database, flags, RedisCommand.XACK, values);
}
private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, CommandFlags flags)
{
// Calculate the correct number of arguments:
// 3 array elements for Entry ID & NameValueEntry.Name & NameValueEntry.Value.
// 2 elements if using MAXLEN (keyword & value), otherwise 0.
// 1 element if using Approximate Length (~), otherwise 0.
var totalLength = 3 + (maxLength.HasValue ? 2 : 0)
+ (maxLength.HasValue && useApproximateMaxLength ? 1 : 0);
var values = new RedisValue[totalLength];
var offset = 0;
values[offset++] = messageId;
if (maxLength.HasValue)
{
values[offset++] = StreamConstants.MaxLen;
if (useApproximateMaxLength)
{
values[offset++] = StreamConstants.ApproximateMaxLen;
values[offset++] = maxLength.Value;
}
else
{
values[offset++] = maxLength.Value;
}
}
values[offset++] = streamPair.Name;
values[offset] = streamPair.Value;
return Message.Create(Database, flags, RedisCommand.XADD, key, values);
}
private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, CommandFlags flags)
{
// See https://redis.io/commands/xadd.
if (streamPairs == null) throw new ArgumentNullException(nameof(streamPairs));
if (streamPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPairs), "streamPairs must contain at least one item.");
if (maxLength.HasValue && maxLength <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxLength), "maxLength must be greater than 0.");
}
var includeMaxLen = maxLength.HasValue ? 2 : 0;
var includeApproxLen = maxLength.HasValue && useApproximateMaxLength ? 1 : 0;
var totalLength = (streamPairs.Length * 2) // Room for the name/value pairs
+ 1 // The stream entry ID
+ includeMaxLen // 2 or 0 (MAXLEN keyword & the count)
+ includeApproxLen; // 1 or 0
var values = new RedisValue[totalLength];
var offset = 0;
values[offset++] = entryId;
if (maxLength.HasValue)
{
values[offset++] = StreamConstants.MaxLen;
if (useApproximateMaxLength)
{
values[offset++] = StreamConstants.ApproximateMaxLen;
}
values[offset++] = maxLength.Value;
}
for (var i = 0; i < streamPairs.Length; i++)
{
values[offset++] = streamPairs[i].Name;
values[offset++] = streamPairs[i].Value;
}
return Message.Create(Database, flags, RedisCommand.XADD, key, values);
}
private Message GetStreamClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue[] messageIds, bool returnJustIds, CommandFlags flags)
{
if (messageIds == null) throw new ArgumentNullException(nameof(messageIds));
if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item.");
// XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
var values = new RedisValue[4 + messageIds.Length + (returnJustIds ? 1 : 0)];
var offset = 0;
values[offset++] = key.AsRedisValue();
values[offset++] = consumerGroup;
values[offset++] = assignToConsumer;
values[offset++] = minIdleTimeInMs;
for (var i = 0; i < messageIds.Length; i++)
{
values[offset++] = messageIds[i];
}
if (returnJustIds)
{
values[offset] = StreamConstants.JustId;
}
return Message.Create(Database, flags, RedisCommand.XCLAIM, values);
}
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
{
// > XPENDING mystream mygroup - + 10 [consumer name]
// 1) 1) 1526569498055 - 0
// 2) "Bob"
// 3) (integer)74170458
// 4) (integer)1
// 2) 1) 1526569506935 - 0
// 2) "Bob"
// 3) (integer)74170458
// 4) (integer)1
// See https://redis.io/topics/streams-intro.
if (count <= 0)
{
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}
var values = new RedisValue[consumerName == RedisValue.Null ? 5 : 6];
values[0] = key.AsRedisValue();
values[1] = groupName;
values[2] = minId ?? StreamConstants.ReadMinValue;
values[3] = maxId ?? StreamConstants.ReadMaxValue;
values[4] = count;
if (consumerName != RedisValue.Null)
{
values[5] = consumerName;
}
return Message.Create(Database,
flags,
RedisCommand.XPENDING,
values);
}
private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValue? maxId, int? count, Order messageOrder, CommandFlags flags)
{
if (count.HasValue && count <= 0)
{
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}
var actualMin = minId ?? StreamConstants.ReadMinValue;
var actualMax = maxId ?? StreamConstants.ReadMaxValue;
var values = new RedisValue[2 + (count.HasValue ? 2 : 0)];
values[0] = (messageOrder == Order.Ascending ? actualMin : actualMax);
values[1] = (messageOrder == Order.Ascending ? actualMax : actualMin);
if (count.HasValue)
{
values[2] = StreamConstants.Count;
values[3] = count.Value;
}
return Message.Create(Database,
flags,
messageOrder == Order.Ascending ? RedisCommand.XRANGE : RedisCommand.XREVRANGE,
key,
values);
}
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId, int? count, CommandFlags flags)
{
// Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
if (count.HasValue && count <= 0)
{
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}
var totalValueCount = 6 + (count.HasValue ? 2 : 0);
var values = new RedisValue[totalValueCount];
var offset = 0;
values[offset++] = StreamConstants.Group;
values[offset++] = groupName;
values[offset++] = consumerName;
if (count.HasValue)
{
values[offset++] = StreamConstants.Count;
values[offset++] = count.Value;
}
values[offset++] = StreamConstants.Streams;
values[offset++] = key.AsRedisValue();
values[offset] = readFromId ?? StreamConstants.UndeliveredMessages;
return Message.Create(Database,
flags,
RedisCommand.XREADGROUP,
values);
}
private Message GetSingleStreamReadMessage(RedisKey key, RedisValue afterId, int? count, CommandFlags flags)
{
if (count.HasValue && count <= 0)
{
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}
var values = new RedisValue[3 + (count.HasValue ? 2 : 0)];
var offset = 0;
if (count.HasValue)
{
values[offset++] = StreamConstants.Count;
values[offset++] = count.Value;
}
values[offset++] = StreamConstants.Streams;
values[offset++] = key.AsRedisValue();
values[offset] = afterId;
// Example: > XREAD COUNT 2 STREAMS writers 1526999352406-0
return Message.Create(Database,
flags,
RedisCommand.XREAD,
values);
}
private Message GetStreamTrimMessage(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags)
{
if (maxLength <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxLength), "maxLength must be greater than 0.");
}
var values = new RedisValue[2 + (useApproximateMaxLength ? 1 : 0)];
values[0] = StreamConstants.MaxLen;
if (useApproximateMaxLength)
{
values[1] = StreamConstants.ApproximateMaxLen;
values[2] = maxLength;
}
else
{
values[1] = maxLength;
}
return Message.Create(Database,
flags,
RedisCommand.XTRIM,
key,
values);
}
private Message GetStringBitOperationMessage(Bitwise operation, RedisKey destination, RedisKey[] keys, CommandFlags flags) private Message GetStringBitOperationMessage(Bitwise operation, RedisKey destination, RedisKey[] keys, CommandFlags flags)
{ {
if (keys == null) throw new ArgumentNullException(nameof(keys)); if (keys == null) throw new ArgumentNullException(nameof(keys));
......
...@@ -28,7 +28,8 @@ public struct RedisFeatures ...@@ -28,7 +28,8 @@ public struct RedisFeatures
v2_8_18 = new Version(2, 8, 18), v2_8_18 = new Version(2, 8, 18),
v2_9_5 = new Version(2, 9, 5), v2_9_5 = new Version(2, 9, 5),
v3_0_0 = new Version(3, 0, 0), v3_0_0 = new Version(3, 0, 0),
v3_2_0 = new Version(3, 2, 0); v3_2_0 = new Version(3, 2, 0),
v4_9_1 = new Version(4, 9, 1); // 5.0 RC1 is version 4.9.1
private readonly Version version; private readonly Version version;
/// <summary> /// <summary>
...@@ -120,6 +121,11 @@ public RedisFeatures(Version version) ...@@ -120,6 +121,11 @@ public RedisFeatures(Version version)
/// </summary> /// </summary>
public bool SetVaradicAddRemove => Version >= v2_4_0; public bool SetVaradicAddRemove => Version >= v2_4_0;
/// <summary>
/// Are Redis Streams available?
/// </summary>
public bool Streams => Version >= v4_9_1;
/// <summary> /// <summary>
/// Is STRLEN available? /// Is STRLEN available?
/// </summary> /// </summary>
......
namespace StackExchange.Redis
{
/// <summary>
/// Describes a Redis Stream with an associated array of entries.
/// </summary>
public struct RedisStream
{
internal RedisStream(RedisKey key, RedisStreamEntry[] entries)
{
Key = key;
Entries = entries;
}
/// <summary>
/// The key for the stream.
/// </summary>
public RedisKey Key { get; }
/// <summary>
/// An arry of entries contained within the stream.
/// </summary>
public RedisStreamEntry[] Entries { get; }
}
}
namespace StackExchange.Redis
{
/// <summary>
/// Describes an entry contained in a Redis Stream.
/// </summary>
public struct RedisStreamEntry
{
internal RedisStreamEntry(RedisValue id, NameValueEntry[] values)
{
Id = id;
Values = values;
}
/// <summary>
/// A null stream entry.
/// </summary>
public static RedisStreamEntry Null { get; } = new RedisStreamEntry(RedisValue.Null, null);
/// <summary>
/// The ID assigned to the message.
/// </summary>
public RedisValue Id { get; }
/// <summary>
/// The values contained within the message.
/// </summary>
public NameValueEntry[] Values { get; }
/// <summary>
/// Indicates that the Redis Stream Entry is null.
/// </summary>
public bool IsNull => Id == RedisValue.Null && Values == null;
}
}
...@@ -40,6 +40,9 @@ internal abstract class ResultProcessor ...@@ -40,6 +40,9 @@ internal abstract class ResultProcessor
public static readonly ResultProcessor<IGrouping<string, KeyValuePair<string, string>>[]> public static readonly ResultProcessor<IGrouping<string, KeyValuePair<string, string>>[]>
Info = new InfoProcessor(); Info = new InfoProcessor();
public static readonly MultiStreamProcessor
MultiStream = new MultiStreamProcessor();
public static readonly ResultProcessor<long> public static readonly ResultProcessor<long>
Int64 = new Int64Processor(), Int64 = new Int64Processor(),
PubSubNumSub = new PubSubNumSubProcessor(); PubSubNumSub = new PubSubNumSubProcessor();
...@@ -84,6 +87,27 @@ internal abstract class ResultProcessor ...@@ -84,6 +87,27 @@ internal abstract class ResultProcessor
public static readonly SortedSetEntryArrayProcessor public static readonly SortedSetEntryArrayProcessor
SortedSetWithScores = new SortedSetEntryArrayProcessor(); SortedSetWithScores = new SortedSetEntryArrayProcessor();
public static readonly SingleStreamProcessor
SingleStream = new SingleStreamProcessor();
public static readonly SingleStreamProcessor
SingleStreamWithNameSkip = new SingleStreamProcessor(skipStreamName: true);
public static readonly StreamConsumerInfoProcessor
StreamConsumerInfo = new StreamConsumerInfoProcessor();
public static readonly StreamGroupInfoProcessor
StreamGroupInfo = new StreamGroupInfoProcessor();
public static readonly StreamInfoProcessor
StreamInfo = new StreamInfoProcessor();
public static readonly StreamPendingInfoProcessor
StreamPendingInfo = new StreamPendingInfoProcessor();
public static readonly StreamPendingMessagesProcessor
StreamPendingMessages = new StreamPendingMessagesProcessor();
public static ResultProcessor<GeoRadiusResult[]> GeoRadiusArray(GeoRadiusOptions options) => GeoRadiusResultArrayProcessor.Get(options); public static ResultProcessor<GeoRadiusResult[]> GeoRadiusArray(GeoRadiusOptions options) => GeoRadiusResultArrayProcessor.Get(options);
public static readonly ResultProcessor<string> public static readonly ResultProcessor<string>
...@@ -1301,6 +1325,424 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1301,6 +1325,424 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
} }
internal sealed class SingleStreamProcessor : StreamProcessorBase<RedisStreamEntry[]>
{
private bool skipStreamName;
public SingleStreamProcessor(bool skipStreamName = false)
{
this.skipStreamName = skipStreamName;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.IsNull)
{
// Server returns 'nil' if no entries are returned for the given stream.
SetResult(message, new RedisStreamEntry[0]);
return true;
}
if (result.Type != ResultType.MultiBulk)
{
return false;
}
RedisStreamEntry[] entries = null;
if (skipStreamName)
{
// Skip the first element in the array (i.e., the stream name).
// See https://redis.io/commands/xread.
// > XREAD COUNT 2 STREAMS mystream 0
// 1) 1) "mystream" <== Skip the stream name
// 2) 1) 1) 1519073278252 - 0 <== Index 1 contains the array of stream entries
// 2) 1) "foo"
// 2) "value_1"
// 2) 1) 1519073279157 - 0
// 2) 1) "foo"
// 2) "value_2"
// Retrieve the initial array. For XREAD of a single stream it will
// be an array of only 1 element in the response.
var readResult = result.GetItems();
// Within that single element, GetItems will return an array of
// 2 elements: the stream name and the stream entries.
// Skip the stream name (index 0) and only process the stream entries (index 1).
entries = ParseRedisStreamEntries(readResult[0].GetItems()[1]);
}
else
{
entries = ParseRedisStreamEntries(result);
}
SetResult(message, entries);
return true;
}
}
internal sealed class MultiStreamProcessor : StreamProcessorBase<RedisStream[]>
{
/*
The result is similar to the XRANGE result (see SingleStreamProcessor)
with the addition of the stream name as the first element of top level
Multibulk array.
See https://redis.io/commands/xread.
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
*/
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.IsNull)
{
// Nothing returned for any of the requested streams. The server returns 'nil'.
SetResult(message, new RedisStream[0]);
return true;
}
if (result.Type != ResultType.MultiBulk)
{
return false;
}
var arr = result.GetItems();
var streams = Array.ConvertAll(arr, item =>
{
var details = item.GetItems();
// details[0] = Name of the Stream
// details[1] = Multibulk Array of Stream Entries
return new RedisStream(key: details[0].AsRedisKey(),
entries: ParseRedisStreamEntries(details[1]));
});
SetResult(message, streams);
return true;
}
}
internal sealed class StreamConsumerInfoProcessor : InterleavedStreamInfoProcessorBase<StreamConsumerInfo>
{
protected override StreamConsumerInfo ParseItem(RawResult result)
{
// Note: the base class passes a single consumer from the response into this method.
// Response format:
// > XINFO CONSUMERS mystream mygroup
// 1) 1) name
// 2) "Alice"
// 3) pending
// 4) (integer)1
// 5) idle
// 6) (integer)9104628
// 2) 1) name
// 2) "Bob"
// 3) pending
// 4) (integer)1
// 5) idle
// 6) (integer)83841983
var arr = result.GetItems();
return new StreamConsumerInfo(name: arr[1].AsRedisValue(),
pendingMessageCount: (int)arr[3].AsRedisValue(),
idleTimeInMilliseconds: (long)arr[5].AsRedisValue());
}
}
internal sealed class StreamGroupInfoProcessor : InterleavedStreamInfoProcessorBase<StreamGroupInfo>
{
protected override StreamGroupInfo ParseItem(RawResult result)
{
// Note: the base class passes a single item from the response into this method.
// Response format:
// > XINFO GROUPS mystream
// 1) 1) name
// 2) "mygroup"
// 3) consumers
// 4) (integer)2
// 5) pending
// 6) (integer)2
// 2) 1) name
// 2) "some-other-group"
// 3) consumers
// 4) (integer)1
// 5) pending
// 6) (integer)0
var arr = result.GetItems();
return new StreamGroupInfo(name: arr[1].AsRedisValue(),
consumerCount: (int)arr[3].AsRedisValue(),
pendingMessageCount: (int)arr[5].AsRedisValue());
}
}
internal abstract class InterleavedStreamInfoProcessorBase<T> : ResultProcessor<T[]>
{
protected abstract T ParseItem(RawResult result);
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type != ResultType.MultiBulk)
{
return false;
}
var arr = result.GetItems();
var parsedItems = Array.ConvertAll(arr, item => ParseItem(item));
SetResult(message, parsedItems);
return true;
}
}
internal sealed class StreamInfoProcessor : StreamProcessorBase<StreamInfo>
{
// Parse the following format:
// > XINFO mystream
// 1) length
// 2) (integer) 13
// 3) radix-tree-keys
// 4) (integer) 1
// 5) radix-tree-nodes
// 6) (integer) 2
// 7) groups
// 8) (integer) 2
// 9) first-entry
// 10) 1) 1524494395530-0
// 2) 1) "a"
// 2) "1"
// 3) "b"
// 4) "2"
// 11) last-entry
// 12) 1) 1526569544280-0
// 2) 1) "message"
// 2) "banana"
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type != ResultType.MultiBulk)
{
return false;
}
var arr = result.GetItems();
if (arr.Length != 12)
{
return false;
}
// Note: Even if there is only 1 message in the stream, this command returns
// the single entry as the first-entry and last-entry in the response.
// The first 8 items are interleaved name/value pairs.
// Items 9-12 represent the first and last entry in the stream. The values will
// be nil (stored in index 9 & 11) if the stream length is 0.
var entries = ParseRedisStreamEntries(RawResult.CreateMultiBulk(arr[9], arr[11]));
var streamInfo = new StreamInfo(length: (int)arr[1].AsRedisValue(),
radixTreeKeys: (int)arr[3].AsRedisValue(),
radixTreeNodes: (int)arr[5].AsRedisValue(),
groups: (int)arr[7].AsRedisValue(),
firstEntry: entries[0],
lastEntry: entries[1]);
SetResult(message, streamInfo);
return true;
}
}
internal sealed class StreamPendingInfoProcessor : ResultProcessor<StreamPendingInfo>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
// Example:
// > XPENDING mystream mygroup
// 1) (integer)2
// 2) 1526569498055 - 0
// 3) 1526569506935 - 0
// 4) 1) 1) "Bob"
// 2) "2"
// 5) 1) 1) "Joe"
// 2) "8"
if (result.Type != ResultType.MultiBulk)
{
return false;
}
var arr = result.GetItems();
if (arr.Length != 4)
{
return false;
}
StreamConsumer[] consumers = null;
// If there are no consumers as of yet for the given group, the last
// item in the response array will be null.
if (!arr[3].IsNull)
{
consumers = Array.ConvertAll(arr[3].GetItems(), item =>
{
var details = item.GetItems();
return new StreamConsumer(
name: details[0].AsRedisValue(),
pendingMessageCount: (int)details[1].AsRedisValue());
});
}
var pendingInfo = new StreamPendingInfo(pendingMessageCount: (int)arr[0].AsRedisValue(),
lowestId: arr[1].AsRedisValue(),
highestId: arr[2].AsRedisValue(),
consumers: consumers ?? new StreamConsumer[0]);
// ^^^^^
// Should we bother allocating an empty array only to prevent the need for a null check?
SetResult(message, pendingInfo);
return true;
}
}
internal sealed class StreamPendingMessagesProcessor : ResultProcessor<StreamPendingMessageInfo[]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type != ResultType.MultiBulk)
{
return false;
}
var arr = result.GetItems();
var messageInfoArray = Array.ConvertAll(arr, item =>
{
var details = item.GetItems();
return new StreamPendingMessageInfo(messageId: details[0].AsRedisValue(),
consumerName: details[1].AsRedisValue(),
idleTimeInMs: (long)details[2].AsRedisValue(),
deliveryCount: (int)details[3].AsRedisValue());
});
SetResult(message, messageInfoArray);
return true;
}
}
internal abstract class StreamProcessorBase<T> : ResultProcessor<T>
{
// For command response formats see https://redis.io/topics/streams-intro.
protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result)
{
if (result.Type != ResultType.MultiBulk)
{
return null;
}
var arr = result.GetItems();
return Array.ConvertAll(arr, item =>
{
if (item.IsNull || item.Type != ResultType.MultiBulk)
{
return RedisStreamEntry.Null;
}
// Process the Multibulk array for each entry. The entry contains the following elements:
// [0] = SimpleString (the ID of the stream entry)
// [1] = Multibulk array of the name/value pairs of the stream entry's data
var entryDetails = item.GetItems();
return new RedisStreamEntry(id: entryDetails[0].AsRedisValue(),
values: ParseStreamEntryValues(entryDetails[1]));
});
}
protected NameValueEntry[] ParseStreamEntryValues(RawResult result)
{
// The XRANGE, XREVRANGE, XREAD commands return stream entries
// in the following format. The name/value pairs are interleaved
// in the same fashion as the HGETALL response.
//
// 1) 1) 1518951480106-0
// 2) 1) "sensor-id"
// 2) "1234"
// 3) "temperature"
// 4) "19.8"
// 2) 1) 1518951482479-0
// 2) 1) "sensor-id"
// 2) "9999"
// 3) "temperature"
// 4) "18.2"
if (result.Type != ResultType.MultiBulk)
{
return null;
}
var arr = result.GetItems();
if (arr == null)
{
return null;
}
// Calculate how many name/value pairs are in the stream entry.
int count = arr.Length / 2;
if (count == 0)
{
return new NameValueEntry[0];
}
var pairs = new NameValueEntry[count];
int offset = 0;
for (int i = 0; i < pairs.Length; i++)
{
pairs[i] = new NameValueEntry(arr[offset++].AsRedisValue(),
arr[offset++].AsRedisValue());
}
return pairs;
}
}
private sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase<KeyValuePair<string, string>> private sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase<KeyValuePair<string, string>>
{ {
protected override KeyValuePair<string, string> Parse(RawResult first, RawResult second) protected override KeyValuePair<string, string> Parse(RawResult first, RawResult second)
......

namespace StackExchange.Redis
{
/// <summary>
/// Constants representing values used in Redis Stream commands.
/// </summary>
internal static class StreamConstants
{
/// <summary>
/// The "~" value used with the MAXLEN option.
/// </summary>
internal static readonly RedisValue ApproximateMaxLen = "~";
/// <summary>
/// The "*" value used with the XADD command.
/// </summary>
internal static readonly RedisValue AutoGeneratedId = "*";
/// <summary>
/// The "$" value used in the XGROUP command. Indicates reading only new messages from the stream.
/// </summary>
internal static readonly RedisValue NewMessages = "$";
/// <summary>
/// The "-" value used in the XRANGE, XREAD, and XREADGROUP commands. Indicates the minimum message ID from the stream.
/// </summary>
internal static readonly RedisValue ReadMinValue = "-";
/// <summary>
/// The "+" value used in the XRANGE, XREAD, and XREADGROUP commands. Indicates the maximum message ID from the stream.
/// </summary>
internal static readonly RedisValue ReadMaxValue = "+";
/// <summary>
/// The ">" value used in the XREADGROUP command. Use this to read messages that have not been delivered to a consumer group.
/// </summary>
internal static readonly RedisValue UndeliveredMessages = ">";
internal static readonly RedisValue Consumers = "CONSUMERS";
internal static readonly RedisValue Count = "COUNT";
internal static readonly RedisValue Create = "CREATE";
internal static readonly RedisValue Group = "GROUP";
internal static readonly RedisValue Groups = "GROUPS";
internal static readonly RedisValue JustId = "JUSTID";
internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue Stream = "STREAM";
internal static readonly RedisValue Streams = "STREAMS";
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes a consumer off a Redis Stream.
/// </summary>
public struct StreamConsumer
{
internal StreamConsumer(RedisValue name, int pendingMessageCount)
{
Name = name;
PendingMessageCount = pendingMessageCount;
}
/// <summary>
/// The name of the consumer.
/// </summary>
public RedisValue Name { get; }
/// <summary>
/// The number of messages that have been delivered by not yet acknowledged by the consumer.
/// </summary>
public int PendingMessageCount { get; }
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes a consumer within a consumer group, retrieved using the XINFO CONSUMERS command. <see cref="IDatabase.StreamConsumerInfo"/>
/// </summary>
public struct StreamConsumerInfo
{
internal StreamConsumerInfo(string name, int pendingMessageCount, long idleTimeInMilliseconds)
{
Name = name;
PendingMessageCount = pendingMessageCount;
IdleTimeInMilliseconds = idleTimeInMilliseconds;
}
/// <summary>
/// The name of the consumer.
/// </summary>
public string Name { get; }
/// <summary>
/// The number of pending messages for the consumer. A pending message is one that has been
/// received by the consumer but not yet acknowledged.
/// </summary>
public int PendingMessageCount { get; }
/// <summary>
/// The idle time, if any, for the consumer.
/// </summary>
public long IdleTimeInMilliseconds { get; }
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes a consumer group retrieved using the XINFO GROUPS command. <see cref="IDatabase.StreamGroupInfo"/>
/// </summary>
public struct StreamGroupInfo
{
internal StreamGroupInfo(string name, int consumerCount, int pendingMessageCount)
{
Name = name;
ConsumerCount = consumerCount;
PendingMessageCount = pendingMessageCount;
}
/// <summary>
/// The name of the consumer group.
/// </summary>
public string Name { get; }
/// <summary>
/// The number of consumers within the consumer group.
/// </summary>
public int ConsumerCount { get; }
/// <summary>
/// The total number of pending messages for the consumer group. A pending message is one that has been
/// received by a consumer but not yet acknowledged.
/// </summary>
public int PendingMessageCount { get; }
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes a pair consisting of the Stream Key and the ID from which to read.
/// </summary>
/// <remarks><see cref="IDatabase.StreamRead(StreamIdPair[], int?, CommandFlags)"/></remarks>
public struct StreamIdPair
{
/// <summary>
/// Initializes a <see cref="StreamIdPair"/> value.
/// </summary>
/// <param name="key">The key for the stream.</param>
/// <param name="id">The ID from which to begin reading the stream.</param>
public StreamIdPair(RedisKey key, RedisValue id)
{
Key = key;
Id = id;
}
/// <summary>
/// The key for the stream.
/// </summary>
public RedisKey Key { get; }
/// <summary>
/// The ID from which to begin reading the stream.
/// </summary>
public RedisValue Id { get; }
/// <summary>
/// See Object.ToString()
/// </summary>
public override string ToString() => $"{Key}: {Id}";
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes stream information retrieved using the XINFO STREAM command. <see cref="IDatabase.StreamInfo"/>
/// </summary>
public struct StreamInfo
{
internal StreamInfo(int length,
int radixTreeKeys,
int radixTreeNodes,
int groups,
RedisStreamEntry firstEntry,
RedisStreamEntry lastEntry)
{
Length = length;
RadixTreeKeys = radixTreeKeys;
RadixTreeNodes = radixTreeNodes;
ConsumerGroupCount = groups;
FirstEntry = firstEntry;
LastEntry = lastEntry;
}
/// <summary>
/// The number of entries in the stream.
/// </summary>
public int Length { get; }
/// <summary>
/// The number of radix tree keys in the stream.
/// </summary>
public int RadixTreeKeys { get; }
/// <summary>
/// The number of radix tree nodes in the stream.
/// </summary>
public int RadixTreeNodes { get; }
/// <summary>
/// The number of consumers groups in the stream.
/// </summary>
public int ConsumerGroupCount { get; }
/// <summary>
/// The first entry in the stream.
/// </summary>
public RedisStreamEntry FirstEntry { get; }
/// <summary>
/// The last entry in the stream.
/// </summary>
public RedisStreamEntry LastEntry { get; }
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes basic information about pending messages for a consumer group.
/// </summary>
public struct StreamPendingInfo
{
internal StreamPendingInfo(int pendingMessageCount,
RedisValue lowestId,
RedisValue highestId,
StreamConsumer[] consumers)
{
PendingMessageCount = pendingMessageCount;
LowestPendingMessageId = lowestId;
HighestPendingMessageId = highestId;
Consumers = consumers;
}
/// <summary>
/// The number of pending messages. A pending message is a message that has been consumed but not yet acknowledged.
/// </summary>
public int PendingMessageCount { get; }
/// <summary>
/// The lowest message ID in the set of pending messages.
/// </summary>
public RedisValue LowestPendingMessageId { get; }
/// <summary>
/// The highest message ID in the set of pending messages.
/// </summary>
public RedisValue HighestPendingMessageId { get; }
/// <summary>
/// An array of consumers within the consumer group that have pending messages.
/// </summary>
public StreamConsumer[] Consumers { get; }
}
}

namespace StackExchange.Redis
{
/// <summary>
/// Describes properties of a pending message. A pending message is one that has
/// been received by a consumer but has not yet been acknowledged.
/// </summary>
public struct StreamPendingMessageInfo
{
internal StreamPendingMessageInfo(RedisValue messageId,
RedisValue consumerName,
long idleTimeInMs,
int deliveryCount)
{
MessageId = messageId;
ConsumerName = consumerName;
IdleTimeInMilliseconds = idleTimeInMs;
DeliveryCount = deliveryCount;
}
/// <summary>
/// The ID of the pending message.
/// </summary>
public RedisValue MessageId { get; }
/// <summary>
/// The consumer that received the pending message.
/// </summary>
public RedisValue ConsumerName { get; }
/// <summary>
/// The time that has passed since the message was last delivered to a consumer.
/// </summary>
public long IdleTimeInMilliseconds { get; }
/// <summary>
/// The number of times the message has been delivered to a consumer.
/// </summary>
public int DeliveryCount { get; }
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment