Commit 60f90766 authored by Marc Gravell's avatar Marc Gravell

provide a huge max buffer on the receive pipe (required splitting send/receive options)

parent b3c880b3
...@@ -835,7 +835,7 @@ private static LocalCertificateSelectionCallback GetAmbientCertificateCallback() ...@@ -835,7 +835,7 @@ private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
return null; return null;
} }
async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWriter log, PipeOptions pipeOptions) async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWriter log, SocketManager manager)
{ {
try try
{ {
...@@ -871,11 +871,11 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr ...@@ -871,11 +871,11 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr
Multiplexer.Trace("Encryption failure"); Multiplexer.Trace("Encryption failure");
return SocketMode.Abort; return SocketMode.Abort;
} }
pipe = StreamConnector.GetDuplex(ssl, pipeOptions, name: Bridge.Name); pipe = StreamConnector.GetDuplex(ssl, manager.SendPipeOptions, manager.ReceivePipeOptions, name: Bridge.Name);
} }
else else
{ {
pipe = SocketConnection.Create(socket, pipeOptions, name: Bridge.Name); pipe = SocketConnection.Create(socket, manager.SendPipeOptions, manager.ReceivePipeOptions, name: Bridge.Name);
} }
OnWrapForLogging(ref pipe, physicalName); OnWrapForLogging(ref pipe, physicalName);
......
...@@ -28,8 +28,8 @@ internal partial interface ISocketCallback ...@@ -28,8 +28,8 @@ internal partial interface ISocketCallback
/// </summary> /// </summary>
/// <param name="socket">The socket.</param> /// <param name="socket">The socket.</param>
/// <param name="log">A text logger to write to.</param> /// <param name="log">A text logger to write to.</param>
/// <param name="pipeOptions">Pipe configuration</param> /// <param name="manager">The manager that will be owning this socket.</param>
ValueTask<SocketMode> ConnectedAsync(Socket socket, TextWriter log, PipeOptions pipeOptions); ValueTask<SocketMode> ConnectedAsync(Socket socket, TextWriter log, SocketManager manager);
/// <summary> /// <summary>
/// Indicates that the socket has signalled an error condition /// Indicates that the socket has signalled an error condition
...@@ -139,15 +139,25 @@ public SocketManager(string name, bool useHighPrioritySocketThreads) ...@@ -139,15 +139,25 @@ public SocketManager(string name, bool useHighPrioritySocketThreads)
Task.Run(() => WriteAllQueuesAsync()); Task.Run(() => WriteAllQueuesAsync());
const int Receive_PauseWriterThreshold = 1024 * 1024 * 1024; // let's give it up to 1GiB of buffer for now
var defaultPipeOptions = PipeOptions.Default; var defaultPipeOptions = PipeOptions.Default;
_scheduler = new DedicatedThreadPoolPipeScheduler(name, priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal); _scheduler = new DedicatedThreadPoolPipeScheduler(name, priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
_pipeOptions = new PipeOptions( SendPipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler,
pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold,
resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold,
defaultPipeOptions.MinimumSegmentSize,
useSynchronizationContext: false);
ReceivePipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler, defaultPipeOptions.Pool, _scheduler, _scheduler,
defaultPipeOptions.PauseWriterThreshold, defaultPipeOptions.ResumeWriterThreshold, defaultPipeOptions.MinimumSegmentSize, pauseWriterThreshold: Receive_PauseWriterThreshold,
resumeWriterThreshold: Receive_PauseWriterThreshold / 2,
defaultPipeOptions.MinimumSegmentSize,
useSynchronizationContext: false); useSynchronizationContext: false);
} }
readonly DedicatedThreadPoolPipeScheduler _scheduler; readonly DedicatedThreadPoolPipeScheduler _scheduler;
readonly PipeOptions _pipeOptions; internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
private readonly Func<Task> _writeOneQueueAsync; private readonly Func<Task> _writeOneQueueAsync;
...@@ -274,7 +284,7 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl ...@@ -274,7 +284,7 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl
if (ignoreConnect) return; if (ignoreConnect) return;
socket.EndConnect(ar); socket.EndConnect(ar);
var socketMode = callback == null ? SocketMode.Abort : await callback.ConnectedAsync(socket, log, _pipeOptions); var socketMode = callback == null ? SocketMode.Abort : await callback.ConnectedAsync(socket, log, this);
switch (socketMode) switch (socketMode)
{ {
case SocketMode.Async: case SocketMode.Async:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment