Commit 079db398 authored by Marc Gravell's avatar Marc Gravell

fix pathological case when consuming *really really* buffers (SMEMBERS on 5M...

fix pathological case when consuming *really really* buffers (SMEMBERS on 5M rows for example) - have BufferReader keep a SequencePosition "snapshot", and compute sub-buffers using that

(also fixes bug where LoggingPipe wasn't using the right thresholds, but that's a debug tool only)
parent 102039d5
......@@ -323,13 +323,13 @@ partial class PhysicalConnection
// echo = null;
// }
//}
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name)
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name, SocketManager mgr)
{
foreach(var c in Path.GetInvalidFileNameChars())
{
name = name.Replace(c, '_');
}
pipe = new LoggingPipe(pipe, $"{name}.in", $"{name}.out");
pipe = new LoggingPipe(pipe, $"{name}.in", $"{name}.out", mgr);
}
}
#endif
......
......@@ -11,7 +11,7 @@ sealed class LoggingPipe : IDuplexPipe
{
private IDuplexPipe _inner;
public LoggingPipe(IDuplexPipe inner, string inPath, string outPath)
public LoggingPipe(IDuplexPipe inner, string inPath, string outPath, SocketManager mgr)
{
_inner = inner;
if (string.IsNullOrWhiteSpace(inPath))
......@@ -20,7 +20,7 @@ public LoggingPipe(IDuplexPipe inner, string inPath, string outPath)
}
else
{
var pipe = new Pipe();
var pipe = new Pipe(mgr.ReceivePipeOptions);
Input = pipe.Reader;
CloneAsync(inPath, inner.Input, pipe.Writer);
}
......@@ -31,7 +31,7 @@ public LoggingPipe(IDuplexPipe inner, string inPath, string outPath)
}
else
{
var pipe = new Pipe();
var pipe = new Pipe(mgr.SendPipeOptions);
Output = pipe.Writer;
CloneAsync(outPath, pipe.Reader, inner.Output);
}
......
......@@ -919,7 +919,7 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr
{
pipe = SocketConnection.Create(socket, manager.SendPipeOptions, manager.ReceivePipeOptions, name: Bridge.Name);
}
OnWrapForLogging(ref pipe, physicalName);
OnWrapForLogging(ref pipe, physicalName, manager);
_ioPipe = pipe;
......@@ -1021,7 +1021,7 @@ void ISocketCallback.OnHeartbeat()
}
}
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name);
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name, SocketManager mgr);
private async void ReadFromPipe() // yes it is an async void; deal with it!
{
......@@ -1177,15 +1177,13 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea
return new RawResult(ResultType.BulkString, ReadOnlySequence<byte>.Empty, true);
}
int from = reader.TotalConsumed;
if(reader.TryConsume(bodySize))
if(reader.TryConsumeAsBuffer(bodySize, out var payload))
{
switch(reader.TryConsumeCRLF())
{
case ConsumeResult.NeedMoreData:
break; // see NilResult below
case ConsumeResult.Success:
var payload = bodySize == 0 ? ReadOnlySequence<byte>.Empty : buffer.Slice(from, bodySize);
return new RawResult(ResultType.BulkString, payload, false);
default:
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
......@@ -1200,8 +1198,8 @@ private RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence<
int crlfOffsetFromCurrent = BufferReader.FindNextCrLf(reader);
if (crlfOffsetFromCurrent < 0) return RawResult.Nil;
var payload = buffer.Slice(reader.TotalConsumed, crlfOffsetFromCurrent);
reader.Consume(crlfOffsetFromCurrent + 2);
var payload = reader.ConsumeAsBuffer(crlfOffsetFromCurrent);
reader.Consume(2);
return new RawResult(type, payload, false);
}
......@@ -1270,6 +1268,9 @@ private bool FetchNextSegment()
public BufferReader(ReadOnlySequence<byte> buffer)
{
_buffer = buffer;
_lastSnapshotPosition = buffer.Start;
_lastSnapshotBytes = 0;
_iterator = buffer.GetEnumerator();
_current = default;
OffsetThisSpan = RemainingThisSpan = TotalConsumed = 0;
......@@ -1303,7 +1304,6 @@ public ConsumeResult TryConsumeCRLF()
return result;
}
}
public bool TryConsume(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
......@@ -1328,6 +1328,40 @@ public bool TryConsume(int count)
return false;
}
readonly ReadOnlySequence<byte> _buffer;
SequencePosition _lastSnapshotPosition;
long _lastSnapshotBytes;
// makes an internal note of where we are, as a SequencePosition; useful
// to avoid having to use buffer.Slice on huge ranges
SequencePosition SnapshotPosition()
{
var consumed = TotalConsumed;
var delta = consumed - _lastSnapshotBytes;
if (delta == 0) return _lastSnapshotPosition;
var pos = _buffer.GetPosition(delta, _lastSnapshotPosition);
_lastSnapshotBytes = consumed;
return _lastSnapshotPosition = pos;
}
public ReadOnlySequence<byte> ConsumeAsBuffer(int count)
{
if(!TryConsumeAsBuffer(count, out var buffer)) throw new EndOfStreamException();
return buffer;
}
public bool TryConsumeAsBuffer(int count, out ReadOnlySequence<byte> buffer)
{
var from = SnapshotPosition();
if (!TryConsume(count))
{
buffer = default;
return false;
}
var to = SnapshotPosition();
buffer = _buffer.Slice(from, to);
return true;
}
public void Consume(int count)
{
if (!TryConsume(count)) throw new EndOfStreamException();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment