Commit 420a024f authored by Marc Gravell's avatar Marc Gravell

use pooled/recycled buffers for RawResult sub-items; these are never exposed...

use pooled/recycled buffers for RawResult sub-items; these are never exposed outside of the framing process, so lifetime is not an issue
parent 9a0f258e
...@@ -5,6 +5,12 @@ namespace StackExchange.Redis.Tests ...@@ -5,6 +5,12 @@ namespace StackExchange.Redis.Tests
{ {
public class RawResultTests public class RawResultTests
{ {
[Fact]
public void TypeLoads()
{
var type = typeof(RawResult);
Assert.Equal(nameof(RawResult), type.Name);
}
[Fact] [Fact]
public void NullWorks() public void NullWorks()
{ {
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="System.Numerics.Vectors" Version="4.5.0" />
<PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.3.0" /> <PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.3.0" />
<PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" /> <PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="$(CoreFxVersion)" /> <PackageReference Include="System.Diagnostics.PerformanceCounter" Version="$(CoreFxVersion)" />
......
...@@ -206,31 +206,5 @@ internal static EndPoint TryParseEndPoint(string endpoint) ...@@ -206,31 +206,5 @@ internal static EndPoint TryParseEndPoint(string endpoint)
return ParseEndPoint(host, port); return ParseEndPoint(host, port);
} }
static readonly Vector<ushort> NonAsciiMask = new Vector<ushort>(0xFF80);
internal static unsafe int GetEncodedLength(string value)
{
if (value.Length == 0) return 0;
int offset = 0;
if (Vector.IsHardwareAccelerated && value.Length >= Vector<ushort>.Count)
{
var vecSpan = MemoryMarshal.Cast<char, Vector<ushort>>(value.AsSpan());
var nonAscii = NonAsciiMask;
int i;
for (i = 0; i < vecSpan.Length; i++)
{
if ((vecSpan[i] & nonAscii) != Vector<ushort>.Zero) break;
}
offset = Vector<ushort>.Count * i;
}
int remaining = value.Length - offset;
if (remaining == 0) return offset; // all ASCII (nice round length, and Vector support)
// handles a) no Vector support, b) anything from the fisrt non-ASCII chunk, c) tail end
fixed (char* ptr = value)
{
return offset + Encoding.UTF8.GetByteCount(ptr + offset, remaining);
}
}
} }
} }
...@@ -40,8 +40,15 @@ public LoggingPipe(IDuplexPipe inner, string inPath, string outPath) ...@@ -40,8 +40,15 @@ public LoggingPipe(IDuplexPipe inner, string inPath, string outPath)
private async void CloneAsync(string path, PipeReader from, PipeWriter to) private async void CloneAsync(string path, PipeReader from, PipeWriter to)
{ {
to.OnReaderCompleted((ex, o) => ((PipeReader)o).Complete(ex), from); to.OnReaderCompleted((ex, o) => {
from.OnWriterCompleted((ex, o) => ((PipeWriter)o).Complete(ex), to); if (ex != null) Console.Error.WriteLine(ex);
((PipeReader)o).Complete(ex);
}, from);
from.OnWriterCompleted((ex, o) =>
{
if (ex != null) Console.Error.WriteLine(ex);
((PipeWriter)o).Complete(ex);
}, to);
while(true) while(true)
{ {
var result = await from.ReadAsync(); var result = await from.ReadAsync();
......
...@@ -722,7 +722,7 @@ private void WriteUnified(PipeWriter writer, byte[] prefix, string value) ...@@ -722,7 +722,7 @@ private void WriteUnified(PipeWriter writer, byte[] prefix, string value)
{ {
// ${total-len}\r\n 3 + MaxInt32TextLen // ${total-len}\r\n 3 + MaxInt32TextLen
// {prefix}{value}\r\n // {prefix}{value}\r\n
int encodedLength = Format.GetEncodedLength(value), int encodedLength = Encoding.UTF8.GetByteCount(value),
prefixLength = prefix == null ? 0 : prefix.Length, prefixLength = prefix == null ? 0 : prefix.Length,
totalLength = prefixLength + encodedLength; totalLength = prefixLength + encodedLength;
...@@ -1045,18 +1045,24 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer) ...@@ -1045,18 +1045,24 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
{ {
var reader = new BufferReader(buffer); var reader = new BufferReader(buffer);
var result = TryParseResult(in buffer, ref reader); var result = TryParseResult(in buffer, ref reader);
try
if (result.HasValue)
{ {
buffer = buffer.Slice(reader.TotalConsumed); if (result.HasValue)
{
buffer = buffer.Slice(reader.TotalConsumed);
messageCount++; messageCount++;
Multiplexer.Trace(result.ToString(), physicalName); Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result); MatchResult(result);
}
else
{
break; // remaining buffer isn't enough; give up
}
} }
else finally
{ {
break; // remaining buffer isn't enough; give up result.Recycle();
} }
} }
return messageCount; return messageCount;
...@@ -1114,13 +1120,17 @@ private RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferReader r ...@@ -1114,13 +1120,17 @@ private RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferReader r
return RawResult.EmptyMultiBulk; return RawResult.EmptyMultiBulk;
} }
var arr = new RawResult[itemCountActual]; var oversized = ArrayPool<RawResult>.Shared.Rent(itemCountActual);
var result = new RawResult(oversized, itemCountActual);
for (int i = 0; i < itemCountActual; i++) for (int i = 0; i < itemCountActual; i++)
{ {
if (!(arr[i] = TryParseResult(in buffer, ref reader)).HasValue) if (!(oversized[i] = TryParseResult(in buffer, ref reader)).HasValue)
{
result.Recycle(i); // passing index here means we don't need to "Array.Clear" before-hand
return RawResult.Nil; return RawResult.Nil;
}
} }
return new RawResult(arr); return result;
} }
return RawResult.Nil; return RawResult.Nil;
} }
......
...@@ -6,13 +6,15 @@ namespace StackExchange.Redis ...@@ -6,13 +6,15 @@ namespace StackExchange.Redis
{ {
internal readonly struct RawResult internal readonly struct RawResult
{ {
internal static readonly RawResult NullMultiBulk = new RawResult((RawResult[])null); internal static readonly RawResult NullMultiBulk = new RawResult(null, 0);
internal static readonly RawResult EmptyMultiBulk = new RawResult(Array.Empty<RawResult>()); internal static readonly RawResult EmptyMultiBulk = new RawResult(Array.Empty<RawResult>(), 0);
internal static readonly RawResult Nil = default; internal static readonly RawResult Nil = default;
private readonly ReadOnlySequence<byte> _payload; private readonly ReadOnlySequence<byte> _payload;
private readonly RawResult[] _subArray; // note: can't use Memory<RawResult> here - struct recursion breaks runtimr
private readonly RawResult[] _itemsOversized;
private readonly int _itemsCount;
private readonly ResultType _type; private readonly ResultType _type;
const ResultType NonNullFlag = (ResultType)128; const ResultType NonNullFlag = (ResultType)128;
...@@ -32,15 +34,17 @@ public RawResult(ResultType resultType, ReadOnlySequence<byte> payload, bool isN ...@@ -32,15 +34,17 @@ public RawResult(ResultType resultType, ReadOnlySequence<byte> payload, bool isN
if (!isNull) resultType |= NonNullFlag; if (!isNull) resultType |= NonNullFlag;
_type = resultType; _type = resultType;
_payload = payload; _payload = payload;
_subArray = default; _itemsOversized = default;
_itemsCount = default;
} }
public RawResult(RawResult[] arr) public RawResult(RawResult[] itemsOversized, int itemCount)
{ {
_type = ResultType.MultiBulk; _type = ResultType.MultiBulk;
if (arr != null) _type |= NonNullFlag; if (itemsOversized != null) _type |= NonNullFlag;
_payload = default; _payload = default;
_subArray = arr; _itemsOversized = itemsOversized;
_itemsCount = itemCount;
} }
public bool IsError => Type == ResultType.Error; public bool IsError => Type == ResultType.Error;
...@@ -62,7 +66,7 @@ public override string ToString() ...@@ -62,7 +66,7 @@ public override string ToString()
case ResultType.BulkString: case ResultType.BulkString:
return $"{Type}: {_payload.Length} bytes"; return $"{Type}: {_payload.Length} bytes";
case ResultType.MultiBulk: case ResultType.MultiBulk:
return $"{Type}: {_subArray.Length} items"; return $"{Type}: {_itemsCount} items";
default: default:
return $"(unknown: {Type})"; return $"(unknown: {Type})";
} }
...@@ -118,6 +122,21 @@ internal RedisValue AsRedisValue() ...@@ -118,6 +122,21 @@ internal RedisValue AsRedisValue()
throw new InvalidCastException("Cannot convert to RedisValue: " + Type); throw new InvalidCastException("Cannot convert to RedisValue: " + Type);
} }
public void Recycle(int limit = -1)
{
var arr = _itemsOversized;
if (limit < 0) limit = _itemsCount;
if (arr != null)
{
for (int i = 0; i < limit; i++)
{
arr[i].Recycle();
}
}
if(_itemsOversized != null)
ArrayPool<RawResult>.Shared.Return(_itemsOversized, clearArray: false);
}
internal unsafe bool IsEqual(byte[] expected) internal unsafe bool IsEqual(byte[] expected)
{ {
if (expected == null) throw new ArgumentNullException(nameof(expected)); if (expected == null) throw new ArgumentNullException(nameof(expected));
...@@ -179,16 +198,17 @@ internal bool GetBoolean() ...@@ -179,16 +198,17 @@ internal bool GetBoolean()
} }
} }
internal RawResult[] GetItems() internal ReadOnlySpan<RawResult> GetItems()
{ {
if (Type == ResultType.MultiBulk) return _subArray; if (Type == ResultType.MultiBulk)
return new ReadOnlySpan<RawResult>(_itemsOversized, 0, _itemsCount);
throw new InvalidOperationException(); throw new InvalidOperationException();
} }
internal RedisKey[] GetItemsAsKeys() internal RedisKey[] GetItemsAsKeys()
{ {
RawResult[] items = GetItems(); var items = GetItems();
if (items == null) if (IsNull)
{ {
return null; return null;
} }
...@@ -209,8 +229,8 @@ internal RedisKey[] GetItemsAsKeys() ...@@ -209,8 +229,8 @@ internal RedisKey[] GetItemsAsKeys()
internal RedisValue[] GetItemsAsValues() internal RedisValue[] GetItemsAsValues()
{ {
RawResult[] items = GetItems(); var items = GetItems();
if (items == null) if (IsNull)
{ {
return null; return null;
} }
...@@ -232,8 +252,8 @@ internal RedisValue[] GetItemsAsValues() ...@@ -232,8 +252,8 @@ internal RedisValue[] GetItemsAsValues()
private static readonly string[] NilStrings = new string[0]; private static readonly string[] NilStrings = new string[0];
internal string[] GetItemsAsStrings() internal string[] GetItemsAsStrings()
{ {
RawResult[] items = GetItems(); var items = GetItems();
if (items == null) if (IsNull)
{ {
return null; return null;
} }
...@@ -254,14 +274,14 @@ internal string[] GetItemsAsStrings() ...@@ -254,14 +274,14 @@ internal string[] GetItemsAsStrings()
internal GeoPosition? GetItemsAsGeoPosition() internal GeoPosition? GetItemsAsGeoPosition()
{ {
RawResult[] items = GetItems(); var items = GetItems();
if (items == null || items.Length == 0) if (IsNull || items.Length == 0)
{ {
return null; return null;
} }
var coords = items[0].GetItems(); var coords = items[0].GetItems();
if (coords == null) if (items[0].IsNull)
{ {
return null; return null;
} }
...@@ -270,8 +290,8 @@ internal string[] GetItemsAsStrings() ...@@ -270,8 +290,8 @@ internal string[] GetItemsAsStrings()
internal GeoPosition?[] GetItemsAsGeoPositionArray() internal GeoPosition?[] GetItemsAsGeoPositionArray()
{ {
RawResult[] items = GetItems(); var items = GetItems();
if (items == null) if (IsNull)
{ {
return null; return null;
} }
...@@ -284,8 +304,8 @@ internal string[] GetItemsAsStrings() ...@@ -284,8 +304,8 @@ internal string[] GetItemsAsStrings()
var arr = new GeoPosition?[items.Length]; var arr = new GeoPosition?[items.Length];
for (int i = 0; i < arr.Length; i++) for (int i = 0; i < arr.Length; i++)
{ {
RawResult[] item = items[i].GetItems(); var item = items[i].GetItems();
if (item == null) if (items[i].IsNull)
{ {
arr[i] = null; arr[i] = null;
} }
...@@ -297,9 +317,6 @@ internal string[] GetItemsAsStrings() ...@@ -297,9 +317,6 @@ internal string[] GetItemsAsStrings()
return arr; return arr;
} }
} }
internal RawResult[] GetItemsAsRawResults() => GetItems();
internal unsafe string GetString() internal unsafe string GetString()
{ {
if (IsNull) return null; if (IsNull) return null;
......
...@@ -255,7 +255,7 @@ internal static byte[] ConcatenateBytes(byte[] a, object b, byte[] c) ...@@ -255,7 +255,7 @@ internal static byte[] ConcatenateBytes(byte[] a, object b, byte[] c)
int aLen = a?.Length ?? 0, int aLen = a?.Length ?? 0,
bLen = b == null ? 0 : (b is string bLen = b == null ? 0 : (b is string
? Format.GetEncodedLength((string)b) ? Encoding.UTF8.GetByteCount((string)b)
: ((byte[])b).Length), : ((byte[])b).Length),
cLen = c?.Length ?? 0; cLen = c?.Length ?? 0;
......
...@@ -29,7 +29,7 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r ...@@ -29,7 +29,7 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r
return new SingleRedisResult(result.AsRedisValue()); return new SingleRedisResult(result.AsRedisValue());
case ResultType.MultiBulk: case ResultType.MultiBulk:
var items = result.GetItems(); var items = result.GetItems();
var arr = new RedisResult[items.Length]; var arr = result.IsNull ? null : new RedisResult[items.Length];
for (int i = 0; i < arr.Length; i++) for (int i = 0; i < arr.Length; i++)
{ {
var next = TryCreate(connection, items[i]); var next = TryCreate(connection, items[i]);
......
...@@ -423,7 +423,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -423,7 +423,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if (!tran.IsAborted) if (!tran.IsAborted)
{ {
var arr = result.GetItems(); var arr = result.GetItems();
if (arr == null) if (result.IsNull)
{ {
connection.Multiplexer.Trace("Server aborted due to failed WATCH"); connection.Multiplexer.Trace("Server aborted due to failed WATCH");
foreach (var op in wrapped) foreach (var op in wrapped)
......
...@@ -487,7 +487,7 @@ public bool TryParse(RawResult result, out T[] pairs) ...@@ -487,7 +487,7 @@ public bool TryParse(RawResult result, out T[] pairs)
{ {
case ResultType.MultiBulk: case ResultType.MultiBulk:
var arr = result.GetItems(); var arr = result.GetItems();
if (arr == null) if (result.IsNull)
{ {
pairs = null; pairs = null;
} }
...@@ -962,7 +962,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -962,7 +962,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if (result.Type == ResultType.MultiBulk) if (result.Type == ResultType.MultiBulk)
{ {
var arr = result.GetItems(); var arr = result.GetItems();
if (arr?.Length == 2 && arr[1].TryGetInt64(out long val)) if (arr.Length == 2 && arr[1].TryGetInt64(out long val))
{ {
SetResult(message, val); SetResult(message, val);
return true; return true;
...@@ -1201,10 +1201,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1201,10 +1201,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Type) switch (result.Type)
{ {
case ResultType.MultiBulk: case ResultType.MultiBulk:
var arr = result.GetItemsAsRawResults(); var arr = result.GetItems();
GeoRadiusResult[] typed; GeoRadiusResult[] typed;
if (arr == null) if (result.IsNull)
{ {
typed = null; typed = null;
} }
......
...@@ -36,7 +36,8 @@ static int Main() ...@@ -36,7 +36,8 @@ static int Main()
finally finally
{ {
Console.WriteLine(); Console.WriteLine();
//Console.WriteLine(s); // Console.WriteLine(s);
Console.ReadKey();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment