Commit 8a7fc8f4 authored by Nick Craver's avatar Nick Craver Committed by Nick Craver

Simplifying socket code

This moves the socket bits all the physical connection which I think is a lot easier to grok than the double relationship between it and SocketManager. Pushing a PR for sanity check.
parent e00fa9e3
...@@ -56,14 +56,6 @@ public partial class ConnectionMultiplexer ...@@ -56,14 +56,6 @@ public partial class ConnectionMultiplexer
public bool IgnoreConnect { get { return ignoreConnect; } set { ignoreConnect = value; } } public bool IgnoreConnect { get { return ignoreConnect; } set { ignoreConnect = value; } }
} }
public partial class SocketManager
{
partial void ShouldIgnoreConnect(PhysicalConnection callback, ref bool ignore)
{
ignore = callback.IgnoreConnect;
}
}
internal partial class PhysicalBridge internal partial class PhysicalBridge
{ {
internal void SimulateConnectionFailure() internal void SimulateConnectionFailure()
...@@ -78,6 +70,11 @@ internal void SimulateConnectionFailure() ...@@ -78,6 +70,11 @@ internal void SimulateConnectionFailure()
internal partial class PhysicalConnection internal partial class PhysicalConnection
{ {
partial void ShouldIgnoreConnect(ref bool ignore)
{
ignore = IgnoreConnect;
}
partial void OnDebugAbort() partial void OnDebugAbort()
{ {
if (!Multiplexer.AllowConnect) if (!Multiplexer.AllowConnect)
......
...@@ -598,7 +598,7 @@ private PhysicalConnection GetConnection(TextWriter log) ...@@ -598,7 +598,7 @@ private PhysicalConnection GetConnection(TextWriter log)
// separate creation and connection for case when connection completes synchronously // separate creation and connection for case when connection completes synchronously
// in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null; // in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null;
physical = new PhysicalConnection(this); physical = new PhysicalConnection(this);
physical.BeginConnect(log); physical.BeginConnectAsync(log);
} }
} }
return null; return null;
......
...@@ -73,6 +73,7 @@ private static readonly Message ...@@ -73,6 +73,7 @@ private static readonly Message
private IDuplexPipe _ioPipe; private IDuplexPipe _ioPipe;
private Socket _socket; private Socket _socket;
private SocketAsyncEventArgs _socketArgs;
public PhysicalConnection(PhysicalBridge bridge) public PhysicalConnection(PhysicalBridge bridge)
{ {
...@@ -88,14 +89,101 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -88,14 +89,101 @@ public PhysicalConnection(PhysicalBridge bridge)
OnCreateEcho(); OnCreateEcho();
} }
public void BeginConnect(TextWriter log) internal async void BeginConnectAsync(TextWriter log)
{ {
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0); Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var endpoint = Bridge.ServerEndPoint.EndPoint; var endpoint = Bridge.ServerEndPoint.EndPoint;
Multiplexer.Trace("Connecting...", physicalName); Multiplexer.Trace("Connecting...", physicalName);
_socket = SocketManager.CreateSocket(endpoint); _socket = SocketManager.CreateSocket(endpoint);
Multiplexer.SocketManager.BeginConnectAsync(endpoint, _socket, this, Multiplexer, log);
Multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
var awaitable = new SocketAwaitable();
_socketArgs = new SocketAsyncEventArgs
{
UserToken = awaitable,
RemoteEndPoint = endpoint,
};
_socketArgs.Completed += SocketAwaitable.Callback;
try
{
if (_socket.ConnectAsync(_socketArgs))
{ // asynchronous operation is pending
ConfigureTimeout(_socketArgs, Multiplexer.RawConfig.ConnectTimeout);
}
else
{ // completed synchronously
SocketAwaitable.OnCompleted(_socketArgs);
}
// Complete connection
try
{
bool ignoreConnect = false;
ShouldIgnoreConnect(ref ignoreConnect);
if (ignoreConnect) return;
await awaitable; // wait for the connect to complete or fail (will throw)
if (await ConnectedAsync(_socket, log, Multiplexer.SocketManager).ForAwait())
{
Multiplexer.LogLocked(log, "Starting read");
try
{
StartReading();
// Normal return
}
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext(ex.Message);
Shutdown();
}
}
else
{
ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown();
}
}
catch (ObjectDisposedException)
{
Multiplexer.LogLocked(log, "(socket shutdown)");
try { Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
catch (Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
try { Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
catch (NotImplementedException ex)
{
if (!(endpoint is IPEndPoint))
{
throw new InvalidOperationException("BeginConnect failed with NotImplementedException; consider using IP endpoints, or enable ResolveDns in the configuration", ex);
}
throw;
}
}
private static void ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilliseconds)
{
var timeout = Task.Delay(timeoutMilliseconds);
timeout.ContinueWith((_, state) =>
{
var a = (SocketAsyncEventArgs)state;
try { Socket.CancelConnectAsync(a); } catch { }
try { ((SocketAwaitable)a.UserToken).Complete(0, SocketError.TimedOut); } catch { }
}, args);
} }
private enum ReadMode : byte private enum ReadMode : byte
...@@ -115,6 +203,20 @@ private enum ReadMode : byte ...@@ -115,6 +203,20 @@ private enum ReadMode : byte
public bool TransactionActive { get; internal set; } public bool TransactionActive { get; internal set; }
partial void ShouldIgnoreConnect(ref bool ignore);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown()
{
if (_socket != null)
{
try { _socket.Shutdown(SocketShutdown.Both); } catch { }
try { _socket.Close(); } catch { }
try { _socket.Dispose(); } catch { }
}
try { using (_socketArgs) { } } catch { }
}
public void Dispose() public void Dispose()
{ {
var ioPipe = _ioPipe; var ioPipe = _ioPipe;
...@@ -132,7 +234,7 @@ public void Dispose() ...@@ -132,7 +234,7 @@ public void Dispose()
if (_socket != null) if (_socket != null)
{ {
Multiplexer.SocketManager?.Shutdown(_socket); Shutdown();
_socket = default; _socket = default;
Multiplexer.Trace("Disconnected", physicalName); Multiplexer.Trace("Disconnected", physicalName);
RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed); RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed);
...@@ -244,7 +346,7 @@ void add(string lk, string sk, string v) ...@@ -244,7 +346,7 @@ void add(string lk, string sk, string v)
} }
// burn the socket // burn the socket
Multiplexer.SocketManager?.Shutdown(_socket); Shutdown();
} }
public override string ToString() public override string ToString()
......
...@@ -131,113 +131,7 @@ internal static Socket CreateSocket(EndPoint endpoint) ...@@ -131,113 +131,7 @@ internal static Socket CreateSocket(EndPoint endpoint)
return socket; return socket;
} }
private static void ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilliseconds)
{
var timeout = Task.Delay(timeoutMilliseconds);
timeout.ContinueWith((_, state) =>
{
var a = (SocketAsyncEventArgs)state;
try { Socket.CancelConnectAsync(a); } catch { }
try { ((SocketAwaitable)a.UserToken).Complete(0, SocketError.TimedOut); } catch { }
}, args);
}
internal async void BeginConnectAsync(EndPoint endpoint, Socket socket, PhysicalConnection physicalConnection, ConnectionMultiplexer multiplexer, TextWriter log)
{
multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
var awaitable = new SocketAwaitable();
var args = new SocketAsyncEventArgs
{
UserToken = awaitable,
RemoteEndPoint = endpoint
};
args.Completed += SocketAwaitable.Callback;
try
{
if (socket.ConnectAsync(args))
{ // asynchronous operation is pending
ConfigureTimeout(args, multiplexer.RawConfig.ConnectTimeout);
}
else
{ // completed synchronously
SocketAwaitable.OnCompleted(args);
}
// Complete connection
try
{
bool ignoreConnect = false;
ShouldIgnoreConnect(physicalConnection, ref ignoreConnect);
if (ignoreConnect) return;
await awaitable; // wait for the connect to complete or fail (will throw)
if (physicalConnection != null && await physicalConnection.ConnectedAsync(socket, log, this).ForAwait())
{
multiplexer.LogLocked(log, "Starting read");
try
{
physicalConnection.StartReading();
// Normal return
}
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext(ex.Message);
Shutdown(socket);
}
}
else
{
ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown(socket);
}
}
catch (ObjectDisposedException)
{
multiplexer.LogLocked(log, "(socket shutdown)");
try { physicalConnection?.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
catch (Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
try { physicalConnection?.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
catch (NotImplementedException ex)
{
if (!(endpoint is IPEndPoint))
{
throw new InvalidOperationException("BeginConnect failed with NotImplementedException; consider using IP endpoints, or enable ResolveDns in the configuration", ex);
}
throw;
}
}
partial void OnDispose(); partial void OnDispose();
partial void OnShutdown(Socket socket);
partial void ShouldIgnoreConnect(PhysicalConnection callback, ref bool ignore);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
internal void Shutdown(Socket socket)
{
if (socket != null)
{
OnShutdown(socket);
try { socket.Shutdown(SocketShutdown.Both); } catch { }
try { socket.Close(); } catch { }
try { socket.Dispose(); } catch { }
}
}
internal string GetState() internal string GetState()
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment