Commit bbad0492 authored by Nick Craver's avatar Nick Craver

This is a relatively minor pass at simplifying the connect logic.

This was an expedition, but here's the history:
- There are 2 forks in the connection logic
 - One for Mono (the DnsEndpoint split, see #20 and #155)
 - One for .BeginConnect() and one for ConnectAsync (because netstandard1.x was "async all the things!")
- There's a behind-the-scenes fork on completion type, but for actual library consumers (and not tests), it was always .Any, which was effectively .Sync.

There was also a critical bug fix in #113 that unfortuantely added a lot of complexity here. The maintenance problem is this complexity wasn't even used, as far as I can tell. It was only ever exposed to even be possibly used in the test project. So we had complicated connection logic, only for the sake of testing it. Unless I'm an idiot, which is entirely possible.

This commit removes that logic and simplifies things, as a first step that's a unit in itself. It does not fix Mono. There are other issues there, but thanks to WSL I can readily run the test suite under Mono on Linux now. Mono has a littany of other issues which I'll comment on in #314.

The most important thing here is that we don't regress what was fixed in #113, as this doesn't readily show in tests.
parent b987ab3f
......@@ -10,11 +10,8 @@ public class ConnectToUnexistingHost : TestBase
public ConnectToUnexistingHost(ITestOutputHelper output) : base (output) { }
#if DEBUG
[Theory]
[InlineData(CompletionType.Any)]
[InlineData(CompletionType.Sync)]
[InlineData(CompletionType.Async)]
public void FailsWithinTimeout(CompletionType completionType)
[Fact]
public void FailsWithinTimeout()
{
const int timeout = 1000;
var sw = Stopwatch.StartNew();
......@@ -25,9 +22,7 @@ public void FailsWithinTimeout(CompletionType completionType)
EndPoints = { { "invalid", 1234 } },
ConnectTimeout = timeout
};
SocketManager.ConnectCompletionType = completionType;
using (var muxer = ConnectionMultiplexer.Connect(config, Writer))
{
Thread.Sleep(10000);
......@@ -42,11 +37,7 @@ public void FailsWithinTimeout(CompletionType completionType)
Output.WriteLine("Timeout: " + timeout);
Assert.True(elapsed < 9000, "Connect should fail within ConnectTimeout, ElapsedMs: " + elapsed);
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
}
}
#endif
}
}
\ No newline at end of file
}
......@@ -23,7 +23,6 @@ public void FastNoticesFailOnConnectingSyncComlpetion()
var server2 = muxer.GetServer(muxer.GetEndPoints()[1]);
muxer.AllowConnect = false;
SocketManager.ConnectCompletionType = CompletionType.Sync;
// muxer.IsConnected is true of *any* are connected, simulate failure for all cases.
server.SimulateConnectionFailure();
......@@ -46,7 +45,6 @@ public void FastNoticesFailOnConnectingSyncComlpetion()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......@@ -56,8 +54,6 @@ public void ConnectsWhenBeginConnectCompletesSynchronously()
{
try
{
SocketManager.ConnectCompletionType = CompletionType.Sync;
using (var muxer = Create(keepAlive: 1, connectTimeout: 3000))
{
var conn = muxer.GetDatabase();
......@@ -68,7 +64,6 @@ public void ConnectsWhenBeginConnectCompletesSynchronously()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......@@ -87,7 +82,6 @@ public void FastNoticesFailOnConnectingAsyncComlpetion()
var server2 = muxer.GetServer(muxer.GetEndPoints()[1]);
muxer.AllowConnect = false;
SocketManager.ConnectCompletionType = CompletionType.Async;
// muxer.IsConnected is true of *any* are connected, simulate failure for all cases.
server.SimulateConnectionFailure();
......@@ -110,7 +104,6 @@ public void FastNoticesFailOnConnectingAsyncComlpetion()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......
......@@ -114,7 +114,6 @@ public void CheckFailureRecovered()
var server = muxer.GetServer(muxer.GetEndPoints()[0]);
muxer.AllowConnect = false;
SocketManager.ConnectCompletionType = CompletionType.Async;
server.SimulateConnectionFailure();
......@@ -129,7 +128,6 @@ public void CheckFailureRecovered()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......
......@@ -36,7 +36,6 @@ public void MultipleEndpointsThrowAggregateException()
{
var conn = muxer.GetDatabase();
muxer.AllowConnect = false;
SocketManager.ConnectCompletionType = CompletionType.Async;
foreach (var endpoint in muxer.GetEndPoints())
{
......@@ -56,7 +55,6 @@ public void MultipleEndpointsThrowAggregateException()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......@@ -70,7 +68,6 @@ public void NullInnerExceptionForMultipleEndpointsWithNoLastException()
{
var conn = muxer.GetDatabase();
muxer.AllowConnect = false;
SocketManager.ConnectCompletionType = CompletionType.Async;
var ex = ExceptionFactory.NoConnectionAvailable(true, true, new RedisCommand(), null, null, muxer.GetServerSnapshot());
Assert.IsType<RedisConnectionException>(ex);
Assert.Null(ex.InnerException);
......@@ -78,7 +75,6 @@ public void NullInnerExceptionForMultipleEndpointsWithNoLastException()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......@@ -92,7 +88,6 @@ public void ServerTakesPrecendenceOverSnapshot()
{
var conn = muxer.GetDatabase();
muxer.AllowConnect = false;
SocketManager.ConnectCompletionType = CompletionType.Async;
muxer.GetServer(muxer.GetEndPoints()[0]).SimulateConnectionFailure();
......@@ -104,7 +99,6 @@ public void ServerTakesPrecendenceOverSnapshot()
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
ClearAmbientFailures();
}
}
......
......@@ -185,16 +185,6 @@ public partial class SocketManager
{
ignore = callback.IgnoreConnect;
}
/// <summary>
/// Completion type for BeginConnect call
/// </summary>
public static CompletionType ConnectCompletionType { get; set; }
partial void ShouldForceConnectCompletionType(ref CompletionType completionType)
{
completionType = SocketManager.ConnectCompletionType;
}
}
internal partial interface ISocketCallback
......@@ -270,25 +260,6 @@ public static bool EmulateStaleConnection
}
#endif
/// <summary>
/// Completion type for CompletionTypeHelper
/// </summary>
public enum CompletionType
{
/// <summary>
/// Retain original completion type (either sync or async)
/// </summary>
Any = 0,
/// <summary>
/// Force sync completion
/// </summary>
Sync = 1,
/// <summary>
/// Force async completion
/// </summary>
Async = 2
}
#if FEATURE_PERFCOUNTER
internal static class PerfCounterHelper
{
......@@ -336,49 +307,6 @@ public static bool TryGetSystemCPU(out float value)
}
}
#endif
#if FEATURE_THREADPOOL
internal static class CompletionTypeHelper
{
public static void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback callback, CompletionType completionType)
{
AsyncCallback proxyCallback;
if (completionType == CompletionType.Any)
{
proxyCallback = ar =>
{
if (!ar.CompletedSynchronously)
{
callback(ar);
}
};
}
else
{
proxyCallback = _ => { };
}
var result = beginAsync(proxyCallback);
if (completionType == CompletionType.Any && !result.CompletedSynchronously)
{
return;
}
result.AsyncWaitHandle.WaitOne();
switch (completionType)
{
case CompletionType.Async:
ThreadPool.QueueUserWorkItem(_ => callback(result));
break;
case CompletionType.Any:
case CompletionType.Sync:
callback(result);
break;
}
}
}
#endif
#if VERBOSE
......
......@@ -179,22 +179,39 @@ public void Dispose()
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{
#if !NETSTANDARD1_5
void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback)
{
void proxyCallback(IAsyncResult ar)
{
if (!ar.CompletedSynchronously)
{
asyncCallback(ar);
}
}
var result = beginAsync(proxyCallback);
if (result.CompletedSynchronously)
{
result.AsyncWaitHandle.WaitOne();
asyncCallback(result);
}
}
#endif
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SetFastLoopbackOption(socket);
socket.NoDelay = true;
try
{
var connectCompletionType = CompletionType.Any;
ShouldForceConnectCompletionType(ref connectCompletionType);
var formattedEndpoint = Format.ToString(endpoint);
var tuple = Tuple.Create(socket, callback);
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
if (endpoint is DnsEndPoint dnsEndpoint)
{
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
#if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
#if NETSTANDARD1_5 // No BeginConnect there, because everything was an Async push...we should drop this
socket.ConnectAsync(dnsEndpoint.Host, dnsEndpoint.Port).ContinueWith(t =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
......@@ -202,40 +219,31 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
#else
CompletionTypeHelper.RunWithCompletionType(
cb => {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple);
},
RunWithCompletionType(
cb => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple),
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType);
});
#endif
}
else
{
#if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
#if NETSTANDARD1_5 // No BeginConnect there, because everything was an Async push...we should drop this
socket.ConnectAsync(endpoint).ContinueWith(t =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(t, multiplexer, log, tuple);
});
#else
CompletionTypeHelper.RunWithCompletionType(
cb => {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(endpoint, cb, tuple);
},
RunWithCompletionType(
cb => socket.BeginConnect(endpoint, cb, tuple),
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType);
});
#endif
}
}
......@@ -385,9 +393,7 @@ private void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer,
partial void OnShutdown(Socket socket);
partial void ShouldIgnoreConnect(ISocketCallback callback, ref bool ignore);
partial void ShouldForceConnectCompletionType(ref CompletionType completionType);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown(Socket socket)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment