Unverified Commit 77f3ff69 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Pipelines race (#864)

* really nasty commit - incomplete investigating thread-race

* fixed race via scheduler

* revert physicalconnection.cs
parent 83c0ef10
...@@ -31,6 +31,6 @@ ...@@ -31,6 +31,6 @@
<PackageReference Include="System.Memory" Version="$(CoreFxVersion)" /> <PackageReference Include="System.Memory" Version="$(CoreFxVersion)" />
<PackageReference Include="System.Buffers" Version="$(CoreFxVersion)" /> <PackageReference Include="System.Buffers" Version="$(CoreFxVersion)" />
<PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" /> <PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.0-alpha-007" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.33" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -137,7 +137,17 @@ public SocketManager(string name, bool useHighPrioritySocketThreads) ...@@ -137,7 +137,17 @@ public SocketManager(string name, bool useHighPrioritySocketThreads)
_writeOneQueueAsync = () => WriteOneQueueAsync(); _writeOneQueueAsync = () => WriteOneQueueAsync();
Task.Run(() => WriteAllQueuesAsync()); Task.Run(() => WriteAllQueuesAsync());
var defaultPipeOptions = PipeOptions.Default;
_scheduler = new DedicatedThreadPoolPipeScheduler(name, priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
_pipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler,
defaultPipeOptions.PauseWriterThreshold, defaultPipeOptions.ResumeWriterThreshold, defaultPipeOptions.MinimumSegmentSize,
useSynchronizationContext: false);
} }
readonly DedicatedThreadPoolPipeScheduler _scheduler;
readonly PipeOptions _pipeOptions;
private readonly Func<Task> _writeOneQueueAsync; private readonly Func<Task> _writeOneQueueAsync;
...@@ -152,6 +162,7 @@ private enum CallbackOperation ...@@ -152,6 +162,7 @@ private enum CallbackOperation
/// </summary> /// </summary>
public void Dispose() public void Dispose()
{ {
_scheduler?.Dispose();
lock (writeQueue) lock (writeQueue)
{ {
// make sure writer threads know to exit // make sure writer threads know to exit
...@@ -160,23 +171,16 @@ public void Dispose() ...@@ -160,23 +171,16 @@ public void Dispose()
} }
OnDispose(); OnDispose();
} }
//static readonly PipeOptions PipeOptions = PipeOptions.Default;
static readonly PipeOptions PipeOptions = new PipeOptions(
readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log) internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily; var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SocketConnection.SetRecommendedClientOptions(socket);
try try
{ {
var formattedEndpoint = Format.ToString(endpoint); var formattedEndpoint = Format.ToString(endpoint);
var t = SocketConnection.ConnectAsync(endpoint, _pipeOptions,
var t = SocketConnection.ConnectAsync(endpoint, PipeOptions,
SocketConnectionOptions.SyncReader | SocketConnectionOptions.SyncWriter,
// SocketConnectionOptions.None,
onConnected: conn => EndConnectAsync(conn, multiplexer, log, callback), onConnected: conn => EndConnectAsync(conn, multiplexer, log, callback),
socket: socket); socket: socket);
GC.KeepAlive(t); // make compiler happier GC.KeepAlive(t); // make compiler happier
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment