Unverified Commit e8340280 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Streams RC (#983)

* fix XINFO

* fix XGROUP ("-" => "0")

* fix XREAD/XREADGROUP ("-" => "0")

* fix XADD: entry-id must **follow** MAXLEN (fix #982)
parent dd0ee609
...@@ -2905,8 +2905,6 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? max ...@@ -2905,8 +2905,6 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? max
var values = new RedisValue[totalLength]; var values = new RedisValue[totalLength];
var offset = 0; var offset = 0;
values[offset++] = messageId;
if (maxLength.HasValue) if (maxLength.HasValue)
{ {
values[offset++] = StreamConstants.MaxLen; values[offset++] = StreamConstants.MaxLen;
...@@ -2922,6 +2920,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? max ...@@ -2922,6 +2920,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? max
} }
} }
values[offset++] = messageId;
values[offset++] = streamPair.Name; values[offset++] = streamPair.Name;
values[offset] = streamPair.Value; values[offset] = streamPair.Value;
...@@ -2952,8 +2952,6 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe ...@@ -2952,8 +2952,6 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe
var offset = 0; var offset = 0;
values[offset++] = entryId;
if (maxLength.HasValue) if (maxLength.HasValue)
{ {
values[offset++] = StreamConstants.MaxLen; values[offset++] = StreamConstants.MaxLen;
...@@ -2966,6 +2964,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe ...@@ -2966,6 +2964,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe
values[offset++] = maxLength.Value; values[offset++] = maxLength.Value;
} }
values[offset++] = entryId;
for (var i = 0; i < streamPairs.Length; i++) for (var i = 0; i < streamPairs.Length; i++)
{ {
values[offset++] = streamPairs[i].Name; values[offset++] = streamPairs[i].Name;
......
...@@ -24,7 +24,16 @@ public static readonly CommandBytes ...@@ -24,7 +24,16 @@ public static readonly CommandBytes
timeout = "timeout", timeout = "timeout",
wildcard = "*", wildcard = "*",
yes = "yes", yes = "yes",
zero = "0"; zero = "0",
// streams
length = "length",
radixTreeKeys = "radix-tree-keys",
radixTreeNodes = "radix-tree-nodes",
groups = "groups",
lastGeneratedId = "last-generated-id",
firstEntry = "first-entry",
lastEntry = "last-entry";
} }
internal static class RedisLiterals internal static class RedisLiterals
{ {
......
...@@ -1593,34 +1593,55 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1593,34 +1593,55 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
var arr = result.GetItems(); var arr = result.GetItems();
var max = arr.Length / 2;
if (arr.Length != 12) long length = -1, radixTreeKeys = -1, radixTreeNodes = -1, groups = -1;
var lastGeneratedId = Redis.RedisValue.Null;
StreamEntry firstEntry = StreamEntry.Null, lastEntry = StreamEntry.Null;
for(int index = 0, i = 0; i < max; i++)
{ {
return false; RawResult key = arr[index++], value = arr[index++];
} if (key.Payload.Length > CommandBytes.MaxLength) continue;
// Note: Even if there is only 1 message in the stream, this command returns
// the single entry as the first-entry and last-entry in the response.
// The first 8 items are interleaved name/value pairs.
// Items 9-12 represent the first and last entry in the stream. The values will
// be nil (stored in index 9 & 11) if the stream length is 0.
var leased = ArrayPool<RawResult>.Shared.Rent(2); var keyBytes = new CommandBytes(key.Payload);
leased[0] = arr[9]; if(keyBytes.Equals(CommonReplies.length))
leased[1] = arr[11]; {
var tmp = new RawResult(leased, 2); if (!value.TryGetInt64(out length)) return false;
var entries = ParseRedisStreamEntries(tmp); }
// note: don't .Recycle(), would be a stack overflow because else if (keyBytes.Equals(CommonReplies.radixTreeKeys))
// it would bridge the fake and real result set {
ArrayPool<RawResult>.Shared.Return(leased); if (!value.TryGetInt64(out radixTreeKeys)) return false;
}
else if (keyBytes.Equals(CommonReplies.radixTreeNodes))
{
if (!value.TryGetInt64(out radixTreeNodes)) return false;
}
else if (keyBytes.Equals(CommonReplies.groups))
{
if (!value.TryGetInt64(out groups)) return false;
}
else if (keyBytes.Equals(CommonReplies.lastGeneratedId))
{
lastGeneratedId = value.AsRedisValue();
}
else if (keyBytes.Equals(CommonReplies.firstEntry))
{
firstEntry = ParseRedisStreamEntry(value);
}
else if (keyBytes.Equals(CommonReplies.lastEntry))
{
lastEntry = ParseRedisStreamEntry(value);
}
}
var streamInfo = new StreamInfo(length: (int)arr[1].AsRedisValue(), var streamInfo = new StreamInfo(
radixTreeKeys: (int)arr[3].AsRedisValue(), length: checked((int)length),
radixTreeNodes: (int)arr[5].AsRedisValue(), radixTreeKeys: checked((int)radixTreeKeys),
groups: (int)arr[7].AsRedisValue(), radixTreeNodes: checked((int)radixTreeNodes),
firstEntry: entries[0], groups: checked((int)groups),
lastEntry: entries[1]); firstEntry: firstEntry,
lastEntry: lastEntry,
lastGeneratedId: lastGeneratedId);
SetResult(message, streamInfo); SetResult(message, streamInfo);
return true; return true;
...@@ -1711,6 +1732,20 @@ internal abstract class StreamProcessorBase<T> : ResultProcessor<T> ...@@ -1711,6 +1732,20 @@ internal abstract class StreamProcessorBase<T> : ResultProcessor<T>
{ {
// For command response formats see https://redis.io/topics/streams-intro. // For command response formats see https://redis.io/topics/streams-intro.
protected StreamEntry ParseRedisStreamEntry(RawResult item)
{
if (item.IsNull || item.Type != ResultType.MultiBulk)
{
return StreamEntry.Null;
}
// Process the Multibulk array for each entry. The entry contains the following elements:
// [0] = SimpleString (the ID of the stream entry)
// [1] = Multibulk array of the name/value pairs of the stream entry's data
var entryDetails = item.GetItems();
return new StreamEntry(id: entryDetails[0].AsRedisValue(),
values: ParseStreamEntryValues(entryDetails[1]));
}
protected StreamEntry[] ParseRedisStreamEntries(RawResult result) protected StreamEntry[] ParseRedisStreamEntries(RawResult result)
{ {
if (result.Type != ResultType.MultiBulk) if (result.Type != ResultType.MultiBulk)
...@@ -1720,21 +1755,7 @@ protected StreamEntry[] ParseRedisStreamEntries(RawResult result) ...@@ -1720,21 +1755,7 @@ protected StreamEntry[] ParseRedisStreamEntries(RawResult result)
var arr = result.GetItems(); var arr = result.GetItems();
return ConvertAll(arr, item => return ConvertAll(arr, item => ParseRedisStreamEntry(item));
{
if (item.IsNull || item.Type != ResultType.MultiBulk)
{
return StreamEntry.Null;
}
// Process the Multibulk array for each entry. The entry contains the following elements:
// [0] = SimpleString (the ID of the stream entry)
// [1] = Multibulk array of the name/value pairs of the stream entry's data
var entryDetails = item.GetItems();
return new StreamEntry(id: entryDetails[0].AsRedisValue(),
values: ParseStreamEntryValues(entryDetails[1]));
});
} }
protected NameValueEntry[] ParseStreamEntryValues(RawResult result) protected NameValueEntry[] ParseStreamEntryValues(RawResult result)
......
...@@ -21,6 +21,11 @@ internal static class StreamConstants ...@@ -21,6 +21,11 @@ internal static class StreamConstants
/// </summary> /// </summary>
internal static readonly RedisValue NewMessages = "$"; internal static readonly RedisValue NewMessages = "$";
/// <summary>
/// The "0" value used in the XGROUP command. Indicates reading all messages from the stream.
/// </summary>
internal static readonly RedisValue AllMessages = "0";
/// <summary> /// <summary>
/// The "-" value used in the XRANGE, XREAD, and XREADGROUP commands. Indicates the minimum message ID from the stream. /// The "-" value used in the XRANGE, XREAD, and XREADGROUP commands. Indicates the minimum message ID from the stream.
/// </summary> /// </summary>
......
...@@ -6,12 +6,14 @@ namespace StackExchange.Redis ...@@ -6,12 +6,14 @@ namespace StackExchange.Redis
/// </summary> /// </summary>
public readonly struct StreamInfo public readonly struct StreamInfo
{ {
internal StreamInfo(int length, internal StreamInfo(
int length,
int radixTreeKeys, int radixTreeKeys,
int radixTreeNodes, int radixTreeNodes,
int groups, int groups,
StreamEntry firstEntry, StreamEntry firstEntry,
StreamEntry lastEntry) StreamEntry lastEntry,
RedisValue lastGeneratedId)
{ {
Length = length; Length = length;
RadixTreeKeys = radixTreeKeys; RadixTreeKeys = radixTreeKeys;
...@@ -19,6 +21,7 @@ namespace StackExchange.Redis ...@@ -19,6 +21,7 @@ namespace StackExchange.Redis
ConsumerGroupCount = groups; ConsumerGroupCount = groups;
FirstEntry = firstEntry; FirstEntry = firstEntry;
LastEntry = lastEntry; LastEntry = lastEntry;
LastGeneratedId = lastGeneratedId;
} }
/// <summary> /// <summary>
...@@ -50,5 +53,10 @@ namespace StackExchange.Redis ...@@ -50,5 +53,10 @@ namespace StackExchange.Redis
/// The last entry in the stream. /// The last entry in the stream.
/// </summary> /// </summary>
public StreamEntry LastEntry { get; } public StreamEntry LastEntry { get; }
/// <summary>
/// The last generated id
/// </summary>
public RedisValue LastGeneratedId { get; }
} }
} }
...@@ -50,6 +50,15 @@ internal static RedisValue Resolve(RedisValue value, RedisCommand command) ...@@ -50,6 +50,15 @@ internal static RedisValue Resolve(RedisValue value, RedisCommand command)
default: // new is only valid for the above default: // new is only valid for the above
throw new ArgumentException($"Unsupported command in StreamPosition.Resolve: {command}.", nameof(command)); throw new ArgumentException($"Unsupported command in StreamPosition.Resolve: {command}.", nameof(command));
} }
} else if (value == StreamPosition.Beginning)
{
switch(command)
{
case RedisCommand.XREAD:
case RedisCommand.XREADGROUP:
case RedisCommand.XGROUP:
return StreamConstants.AllMessages;
}
} }
return value; return value;
} }
......
...@@ -894,6 +894,7 @@ public void StreamGroupInfoGet() ...@@ -894,6 +894,7 @@ public void StreamGroupInfoGet()
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase(); var db = conn.GetDatabase();
db.KeyDelete(key);
var id1 = db.StreamAdd(key, "field1", "value1"); var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2"); var id2 = db.StreamAdd(key, "field2", "value2");
...@@ -1610,6 +1611,34 @@ public void StreamVerifyLength() ...@@ -1610,6 +1611,34 @@ public void StreamVerifyLength()
} }
} }
private string GetUniqueKey(string type) => $"{type}_stream_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; [Fact]
public async Task AddWithApproxCountAsync()
{
var key = GetUniqueKey("approx");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
await db.StreamAddAsync(key, "field", "value", maxLength: 10, useApproximateMaxLength: true, flags: CommandFlags.None).ConfigureAwait(false);
}
}
[Fact]
public void AddWithApproxCount()
{
var key = GetUniqueKey("approx");
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams);
var db = conn.GetDatabase();
db.StreamAdd(key, "field", "value", maxLength: 10, useApproximateMaxLength: true, flags: CommandFlags.None);
}
}
private RedisKey GetUniqueKey(string type) => $"{type}_stream_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment