Unverified Commit 988d6aef authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Arenas (#1075)

* remove the entire concept of the completion manager; it just doesn't make sense any more; everything async and external facing should be via the TP

* use "in" with ForAwait to avoid some extra copies

* experimental "backlog queue" approach

* Cleanup and de-dupe timeout exception data

* WriteMessageTakingWriteLockSync should consider backlog

* don't allocate all those strings

* this compiles, but nothing else; not tested yet

* fix the by-ref indexer

* record that CompletedSynchronously has changed - impacts a test

* update benchmarks and rev unofficial

* update to  pipelines 1.1.2 and simplify

* in debug mode, use a small page size in the arena

* rev pipelines to 1.1.3

* Update src/StackExchange.Redis/CommandTrace.cs
Co-Authored-By: 's avatarmgravell <marc.gravell@gmail.com>

* Perf regression (#1076)

* compare baseline to 1.2.6

* remove the entire concept of the completion manager; it just doesn't make sense any more; everything async and external facing should be via the TP

* use "in" with ForAwait to avoid some extra copies

* experimental "backlog queue" approach

* Cleanup and de-dupe timeout exception data

* WriteMessageTakingWriteLockSync should consider backlog

* don't allocate all those strings
parent cfe491ee
......@@ -78,12 +78,13 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.MultiBulk:
var parts = result.GetItems();
CommandTrace[] arr = new CommandTrace[parts.Length];
for (int i = 0; i < parts.Length; i++)
int i = 0;
foreach(var item in parts)
{
var subParts = parts[i].GetItems();
var subParts = item.GetItems();
if (!subParts[0].TryGetInt64(out long uniqueid) || !subParts[1].TryGetInt64(out long time) || !subParts[2].TryGetInt64(out long duration))
return false;
arr[i] = new CommandTrace(uniqueid, time, duration, subParts[3].GetItemsAsValues());
arr[i++] = new CommandTrace(uniqueid, time, duration, subParts[3].GetItemsAsValues());
}
SetResult(message, arr);
return true;
......
......@@ -6,6 +6,7 @@
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using Pipelines.Sockets.Unofficial.Arenas;
namespace StackExchange.Redis
{
......@@ -271,5 +272,13 @@ internal static int VectorSafeIndexOfCRLF(this ReadOnlySpan<byte> span)
return -1;
}
#endif
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static T[] ToArray<T>(in this RawResult result, Projection<RawResult, T> selector)
=> result.IsNull ? null : result.GetItems().ToArray(selector);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static TTo[] ToArray<TTo, TState>(in this RawResult result, Projection<RawResult, TState, TTo> selector, in TState state)
=> result.IsNull ? null : result.GetItems().ToArray(selector, in state);
}
}
......@@ -17,6 +17,7 @@
using System.Threading;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Arenas;
namespace StackExchange.Redis
{
......@@ -55,7 +56,7 @@ private static readonly Message
internal void GetBytes(out long sent, out long received)
{
if(_ioPipe is IMeasuredDuplexPipe sc)
if (_ioPipe is IMeasuredDuplexPipe sc)
{
sent = sc.TotalBytesSent;
received = sc.TotalBytesReceived;
......@@ -268,6 +269,7 @@ public void Dispose()
RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed);
}
OnCloseEcho();
_arena.Dispose();
GC.SuppressFinalize(this);
}
......@@ -1411,6 +1413,12 @@ private async Task ReadFromPipe()
}
}
private static readonly ArenaOptions s_arenaOptions = new ArenaOptions(
#if DEBUG
blockSizeBytes: Unsafe.SizeOf<RawResult>() * 8 // force an absurdly small page size to trigger bugs
#endif
);
private readonly Arena<RawResult> _arena = new Arena<RawResult>(s_arenaOptions);
private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
{
int messageCount = 0;
......@@ -1418,7 +1426,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
while (!buffer.IsEmpty)
{
var reader = new BufferReader(buffer);
var result = TryParseResult(in buffer, ref reader, IncludeDetailInExceptions, BridgeCouldBeNull?.ServerEndPoint);
var result = TryParseResult(_arena, in buffer, ref reader, IncludeDetailInExceptions, BridgeCouldBeNull?.ServerEndPoint);
try
{
if (result.HasValue)
......@@ -1436,7 +1444,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
}
finally
{
result.Recycle();
_arena.Reset();
}
}
return messageCount;
......@@ -1466,7 +1474,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
// }
//}
private static RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferReader reader, bool includeDetailInExceptions, ServerEndPoint server)
private static RawResult ReadArray(Arena<RawResult> arena, in ReadOnlySequence<byte> buffer, ref BufferReader reader, bool includeDetailInExceptions, ServerEndPoint server)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, ref reader);
if (itemCount.HasValue)
......@@ -1485,14 +1493,31 @@ private static RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferR
return RawResult.EmptyMultiBulk;
}
var oversized = ArrayPool<RawResult>.Shared.Rent(itemCountActual);
var result = new RawResult(oversized, itemCountActual);
for (int i = 0; i < itemCountActual; i++)
var oversized = arena.Allocate(itemCountActual);
var result = new RawResult(oversized, false);
if (oversized.IsSingleSegment)
{
var span = oversized.FirstSpan;
for(int i = 0; i < span.Length; i++)
{
if (!(span[i] = TryParseResult(arena, in buffer, ref reader, includeDetailInExceptions, server)).HasValue)
{
return RawResult.Nil;
}
}
}
else
{
if (!(oversized[i] = TryParseResult(in buffer, ref reader, includeDetailInExceptions, server)).HasValue)
foreach(var span in oversized.Spans)
{
result.Recycle(i); // passing index here means we don't need to "Array.Clear" before-hand
return RawResult.Nil;
for (int i = 0; i < span.Length; i++)
{
if (!(span[i] = TryParseResult(arena, in buffer, ref reader, includeDetailInExceptions, server)).HasValue)
{
return RawResult.Nil;
}
}
}
}
return result;
......@@ -1541,7 +1566,7 @@ private static RawResult ReadLineTerminatedString(ResultType type, ref BufferRea
internal void StartReading() => ReadFromPipe().RedisFireAndForget();
internal static RawResult TryParseResult(in ReadOnlySequence<byte> buffer, ref BufferReader reader,
internal static RawResult TryParseResult(Arena<RawResult> arena, in ReadOnlySequence<byte> buffer, ref BufferReader reader,
bool includeDetilInExceptions, ServerEndPoint server, bool allowInlineProtocol = false)
{
var prefix = reader.PeekByte();
......@@ -1562,15 +1587,15 @@ private static RawResult ReadLineTerminatedString(ResultType type, ref BufferRea
return ReadBulkString(ref reader, includeDetilInExceptions, server);
case '*': // array
reader.Consume(1);
return ReadArray(in buffer, ref reader, includeDetilInExceptions, server);
return ReadArray(arena, in buffer, ref reader, includeDetilInExceptions, server);
default:
// string s = Format.GetString(buffer);
if (allowInlineProtocol) return ParseInlineProtocol(ReadLineTerminatedString(ResultType.SimpleString, ref reader));
if (allowInlineProtocol) return ParseInlineProtocol(arena, ReadLineTerminatedString(ResultType.SimpleString, ref reader));
throw new InvalidOperationException("Unexpected response prefix: " + (char)prefix);
}
}
private static RawResult ParseInlineProtocol(in RawResult line)
private static RawResult ParseInlineProtocol(Arena<RawResult> arena, in RawResult line)
{
if (!line.HasValue) return RawResult.Nil; // incomplete line
......@@ -1578,13 +1603,14 @@ private static RawResult ParseInlineProtocol(in RawResult line)
#pragma warning disable IDE0059
foreach (var _ in line.GetInlineTokenizer()) count++;
#pragma warning restore IDE0059
var oversized = ArrayPool<RawResult>.Shared.Rent(count);
count = 0;
var block = arena.Allocate(count);
var iter = block.GetEnumerator();
foreach (var token in line.GetInlineTokenizer())
{
oversized[count++] = new RawResult(line.Type, token, false);
{ // this assigns *via a reference*, returned via the iterator; just... sweet
iter.GetNext() = new RawResult(line.Type, token, false);
}
return new RawResult(oversized, count);
return new RawResult(block, false);
}
}
}
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using Pipelines.Sockets.Unofficial.Arenas;
namespace StackExchange.Redis
{
internal readonly struct RawResult
{
internal RawResult this[int index]
{
get
{
if (index >= ItemsCount) throw new IndexOutOfRangeException();
return _itemsOversized[index];
}
}
internal int ItemsCount { get; }
internal ref RawResult this[int index] => ref GetItems()[index];
internal int ItemsCount => (int)_items.Length;
internal ReadOnlySequence<byte> Payload { get; }
internal static readonly RawResult NullMultiBulk = new RawResult(null, 0);
internal static readonly RawResult EmptyMultiBulk = new RawResult(Array.Empty<RawResult>(), 0);
internal static readonly RawResult NullMultiBulk = new RawResult(default(Sequence<RawResult>), isNull: true);
internal static readonly RawResult EmptyMultiBulk = new RawResult(default(Sequence<RawResult>), isNull: false);
internal static readonly RawResult Nil = default;
// note: can't use Memory<RawResult> here - struct recursion breaks runtimr
private readonly RawResult[] _itemsOversized;
private readonly Sequence _items;
private readonly ResultType _type;
private const ResultType NonNullFlag = (ResultType)128;
......@@ -42,17 +38,14 @@ public RawResult(ResultType resultType, in ReadOnlySequence<byte> payload, bool
if (!isNull) resultType |= NonNullFlag;
_type = resultType;
Payload = payload;
_itemsOversized = default;
ItemsCount = default;
_items = default;
}
public RawResult(RawResult[] itemsOversized, int itemCount)
public RawResult(Sequence<RawResult> items, bool isNull)
{
_type = ResultType.MultiBulk;
if (itemsOversized != null) _type |= NonNullFlag;
_type = isNull ? ResultType.MultiBulk : (ResultType.MultiBulk | NonNullFlag);
Payload = default;
_itemsOversized = itemsOversized;
ItemsCount = itemCount;
_items = items.Untyped();
}
public bool IsError => Type == ResultType.Error;
......@@ -195,20 +188,6 @@ internal Lease<byte> AsLease()
throw new InvalidCastException("Cannot convert to Lease: " + Type);
}
internal void Recycle(int limit = -1)
{
var arr = _itemsOversized;
if (arr != null)
{
if (limit < 0) limit = ItemsCount;
for (int i = 0; i < limit; i++)
{
arr[i].Recycle();
}
ArrayPool<RawResult>.Shared.Return(arr, clearArray: false);
}
}
internal bool IsEqual(in CommandBytes expected)
{
if (expected.Length != Payload.Length) return false;
......@@ -284,83 +263,17 @@ internal bool GetBoolean()
}
}
internal ReadOnlySpan<RawResult> GetItems()
{
if (Type == ResultType.MultiBulk)
return new ReadOnlySpan<RawResult>(_itemsOversized, 0, ItemsCount);
throw new InvalidOperationException();
}
internal ReadOnlyMemory<RawResult> GetItemsMemory()
{
if (Type == ResultType.MultiBulk)
return new ReadOnlyMemory<RawResult>(_itemsOversized, 0, ItemsCount);
throw new InvalidOperationException();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal Sequence<RawResult> GetItems() => _items.Cast<RawResult>();
internal RedisKey[] GetItemsAsKeys()
{
var items = GetItems();
if (IsNull)
{
return null;
}
else if (items.Length == 0)
{
return Array.Empty<RedisKey>();
}
else
{
var arr = new RedisKey[items.Length];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = items[i].AsRedisKey();
}
return arr;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal RedisKey[] GetItemsAsKeys() => this.ToArray<RedisKey>((in RawResult x) => x.AsRedisKey());
internal RedisValue[] GetItemsAsValues()
{
var items = GetItems();
if (IsNull)
{
return null;
}
else if (items.Length == 0)
{
return RedisValue.EmptyArray;
}
else
{
var arr = new RedisValue[items.Length];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = items[i].AsRedisValue();
}
return arr;
}
}
internal string[] GetItemsAsStrings()
{
var items = GetItems();
if (IsNull)
{
return null;
}
else if (items.Length == 0)
{
return Array.Empty<string>();
}
else
{
var arr = new string[items.Length];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = (string)(items[i].AsRedisValue());
}
return arr;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal RedisValue[] GetItemsAsValues() => this.ToArray<RedisValue>((in RawResult x) => x.AsRedisValue());
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal string[] GetItemsAsStrings() => this.ToArray<string>((in RawResult x) => (string)x.AsRedisValue());
internal GeoPosition? GetItemsAsGeoPosition()
{
......@@ -370,43 +283,36 @@ internal string[] GetItemsAsStrings()
return null;
}
var coords = items[0].GetItems();
if (items[0].IsNull)
ref RawResult root = ref items[0];
if (root.IsNull)
{
return null;
}
return new GeoPosition((double)coords[0].AsRedisValue(), (double)coords[1].AsRedisValue());
return AsGeoPosition(root.GetItems());
}
internal GeoPosition?[] GetItemsAsGeoPositionArray()
static GeoPosition AsGeoPosition(Sequence<RawResult> coords)
{
var items = GetItems();
if (IsNull)
double longitude, latitude;
if (coords.IsSingleSegment)
{
return null;
}
else if (items.Length == 0)
{
return Array.Empty<GeoPosition?>();
var span = coords.FirstSpan;
longitude = (double)span[0].AsRedisValue();
latitude = (double)span[1].AsRedisValue();
}
else
{
var arr = new GeoPosition?[items.Length];
for (int i = 0; i < arr.Length; i++)
{
var item = items[i].GetItems();
if (items[i].IsNull)
{
arr[i] = null;
}
else
{
arr[i] = new GeoPosition((double)item[0].AsRedisValue(), (double)item[1].AsRedisValue());
}
}
return arr;
var iter = coords.GetEnumerator();
longitude = (double)iter.GetNext().AsRedisValue();
latitude = (double)iter.GetNext().AsRedisValue();
}
return new GeoPosition(longitude, latitude);
}
internal GeoPosition?[] GetItemsAsGeoPositionArray()
=> this.ToArray<GeoPosition?>((in RawResult item) => item.IsNull ? (GeoPosition?)null : AsGeoPosition(item.GetItems()));
internal unsafe string GetString()
{
if (IsNull) return null;
......
......@@ -3494,12 +3494,15 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
case ResultType.MultiBulk:
var arr = result.GetItems();
long i64;
if (arr.Length == 2 && arr[1].Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
if (arr.Length == 2)
{
var sscanResult = new ScanIterator<T>.ScanResult(i64, Parse(arr[1]));
SetResult(message, sscanResult);
return true;
ref RawResult inner = ref arr[1];
if (inner.Type == ResultType.MultiBulk && arr[0].TryGetInt64(out var i64))
{
var sscanResult = new ScanIterator<T>.ScanResult(i64, Parse(inner));
SetResult(message, sscanResult);
return true;
}
}
break;
}
......
......@@ -60,11 +60,13 @@ internal static RedisResult TryCreate(PhysicalConnection connection, in RawResul
var items = result.GetItems();
if (items.Length == 0) return EmptyArray;
var arr = new RedisResult[items.Length];
for (int i = 0; i < arr.Length; i++)
int i = 0;
var iter = items.GetEnumerator();
while (iter.MoveNext())
{
var next = TryCreate(connection, items[i]);
var next = TryCreate(connection, in iter.CurrentReference);
if (next == null) return null; // means we didn't understand
arr[i] = next;
arr[i++] = next;
}
return new ArrayRedisResult(arr);
case ResultType.Error:
......
......@@ -755,9 +755,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.MultiBulk:
var arr = result.GetItems();
long i64;
if (arr.Length == 2 && arr[1].Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
RawResult inner;
if (arr.Length == 2 && (inner = arr[1]).Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
{
var keysResult = new ScanResult(i64, arr[1].GetItemsAsKeys());
var keysResult = new ScanResult(i64, inner.GetItemsAsKeys());
SetResult(message, keysResult);
return true;
}
......
......@@ -503,11 +503,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
connection.Trace("Server committed; processing nested replies");
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"processing {arr.Length} wrapped messages");
for (int i = 0; i < arr.Length; i++)
int i = 0;
var iter = arr.GetEnumerator();
while(iter.MoveNext())
{
var inner = wrapped[i].Wrapped;
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {arr[i]} for {inner.CommandAndKey}");
if (inner.ComputeResult(connection, arr[i]))
var inner = wrapped[i++].Wrapped;
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {iter.Current} for {inner.CommandAndKey}");
if (inner.ComputeResult(connection, iter.CurrentReference))
{
inner.Complete();
}
......
......@@ -5,8 +5,9 @@
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using Pipelines.Sockets.Unofficial.Arenas;
namespace StackExchange.Redis
{
......@@ -563,7 +564,7 @@ public bool TryParse(in RawResult result, out T[] pairs)
}
else
{
int count = arr.Length / 2;
int count = (int)arr.Length / 2;
if (count == 0)
{
pairs = Array.Empty<T>();
......@@ -571,10 +572,22 @@ public bool TryParse(in RawResult result, out T[] pairs)
else
{
pairs = new T[count];
int offset = 0;
for (int i = 0; i < pairs.Length; i++)
if (arr.IsSingleSegment)
{
pairs[i] = Parse(arr[offset++], arr[offset++]);
var span = arr.FirstSpan;
int offset = 0;
for (int i = 0; i < pairs.Length; i++)
{
pairs[i] = Parse(span[offset++], span[offset++]);
}
}
else
{
var iter = arr.GetEnumerator(); // simplest way of getting successive values
for (int i = 0; i < pairs.Length; i++)
{
pairs[i] = Parse(iter.GetNext(), iter.GetNext());
}
}
}
}
......@@ -703,13 +716,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.MultiBulk:
if (message?.Command == RedisCommand.CONFIG)
{
var arr = result.GetItems();
int count = arr.Length / 2;
for (int i = 0; i < count; i++)
var iter = result.GetItems().GetEnumerator();
while(iter.MoveNext())
{
var key = arr[i * 2];
if (key.IsEqual(CommonReplies.timeout) && arr[(i * 2) + 1].TryGetInt64(out long i64))
ref RawResult key = ref iter.CurrentReference;
if (!iter.MoveNext()) break;
ref RawResult val = ref iter.CurrentReference;
if (key.IsEqual(CommonReplies.timeout) && val.TryGetInt64(out long i64))
{
// note the configuration is in seconds
int timeoutSeconds = checked((int)i64), targetSeconds;
......@@ -727,7 +741,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
server.WriteEverySeconds = targetSeconds;
}
}
else if (key.IsEqual(CommonReplies.databases) && arr[(i * 2) + 1].TryGetInt64(out i64))
else if (key.IsEqual(CommonReplies.databases) && val.TryGetInt64(out i64))
{
int dbCount = checked((int)i64);
server.Multiplexer.Trace("Auto-configured databases: " + dbCount);
......@@ -735,7 +749,6 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
else if (key.IsEqual(CommonReplies.slave_read_only))
{
var val = arr[(i * 2) + 1];
if (val.IsEqual(CommonReplies.yes))
{
server.SlaveReadOnly = true;
......@@ -892,7 +905,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (arr.Length)
{
case 1:
if (arr[0].TryGetInt64(out unixTime))
if (arr.FirstSpan[0].TryGetInt64(out unixTime))
{
var time = RedisBase.UnixEpoch.AddSeconds(unixTime);
SetResult(message, time);
......@@ -1101,26 +1114,25 @@ public RedisChannelArrayProcessor(RedisChannel.PatternMode mode)
this.mode = mode;
}
readonly struct ChannelState // I would use a value-tuple here, but that is binding hell
{
public readonly byte[] Prefix;
public readonly RedisChannel.PatternMode Mode;
public ChannelState(byte[] prefix, RedisChannel.PatternMode mode)
{
Prefix = prefix;
Mode = mode;
}
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItems();
RedisChannel[] final;
if (arr.Length == 0)
{
final = Array.Empty<RedisChannel>();
}
else
{
final = new RedisChannel[arr.Length];
byte[] channelPrefix = connection.ChannelPrefix;
for (int i = 0; i < final.Length; i++)
{
final[i] = arr[i].AsRedisChannel(channelPrefix, mode);
}
}
var final = result.ToArray(
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode),
new ChannelState(connection.ChannelPrefix, mode));
SetResult(message, final);
return true;
}
......@@ -1274,29 +1286,15 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItems();
GeoRadiusResult[] typed;
if (result.IsNull)
{
typed = null;
}
else
{
var options = this.options;
typed = new GeoRadiusResult[arr.Length];
for (int i = 0; i < arr.Length; i++)
{
typed[i] = Parse(options, arr[i]);
}
}
var typed = result.ToArray(
(in RawResult item, in GeoRadiusOptions options) => Parse(item, options), this.options);
SetResult(message, typed);
return true;
}
return false;
}
private static GeoRadiusResult Parse(GeoRadiusOptions options, in RawResult item)
private static GeoRadiusResult Parse(in RawResult item, GeoRadiusOptions options)
{
if (options == GeoRadiusOptions.None)
{
......@@ -1304,11 +1302,10 @@ private static GeoRadiusResult Parse(GeoRadiusOptions options, in RawResult item
return new GeoRadiusResult(item.AsRedisValue(), null, null, null);
}
// If WITHCOORD, WITHDIST or WITHHASH options are specified, the command returns an array of arrays, where each sub-array represents a single item.
var arr = item.GetItems();
var iter = item.GetItems().GetEnumerator();
int index = 0;
// the first item in the sub-array is always the name of the returned item.
var member = arr[index++].AsRedisValue();
var member = iter.GetNext().AsRedisValue();
/* The other information is returned in the following order as successive elements of the sub-array.
The distance from the center as a floating point number, in the same unit specified in the radius.
......@@ -1318,11 +1315,11 @@ private static GeoRadiusResult Parse(GeoRadiusOptions options, in RawResult item
double? distance = null;
GeoPosition? position = null;
long? hash = null;
if ((options & GeoRadiusOptions.WithDistance) != 0) { distance = (double?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithGeoHash) != 0) { hash = (long?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithDistance) != 0) { distance = (double?)iter.GetNext().AsRedisValue(); }
if ((options & GeoRadiusOptions.WithGeoHash) != 0) { hash = (long?)iter.GetNext().AsRedisValue(); }
if ((options & GeoRadiusOptions.WithCoordinates) != 0)
{
var coords = arr[index++].GetItems();
var coords = iter.GetNext().GetItems();
double longitude = (double)coords[0].AsRedisValue(), latitude = (double)coords[1].AsRedisValue();
position = new GeoPosition(longitude, latitude);
}
......@@ -1495,33 +1492,21 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
var arr = result.GetItems();
var streams = ConvertAll(arr, item =>
var streams = result.GetItems().ToArray((in RawResult item, in MultiStreamProcessor obj) =>
{
var details = item.GetItems();
var details = item.GetItems().GetEnumerator();
// details[0] = Name of the Stream
// details[1] = Multibulk Array of Stream Entries
return new RedisStream(key: details[0].AsRedisKey(),
entries: ParseRedisStreamEntries(details[1]));
});
return new RedisStream(key: details.GetNext().AsRedisKey(),
entries: obj.ParseRedisStreamEntries(details.GetNext()));
}, this);
SetResult(message, streams);
return true;
}
}
private static T[] ConvertAll<T>(ReadOnlySpan<RawResult> items, Func<RawResult, T> converter)
{
if (items.Length == 0) return Array.Empty<T>();
T[] arr = new T[items.Length];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = converter(items[i]);
}
return arr;
}
internal sealed class StreamConsumerInfoProcessor : InterleavedStreamInfoProcessorBase<StreamConsumerInfo>
{
protected override StreamConsumerInfo ParseItem(in RawResult result)
......@@ -1592,7 +1577,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
var arr = result.GetItems();
var parsedItems = ConvertAll(arr, item => ParseItem(item));
var parsedItems = arr.ToArray((in RawResult item, in InterleavedStreamInfoProcessorBase<T> obj) => obj.ParseItem(item), this);
SetResult(message, parsedItems);
return true;
......@@ -1634,9 +1619,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
long length = -1, radixTreeKeys = -1, radixTreeNodes = -1, groups = -1;
var lastGeneratedId = Redis.RedisValue.Null;
StreamEntry firstEntry = StreamEntry.Null, lastEntry = StreamEntry.Null;
for(int index = 0, i = 0; i < max; i++)
var iter = arr.GetEnumerator();
for(int i = 0; i < max; i++)
{
RawResult key = arr[index++], value = arr[index++];
ref RawResult key = ref iter.GetNext(), value = ref iter.GetNext();
if (key.Payload.Length > CommandBytes.MaxLength) continue;
var keyBytes = new CommandBytes(key.Payload);
......@@ -1714,15 +1700,16 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
// If there are no consumers as of yet for the given group, the last
// item in the response array will be null.
if (!arr[3].IsNull)
ref RawResult third = ref arr[3];
if (!third.IsNull)
{
consumers = ConvertAll(arr[3].GetItems(), item =>
consumers = third.ToArray((in RawResult item) =>
{
var details = item.GetItems();
var details = item.GetItems().GetEnumerator();
return new StreamConsumer(
name: details[0].AsRedisValue(),
pendingMessageCount: (int)details[1].AsRedisValue());
name: details.GetNext().AsRedisValue(),
pendingMessageCount: (int)details.GetNext().AsRedisValue());
});
}
......@@ -1747,16 +1734,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
var arr = result.GetItems();
var messageInfoArray = ConvertAll(arr, item =>
var messageInfoArray = result.GetItems().ToArray((in RawResult item) =>
{
var details = item.GetItems();
var details = item.GetItems().GetEnumerator();
return new StreamPendingMessageInfo(messageId: details[0].AsRedisValue(),
consumerName: details[1].AsRedisValue(),
idleTimeInMs: (long)details[2].AsRedisValue(),
deliveryCount: (int)details[3].AsRedisValue());
return new StreamPendingMessageInfo(messageId: details.GetNext().AsRedisValue(),
consumerName: details.GetNext().AsRedisValue(),
idleTimeInMs: (long)details.GetNext().AsRedisValue(),
deliveryCount: (int)details.GetNext().AsRedisValue());
});
SetResult(message, messageInfoArray);
......@@ -1789,9 +1774,8 @@ protected StreamEntry[] ParseRedisStreamEntries(in RawResult result)
return null;
}
var arr = result.GetItems();
return ConvertAll(arr, item => ParseRedisStreamEntry(item));
return result.GetItems().ToArray(
(in RawResult item, in StreamProcessorBase<T> obj) => obj.ParseRedisStreamEntry(item), this);
}
protected NameValueEntry[] ParseStreamEntryValues(in RawResult result)
......@@ -1819,17 +1803,17 @@ protected NameValueEntry[] ParseStreamEntryValues(in RawResult result)
var arr = result.GetItems();
// Calculate how many name/value pairs are in the stream entry.
int count = arr.Length / 2;
int count = (int)arr.Length / 2;
if (count == 0) return Array.Empty<NameValueEntry>();
var pairs = new NameValueEntry[count];
int offset = 0;
var iter = arr.GetEnumerator();
for (int i = 0; i < pairs.Length; i++)
{
pairs[i] = new NameValueEntry(arr[offset++].AsRedisValue(),
arr[offset++].AsRedisValue());
pairs[i] = new NameValueEntry(iter.GetNext().AsRedisValue(),
iter.GetNext().AsRedisValue());
}
return pairs;
......@@ -2008,14 +1992,12 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.MultiBulk:
var arrayOfArrays = result.GetItems();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Length][];
for (int i = 0; i < arrayOfArrays.Length; i++)
{
var rawInnerArray = arrayOfArrays[i];
innerProcessor.TryParse(rawInnerArray, out KeyValuePair<string, string>[] kvpArray);
returnArray[i] = kvpArray;
}
var returnArray = result.ToArray<KeyValuePair<string, string>[], StringPairInterleavedProcessor>(
(in RawResult rawInnerArray, in StringPairInterleavedProcessor proc) =>
{
proc.TryParse(rawInnerArray, out KeyValuePair<string, string>[] kvpArray);
return kvpArray;
}, innerProcessor);
SetResult(message, returnArray);
return true;
......
......@@ -15,7 +15,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.26" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.1.3" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.1" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
......@@ -12,7 +12,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.11.1" />
<PackageReference Include="BenchmarkDotNet" Version="0.11.4" />
<ProjectReference Include="..\..\src\StackExchange.Redis\StackExchange.Redis.csproj" />
</ItemGroup>
......
......@@ -26,7 +26,7 @@ protected virtual Job Configure(Job j)
public CustomConfig()
{
Add(new MemoryDiagnoser());
Add(MemoryDiagnoser.Default);
Add(StatisticColumn.OperationsPerSecond);
Add(JitOptimizationsValidator.FailOnError);
......@@ -64,11 +64,18 @@ public void Setup()
connection = ConnectionMultiplexer.Connect(options);
db = connection.GetDatabase(3);
db.KeyDelete(GeoKey, CommandFlags.FireAndForget);
db.KeyDelete(GeoKey);
db.GeoAdd(GeoKey, 13.361389, 38.115556, "Palermo ");
db.GeoAdd(GeoKey, 15.087269, 37.502669, "Catania");
db.KeyDelete(HashKey);
for (int i = 0; i < 1000; i++)
{
db.HashSet(HashKey, i, i);
}
}
private static readonly RedisKey GeoKey = "GeoTest", IncrByKey = "counter", StringKey = "string";
private static readonly RedisKey GeoKey = "GeoTest", IncrByKey = "counter", StringKey = "string", HashKey = "hash";
void IDisposable.Dispose()
{
mgr?.Dispose();
......@@ -78,16 +85,13 @@ void IDisposable.Dispose()
connection = null;
}
private const int COUNT = 500;
private const int COUNT = 50;
/// <summary>
/// Run INCRBY lots of times
/// </summary>
#if TEST_BASELINE
// [Benchmark(Description = "INCRBY:v1/s", OperationsPerInvoke = COUNT)]
#else
// [Benchmark(Description = "INCRBY:v2/s", OperationsPerInvoke = COUNT)]
#endif
// [Benchmark(Description = "INCRBY/s", OperationsPerInvoke = COUNT)]
public int ExecuteIncrBy()
{
var rand = new Random(12345);
......@@ -108,11 +112,7 @@ public int ExecuteIncrBy()
/// <summary>
/// Run INCRBY lots of times
/// </summary>
#if TEST_BASELINE
// [Benchmark(Description = "INCRBY:v1/a", OperationsPerInvoke = COUNT)]
#else
// [Benchmark(Description = "INCRBY:v2/a", OperationsPerInvoke = COUNT)]
#endif
// [Benchmark(Description = "INCRBY/a", OperationsPerInvoke = COUNT)]
public async Task<int> ExecuteIncrByAsync()
{
var rand = new Random(12345);
......@@ -133,11 +133,7 @@ public async Task<int> ExecuteIncrByAsync()
/// <summary>
/// Run GEORADIUS lots of times
/// </summary>
#if TEST_BASELINE
// [Benchmark(Description = "GEORADIUS:v1/s", OperationsPerInvoke = COUNT)]
#else
// [Benchmark(Description = "GEORADIUS:v2/s", OperationsPerInvoke = COUNT)]
#endif
// [Benchmark(Description = "GEORADIUS/s", OperationsPerInvoke = COUNT)]
public int ExecuteGeoRadius()
{
int total = 0;
......@@ -153,11 +149,7 @@ public int ExecuteGeoRadius()
/// <summary>
/// Run GEORADIUS lots of times
/// </summary>
#if TEST_BASELINE
// [Benchmark(Description = "GEORADIUS:v1/a", OperationsPerInvoke = COUNT)]
#else
// [Benchmark(Description = "GEORADIUS:v2/a", OperationsPerInvoke = COUNT)]
#endif
// [Benchmark(Description = "GEORADIUS/a", OperationsPerInvoke = COUNT)]
public async Task<int> ExecuteGeoRadiusAsync()
{
int total = 0;
......@@ -173,11 +165,7 @@ public async Task<int> ExecuteGeoRadiusAsync()
/// <summary>
/// Run StringSet lots of times
/// </summary>
#if TEST_BASELINE
[Benchmark(Description = "StringSet:v1/a", OperationsPerInvoke = COUNT)]
#else
[Benchmark(Description = "StringSet:v2/a", OperationsPerInvoke = COUNT)]
#endif
[Benchmark(Description = "StringSet/s", OperationsPerInvoke = COUNT)]
public void StringSet()
{
for (int i = 0; i < COUNT; i++)
......@@ -189,11 +177,7 @@ public void StringSet()
/// <summary>
/// Run StringGet lots of times
/// </summary>
#if TEST_BASELINE
[Benchmark(Description = "StringGet:v1/a", OperationsPerInvoke = COUNT)]
#else
[Benchmark(Description = "StringGet:v2/a", OperationsPerInvoke = COUNT)]
#endif
[Benchmark(Description = "StringGet/s", OperationsPerInvoke = COUNT)]
public void StringGet()
{
for (int i = 0; i < COUNT; i++)
......@@ -201,6 +185,33 @@ public void StringGet()
db.StringGet(StringKey);
}
}
/// <summary>
/// Run HashGetAll lots of times
/// </summary>
[Benchmark(Description = "HashGetAll F+F/s", OperationsPerInvoke = COUNT)]
public void HashGetAll_FAF()
{
for (int i = 0; i < COUNT; i++)
{
db.HashGetAll(HashKey, CommandFlags.FireAndForget);
db.Ping(); // to wait for response
}
}
/// <summary>
/// Run HashGetAll lots of times
/// </summary>
[Benchmark(Description = "HashGetAll F+F/a", OperationsPerInvoke = COUNT)]
public async Task HashGetAllAsync_FAF()
{
for (int i = 0; i < COUNT; i++)
{
await db.HashGetAllAsync(HashKey, CommandFlags.FireAndForget);
await db.PingAsync(); // to wait for response
}
}
}
#pragma warning disable CS1591
......
......@@ -17,8 +17,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.11.1" />
<PackageReference Include="StackExchange.Redis" Version="[1.2.7-alpha-00002]" />
<PackageReference Include="BenchmarkDotNet" Version="0.11.4" />
<PackageReference Include="StackExchange.Redis" Version="[2.0.519]" />
</ItemGroup>
</Project>
......@@ -2,14 +2,15 @@
using System.Buffers;
using System.Collections.Generic;
using System.Text;
using Pipelines.Sockets.Unofficial.Arenas;
using Xunit;
using Xunit.Abstractions;
namespace StackExchange.Redis.Tests
{
public class Parse : TestBase
public class ParseTests : TestBase
{
public Parse(ITestOutputHelper output) : base(output) { }
public ParseTests(ITestOutputHelper output) : base(output) { }
public static IEnumerable<object[]> GetTestData()
{
......@@ -34,7 +35,10 @@ public static IEnumerable<object[]> GetTestData()
public void ParseAsSingleChunk(string ascii, int expected)
{
var buffer = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes(ascii));
ProcessMessages(buffer, expected);
using (var arena = new Arena<RawResult>())
{
ProcessMessages(arena, buffer, expected);
}
}
[Theory]
......@@ -58,16 +62,19 @@ public void ParseAsLotsOfChunks(string ascii, int expected)
}
var buffer = new ReadOnlySequence<byte>(chain, 0, tail, 1);
Assert.Equal(bytes.Length, buffer.Length);
ProcessMessages(buffer, expected);
using (var arena = new Arena<RawResult>())
{
ProcessMessages(arena, buffer, expected);
}
}
private void ProcessMessages(ReadOnlySequence<byte> buffer, int expected)
private void ProcessMessages(Arena<RawResult> arena, ReadOnlySequence<byte> buffer, int expected)
{
Writer.WriteLine($"chain: {buffer.Length}");
var reader = new BufferReader(buffer);
RawResult result;
int found = 0;
while (!(result = PhysicalConnection.TryParseResult(buffer, ref reader, false, null, false)).IsNull)
while (!(result = PhysicalConnection.TryParseResult(arena, buffer, ref reader, false, null, false)).IsNull)
{
Writer.WriteLine($"{result} - {result.GetString()}");
found++;
......
......@@ -14,6 +14,8 @@ public class TestInfoReplicationChecks : TestBase
[Fact]
public async Task Exec()
{
Skip.Inconclusive("need to think about CompletedSynchronously");
using(var conn = Create())
{
var parsed = ConfigurationOptions.Parse(conn.Configuration);
......
......@@ -34,8 +34,6 @@ internal RedisRequest(in RawResult result)
Count = result.ItemsCount;
}
internal void Recycle() => _inner.Recycle();
public RedisValue GetValue(int index)
=> _inner[index].AsRedisValue();
......
......@@ -10,6 +10,7 @@
using System.Threading;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Arenas;
namespace StackExchange.Redis.Server
{
......@@ -241,6 +242,7 @@ protected void DoShutdown(ShutdownReason reason)
public void Dispose() => Dispose(true);
protected virtual void Dispose(bool disposing)
{
_arena.Dispose();
DoShutdown(ShutdownReason.ServerDisposed);
}
......@@ -363,10 +365,11 @@ void WritePrefix(PipeWriter ooutput, char pprefix)
}
await output.FlushAsync().ConfigureAwait(false);
}
public static bool TryParseRequest(ref ReadOnlySequence<byte> buffer, out RedisRequest request)
private static bool TryParseRequest(Arena<RawResult> arena, ref ReadOnlySequence<byte> buffer, out RedisRequest request)
{
var reader = new BufferReader(buffer);
var raw = PhysicalConnection.TryParseResult(in buffer, ref reader, false, null, true);
var raw = PhysicalConnection.TryParseResult(arena, in buffer, ref reader, false, null, true);
if (raw.HasValue)
{
buffer = reader.SliceFromCurrent();
......@@ -377,6 +380,9 @@ public static bool TryParseRequest(ref ReadOnlySequence<byte> buffer, out RedisR
return false;
}
private readonly Arena<RawResult> _arena = new Arena<RawResult>();
public ValueTask<bool> TryProcessRequestAsync(ref ReadOnlySequence<byte> buffer, RedisClient client, PipeWriter output)
{
async ValueTask<bool> Awaited(ValueTask wwrite, TypedRedisValue rresponse)
......@@ -385,11 +391,11 @@ async ValueTask<bool> Awaited(ValueTask wwrite, TypedRedisValue rresponse)
rresponse.Recycle();
return true;
}
if (!buffer.IsEmpty && TryParseRequest(ref buffer, out var request))
if (!buffer.IsEmpty && TryParseRequest(_arena, ref buffer, out var request))
{
TypedRedisValue response;
try { response = Execute(client, request); }
finally { request.Recycle(); }
finally { _arena.Reset(); }
var write = WriteResponseAsync(client, output, response);
if (!write.IsCompletedSuccessfully) return Awaited(write, response);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment