Commit 996b182f authored by Marc Gravell's avatar Marc Gravell

pipeline reading code; completely non-compiling right now

parent 6af1811b
......@@ -10,6 +10,7 @@
<OutputTypeEx>Library</OutputTypeEx>
<SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<LangVersion>7.2</LangVersion>
</PropertyGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' OR '$(TargetFramework)' == 'net46' ">
......
using System;
using System.Buffers;
using System.Buffers.Text;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
......@@ -478,7 +479,8 @@ private void WriteHeader(byte[] commandBytes, int arguments)
_ioPipe.Output.Advance(offset);
}
const int MaxInt32TextLen = 11, // -2,147,483,648 (not including the commas)
internal const int
MaxInt32TextLen = 11, // -2,147,483,648 (not including the commas)
MaxInt64TextLen = 20; // -9,223,372,036,854,775,808 (not including the commas)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
......@@ -556,23 +558,36 @@ private static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix =
}
else
{
unsafe
// we're going to write it, but *to the wrong place*
var availableChunk = span.Slice(offset);
if (!Utf8Formatter.TryFormat(value, availableChunk, out int formattedLength))
{
byte* bytes = stackalloc byte[MaxInt32TextLen];
var s = Format.ToString(value); // need an alloc-free version of this...
int len;
fixed (char* c = s)
throw new InvalidOperationException("TryFormat failed");
}
if (withLengthPrefix)
{
// now we know how large the prefix is: write the prefix, then write the value
if (!Utf8Formatter.TryFormat(formattedLength, availableChunk, out int prefixLength))
{
len = Encoding.ASCII.GetBytes(c, s.Length, bytes, MaxInt32TextLen);
throw new InvalidOperationException("TryFormat failed");
}
if (withLengthPrefix)
offset += prefixLength;
offset = WriteCrlf(span, offset);
availableChunk = span.Slice(offset);
if (!Utf8Formatter.TryFormat(value, availableChunk, out int finalLength))
{
offset = WriteRaw(span, len, false, offset);
throw new InvalidOperationException("TryFormat failed");
}
new ReadOnlySpan<byte>(bytes, len).CopyTo(span.Slice(offset));
offset += len;
offset += finalLength;
Debug.Assert(finalLength == formattedLength);
}
else
{
offset += formattedLength;
}
}
return WriteCrlf(span, offset);
}
......@@ -784,30 +799,7 @@ private static void WriteUnified(PipeWriter writer, long value)
var bytes = WriteRaw(span, value, withLengthPrefix: true, offset: 1);
writer.Advance(bytes);
}
private void BeginReading()
{
bool keepReading;
try
{
do
{
keepReading = false;
int space = EnsureSpaceAndComputeBytesToRead();
Multiplexer.Trace("Beginning async read...", physicalName);
var result = netStream.BeginRead(ioBuffer, ioBufferBytes, space, endRead, this);
if (result.CompletedSynchronously)
{
Multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
}
} while (keepReading);
}
catch (IOException ex)
{
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
}
}
private int haveReader;
......@@ -897,31 +889,6 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(IDuplexPipe pipe, Tex
}
}
private bool EndReading(IAsyncResult result)
{
try
{
int bytesRead = netStream?.EndRead(result) ?? 0;
return ProcessReadBytes(bytesRead);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
private int EnsureSpaceAndComputeBytesToRead()
{
int space = ioBuffer.Length - ioBufferBytes;
if (space == 0)
{
Array.Resize(ref ioBuffer, ioBuffer.Length * 2);
space = ioBuffer.Length - ioBufferBytes;
}
return space;
}
void ISocketCallback.Error()
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
......@@ -1007,87 +974,82 @@ void ISocketCallback.OnHeartbeat()
}
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name);
private int ProcessBuffer(byte[] underlying, ref int offset, ref int count)
private async Task ReadFromPipe()
{
int messageCount = 0;
RawResult result;
do
try
{
int tmpOffset = offset, tmpCount = count;
// we want TryParseResult to be able to mess with these without consequence
result = TryParseResult(underlying, ref tmpOffset, ref tmpCount);
if (result.HasValue)
while (true)
{
messageCount++;
// entire message: update the external counters
offset = tmpOffset;
count = tmpCount;
var input = _ioPipe.Input;
var readResult = await input.ReadAsync();
if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
{
break; // we're all done
}
var buffer = readResult.Buffer;
Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result);
int handled = ProcessBuffer(ref buffer);
Multiplexer.Trace($"Processed {handled} messages", physicalName);
input.AdvanceTo(buffer.Start, buffer.End);
}
} while (result.HasValue);
return messageCount;
}
private bool ProcessReadBytes(int bytesRead)
{
if (bytesRead <= 0)
{
Multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
return false;
}
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
// reset unanswered write timestamp
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
ioBufferBytes += bytesRead;
Multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes;
int handled = ProcessBuffer(ioBuffer, ref offset, ref count);
Multiplexer.Trace("Processed: " + handled, physicalName);
if (handled != 0)
catch (Exception ex)
{
// read stuff
if (count != 0)
{
Multiplexer.Trace("Copying remaining bytes: " + count, physicalName);
// if anything was left over, we need to copy it to
// the start of the buffer so it can be used next time
Buffer.BlockCopy(ioBuffer, offset, ioBuffer, 0, count);
}
ioBufferBytes = count;
Multiplexer.Trace("Faulted", physicalName);
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
return true;
}
void ISocketCallback.Read()
private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
{
Interlocked.Increment(ref haveReader);
try
int messageCount = 0;
RawResult result;
while (!buffer.IsEmpty)
{
do
// we want TryParseResult to be able to mess with these without consequence
var snapshot = buffer;
result = TryParseResult(ref buffer);
if (result.HasValue)
{
int space = EnsureSpaceAndComputeBytesToRead();
int bytesRead = netStream?.Read(ioBuffer, ioBufferBytes, space) ?? 0;
messageCount++;
if (!ProcessReadBytes(bytesRead)) return; // EOF
} while (socketToken.Available != 0);
Multiplexer.Trace("Buffer exhausted", physicalName);
// ^^^ note that the socket manager will call us again when there is something to do
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
finally
{
Interlocked.Decrement(ref haveReader);
Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result);
}
else
{
buffer = snapshot; // just in case TryParseResult toyed with it
break;
}
}
return messageCount;
}
//void ISocketCallback.Read()
//{
// Interlocked.Increment(ref haveReader);
// try
// {
// do
// {
// int space = EnsureSpaceAndComputeBytesToRead();
// int bytesRead = netStream?.Read(ioBuffer, ioBufferBytes, space) ?? 0;
// if (!ProcessReadBytes(bytesRead)) return; // EOF
// } while (socketToken.Available != 0);
// Multiplexer.Trace("Buffer exhausted", physicalName);
// // ^^^ note that the socket manager will call us again when there is something to do
// }
// catch (Exception ex)
// {
// RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
// }
// finally
// {
// Interlocked.Decrement(ref haveReader);
// }
//}
bool ISocketCallback.IsDataAvailable
{
......@@ -1154,21 +1116,17 @@ private RawResult ReadBulkString(byte[] buffer, ref int offset, ref int count)
return RawResult.Nil;
}
private RawResult ReadLineTerminatedString(ResultType type, byte[] buffer, ref int offset, ref int count)
private RawResult ReadLineTerminatedString(ResultType type, ref ReadOnlySequence<byte> buffer, ref BufferReader reader)
{
int max = offset + count - 2;
for (int i = offset; i < max; i++)
{
if (buffer[i + 1] == '\r' && buffer[i + 2] == '\n')
{
int len = i - offset + 1;
var result = new RawResult(type, buffer, offset, len);
count -= (len + 2);
offset += (len + 2);
return result;
}
}
return RawResult.Nil;
int crlf = BufferReader.FindNextCrLf(reader);
if (crlf < 0) return RawResult.Nil;
var inner = buffer.Slice(reader.TotalConsumed, crlf);
reader.Consume(crlf + 2);
var result = new RawResult(type, inner);
}
void ISocketCallback.StartReading()
......@@ -1176,27 +1134,124 @@ void ISocketCallback.StartReading()
BeginReading();
}
private RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
private RawResult TryParseResult(ref ReadOnlySequence<byte> buffer)
{
if (count == 0) return RawResult.Nil;
if (buffer.IsEmpty) return RawResult.Nil;
char resultType = (char)buffer[offset++];
count--;
// so we have *at least* one byte
char resultType = (char)buffer.First.Span[0];
var reader = new BufferReader(buffer);
reader.Consume(1);
RawResult result;
switch (resultType)
{
case '+': // simple string
return ReadLineTerminatedString(ResultType.SimpleString, buffer, ref offset, ref count);
result = ReadLineTerminatedString(ResultType.SimpleString, ref buffer, ref reader);
break;
case '-': // error
return ReadLineTerminatedString(ResultType.Error, buffer, ref offset, ref count);
result = ReadLineTerminatedString(ResultType.Error, ref buffer, ref reader);
break;
case ':': // integer
return ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
result = ReadLineTerminatedString(ResultType.Integer, ref buffer, ref reader);
break;
case '$': // bulk string
return ReadBulkString(buffer, ref offset, ref count);
result = ReadBulkString(buffer, ref reader);
break;
case '*': // array
return ReadArray(buffer, ref offset, ref count);
result = ReadArray(buffer, ref reader);
break;
default:
throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType);
}
buffer = buffer.Slice(reader.TotalConsumed);
return result;
}
ref struct BufferReader
{
private ReadOnlySequence<byte>.Enumerator _iterator;
private ReadOnlySpan<byte> _current;
public ReadOnlySpan<byte> Span => _current;
public int OffsetThisSpan { get; private set; }
public int TotalConsumed { get; private set; }
public int RemainingThisSpan { get; private set; }
bool FetchNext()
{
if(_iterator.MoveNext())
{
_current = _iterator.Current.Span;
OffsetThisSpan = 0;
RemainingThisSpan = _current.Length;
return true;
}
else
{
OffsetThisSpan = RemainingThisSpan = 0;
return false;
}
}
public BufferReader(ReadOnlySequence<byte> buffer)
{
_iterator = buffer.GetEnumerator();
_current = default;
OffsetThisSpan = RemainingThisSpan = TotalConsumed = 0;
FetchNext();
}
static readonly byte[] CRLF = { (byte)'\r', (byte)'\n' };
internal static int FindNextCrLf(BufferReader reader) // very deliberately not ref; want snapshot
{
// is it in the current span? (we need to handle the offsets differently if so)
int totalSkipped = 0;
bool haveTrailingCR = false;
do
{
var span = reader.Span;
if (reader.OffsetThisSpan != 0) span = span.Slice(reader.OffsetThisSpan);
if (span.IsEmpty)
{
haveTrailingCR = false;
}
else
{
if (haveTrailingCR && span[0] == '\n') return totalSkipped - 1;
int found = span.IndexOf(CRLF);
if (found >= 0) return totalSkipped + found;
haveTrailingCR = span[span.Length - 1] == '\r';
totalSkipped += span.Length;
}
}
while (reader.FetchNext());
return -1;
}
public void Consume(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
while(count != 0)
{
if(count < RemainingThisSpan)
{
// consume part of this span
TotalConsumed += count;
RemainingThisSpan -= count;
OffsetThisSpan += count;
count = 0;
}
else
{
// consume all of this span
TotalConsumed += RemainingThisSpan;
count -= RemainingThisSpan;
if (!FetchNext()) throw new EndOfStreamException();
}
}
}
}
partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite);
......
using System;
using System.Buffers;
using System.Text;
namespace StackExchange.Redis
{
internal struct RawResult
internal readonly struct RawResult
{
public static readonly RawResult EmptyArray = new RawResult(new RawResult[0]);
public static readonly RawResult Nil = new RawResult();
private static readonly byte[] emptyBlob = new byte[0];
private readonly int offset, count;
private readonly Array arr;
public RawResult(ResultType resultType, byte[] buffer, int offset, int count)
private readonly ReadOnlySequence<byte> _payload;
private readonly RawResult[] _subArray;
private readonly ResultType _type;
const ResultType NullResultTypeBit = unchecked((ResultType)(1 << 31));
public RawResult(ResultType resultType, ReadOnlySequence<byte> payload, bool isNull)
{
switch (resultType)
{
......@@ -22,35 +27,31 @@ public RawResult(ResultType resultType, byte[] buffer, int offset, int count)
default:
throw new ArgumentOutOfRangeException(nameof(resultType));
}
Type = resultType;
arr = buffer;
this.offset = offset;
this.count = count;
if (isNull) resultType |= NullResultTypeBit;
_type = resultType;
_payload = payload;
_subArray = default;
}
public RawResult(RawResult[] arr)
{
if (arr == null) throw new ArgumentNullException(nameof(arr));
Type = ResultType.MultiBulk;
offset = 0;
count = arr.Length;
this.arr = arr;
_type = ResultType.MultiBulk;
_payload = default;
_subArray = arr ?? throw new ArgumentNullException(nameof(arr));
}
public bool HasValue => Type != ResultType.None;
public bool IsError => Type == ResultType.Error;
public ResultType Type { get; }
public ResultType Type => _type & ~NullResultTypeBit;
internal bool IsNull => arr == null;
internal bool IsNull => (_type & NullResultTypeBit) != 0;
public override string ToString()
{
if (arr == null)
{
return "(null)";
}
if (IsNull) return "(null)";
switch (Type)
{
case ResultType.SimpleString:
......@@ -58,11 +59,11 @@ public override string ToString()
case ResultType.Error:
return $"{Type}: {GetString()}";
case ResultType.BulkString:
return $"{Type}: {count} bytes";
return $"{Type}: {_payload.Length} bytes";
case ResultType.MultiBulk:
return $"{Type}: {count} items";
return $"{Type}: {_subArray.Length} items";
default:
return "(unknown)";
return $"(unknown: {Type})";
}
}
......@@ -78,10 +79,8 @@ internal RedisChannel AsRedisChannel(byte[] channelPrefix, RedisChannel.PatternM
}
if (AssertStarts(channelPrefix))
{
var src = (byte[])arr;
byte[] copy = new byte[count - channelPrefix.Length];
Buffer.BlockCopy(src, offset + channelPrefix.Length, copy, 0, copy.Length);
byte[] copy = _payload.Slice(channelPrefix.Length).ToArray();
return new RedisChannel(copy, mode);
}
return default(RedisChannel);
......@@ -120,26 +119,20 @@ internal RedisValue AsRedisValue()
internal unsafe bool IsEqual(byte[] expected)
{
if (expected == null) throw new ArgumentNullException(nameof(expected));
if (expected.Length != count) return false;
var actual = arr as byte[];
if (actual == null) return false;
int octets = count / 8, spare = count % 8;
fixed (byte* actual8 = &actual[offset])
fixed (byte* expected8 = expected)
var rangeToCheck = _payload;
if (expected.Length != rangeToCheck.Length) return false;
if (rangeToCheck.IsSingleSegment) return rangeToCheck.First.Span.SequenceEqual(expected);
int offset = 0;
foreach (var segment in rangeToCheck)
{
long* actual64 = (long*)actual8;
long* expected64 = (long*)expected8;
var from = segment.Span;
var to = new Span<byte>(expected, offset, from.Length);
if (!from.SequenceEqual(to)) return false;
for (int i = 0; i < octets; i++)
{
if (actual64[i] != expected64[i]) return false;
}
int index = count - spare;
while (spare-- != 0)
{
if (actual8[index] != expected8[index]) return false;
}
offset += from.Length;
}
return true;
}
......@@ -147,35 +140,36 @@ internal unsafe bool IsEqual(byte[] expected)
internal bool AssertStarts(byte[] expected)
{
if (expected == null) throw new ArgumentNullException(nameof(expected));
if (expected.Length > count) return false;
var actual = arr as byte[];
if (actual == null) return false;
if (expected.Length > _payload.Length) return false;
var rangeToCheck = _payload.Slice(0, expected.Length);
if (rangeToCheck.IsSingleSegment) return rangeToCheck.First.Span.SequenceEqual(expected);
for (int i = 0; i < expected.Length; i++)
int offset = 0;
foreach(var segment in rangeToCheck)
{
if (expected[i] != actual[offset + i]) return false;
var from = segment.Span;
var to = new Span<byte>(expected, offset, from.Length);
if (!from.SequenceEqual(to)) return false;
offset += from.Length;
}
return true;
}
internal byte[] GetBlob()
{
var src = (byte[])arr;
if (src == null) return null;
if (IsNull) return null;
if (count == 0) return emptyBlob;
if (_payload.IsEmpty) return Array.Empty<byte>();
byte[] copy = new byte[count];
Buffer.BlockCopy(src, offset, copy, 0, count);
return copy;
return _payload.ToArray();
}
internal bool GetBoolean()
{
if (count != 1) throw new InvalidCastException();
byte[] actual = arr as byte[];
if (actual == null) throw new InvalidCastException();
switch (actual[offset])
if (_payload.Length != 1) throw new InvalidCastException();
switch (_payload.First.Span[0])
{
case (byte)'1': return true;
case (byte)'0': return false;
......@@ -185,7 +179,8 @@ internal bool GetBoolean()
internal RawResult[] GetItems()
{
return (RawResult[])arr;
if (Type == ResultType.MultiBulk) return _subArray;
throw new InvalidOperationException();
}
internal RedisKey[] GetItemsAsKeys()
......@@ -263,7 +258,7 @@ internal string[] GetItemsAsStrings()
return null;
}
var coords = items[0].GetArrayOfRawResults();
var coords = items[0].GetItems();
if (coords == null)
{
return null;
......@@ -287,7 +282,7 @@ internal string[] GetItemsAsStrings()
var arr = new GeoPosition?[items.Length];
for (int i = 0; i < arr.Length; i++)
{
RawResult[] item = items[i].GetArrayOfRawResults();
RawResult[] item = items[i].GetItems();
if (item == null)
{
arr[i] = null;
......@@ -301,45 +296,30 @@ internal string[] GetItemsAsStrings()
}
}
internal RawResult[] GetItemsAsRawResults()
{
return GetItems();
}
internal RawResult[] GetItemsAsRawResults() => GetItems();
// returns an array of RawResults
internal RawResult[] GetArrayOfRawResults()
internal string GetString()
{
if (arr == null)
{
return null;
}
else if (arr.Length == 0)
{
return new RawResult[0];
}
else
if (IsNull) return null;
if (_payload.IsEmpty) return "";
if (_payload.IsSingleSegment)
{
var rawResultArray = new RawResult[arr.Length];
for (int i = 0; i < arr.Length; i++)
var span = _payload.First.Span;
unsafe
{
var rawResult = (RawResult)arr.GetValue(i);
rawResultArray.SetValue(rawResult, i);
fixed (byte* ptr = &span[0])
{
return Encoding.UTF8.GetString(ptr, span.Length);
}
}
return rawResultArray;
}
}
internal string GetString()
{
if (arr == null) return null;
var blob = (byte[])arr;
if (blob.Length == 0) return "";
return Encoding.UTF8.GetString(blob, offset, count);
}
internal bool TryGetDouble(out double val)
{
if (arr == null)
if (IsNull)
{
val = 0;
return false;
......@@ -354,12 +334,17 @@ internal bool TryGetDouble(out double val)
internal bool TryGetInt64(out long value)
{
if (arr == null)
if(IsNull || _payload.IsEmpty || _payload.Length > PhysicalConnection.MaxInt64TextLen)
{
value = 0;
return false;
}
return RedisValue.TryParseInt64(arr as byte[], offset, count, out value);
if (_payload.IsSingleSegment) return RedisValue.TryParseInt64(_payload.First.Span, out value);
Span<byte> span = stackalloc byte[PhysicalConnection.MaxInt64TextLen];
_payload.CopyTo(span);
return RedisValue.TryParseInt64(span, out value);
}
}
}
......
......@@ -189,17 +189,18 @@ internal static unsafe int GetHashCode(byte[] value)
return acc;
}
}
internal static bool TryParseInt64(byte[] value, int offset, int count, out long result)
=> TryParseInt64(new ReadOnlySpan<byte>(value, offset, count), out result);
internal static bool TryParseInt64(ReadOnlySpan<byte> value, out long result)
{
result = 0;
if (value == null || count <= 0) return false;
if (value.IsEmpty) return false;
checked
{
int max = offset + count;
if (value[offset] == '-')
int max = value.Length;
if (value[0] == '-')
{
for (int i = offset + 1; i < max; i++)
for (int i = 1; i < max; i++)
{
var b = value[i];
if (b < '0' || b > '9') return false;
......@@ -209,7 +210,7 @@ internal static bool TryParseInt64(byte[] value, int offset, int count, out long
}
else
{
for (int i = offset; i < max; i++)
for (int i = 0; i < max; i++)
{
var b = value[i];
if (b < '0' || b > '9') return false;
......
......@@ -37,10 +37,10 @@ internal partial interface ISocketCallback
void OnHeartbeat();
/// <summary>
/// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary>
void Read();
///// <summary>
///// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
///// </summary>
//void Read();
/// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment