Commit 82a87cc1 authored by Marc Gravell's avatar Marc Gravell

lib update to v88 (the Kestrel edition); fix MovedProfiling test

parent 995e06b2
...@@ -740,7 +740,7 @@ public void MovedProfiling() ...@@ -740,7 +740,7 @@ public void MovedProfiling()
Assert.True(msg.CreationToEnqueued > TimeSpan.Zero); Assert.True(msg.CreationToEnqueued > TimeSpan.Zero);
Assert.True(msg.EnqueuedToSending > TimeSpan.Zero); Assert.True(msg.EnqueuedToSending > TimeSpan.Zero);
Assert.True(msg.SentToResponse > TimeSpan.Zero); Assert.True(msg.SentToResponse > TimeSpan.Zero);
Assert.True(msg.ResponseToCompletion > TimeSpan.Zero); Assert.True(msg.ResponseToCompletion >= TimeSpan.Zero); // this can be immeasurably fast
Assert.True(msg.ElapsedTime > TimeSpan.Zero); Assert.True(msg.ElapsedTime > TimeSpan.Zero);
if (msg.RetransmissionOf != null) if (msg.RetransmissionOf != null)
......
...@@ -23,7 +23,10 @@ public void CheckForSocketLeaks() ...@@ -23,7 +23,10 @@ public void CheckForSocketLeaks()
// Force GC before memory dump in debug below... // Force GC before memory dump in debug below...
CollectGarbage(); CollectGarbage();
Debugger.Break(); if (Debugger.IsAttached)
{
Debugger.Break();
}
} }
} }
} }
...@@ -90,83 +90,80 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -90,83 +90,80 @@ internal async void BeginConnectAsync(TextWriter log)
CancellationTokenSource timeoutSource = null; CancellationTokenSource timeoutSource = null;
try try
{ {
var awaitable = new SocketAwaitable(); using (var args = new SocketAwaitableEventArgs
using (var _socketArgs = new SocketAsyncEventArgs
{ {
UserToken = awaitable,
RemoteEndPoint = endpoint, RemoteEndPoint = endpoint,
}) })
{ {
_socketArgs.Completed += SocketAwaitable.Callback;
var x = VolatileSocket; var x = VolatileSocket;
if (x == null) if (x == null)
{ {
awaitable.TryComplete(0, SocketError.ConnectionAborted); args.Abort();
} }
else if (x.ConnectAsync(_socketArgs)) else if (x.ConnectAsync(args))
{ // asynchronous operation is pending { // asynchronous operation is pending
timeoutSource = ConfigureTimeout(_socketArgs, bridge.Multiplexer.RawConfig.ConnectTimeout); timeoutSource = ConfigureTimeout(args, bridge.Multiplexer.RawConfig.ConnectTimeout);
} }
else else
{ // completed synchronously { // completed synchronously
SocketAwaitable.OnCompleted(_socketArgs); args.Complete();
} }
}
// Complete connection
try
{
// If we're told to ignore connect, abort here
if (BridgeCouldBeNull?.Multiplexer?.IgnoreConnect ?? false) return;
await awaitable; // wait for the connect to complete or fail (will throw) // Complete connection
if (timeoutSource != null) try
{
timeoutSource.Cancel();
timeoutSource.Dispose();
}
var x = VolatileSocket;
if (x == null)
{
ConnectionMultiplexer.TraceWithoutContext("Socket was already aborted");
}
else if (await ConnectedAsync(x, log, bridge.Multiplexer.SocketManager).ForAwait())
{ {
bridge.Multiplexer.LogLocked(log, "Starting read"); // If we're told to ignore connect, abort here
try if (BridgeCouldBeNull?.Multiplexer?.IgnoreConnect ?? false) return;
await args; // wait for the connect to complete or fail (will throw)
if (timeoutSource != null)
{
timeoutSource.Cancel();
timeoutSource.Dispose();
}
x = VolatileSocket;
if (x == null)
{
ConnectionMultiplexer.TraceWithoutContext("Socket was already aborted");
}
else if (await ConnectedAsync(x, log, bridge.Multiplexer.SocketManager).ForAwait())
{ {
StartReading(); bridge.Multiplexer.LogLocked(log, "Starting read");
// Normal return try
{
StartReading();
// Normal return
}
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext(ex.Message);
Shutdown();
}
} }
catch (Exception ex) else
{ {
ConnectionMultiplexer.TraceWithoutContext(ex.Message); ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown(); Shutdown();
} }
} }
else catch (ObjectDisposedException)
{
ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown();
}
}
catch (ObjectDisposedException)
{
bridge.Multiplexer.LogLocked(log, "(socket shutdown)");
try { RecordConnectionFailed(ConnectionFailureType.UnableToConnect, isInitialConnect: true); }
catch (Exception inner)
{ {
ConnectionMultiplexer.TraceWithoutContext(inner.Message); bridge.Multiplexer.LogLocked(log, "(socket shutdown)");
try { RecordConnectionFailed(ConnectionFailureType.UnableToConnect, isInitialConnect: true); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
} }
} catch (Exception outer)
catch (Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
try { RecordConnectionFailed(ConnectionFailureType.UnableToConnect, isInitialConnect: true); }
catch (Exception inner)
{ {
ConnectionMultiplexer.TraceWithoutContext(inner.Message); ConnectionMultiplexer.TraceWithoutContext(outer.Message);
try { RecordConnectionFailed(ConnectionFailureType.UnableToConnect, isInitialConnect: true); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
} }
} }
} }
...@@ -184,7 +181,7 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -184,7 +181,7 @@ internal async void BeginConnectAsync(TextWriter log)
} }
} }
private static CancellationTokenSource ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilliseconds) private static CancellationTokenSource ConfigureTimeout(SocketAwaitableEventArgs args, int timeoutMilliseconds)
{ {
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var timeout = Task.Delay(timeoutMilliseconds, cts.Token); var timeout = Task.Delay(timeoutMilliseconds, cts.Token);
...@@ -192,11 +189,9 @@ private static CancellationTokenSource ConfigureTimeout(SocketAsyncEventArgs arg ...@@ -192,11 +189,9 @@ private static CancellationTokenSource ConfigureTimeout(SocketAsyncEventArgs arg
{ {
try try
{ {
var a = (SocketAsyncEventArgs)state; var a = (SocketAwaitableEventArgs)state;
if (((SocketAwaitable)a.UserToken).TryComplete(0, SocketError.TimedOut)) a.Abort(SocketError.TimedOut);
{ Socket.CancelConnectAsync(a);
Socket.CancelConnectAsync(a);
}
} }
catch { } catch { }
}, args); }, args);
......
...@@ -32,8 +32,10 @@ internal RedisValue(object obj, long val) ...@@ -32,8 +32,10 @@ internal RedisValue(object obj, long val)
_objectOrSentinel = obj; _objectOrSentinel = obj;
_memory = default; _memory = default;
} }
#pragma warning disable RCS1085 // Use auto-implemented property.
internal object DirectObject => _objectOrSentinel; internal object DirectObject => _objectOrSentinel;
internal long DirectInt64 => _overlappedValue64; internal long DirectInt64 => _overlappedValue64;
#pragma warning restore RCS1085 // Use auto-implemented property.
private readonly static object Sentinel_Integer = new object(); private readonly static object Sentinel_Integer = new object();
private readonly static object Sentinel_Raw = new object(); private readonly static object Sentinel_Raw = new object();
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.87" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.88" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.0" /> <PackageReference Include="System.IO.Pipelines" Version="4.5.0" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
using System; using System;
using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
...@@ -22,6 +21,7 @@ private static void Main() ...@@ -22,6 +21,7 @@ private static void Main()
Console.WriteLine(i++); Console.WriteLine(i++);
RunCompetingBatchesOnSameMuxer(); RunCompetingBatchesOnSameMuxer();
} while (DateTime.UtcNow < stop); } while (DateTime.UtcNow < stop);
Console.WriteLine($"Completed {i} iterations, {2 * i * IterationCount * InnerCount} operations");
} }
private static ConnectionMultiplexer Create() private static ConnectionMultiplexer Create()
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment