Unverified Commit 4091672a authored by Todd Tingen's avatar Todd Tingen Committed by GitHub

Updated StreamCreateConsumerGroup methods to use the MKSTREAM option. (#1141)

* Updated StreamCreateConsumerGroup methods to use the MKSTREAM option.

* Added overload for the createStream parameter on the StreamCreateConsumerGroup methods.

* Corrected the Async overload.
Co-authored-by: 's avatarMarc Gravell <marc.gravell@gmail.com>
parent 162f2b4f
...@@ -1588,7 +1588,19 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1588,7 +1588,19 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns> /// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None); bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position, CommandFlags flags);
/// <summary>
/// Create a consumer group for the given stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="StreamPosition.NewMessages"/>.</param>
/// <param name="createStream">Create the stream if it does not already exist.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Delete messages in the stream. This method does not delete the stream. /// Delete messages in the stream. This method does not delete the stream.
......
...@@ -1499,7 +1499,19 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1499,7 +1499,19 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="flags">The flags to use for this operation.</param> /// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns> /// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks> /// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None); Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position, CommandFlags flags);
/// <summary>
/// Create a consumer group for the given stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="StreamPosition.NewMessages"/>.</param>
/// <param name="createStream">Create the stream if it does not already exist.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Delete messages in the stream. This method does not delete the stream. /// Delete messages in the stream. This method does not delete the stream.
......
...@@ -661,11 +661,16 @@ public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, R ...@@ -661,11 +661,16 @@ public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, R
return Inner.StreamConsumerGroupSetPosition(ToInner(key), groupName, position, flags); return Inner.StreamConsumerGroupSetPosition(ToInner(key), groupName, position, flags);
} }
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None) public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position, CommandFlags flags)
{ {
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, position, flags); return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, position, flags);
} }
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, position, createStream, flags);
}
public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None) public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamInfo(ToInner(key), flags); return Inner.StreamInfo(ToInner(key), flags);
......
...@@ -641,11 +641,16 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g ...@@ -641,11 +641,16 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g
return Inner.StreamConsumerGroupSetPositionAsync(ToInner(key), groupName, position, flags); return Inner.StreamConsumerGroupSetPositionAsync(ToInner(key), groupName, position, flags);
} }
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None) public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position, CommandFlags flags)
{ {
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, position, flags); return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, position, flags);
} }
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, position, createStream, flags);
}
public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None) public Task<StreamInfo> StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamInfoAsync(ToInner(key), flags); return Inner.StreamInfoAsync(ToInner(key), flags);
......
...@@ -1878,38 +1878,46 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g ...@@ -1878,38 +1878,46 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g
return ExecuteAsync(msg, ResultProcessor.Boolean); return ExecuteAsync(msg, ResultProcessor.Boolean);
} }
public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None) public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position, CommandFlags flags)
{ {
var actualPosition = position ?? StreamConstants.NewMessages; return StreamCreateConsumerGroup(
key,
groupName,
position,
true,
flags);
}
var msg = Message.Create(Database, public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None)
flags, {
RedisCommand.XGROUP, var msg = GetStreamCreateConsumerGroupMessage(
new RedisValue[] key,
{ groupName,
StreamConstants.Create, position,
key.AsRedisValue(), createStream,
groupName, flags);
StreamPosition.Resolve(actualPosition, RedisCommand.XGROUP)
});
return ExecuteSync(msg, ResultProcessor.Boolean); return ExecuteSync(msg, ResultProcessor.Boolean);
} }
public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, CommandFlags flags = CommandFlags.None) public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position, CommandFlags flags)
{ {
var actualPosition = position ?? StreamPosition.NewMessages; return StreamCreateConsumerGroupAsync(
key,
groupName,
position,
true,
flags);
}
var msg = Message.Create(Database, public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None)
flags, {
RedisCommand.XGROUP, var msg = GetStreamCreateConsumerGroupMessage(
new RedisValue[] key,
{ groupName,
StreamConstants.Create, position,
key.AsRedisValue(), createStream,
groupName, flags);
StreamPosition.Resolve(actualPosition, RedisCommand.XGROUP)
});
return ExecuteAsync(msg, ResultProcessor.Boolean); return ExecuteAsync(msg, ResultProcessor.Boolean);
} }
...@@ -3144,6 +3152,28 @@ private Message GetStreamClaimMessage(RedisKey key, RedisValue consumerGroup, Re ...@@ -3144,6 +3152,28 @@ private Message GetStreamClaimMessage(RedisKey key, RedisValue consumerGroup, Re
return Message.Create(Database, flags, RedisCommand.XCLAIM, values); return Message.Create(Database, flags, RedisCommand.XCLAIM, values);
} }
private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None)
{
var actualPosition = position ?? StreamConstants.NewMessages;
var values = new RedisValue[createStream ? 5 : 4];
values[0] = StreamConstants.Create;
values[1] = key.AsRedisValue();
values[2] = groupName;
values[3] = StreamPosition.Resolve(actualPosition, RedisCommand.XGROUP);
if (createStream)
{
values[4] = StreamConstants.MkStream;
}
return Message.Create(Database,
flags,
RedisCommand.XGROUP,
values);
}
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags) private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
{ {
// > XPENDING mystream mygroup - + 10 [consumer name] // > XPENDING mystream mygroup - + 10 [consumer name]
......
...@@ -61,6 +61,8 @@ internal static class StreamConstants ...@@ -61,6 +61,8 @@ internal static class StreamConstants
internal static readonly RedisValue MaxLen = "MAXLEN"; internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue MkStream = "MKSTREAM";
internal static readonly RedisValue NoAck = "NOACK"; internal static readonly RedisValue NoAck = "NOACK";
internal static readonly RedisValue Stream = "STREAM"; internal static readonly RedisValue Stream = "STREAM";
......
...@@ -869,8 +869,8 @@ public void StreamConsumerInfoGet() ...@@ -869,8 +869,8 @@ public void StreamConsumerInfoGet()
[Fact] [Fact]
public void StreamCreateConsumerGroup() public void StreamCreateConsumerGroup()
{ {
wrapper.StreamCreateConsumerGroup("key", "group", StreamPosition.Beginning, CommandFlags.None); wrapper.StreamCreateConsumerGroup("key", "group", StreamPosition.Beginning, false, CommandFlags.None);
mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", StreamPosition.Beginning, CommandFlags.None)); mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", StreamPosition.Beginning, false, CommandFlags.None));
} }
[Fact] [Fact]
......
...@@ -200,6 +200,80 @@ public void StreamCreateConsumerGroup() ...@@ -200,6 +200,80 @@ public void StreamCreateConsumerGroup()
} }
} }
[Fact]
public void StreamCreateConsumerGroupBeforeCreatingStream()
{
var key = GetUniqueKey("group_create_before_stream");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Ensure the key doesn't exist.
var keyExistsBeforeCreate = db.KeyExists(key);
// The 'createStream' parameter is 'true' by default.
var groupCreated = db.StreamCreateConsumerGroup(key, "consumerGroup", StreamPosition.NewMessages);
var keyExistsAfterCreate = db.KeyExists(key);
Assert.False(keyExistsBeforeCreate);
Assert.True(groupCreated);
Assert.True(keyExistsAfterCreate);
}
}
[Fact]
public void StreamCreateConsumerGroupFailsIfKeyDoesntExist()
{
var key = GetUniqueKey("group_create_before_stream_should_fail");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
// Pass 'false' for 'createStream' to ensure that an
// execption is thrown when the stream doesn't exist.
Assert.ThrowsAny<RedisServerException>(() =>
{
db.StreamCreateConsumerGroup(
key,
"consumerGroup",
StreamPosition.NewMessages,
createStream: false);
});
}
}
[Fact]
public void StreamCreateConsumerGroupSucceedsWhenKeyExists()
{
var key = GetUniqueKey("group_create_after_stream");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(key, "f1", "v1");
// Pass 'false' for 'createStream', should create the consumer group
// without issue since the stream already exists.
var groupCreated = db.StreamCreateConsumerGroup(
key,
"consumerGroup",
StreamPosition.NewMessages,
createStream: false);
Assert.True(groupCreated);
}
}
[Fact] [Fact]
public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse() public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
{ {
......
...@@ -827,8 +827,8 @@ public void StreamConsumerGroupSetPositionAsync() ...@@ -827,8 +827,8 @@ public void StreamConsumerGroupSetPositionAsync()
[Fact] [Fact]
public void StreamCreateConsumerGroupAsync() public void StreamCreateConsumerGroupAsync()
{ {
wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", CommandFlags.None); wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", false, CommandFlags.None);
mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", CommandFlags.None)); mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", false, CommandFlags.None));
} }
[Fact] [Fact]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment