Unverified Commit c7e76376 authored by Todd Tingen's avatar Todd Tingen Committed by GitHub

Added support for NOACK in the StreamReadGroup methods. (#1154)

* Added support for NOACK in the StreamReadGroup methods.

* Added overload for noAck parameter on StreamReadGroup methods.
parent faf931d3
...@@ -1719,7 +1719,35 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1719,7 +1719,35 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns> /// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks> /// <remarks>https://redis.io/commands/xreadgroup</remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None); StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags);
/// <summary>
/// Read messages from a stream into an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);
/// <summary> /// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/> /// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
...@@ -1729,11 +1757,12 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1729,11 +1757,12 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="groupName">The name of the consumer group.</param> /// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param> /// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param> /// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns> /// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks> /// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks> /// <remarks>https://redis.io/commands/xreadgroup</remarks>
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None); RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Trim the stream to a specified maximum length. /// Trim the stream to a specified maximum length.
......
...@@ -1630,7 +1630,35 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1630,7 +1630,35 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns> /// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks> /// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None); Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags);
/// <summary>
/// Read messages from a stream into an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);
/// <summary> /// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/> /// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
...@@ -1640,11 +1668,12 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1640,11 +1668,12 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="groupName">The name of the consumer group.</param> /// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName"></param> /// <param name="consumerName"></param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param> /// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns> /// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks> /// <remarks>Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks> /// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None); Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Trim the stream to a specified maximum length. /// Trim the stream to a specified maximum length.
......
...@@ -726,16 +726,26 @@ public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerS ...@@ -726,16 +726,26 @@ public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerS
return Inner.StreamRead(streamPositions, countPerStream, flags); return Inner.StreamRead(streamPositions, countPerStream, flags);
} }
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags)
{ {
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, flags); return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, flags);
} }
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, flags);
}
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags)
{ {
return Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags); return Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags);
} }
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags);
}
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags); return Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);
......
...@@ -706,16 +706,26 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int ...@@ -706,16 +706,26 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
return Inner.StreamReadAsync(streamPositions, countPerStream, flags); return Inner.StreamReadAsync(streamPositions, countPerStream, flags);
} }
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags)
{ {
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, flags); return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, flags);
} }
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, flags);
}
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags)
{ {
return Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags); return Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags);
} }
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags);
}
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags); return Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);
......
...@@ -2158,7 +2158,18 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int ...@@ -2158,7 +2158,18 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
return ExecuteAsync(msg, ResultProcessor.MultiStream); return ExecuteAsync(msg, ResultProcessor.MultiStream);
} }
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None) public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags)
{
return StreamReadGroup(key,
groupName,
consumerName,
position,
count,
false,
flags);
}
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{ {
var actualPosition = position ?? StreamPosition.NewMessages; var actualPosition = position ?? StreamPosition.NewMessages;
...@@ -2167,12 +2178,24 @@ public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisVa ...@@ -2167,12 +2178,24 @@ public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisVa
consumerName, consumerName,
StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP), StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP),
count, count,
noAck,
flags); flags);
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip); return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip);
} }
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, CommandFlags flags = CommandFlags.None) public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags)
{
return StreamReadGroupAsync(key,
groupName,
consumerName,
position,
count,
false,
flags);
}
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{ {
var actualPosition = position ?? StreamPosition.NewMessages; var actualPosition = position ?? StreamPosition.NewMessages;
...@@ -2181,20 +2204,53 @@ public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupNa ...@@ -2181,20 +2204,53 @@ public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupNa
consumerName, consumerName,
StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP), StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP),
count, count,
noAck,
flags); flags);
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip); return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
} }
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags)
{
return StreamReadGroup(streamPositions,
groupName,
consumerName,
countPerStream,
false,
flags);
}
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetMultiStreamReadGroupMessage(streamPositions, groupName, consumerName, countPerStream, flags); var msg = GetMultiStreamReadGroupMessage(streamPositions,
groupName,
consumerName,
countPerStream,
noAck,
flags);
return ExecuteSync(msg, ResultProcessor.MultiStream); return ExecuteSync(msg, ResultProcessor.MultiStream);
} }
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, CommandFlags flags = CommandFlags.None) public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags)
{
return StreamReadGroupAsync(streamPositions,
groupName,
consumerName,
countPerStream,
false,
flags);
}
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetMultiStreamReadGroupMessage(streamPositions, groupName, consumerName, countPerStream, flags); var msg = GetMultiStreamReadGroupMessage(streamPositions,
groupName,
consumerName,
countPerStream,
noAck,
flags);
return ExecuteAsync(msg, ResultProcessor.MultiStream); return ExecuteAsync(msg, ResultProcessor.MultiStream);
} }
...@@ -2613,7 +2669,7 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart) ...@@ -2613,7 +2669,7 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart)
return result; return result;
} }
private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags)
{ {
// Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2 // Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2
if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions)); if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions));
...@@ -2627,7 +2683,8 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, ...@@ -2627,7 +2683,8 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions,
var values = new RedisValue[ var values = new RedisValue[
4 // Room for GROUP groupName consumerName & STREAMS 4 // Room for GROUP groupName consumerName & STREAMS
+ (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs. + (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs.
+ (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null. + (countPerStream.HasValue ? 2 : 0) // Room for "COUNT num" or 0 if countPerStream is null.
+ (noAck ? 1 : 0)]; // Allow for the NOACK subcommand.
var offset = 0; var offset = 0;
...@@ -2641,6 +2698,11 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, ...@@ -2641,6 +2698,11 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions,
values[offset++] = countPerStream; values[offset++] = countPerStream;
} }
if (noAck)
{
values[offset++] = StreamConstants.NoAck;
}
values[offset++] = StreamConstants.Streams; values[offset++] = StreamConstants.Streams;
var pairCount = streamPositions.Length; var pairCount = streamPositions.Length;
...@@ -3148,7 +3210,7 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu ...@@ -3148,7 +3210,7 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu
values); values);
} }
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, CommandFlags flags) private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck, CommandFlags flags)
{ {
// Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > // Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
if (count.HasValue && count <= 0) if (count.HasValue && count <= 0)
...@@ -3156,7 +3218,7 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re ...@@ -3156,7 +3218,7 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
} }
var totalValueCount = 6 + (count.HasValue ? 2 : 0); var totalValueCount = 6 + (count.HasValue ? 2 : 0) + (noAck ? 1 : 0);
var values = new RedisValue[totalValueCount]; var values = new RedisValue[totalValueCount];
var offset = 0; var offset = 0;
...@@ -3171,6 +3233,11 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re ...@@ -3171,6 +3233,11 @@ private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, Re
values[offset++] = count.Value; values[offset++] = count.Value;
} }
if (noAck)
{
values[offset++] = StreamConstants.NoAck;
}
values[offset++] = StreamConstants.Streams; values[offset++] = StreamConstants.Streams;
values[offset++] = key.AsRedisValue(); values[offset++] = key.AsRedisValue();
values[offset] = afterId; values[offset] = afterId;
......
...@@ -61,6 +61,8 @@ internal static class StreamConstants ...@@ -61,6 +61,8 @@ internal static class StreamConstants
internal static readonly RedisValue MaxLen = "MAXLEN"; internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue NoAck = "NOACK";
internal static readonly RedisValue Stream = "STREAM"; internal static readonly RedisValue Stream = "STREAM";
internal static readonly RedisValue Streams = "STREAMS"; internal static readonly RedisValue Streams = "STREAMS";
......
...@@ -955,16 +955,16 @@ public void StreamRead_2() ...@@ -955,16 +955,16 @@ public void StreamRead_2()
[Fact] [Fact]
public void StreamStreamReadGroup_1() public void StreamStreamReadGroup_1()
{ {
wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, CommandFlags.None); wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, false, CommandFlags.None);
mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.None)); mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, false, CommandFlags.None));
} }
[Fact] [Fact]
public void StreamStreamReadGroup_2() public void StreamStreamReadGroup_2()
{ {
var streamPositions = new StreamPosition[0] { }; var streamPositions = new StreamPosition[0] { };
wrapper.StreamReadGroup(streamPositions, "group", "consumer", 10, CommandFlags.None); wrapper.StreamReadGroup(streamPositions, "group", "consumer", 10, false, CommandFlags.None);
mock.Verify(_ => _.StreamReadGroup(streamPositions, "group", "consumer", 10, CommandFlags.None)); mock.Verify(_ => _.StreamReadGroup(streamPositions, "group", "consumer", 10, false, CommandFlags.None));
} }
[Fact] [Fact]
......
...@@ -1638,6 +1638,77 @@ public void AddWithApproxCount() ...@@ -1638,6 +1638,77 @@ public void AddWithApproxCount()
} }
} }
[Fact]
public void StreamReadGroupWithNoAckShowsNoPendingMessages()
{
var key = GetUniqueKey("read_group_noack");
const string groupName = "test_group";
const string consumer = "consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "field2", "value2");
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.NewMessages);
var messages = db.StreamReadGroup(key,
groupName,
consumer,
StreamPosition.NewMessages,
noAck: true);
var pendingInfo = db.StreamPending(key, groupName);
Assert.Equal(0, pendingInfo.PendingMessageCount);
}
}
[Fact]
public void StreamReadGroupMultiStreamWithNoAckShowsNoPendingMessages()
{
var key1 = GetUniqueKey("read_group_noack1");
var key2 = GetUniqueKey("read_group_noack2");
const string groupName = "test_group";
const string consumer = "consumer";
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(key1, "field1", "value1");
db.StreamAdd(key1, "field2", "value2");
db.StreamAdd(key2, "field3", "value3");
db.StreamAdd(key2, "field4", "value4");
db.StreamCreateConsumerGroup(key1, groupName, StreamPosition.NewMessages);
db.StreamCreateConsumerGroup(key2, groupName, StreamPosition.NewMessages);
var messages = db.StreamReadGroup(
new StreamPosition[]
{
new StreamPosition(key1, StreamPosition.NewMessages),
new StreamPosition(key2, StreamPosition.NewMessages)
},
groupName,
consumer,
noAck: true);
var pending1 = db.StreamPending(key1, groupName);
var pending2 = db.StreamPending(key2, groupName);
Assert.Equal(0, pending1.PendingMessageCount);
Assert.Equal(0, pending2.PendingMessageCount);
}
}
private RedisKey GetUniqueKey(string type) => $"{type}_stream_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; private RedisKey GetUniqueKey(string type) => $"{type}_stream_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
} }
} }
...@@ -913,16 +913,16 @@ public void StreamReadAsync_2() ...@@ -913,16 +913,16 @@ public void StreamReadAsync_2()
[Fact] [Fact]
public void StreamReadGroupAsync_1() public void StreamReadGroupAsync_1()
{ {
wrapper.StreamReadGroupAsync("key", "group", "consumer", StreamPosition.Beginning, 10, CommandFlags.None); wrapper.StreamReadGroupAsync("key", "group", "consumer", StreamPosition.Beginning, 10, false, CommandFlags.None);
mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", StreamPosition.Beginning, 10, CommandFlags.None)); mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", StreamPosition.Beginning, 10, false, CommandFlags.None));
} }
[Fact] [Fact]
public void StreamStreamReadGroupAsync_2() public void StreamStreamReadGroupAsync_2()
{ {
var streamPositions = new StreamPosition[0] { }; var streamPositions = new StreamPosition[0] { };
wrapper.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, CommandFlags.None); wrapper.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, false, CommandFlags.None);
mock.Verify(_ => _.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, CommandFlags.None)); mock.Verify(_ => _.StreamReadGroupAsync(streamPositions, "group", "consumer", 10, false, CommandFlags.None));
} }
[Fact] [Fact]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment