Commit b81cba51 authored by Marc Gravell's avatar Marc Gravell Committed by Nick Craver

somehow this didn't upload :/

parent 15ebc8ab
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.58" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.62" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -94,7 +94,8 @@ public void BeginConnect(TextWriter log) ...@@ -94,7 +94,8 @@ public void BeginConnect(TextWriter log)
var endpoint = Bridge.ServerEndPoint.EndPoint; var endpoint = Bridge.ServerEndPoint.EndPoint;
Multiplexer.Trace("Connecting...", physicalName); Multiplexer.Trace("Connecting...", physicalName);
_socket = Multiplexer.SocketManager.BeginConnect(endpoint, this, Multiplexer, log); _socket = SocketManager.CreateSocket(endpoint);
Multiplexer.SocketManager.BeginConnectAsync(endpoint, _socket, this, Multiplexer, log);
} }
private enum ReadMode : byte private enum ReadMode : byte
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial; using Pipelines.Sockets.Unofficial;
namespace StackExchange.Redis namespace StackExchange.Redis
...@@ -127,58 +128,49 @@ private void Dispose(bool disposing) ...@@ -127,58 +128,49 @@ private void Dispose(bool disposing)
/// </summary> /// </summary>
~SocketManager() => Dispose(false); ~SocketManager() => Dispose(false);
internal Socket BeginConnect(EndPoint endpoint, PhysicalConnection callback, ConnectionMultiplexer multiplexer, TextWriter log) internal static Socket CreateSocket(EndPoint endpoint)
{ {
void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback)
{
void proxyCallback(IAsyncResult ar)
{
if (!ar.CompletedSynchronously)
{
asyncCallback(ar);
}
}
var result = beginAsync(proxyCallback);
if (result.CompletedSynchronously)
{
result.AsyncWaitHandle.WaitOne();
asyncCallback(result);
}
}
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily; var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var protocolType = addressFamily == AddressFamily.Unix ? ProtocolType.Unspecified : ProtocolType.Tcp; var protocolType = addressFamily == AddressFamily.Unix ? ProtocolType.Unspecified : ProtocolType.Tcp;
var socket = new Socket(addressFamily, SocketType.Stream, protocolType); var socket = new Socket(addressFamily, SocketType.Stream, protocolType);
SocketConnection.SetRecommendedClientOptions(socket); SocketConnection.SetRecommendedClientOptions(socket);
return socket;
}
static void ConfigureTimeout(SocketAsyncEventArgs args, int timeoutMilliseconds)
{
var timeout = Task.Delay(timeoutMilliseconds);
timeout.ContinueWith((t, state) =>
{
var a = (SocketAsyncEventArgs)state;
try { Socket.CancelConnectAsync(a); } catch { }
try try
{ ((SocketAwaitable)a.UserToken).Complete(0, SocketError.TimedOut); }
catch { }
}, args);
}
internal void BeginConnectAsync(EndPoint endpoint, Socket socket, PhysicalConnection physicalConnection, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
var formattedEndpoint = Format.ToString(endpoint); var formattedEndpoint = Format.ToString(endpoint);
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
if (endpoint is DnsEndPoint dnsEndpoint) var awaitable = new SocketAwaitable();
var args = new SocketAsyncEventArgs();
args.UserToken = awaitable;
args.RemoteEndPoint = endpoint;
args.Completed += SocketAwaitable.Callback;
try
{ {
RunWithCompletionType( if (socket.ConnectAsync(args))
cb => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, null),
ar =>
{ {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint); ConfigureTimeout(args, multiplexer.RawConfig.ConnectTimeout);
EndConnectImpl(ar, multiplexer, log, socket, callback);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
} }
else else
{ {
RunWithCompletionType( SocketAwaitable.OnCompleted(args);
cb => socket.BeginConnect(endpoint, cb, null),
ar =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, socket, callback);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
} }
EndConnectAsync(awaitable, multiplexer, log, socket, physicalConnection);
} }
catch (NotImplementedException ex) catch (NotImplementedException ex)
{ {
...@@ -188,16 +180,15 @@ void proxyCallback(IAsyncResult ar) ...@@ -188,16 +180,15 @@ void proxyCallback(IAsyncResult ar)
} }
throw; throw;
} }
return socket;
} }
private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log, Socket socket, PhysicalConnection connection) private async void EndConnectAsync(SocketAwaitable awaitable, ConnectionMultiplexer multiplexer, TextWriter log, Socket socket, PhysicalConnection connection)
{ {
try try
{ {
bool ignoreConnect = false; bool ignoreConnect = false;
ShouldIgnoreConnect(connection, ref ignoreConnect); ShouldIgnoreConnect(connection, ref ignoreConnect);
if (ignoreConnect) return; if (ignoreConnect) return;
socket.EndConnect(ar); await awaitable;
var socketMode = connection == null ? SocketMode.Abort : await connection.ConnectedAsync(socket, log, this).ForAwait(); var socketMode = connection == null ? SocketMode.Abort : await connection.ConnectedAsync(socket, log, this).ForAwait();
switch (socketMode) switch (socketMode)
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj" /> <ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.58" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.62" />
</ItemGroup> </ItemGroup>
</Project> </Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment