Commit 253c719e authored by Marc Gravell's avatar Marc Gravell

simplify build; **just** netstandard2.0; no #if fun

parent c4a5d673
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
<DefaultLanguage>en-US</DefaultLanguage> <DefaultLanguage>en-US</DefaultLanguage>
<IncludeSymbols>false</IncludeSymbols> <IncludeSymbols>false</IncludeSymbols>
<LibraryTargetFrameworks>net46;netstandard2.0</LibraryTargetFrameworks> <LibraryTargetFrameworks>netstandard2.0</LibraryTargetFrameworks>
<CoreFxVersion>4.5.0</CoreFxVersion> <CoreFxVersion>4.5.0</CoreFxVersion>
<xUnitVersion>2.4.0-beta.2.build3981</xUnitVersion> <xUnitVersion>2.4.0-beta.2.build3981</xUnitVersion>
</PropertyGroup> </PropertyGroup>
......
...@@ -10,27 +10,13 @@ ...@@ -10,27 +10,13 @@
<OutputTypeEx>Library</OutputTypeEx> <OutputTypeEx>Library</OutputTypeEx>
<SignAssembly>true</SignAssembly> <SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign> <PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<LangVersion>7.2</LangVersion> <LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' OR '$(TargetFramework)' == 'net46' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<PropertyGroup Condition=" '$(TargetFramework)' == 'net45' or '$(TargetFramework)' == 'net46'">
<DefineConstants>$(DefineConstants);FEATURE_PERFCOUNTER;</DefineConstants>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<!-- theses intentionally does not track CoreFxVersion -->
<PackageReference Include="System.IO.Compression" Version="4.3.0" />
<PackageReference Include="System.Reflection.Emit.ILGeneration" Version="4.3.0" />
<PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.3.0" /> <PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.3.0" />
<PackageReference Include="System.Memory" Version="$(CoreFxVersion)" />
<PackageReference Include="System.Buffers" Version="$(CoreFxVersion)" />
<PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" /> <PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.33" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.43" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="$(CoreFxVersion)" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -2046,12 +2046,12 @@ void add(string lk, string sk, string v) ...@@ -2046,12 +2046,12 @@ void add(string lk, string sk, string v)
add("ThreadPool-IO-Completion", "IOCP", iocp); add("ThreadPool-IO-Completion", "IOCP", iocp);
add("ThreadPool-Workers", "WORKER", worker); add("ThreadPool-Workers", "WORKER", worker);
data.Add(Tuple.Create("Busy-Workers", busyWorkerCount.ToString())); data.Add(Tuple.Create("Busy-Workers", busyWorkerCount.ToString()));
#if FEATURE_PERFCOUNTER
if (IncludePerformanceCountersInExceptions) if (IncludePerformanceCountersInExceptions)
{ {
add("Local-CPU", "Local-CPU", GetSystemCpuPercent()); add("Local-CPU", "Local-CPU", GetSystemCpuPercent());
} }
#endif
sb.Append(" (Please take a look at this article for some common client-side issues that can cause timeouts: "); sb.Append(" (Please take a look at this article for some common client-side issues that can cause timeouts: ");
sb.Append(timeoutHelpLink); sb.Append(timeoutHelpLink);
sb.Append(")"); sb.Append(")");
...@@ -2085,7 +2085,6 @@ void add(string lk, string sk, string v) ...@@ -2085,7 +2085,6 @@ void add(string lk, string sk, string v)
} }
} }
#if FEATURE_PERFCOUNTER
internal static string GetThreadPoolAndCPUSummary(bool includePerformanceCounters) internal static string GetThreadPoolAndCPUSummary(bool includePerformanceCounters)
{ {
GetThreadPoolStats(out string iocp, out string worker); GetThreadPoolStats(out string iocp, out string worker);
...@@ -2099,7 +2098,6 @@ private static string GetSystemCpuPercent() ...@@ -2099,7 +2098,6 @@ private static string GetSystemCpuPercent()
? Math.Round(systemCPU, 2) + "%" ? Math.Round(systemCPU, 2) + "%"
: "unavailable"; : "unavailable";
} }
#endif
private static int GetThreadPoolStats(out string iocp, out string worker) private static int GetThreadPoolStats(out string iocp, out string worker)
{ {
ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxIoThreads); ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxIoThreads);
......
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -260,12 +261,11 @@ public static bool EmulateStaleConnection ...@@ -260,12 +261,11 @@ public static bool EmulateStaleConnection
} }
#endif #endif
#if FEATURE_PERFCOUNTER
internal static class PerfCounterHelper internal static class PerfCounterHelper
{ {
private static readonly object staticLock = new object(); private static readonly object staticLock = new object();
private static volatile PerformanceCounter _cpu; private static volatile PerformanceCounter _cpu;
private static volatile bool _disabled; private static volatile bool _disabled = !RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
public static bool TryGetSystemCPU(out float value) public static bool TryGetSystemCPU(out float value)
{ {
...@@ -306,7 +306,6 @@ public static bool TryGetSystemCPU(out float value) ...@@ -306,7 +306,6 @@ public static bool TryGetSystemCPU(out float value)
return false; return false;
} }
} }
#endif
#if VERBOSE #if VERBOSE
......
...@@ -124,12 +124,10 @@ internal static Exception NoConnectionAvailable(bool includeDetail, bool include ...@@ -124,12 +124,10 @@ internal static Exception NoConnectionAvailable(bool includeDetail, bool include
exceptionmessage.Append("; ").Append(innermostExceptionstring); exceptionmessage.Append("; ").Append(innermostExceptionstring);
} }
#if FEATURE_PERFCOUNTER
if (includeDetail) if (includeDetail)
{ {
exceptionmessage.Append("; ").Append(ConnectionMultiplexer.GetThreadPoolAndCPUSummary(includePerformanceCounters)); exceptionmessage.Append("; ").Append(ConnectionMultiplexer.GetThreadPoolAndCPUSummary(includePerformanceCounters));
} }
#endif
var ex = new RedisConnectionException(ConnectionFailureType.UnableToResolvePhysicalConnection, exceptionmessage.ToString(), innerException, message?.Status ?? CommandStatus.Unknown); var ex = new RedisConnectionException(ConnectionFailureType.UnableToResolvePhysicalConnection, exceptionmessage.ToString(), innerException, message?.Status ?? CommandStatus.Unknown);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
...@@ -834,43 +835,47 @@ private static LocalCertificateSelectionCallback GetAmbientCertificateCallback() ...@@ -834,43 +835,47 @@ private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
return null; return null;
} }
async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(IDuplexPipe pipe, TextWriter log) async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWriter log)
{ {
try try
{ {
var socketMode = SocketManager.DefaultSocketMode; var socketMode = SocketMode.Async;
// disallow connection in some cases // disallow connection in some cases
OnDebugAbort(); OnDebugAbort();
// the order is important here: // the order is important here:
// [network]<==[ssl]<==[logging]<==[buffered] // [Socket]<==[NetworkStream]<==[SslStream]<==[
var config = Multiplexer.RawConfig; var config = Multiplexer.RawConfig;
IDuplexPipe pipe;
if (config.Ssl) if (config.Ssl)
{ {
throw new NotImplementedException("TLS");
//Multiplexer.LogLocked(log, "Configuring SSL"); Multiplexer.LogLocked(log, "Configuring SSL");
//var host = config.SslHost; var host = config.SslHost;
//if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(Bridge.ServerEndPoint.EndPoint); if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(Bridge.ServerEndPoint.EndPoint);
//var ssl = new SslStream(stream, false, config.CertificateValidationCallback, var ssl = new SslStream(new NetworkStream(socket), false, config.CertificateValidationCallback,
// config.CertificateSelectionCallback ?? GetAmbientCertificateCallback(), config.CertificateSelectionCallback ?? GetAmbientCertificateCallback(),
// EncryptionPolicy.RequireEncryption); EncryptionPolicy.RequireEncryption);
//try try
//{ {
// ssl.AuthenticateAsClient(host, config.SslProtocols); ssl.AuthenticateAsClient(host, config.SslProtocols);
// Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}"); Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
//} }
//catch (AuthenticationException authexception) catch (AuthenticationException authexception)
//{ {
// RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, authexception); RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, authexception);
// Multiplexer.Trace("Encryption failure"); Multiplexer.Trace("Encryption failure");
// return SocketMode.Abort; return SocketMode.Abort;
//} }
//stream = ssl; pipe = StreamConnector.GetDuplex(ssl, name: Bridge.Name);
//socketMode = SocketMode.Async; }
else
{
pipe = SocketConnection.Create(socket, name: Bridge.Name);
} }
OnWrapForLogging(ref pipe, physicalName); OnWrapForLogging(ref pipe, physicalName);
......
//using System;
//namespace StackExchange.Redis
//{
// internal static class PlatformHelper
// {
// public static bool IsMono { get; } = Type.GetType("Mono.Runtime") != null;
// public static bool IsUnix { get; } = (int)Environment.OSVersion.Platform == 4
// || (int)Environment.OSVersion.Platform == 6
// || (int)Environment.OSVersion.Platform == 128;
// public static SocketMode DefaultSocketMode = IsMono && IsUnix ? SocketMode.Async : SocketMode.Poll;
// }
//}
...@@ -182,9 +182,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra ...@@ -182,9 +182,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
unableToConnectError = true; unableToConnectError = true;
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "; err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
} }
#if FEATURE_PERFCOUNTER
err += ConnectionMultiplexer.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions); err += ConnectionMultiplexer.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
#endif
} }
} }
} }
......
#if !FEATURE_SOCKET_MODE_POLL
namespace StackExchange.Redis
{
public partial class SocketManager
{
internal const SocketMode DefaultSocketMode = SocketMode.Async;
private void OnAddRead(System.Net.Sockets.Socket socket, ISocketCallback callback)
{
throw new System.NotSupportedException();
}
}
}
#endif
\ No newline at end of file
#if FEATURE_SOCKET_MODE_POLL
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
namespace StackExchange.Redis
{
public partial class SocketManager
{
internal static SocketMode DefaultSocketMode = PlatformHelper.DefaultSocketMode;
private static readonly IntPtr[] EmptyPointers = new IntPtr[0];
private static readonly WaitCallback HelpProcessItems = state =>
{
if (state is QueueDrainSyncLock qdsl && qdsl.Consume())
{
var mgr = qdsl.Manager;
mgr.ProcessItems(false);
qdsl.Pulse();
}
};
private static readonly ParameterizedThreadStart read = state => ((SocketManager)state).Read();
private readonly Queue<ISocketCallback> readQueue = new Queue<ISocketCallback>(), errorQueue = new Queue<ISocketCallback>();
private readonly Dictionary<IntPtr, SocketPair> socketLookup = new Dictionary<IntPtr, SocketPair>();
private int readerCount;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1060:MovePInvokesToNativeMethodsClass")]
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern int select([In] int ignoredParameter, [In, Out] IntPtr[] readfds, [In, Out] IntPtr[] writefds, [In, Out] IntPtr[] exceptfds, [In] ref TimeValue timeout);
private static void ProcessItems(Queue<ISocketCallback> queue, CallbackOperation operation)
{
if (queue == null) return;
while (true)
{
// get the next item (note we could be competing with a worker here, hence lock)
ISocketCallback callback;
lock (queue)
{
if (queue.Count == 0) break;
callback = queue.Dequeue();
}
if (callback != null)
{
try
{
switch (operation)
{
case CallbackOperation.Read: callback.Read(); break;
case CallbackOperation.Error: callback.Error(); break;
}
}
catch (Exception ex)
{
Trace.WriteLine(ex);
}
}
}
}
private void OnAddRead(Socket socket, ISocketCallback callback)
{
if (socket == null) throw new ArgumentNullException(nameof(socket));
if (callback == null) throw new ArgumentNullException(nameof(callback));
lock (socketLookup)
{
if (isDisposed) throw new ObjectDisposedException(Name);
var handle = socket.Handle;
if (handle == IntPtr.Zero) throw new ObjectDisposedException("socket");
socketLookup.Add(handle, new SocketPair(socket, callback));
if (socketLookup.Count == 1)
{
Monitor.PulseAll(socketLookup);
if (Interlocked.CompareExchange(ref readerCount, 0, 0) == 0)
StartReader();
}
}
}
partial void OnDispose()
{
lock (socketLookup)
{
isDisposed = true;
socketLookup.Clear();
Monitor.PulseAll(socketLookup);
}
}
partial void OnShutdown(Socket socket)
{
lock (socketLookup)
{
socketLookup.Remove(socket.Handle);
}
}
private void ProcessItems(bool setState)
{
if(setState) managerState = ManagerState.ProcessReadQueue;
ProcessItems(readQueue, CallbackOperation.Read);
if (setState) managerState = ManagerState.ProcessErrorQueue;
ProcessItems(errorQueue, CallbackOperation.Error);
}
private void Read()
{
bool weAreReader = false;
try
{
weAreReader = Interlocked.CompareExchange(ref readerCount, 1, 0) == 0;
if (weAreReader)
{
managerState = ManagerState.Preparing;
ReadImpl();
managerState = ManagerState.Inactive;
}
}
catch (Exception ex)
{
if (weAreReader)
{
managerState = ManagerState.Faulted;
}
Debug.WriteLine(ex);
Trace.WriteLine(ex);
}
finally
{
if (weAreReader) Interlocked.Exchange(ref readerCount, 0);
}
}
internal ManagerState State => managerState;
private volatile ManagerState managerState;
private volatile int lastErrorTicks;
internal string LastErrorTimeRelative()
{
var tmp = lastErrorTicks;
if (tmp == 0) return "never";
return unchecked(Environment.TickCount - tmp) + "ms ago";
}
private ISocketCallback GetCallback(IntPtr key)
{
lock(socketLookup)
{
return socketLookup.TryGetValue(key, out SocketPair pair) ? pair.Callback : null;
}
}
private void ReadImpl()
{
List<IntPtr> dead = null, active = new List<IntPtr>();
var activeCallbacks = new List<ISocketCallback>();
IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers;
long lastHeartbeat = Environment.TickCount;
SocketPair[] allSocketPairs = null;
while (true)
{
managerState = ManagerState.CheckForHeartbeat;
active.Clear();
activeCallbacks.Clear();
dead?.Clear();
// this check is actually a pace-maker; sometimes the Timer callback stalls for
// extended periods of time, which can cause socket disconnect
long now = Environment.TickCount;
if (unchecked(now - lastHeartbeat) >= 15000)
{
managerState = ManagerState.ExecuteHeartbeat;
lastHeartbeat = now;
lock (socketLookup)
{
if (allSocketPairs == null || allSocketPairs.Length != socketLookup.Count)
allSocketPairs = new SocketPair[socketLookup.Count];
socketLookup.Values.CopyTo(allSocketPairs, 0);
}
foreach (var pair in allSocketPairs)
{
var callback = pair.Callback;
if (callback != null) try { callback.OnHeartbeat(); } catch { }
}
}
managerState = ManagerState.LocateActiveSockets;
lock (socketLookup)
{
if (isDisposed) return;
if (socketLookup.Count == 0)
{
// if empty, give it a few seconds chance before exiting
managerState = ManagerState.NoSocketsPause;
Monitor.Wait(socketLookup, TimeSpan.FromSeconds(20));
if (socketLookup.Count == 0) return; // nothing new came in, so exit
}
managerState = ManagerState.PrepareActiveSockets;
foreach (var pair in socketLookup)
{
var socket = pair.Value.Socket;
if (socket.Handle == pair.Key && socket.Connected)
{
if (pair.Value.Socket.Connected)
{
active.Add(pair.Key);
activeCallbacks.Add(pair.Value.Callback);
}
else
{
(dead ?? (dead = new List<IntPtr>())).Add(pair.Key);
}
}
}
if (dead != null && dead.Count != 0)
{
managerState = ManagerState.CullDeadSockets;
foreach (var socket in dead) socketLookup.Remove(socket);
}
}
int pollingSockets = active.Count;
if (pollingSockets == 0)
{
// nobody had actual sockets; just sleep
managerState = ManagerState.NoActiveSocketsPause;
Thread.Sleep(10);
continue;
}
if (readSockets.Length < active.Count + 1)
{
managerState = ManagerState.GrowingSocketArray;
ConnectionMultiplexer.TraceWithoutContext("Resizing socket array for " + active.Count + " sockets");
readSockets = new IntPtr[active.Count + 6]; // leave so space for growth
errorSockets = new IntPtr[active.Count + 6];
}
managerState = ManagerState.CopyingPointersForSelect;
readSockets[0] = errorSockets[0] = (IntPtr)active.Count;
active.CopyTo(readSockets, 1);
active.CopyTo(errorSockets, 1);
int ready;
try
{
var timeout = new TimeValue(1000);
managerState = ManagerState.ExecuteSelect;
ready = select(0, readSockets, null, errorSockets, ref timeout);
managerState = ManagerState.ExecuteSelectComplete;
if (ready <= 0) // -ve typically means a socket was disposed just before; just retry
{
bool hasWorkToDo = false;
if (ready == 0)
{
managerState = ManagerState.CheckForStaleConnections;
foreach (var s in activeCallbacks)
{
if (s.IsDataAvailable)
{
hasWorkToDo = true;
}
else
{
#pragma warning disable 0420
s.CheckForStaleConnection(ref managerState);
#pragma warning restore 0420
}
}
managerState = ManagerState.CheckForStaleConnectionsDone;
}
else
{
lastErrorTicks = Environment.TickCount;
}
if (!hasWorkToDo)
{
continue;
}
}
ConnectionMultiplexer.TraceWithoutContext((int)readSockets[0] != 0, "Read sockets: " + (int)readSockets[0]);
ConnectionMultiplexer.TraceWithoutContext((int)errorSockets[0] != 0, "Error sockets: " + (int)errorSockets[0]);
}
catch (Exception ex)
{ // this typically means a socket was disposed just before; just retry
Trace.WriteLine(ex.Message);
continue;
}
bool haveWork = false;
int queueCount = (int)readSockets[0];
if (queueCount != 0)
{
managerState = ManagerState.EnqueueRead;
lock (readQueue)
{
for (int i = 1; i <= queueCount; i++)
{
var callback = GetCallback(readSockets[i]);
if (callback != null)
{
readQueue.Enqueue(callback);
haveWork = true;
}
}
}
}
queueCount = (int)errorSockets[0];
if (queueCount != 0)
{
managerState = ManagerState.EnqueueError;
lock (errorQueue)
{
for (int i = 1; i <= queueCount; i++)
{
var callback = GetCallback(errorSockets[i]);
if (callback != null)
{
errorQueue.Enqueue(callback);
haveWork = true;
}
}
}
}
if(!haveWork)
{
// edge case: select is returning 0, but data could still be available
managerState = ManagerState.EnqueueReadFallback;
lock (readQueue)
{
foreach (var callback in activeCallbacks)
{
if(callback.IsDataAvailable)
{
readQueue.Enqueue(callback);
}
}
}
}
if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help
{
// seek help, work in parallel, then synchronize
var obj = new QueueDrainSyncLock(this);
lock (obj)
{
managerState = ManagerState.RequestAssistance;
ThreadPool.QueueUserWorkItem(HelpProcessItems, obj);
managerState = ManagerState.ProcessQueues;
ProcessItems(true);
if (!obj.Consume())
{ // then our worker arrived and picked up work; we need
// to let it finish; note that if it *didn't* get that far
// yet, the Consume() call will mean that it never tries
Monitor.Wait(obj);
}
}
}
else
{
// just do it ourself
managerState = ManagerState.ProcessQueues;
ProcessItems(true);
}
}
}
private void StartReader()
{
var thread = new Thread(read, 32*1024) // don't need a huge stack
{
Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal,
Name = Name + ":Read",
IsBackground = true
};
thread.Start(this);
}
[StructLayout(LayoutKind.Sequential)]
internal struct TimeValue
{
public int Seconds;
public int Microseconds;
public TimeValue(int microSeconds)
{
Seconds = (int)(microSeconds / 1000000L);
Microseconds = (int)(microSeconds % 1000000L);
}
}
private struct SocketPair
{
public readonly ISocketCallback Callback;
public readonly Socket Socket;
public SocketPair(Socket socket, ISocketCallback callback)
{
Socket = socket;
Callback = callback;
}
}
private sealed class QueueDrainSyncLock
{
private int workers;
public QueueDrainSyncLock(SocketManager manager)
{
Manager = manager;
}
public SocketManager Manager { get; }
internal bool Consume()
{
return Interlocked.CompareExchange(ref workers, 1, 0) == 0;
}
internal void Pulse()
{
lock (this)
{
Monitor.PulseAll(this);
}
}
}
}
}
#endif
...@@ -26,9 +26,9 @@ internal partial interface ISocketCallback ...@@ -26,9 +26,9 @@ internal partial interface ISocketCallback
/// <summary> /// <summary>
/// Indicates that a socket has connected /// Indicates that a socket has connected
/// </summary> /// </summary>
/// <param name="pipe">The network stream for this socket.</param> /// <param name="socket">The socket.</param>
/// <param name="log">A text logger to write to.</param> /// <param name="log">A text logger to write to.</param>
ValueTask<SocketMode> ConnectedAsync(IDuplexPipe pipe, TextWriter log); ValueTask<SocketMode> ConnectedAsync(Socket socket, TextWriter log);
/// <summary> /// <summary>
/// Indicates that the socket has signalled an error condition /// Indicates that the socket has signalled an error condition
...@@ -173,17 +173,57 @@ public void Dispose() ...@@ -173,17 +173,57 @@ public void Dispose()
} }
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log) internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback)
{
void proxyCallback(IAsyncResult ar)
{
if (!ar.CompletedSynchronously)
{
asyncCallback(ar);
}
}
var result = beginAsync(proxyCallback);
if (result.CompletedSynchronously)
{
result.AsyncWaitHandle.WaitOne();
asyncCallback(result);
}
}
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily; var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SocketConnection.SetRecommendedClientOptions(socket); SocketConnection.SetRecommendedClientOptions(socket);
if (addressFamily == AddressFamily.Unix) socket.NoDelay = false;
try try
{ {
var formattedEndpoint = Format.ToString(endpoint); var formattedEndpoint = Format.ToString(endpoint);
var t = SocketConnection.ConnectAsync(endpoint, _pipeOptions, var tuple = Tuple.Create(socket, callback);
onConnected: conn => EndConnectAsync(conn, multiplexer, log, callback), multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
socket: socket); // A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
GC.KeepAlive(t); // make compiler happier if (endpoint is DnsEndPoint dnsEndpoint)
{
RunWithCompletionType(
cb => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple),
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
}
else
{
RunWithCompletionType(
cb => socket.BeginConnect(endpoint, cb, tuple),
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
}
} }
catch (NotImplementedException ex) catch (NotImplementedException ex)
{ {
...@@ -223,16 +263,18 @@ internal void Shutdown(SocketToken token) ...@@ -223,16 +263,18 @@ internal void Shutdown(SocketToken token)
Shutdown(token.Socket); Shutdown(token.Socket);
} }
private async Task EndConnectAsync(SocketConnection connection, ConnectionMultiplexer multiplexer, TextWriter log, ISocketCallback callback) private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log, Tuple<Socket, ISocketCallback> tuple)
{ {
var socket = tuple.Item1;
var callback = tuple.Item2;
try try
{ {
bool ignoreConnect = false; bool ignoreConnect = false;
var socket = connection?.Socket; ShouldIgnoreConnect(tuple.Item2, ref ignoreConnect);
ShouldIgnoreConnect(callback, ref ignoreConnect);
if (ignoreConnect) return; if (ignoreConnect) return;
socket.EndConnect(ar);
var socketMode = callback == null ? SocketMode.Abort : await callback.ConnectedAsync(connection, log); var socketMode = callback == null ? SocketMode.Abort : await callback.ConnectedAsync(socket, log);
switch (socketMode) switch (socketMode)
{ {
case SocketMode.Async: case SocketMode.Async:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment