Commit 129ec274 authored by Marc Gravell's avatar Marc Gravell

we were actually allocating a quite alarming number of encoders; use [ThreadStatic] instead

parent 76efc9c6
...@@ -290,22 +290,7 @@ public void Log(string message) ...@@ -290,22 +290,7 @@ public void Log(string message)
} }
} }
static Encoder s_sharedEncoder; // swapped in/out to avoid alloc on the public WriteResponse API public static async ValueTask WriteResponseAsync(RedisClient client, PipeWriter output, TypedRedisValue value)
public static ValueTask WriteResponseAsync(RedisClient client, PipeWriter output, TypedRedisValue value)
{
async ValueTask Awaited(ValueTask wwrite, Encoder eenc)
{
await wwrite;
Interlocked.Exchange(ref s_sharedEncoder, eenc);
}
var enc = Interlocked.Exchange(ref s_sharedEncoder, null) ?? Encoding.UTF8.GetEncoder();
var write = WriteResponseAsync(client, output, value, enc);
if (!write.IsCompletedSuccessfully) return Awaited(write, enc);
Interlocked.Exchange(ref s_sharedEncoder, enc);
return default;
}
internal static async ValueTask WriteResponseAsync(RedisClient client, PipeWriter output, TypedRedisValue value, Encoder encoder)
{ {
void WritePrefix(PipeWriter ooutput, char pprefix) void WritePrefix(PipeWriter ooutput, char pprefix)
{ {
...@@ -331,11 +316,11 @@ void WritePrefix(PipeWriter ooutput, char pprefix) ...@@ -331,11 +316,11 @@ void WritePrefix(PipeWriter ooutput, char pprefix)
WritePrefix(output, prefix); WritePrefix(output, prefix);
var val = (string)value.AsRedisValue(); var val = (string)value.AsRedisValue();
var expectedLength = Encoding.UTF8.GetByteCount(val); var expectedLength = Encoding.UTF8.GetByteCount(val);
PhysicalConnection.WriteRaw(output, val, expectedLength, encoder); PhysicalConnection.WriteRaw(output, val, expectedLength);
PhysicalConnection.WriteCrlf(output); PhysicalConnection.WriteCrlf(output);
break; break;
case ResultType.BulkString: case ResultType.BulkString:
PhysicalConnection.WriteBulkString(value.AsRedisValue(), output, encoder); PhysicalConnection.WriteBulkString(value.AsRedisValue(), output);
break; break;
case ResultType.MultiBulk: case ResultType.MultiBulk:
if (value.IsNullArray) if (value.IsNullArray)
...@@ -355,7 +340,7 @@ void WritePrefix(PipeWriter ooutput, char pprefix) ...@@ -355,7 +340,7 @@ void WritePrefix(PipeWriter ooutput, char pprefix)
throw new InvalidOperationException("Array element cannot be nil, index " + i); throw new InvalidOperationException("Array element cannot be nil, index " + i);
// note: don't pass client down; this would impact SkipReplies // note: don't pass client down; this would impact SkipReplies
await WriteResponseAsync(null, output, item, encoder); await WriteResponseAsync(null, output, item);
} }
} }
break; break;
......
...@@ -571,7 +571,7 @@ internal void Write(RedisKey key) ...@@ -571,7 +571,7 @@ internal void Write(RedisKey key)
var val = key.KeyValue; var val = key.KeyValue;
if (val is string s) if (val is string s)
{ {
WriteUnifiedPrefixedString(_ioPipe.Output, key.KeyPrefix, s, outEncoder); WriteUnifiedPrefixedString(_ioPipe.Output, key.KeyPrefix, s);
} }
else else
{ {
...@@ -584,8 +584,8 @@ internal void Write(RedisChannel channel) ...@@ -584,8 +584,8 @@ internal void Write(RedisChannel channel)
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void WriteBulkString(RedisValue value) internal void WriteBulkString(RedisValue value)
=> WriteBulkString(value, _ioPipe.Output, outEncoder); => WriteBulkString(value, _ioPipe.Output);
internal static void WriteBulkString(RedisValue value, PipeWriter output, Encoder outEncoder) internal static void WriteBulkString(RedisValue value, PipeWriter output)
{ {
switch (value.Type) switch (value.Type)
{ {
...@@ -597,7 +597,7 @@ internal static void WriteBulkString(RedisValue value, PipeWriter output, Encode ...@@ -597,7 +597,7 @@ internal static void WriteBulkString(RedisValue value, PipeWriter output, Encode
break; break;
case RedisValue.StorageType.Double: // use string case RedisValue.StorageType.Double: // use string
case RedisValue.StorageType.String: case RedisValue.StorageType.String:
WriteUnifiedPrefixedString(output, null, (string)value, outEncoder); WriteUnifiedPrefixedString(output, null, (string)value);
break; break;
case RedisValue.StorageType.Raw: case RedisValue.StorageType.Raw:
WriteUnifiedSpan(output, ((ReadOnlyMemory<byte>)value).Span); WriteUnifiedSpan(output, ((ReadOnlyMemory<byte>)value).Span);
...@@ -901,7 +901,7 @@ internal static byte ToHexNibble(int value) ...@@ -901,7 +901,7 @@ internal static byte ToHexNibble(int value)
return value < 10 ? (byte)('0' + value) : (byte)('a' - 10 + value); return value < 10 ? (byte)('0' + value) : (byte)('a' - 10 + value);
} }
internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix, string value, Encoder outEncoder) internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix, string value)
{ {
if (value == null) if (value == null)
{ {
...@@ -929,13 +929,29 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix ...@@ -929,13 +929,29 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix
writer.Advance(bytes); writer.Advance(bytes);
if (prefixLength != 0) writer.Write(prefix); if (prefixLength != 0) writer.Write(prefix);
if (encodedLength != 0) WriteRaw(writer, value, encodedLength, outEncoder); if (encodedLength != 0) WriteRaw(writer, value, encodedLength);
WriteCrlf(writer); WriteCrlf(writer);
} }
} }
} }
unsafe static internal void WriteRaw(PipeWriter writer, string value, int expectedLength, Encoder outEncoder) [ThreadStatic]
static Encoder s_PerThreadEncoder;
static Encoder GetPerThreadEncoder()
{
var encoder = s_PerThreadEncoder;
if(encoder == null)
{
s_PerThreadEncoder = encoder = Encoding.UTF8.GetEncoder();
}
else
{
encoder.Reset();
}
return encoder;
}
unsafe static internal void WriteRaw(PipeWriter writer, string value, int expectedLength)
{ {
const int MaxQuickEncodeSize = 512; const int MaxQuickEncodeSize = 512;
...@@ -955,7 +971,7 @@ unsafe static internal void WriteRaw(PipeWriter writer, string value, int expect ...@@ -955,7 +971,7 @@ unsafe static internal void WriteRaw(PipeWriter writer, string value, int expect
else else
{ {
// use an encoder in a loop // use an encoder in a loop
outEncoder.Reset(); var encoder = GetPerThreadEncoder();
int charsRemaining = value.Length, charOffset = 0; int charsRemaining = value.Length, charOffset = 0;
totalBytes = 0; totalBytes = 0;
...@@ -968,7 +984,7 @@ unsafe static internal void WriteRaw(PipeWriter writer, string value, int expect ...@@ -968,7 +984,7 @@ unsafe static internal void WriteRaw(PipeWriter writer, string value, int expect
bool completed; bool completed;
fixed (byte* bPtr = &MemoryMarshal.GetReference(span)) fixed (byte* bPtr = &MemoryMarshal.GetReference(span))
{ {
outEncoder.Convert(cPtr + charOffset, charsRemaining, bPtr, span.Length, final, out charsUsed, out bytesUsed, out completed); encoder.Convert(cPtr + charOffset, charsRemaining, bPtr, span.Length, final, out charsUsed, out bytesUsed, out completed);
} }
writer.Advance(bytesUsed); writer.Advance(bytesUsed);
totalBytes += bytesUsed; totalBytes += bytesUsed;
...@@ -988,8 +1004,6 @@ unsafe static internal void WriteRaw(PipeWriter writer, string value, int expect ...@@ -988,8 +1004,6 @@ unsafe static internal void WriteRaw(PipeWriter writer, string value, int expect
} }
} }
private readonly Encoder outEncoder = Encoding.UTF8.GetEncoder();
private static void WriteUnifiedPrefixedBlob(PipeWriter writer, byte[] prefix, byte[] value) private static void WriteUnifiedPrefixedBlob(PipeWriter writer, byte[] prefix, byte[] value)
{ {
// ${total-len}\r\n // ${total-len}\r\n
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment