Commit 86949f20 authored by Marc Gravell's avatar Marc Gravell

hot damn, it compiles; this could be very good or very bad

parent 996b182f
......@@ -26,20 +26,20 @@ internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
private static readonly AsyncCallback endRead = result =>
{
PhysicalConnection physical;
if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
try
{
physical.Multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading();
}
catch (Exception ex)
{
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
};
//private static readonly AsyncCallback endRead = result =>
//{
// PhysicalConnection physical;
// if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
// try
// {
// physical.Multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
// if (physical.EndReading(result)) physical.BeginReading();
// }
// catch (Exception ex)
// {
// physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
// }
//};
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
......@@ -975,7 +975,7 @@ void ISocketCallback.OnHeartbeat()
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name);
private async Task ReadFromPipe()
private async void ReadFromPipe() // yes it is an async void; deal with it!
{
try
{
......@@ -989,9 +989,9 @@ private async Task ReadFromPipe()
}
var buffer = readResult.Buffer;
int handled = ProcessBuffer(ref buffer);
int handled = ProcessBuffer(in buffer, out var consumed);
Multiplexer.Trace($"Processed {handled} messages", physicalName);
input.AdvanceTo(buffer.Start, buffer.End);
input.AdvanceTo(buffer.GetPosition(consumed), buffer.End);
}
Multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
......@@ -1002,27 +1002,27 @@ private async Task ReadFromPipe()
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
}
private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
private int ProcessBuffer(in ReadOnlySequence<byte> entireBuffer, out long consumed)
{
int messageCount = 0;
RawResult result;
while (!buffer.IsEmpty)
var remainingBuffer = entireBuffer; // create a snapshot so we can trim it after each decoded message
// (so that slicing later doesn't require us to keep skipping segments)
consumed = 0;
while (!remainingBuffer.IsEmpty)
{
// we want TryParseResult to be able to mess with these without consequence
var snapshot = buffer;
result = TryParseResult(ref buffer);
var reader = new BufferReader(remainingBuffer);
var result = TryParseResult(in remainingBuffer, ref reader);
if (result.HasValue)
{
messageCount++;
consumed += reader.TotalConsumed;
remainingBuffer = remainingBuffer.Slice(reader.TotalConsumed);
messageCount++;
Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result);
}
else
{
buffer = snapshot; // just in case TryParseResult toyed with it
break;
}
}
return messageCount;
}
......@@ -1060,9 +1060,9 @@ bool ISocketCallback.IsDataAvailable
}
}
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
private RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferReader reader)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
var itemCount = ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
if (itemCount.HasValue)
{
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", Bridge.ServerEndPoint);
......@@ -1071,18 +1071,18 @@ private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
if (itemCountActual < 0)
{
//for null response by command like EXEC, RESP array: *-1\r\n
return new RawResult(ResultType.SimpleString, null, 0, 0);
return RawResult.NullMultiBulk;
}
else if (itemCountActual == 0)
{
//for zero array response by command like SCAN, Resp array: *0\r\n
return RawResult.EmptyArray;
return RawResult.EmptyMultiBulk;
}
var arr = new RawResult[itemCountActual];
for (int i = 0; i < itemCountActual; i++)
{
if (!(arr[i] = TryParseResult(buffer, ref offset, ref count)).HasValue)
if (!(arr[i] = TryParseResult(in buffer, ref reader)).HasValue)
return RawResult.Nil;
}
return new RawResult(arr);
......@@ -1090,33 +1090,37 @@ private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
return RawResult.Nil;
}
private RawResult ReadBulkString(byte[] buffer, ref int offset, ref int count)
private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferReader reader)
{
var prefix = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
var prefix = ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
if (prefix.HasValue)
{
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", Bridge.ServerEndPoint);
int bodySize = checked((int)i64);
if (bodySize < 0)
{
return new RawResult(ResultType.BulkString, null, 0, 0);
return new RawResult(ResultType.BulkString, ReadOnlySequence<byte>.Empty, true);
}
else if (count >= bodySize + 2)
int from = reader.TotalConsumed;
if(reader.TryConsume(bodySize))
{
if (buffer[offset + bodySize] != '\r' || buffer[offset + bodySize + 1] != '\n')
switch(reader.TryConsumeCRLF())
{
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
}
var result = new RawResult(ResultType.BulkString, buffer, offset, bodySize);
offset += bodySize + 2;
count -= bodySize + 2;
return result;
case ConsumeResult.NeedMoreData:
break; // see NilResult below
case ConsumeResult.Success:
var payload = bodySize == 0 ? ReadOnlySequence<byte>.Empty : buffer.Slice(from, bodySize);
return new RawResult(ResultType.BulkString, payload, false);
default:
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
}
}
}
return RawResult.Nil;
}
private RawResult ReadLineTerminatedString(ResultType type, ref ReadOnlySequence<byte> buffer, ref BufferReader reader)
private RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence<byte> buffer, ref BufferReader reader)
{
int crlf = BufferReader.FindNextCrLf(reader);
......@@ -1126,48 +1130,38 @@ private RawResult ReadLineTerminatedString(ResultType type, ref ReadOnlySequence
var inner = buffer.Slice(reader.TotalConsumed, crlf);
reader.Consume(crlf + 2);
var result = new RawResult(type, inner);
return new RawResult(type, inner, false);
}
void ISocketCallback.StartReading()
{
BeginReading();
}
void ISocketCallback.StartReading() => ReadFromPipe();
private RawResult TryParseResult(ref ReadOnlySequence<byte> buffer)
private RawResult TryParseResult(in ReadOnlySequence<byte> buffer, ref BufferReader reader)
{
if (buffer.IsEmpty) return RawResult.Nil;
// so we have *at least* one byte
char resultType = (char)buffer.First.Span[0];
var reader = new BufferReader(buffer);
reader.Consume(1);
RawResult result;
switch (resultType)
var prefix = reader.ConsumeByte();
if (prefix < 0) return RawResult.Nil; // EOF
switch (prefix)
{
case '+': // simple string
result = ReadLineTerminatedString(ResultType.SimpleString, ref buffer, ref reader);
break;
return ReadLineTerminatedString(ResultType.SimpleString, in buffer, ref reader);
case '-': // error
result = ReadLineTerminatedString(ResultType.Error, ref buffer, ref reader);
break;
return ReadLineTerminatedString(ResultType.Error, in buffer, ref reader);
case ':': // integer
result = ReadLineTerminatedString(ResultType.Integer, ref buffer, ref reader);
break;
return ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
case '$': // bulk string
result = ReadBulkString(buffer, ref reader);
break;
return ReadBulkString(in buffer, ref reader);
case '*': // array
result = ReadArray(buffer, ref reader);
break;
return ReadArray(in buffer, ref reader);
default:
throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType);
throw new InvalidOperationException("Unexpected response prefix: " + (char)prefix);
}
buffer = buffer.Slice(reader.TotalConsumed);
return result;
}
public enum ConsumeResult
{
Failure,
Success,
NeedMoreData,
}
ref struct BufferReader
{
private ReadOnlySequence<byte>.Enumerator _iterator;
......@@ -1177,20 +1171,24 @@ private RawResult TryParseResult(ref ReadOnlySequence<byte> buffer)
public int OffsetThisSpan { get; private set; }
public int TotalConsumed { get; private set; }
public int RemainingThisSpan { get; private set; }
bool FetchNext()
public bool IsEmpty => RemainingThisSpan == 0;
bool FetchNextSegment()
{
if(_iterator.MoveNext())
do
{
if (!_iterator.MoveNext())
{
OffsetThisSpan = RemainingThisSpan = 0;
return false;
}
_current = _iterator.Current.Span;
OffsetThisSpan = 0;
RemainingThisSpan = _current.Length;
return true;
}
else
{
OffsetThisSpan = RemainingThisSpan = 0;
return false;
}
} while (IsEmpty); // skip empty segments, they don't help us!
return true;
}
public BufferReader(ReadOnlySequence<byte> buffer)
{
......@@ -1198,9 +1196,64 @@ public BufferReader(ReadOnlySequence<byte> buffer)
_current = default;
OffsetThisSpan = RemainingThisSpan = TotalConsumed = 0;
FetchNext();
FetchNextSegment();
}
static readonly byte[] CRLF = { (byte)'\r', (byte)'\n' };
/// <summary>
/// Note that in results other than success, no guarantees are made about final state; if you care: snapshot
/// </summary>
public ConsumeResult TryConsumeCRLF()
{
switch(RemainingThisSpan)
{
case 0:
return ConsumeResult.NeedMoreData;
case 1:
if (_current[OffsetThisSpan] != (byte)'\r') return ConsumeResult.Failure;
Consume(1);
if (IsEmpty) return ConsumeResult.NeedMoreData;
var next = _current[OffsetThisSpan];
Consume(1);
return next == '\n' ? ConsumeResult.Success : ConsumeResult.Failure;
default:
var offset = OffsetThisSpan;
var result = _current[offset++] == (byte)'\r' && _current[offset] == (byte)'\n'
? ConsumeResult.Success : ConsumeResult.Failure;
Consume(2);
return result;
}
}
public bool TryConsume(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
do
{
var available = RemainingThisSpan;
if (count <= available)
{
// consume part of this span
TotalConsumed += count;
RemainingThisSpan -= count;
OffsetThisSpan += count;
if (count == available) FetchNextSegment(); // burned all of it; fetch next
return true;
}
// consume all of this span
TotalConsumed += available;
count -= available;
} while (FetchNextSegment());
return false;
}
public void Consume(int count)
{
if (!TryConsume(count)) throw new EndOfStreamException();
}
internal static int FindNextCrLf(BufferReader reader) // very deliberately not ref; want snapshot
{
// is it in the current span? (we need to handle the offsets differently if so)
......@@ -1227,30 +1280,28 @@ public BufferReader(ReadOnlySequence<byte> buffer)
totalSkipped += span.Length;
}
}
while (reader.FetchNext());
while (reader.FetchNextSegment());
return -1;
}
public void Consume(int count)
//internal static bool HasBytes(BufferReader reader, int count) // very deliberately not ref; want snapshot
//{
// if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
// do
// {
// var available = reader.RemainingThisSpan;
// if (count <= available) return true;
// count -= available;
// } while (reader.FetchNextSegment());
// return false;
//}
public int ConsumeByte()
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
while(count != 0)
{
if(count < RemainingThisSpan)
{
// consume part of this span
TotalConsumed += count;
RemainingThisSpan -= count;
OffsetThisSpan += count;
count = 0;
}
else
{
// consume all of this span
TotalConsumed += RemainingThisSpan;
count -= RemainingThisSpan;
if (!FetchNext()) throw new EndOfStreamException();
}
}
if (IsEmpty) return -1;
var value = _current[OffsetThisSpan];
Consume(1);
return value;
}
}
......
......@@ -6,8 +6,9 @@ namespace StackExchange.Redis
{
internal readonly struct RawResult
{
public static readonly RawResult EmptyArray = new RawResult(new RawResult[0]);
public static readonly RawResult Nil = new RawResult();
internal static readonly RawResult NullMultiBulk = new RawResult((RawResult[])null);
internal static readonly RawResult EmptyMultiBulk = new RawResult(new RawResult[0]);
internal static readonly RawResult Nil = new RawResult();
private readonly ReadOnlySequence<byte> _payload;
......@@ -36,8 +37,9 @@ public RawResult(ResultType resultType, ReadOnlySequence<byte> payload, bool isN
public RawResult(RawResult[] arr)
{
_type = ResultType.MultiBulk;
if (arr == null) _type |= NullResultTypeBit;
_payload = default;
_subArray = arr ?? throw new ArgumentNullException(nameof(arr));
_subArray = arr;
}
public bool HasValue => Type != ResultType.None;
......@@ -298,7 +300,7 @@ internal string[] GetItemsAsStrings()
internal RawResult[] GetItemsAsRawResults() => GetItems();
internal string GetString()
internal unsafe string GetString()
{
if (IsNull) return null;
if (_payload.IsEmpty) return "";
......@@ -306,15 +308,44 @@ internal string GetString()
if (_payload.IsSingleSegment)
{
var span = _payload.First.Span;
unsafe
fixed (byte* ptr = &span[0])
{
fixed (byte* ptr = &span[0])
return Encoding.UTF8.GetString(ptr, span.Length);
}
}
var decoder = Encoding.UTF8.GetDecoder();
int charCount = 0;
foreach(var segment in _payload)
{
var span = segment.Span;
if (span.IsEmpty) continue;
fixed(byte* bPtr = &span[0])
{
charCount += decoder.GetCharCount(bPtr, span.Length, false);
}
}
decoder.Reset();
string s = new string((char)0, charCount);
fixed (char* sPtr = s)
{
char* cPtr = sPtr;
foreach (var segment in _payload)
{
var span = segment.Span;
if (span.IsEmpty) continue;
fixed (byte* bPtr = &span[0])
{
return Encoding.UTF8.GetString(ptr, span.Length);
var written = decoder.GetChars(bPtr, span.Length, cPtr, charCount, false);
cPtr += written;
charCount -= written;
}
}
}
return Encoding.UTF8.GetString(blob, offset, count);
return s;
}
internal bool TryGetDouble(out double val)
......
......@@ -1233,7 +1233,7 @@ private static GeoRadiusResult Parse(GeoRadiusOptions options, RawResult item)
return new GeoRadiusResult(item.AsRedisValue(), null, null, null);
}
// If WITHCOORD, WITHDIST or WITHHASH options are specified, the command returns an array of arrays, where each sub-array represents a single item.
var arr = item.GetArrayOfRawResults();
var arr = item.GetItems();
int index = 0;
// the first item in the sub-array is always the name of the returned item.
......@@ -1251,7 +1251,7 @@ private static GeoRadiusResult Parse(GeoRadiusOptions options, RawResult item)
if ((options & GeoRadiusOptions.WithGeoHash) != 0) { hash = (long?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithCoordinates) != 0)
{
var coords = arr[index++].GetArrayOfRawResults();
var coords = arr[index++].GetItems();
double longitude = (double)coords[0].AsRedisValue(), latitude = (double)coords[1].AsRedisValue();
position = new GeoPosition(longitude, latitude);
}
......@@ -1451,7 +1451,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Type)
{
case ResultType.MultiBulk:
var arrayOfArrays = result.GetArrayOfRawResults();
var arrayOfArrays = result.GetItems();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Length][];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment