Commit 6ba8f745 authored by Marc Gravell's avatar Marc Gravell

fix one more large slice based on the BufferReader; also increase default segment size

parent 079db398
...@@ -1079,7 +1079,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer) ...@@ -1079,7 +1079,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
{ {
if (result.HasValue) if (result.HasValue)
{ {
buffer = buffer.Slice(reader.TotalConsumed); buffer = reader.SliceFromCurrent();
messageCount++; messageCount++;
Multiplexer.Trace(result.ToString(), physicalName); Multiplexer.Trace(result.ToString(), physicalName);
...@@ -1243,7 +1243,7 @@ public enum ConsumeResult ...@@ -1243,7 +1243,7 @@ public enum ConsumeResult
public ReadOnlySpan<byte> SlicedSpan => _current.Slice(OffsetThisSpan, RemainingThisSpan); public ReadOnlySpan<byte> SlicedSpan => _current.Slice(OffsetThisSpan, RemainingThisSpan);
public int OffsetThisSpan { get; private set; } public int OffsetThisSpan { get; private set; }
public int TotalConsumed { get; private set; } private int TotalConsumed { get; set; } // hide this; callers should use the snapshot-aware methods instead
public int RemainingThisSpan { get; private set; } public int RemainingThisSpan { get; private set; }
public bool IsEmpty => RemainingThisSpan == 0; public bool IsEmpty => RemainingThisSpan == 0;
...@@ -1413,6 +1413,12 @@ public int ConsumeByte() ...@@ -1413,6 +1413,12 @@ public int ConsumeByte()
Consume(1); Consume(1);
return value; return value;
} }
public ReadOnlySequence<byte> SliceFromCurrent()
{
var from = SnapshotPosition();
return _buffer.Slice(from);
}
} }
partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite); partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite);
......
...@@ -147,7 +147,7 @@ internal static SocketManager Shared ...@@ -147,7 +147,7 @@ internal static SocketManager Shared
public SocketManager(string name, bool useHighPrioritySocketThreads) public SocketManager(string name, bool useHighPrioritySocketThreads)
: this(name, useHighPrioritySocketThreads, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { } : this(name, useHighPrioritySocketThreads, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { }
private const int DEFAULT_MIN_THREADS = 1, DEFAULT_MAX_THREADS = 5; private const int DEFAULT_MIN_THREADS = 1, DEFAULT_MAX_THREADS = 5, MINIMUM_SEGMENT_SIZE = 8 * 1024;
private SocketManager(string name, bool useHighPrioritySocketThreads, int minThreads, int maxThreads) private SocketManager(string name, bool useHighPrioritySocketThreads, int minThreads, int maxThreads)
{ {
...@@ -165,13 +165,13 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr ...@@ -165,13 +165,13 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr
defaultPipeOptions.Pool, _scheduler, _scheduler, defaultPipeOptions.Pool, _scheduler, _scheduler,
pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold, pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold,
resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold, resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold,
defaultPipeOptions.MinimumSegmentSize, minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false); useSynchronizationContext: false);
ReceivePipeOptions = new PipeOptions( ReceivePipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler, defaultPipeOptions.Pool, _scheduler, _scheduler,
pauseWriterThreshold: Receive_PauseWriterThreshold, pauseWriterThreshold: Receive_PauseWriterThreshold,
resumeWriterThreshold: Receive_ResumeWriterThreshold, resumeWriterThreshold: Receive_ResumeWriterThreshold,
defaultPipeOptions.MinimumSegmentSize, minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false); useSynchronizationContext: false);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment