Commit 343d1345 authored by Nick Craver's avatar Nick Craver

Cleanup: SocketManager

parent 70c9882f
#if !FEATURE_SOCKET_MODE_POLL #if !FEATURE_SOCKET_MODE_POLL
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
partial class SocketManager public partial class SocketManager
{ {
internal const SocketMode DefaultSocketMode = SocketMode.Async; internal const SocketMode DefaultSocketMode = SocketMode.Async;
...@@ -12,5 +11,4 @@ private void OnAddRead(System.Net.Sockets.Socket socket, ISocketCallback callbac ...@@ -12,5 +11,4 @@ private void OnAddRead(System.Net.Sockets.Socket socket, ISocketCallback callbac
} }
} }
} }
#endif #endif
\ No newline at end of file
...@@ -8,14 +8,13 @@ ...@@ -8,14 +8,13 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
partial class SocketManager public partial class SocketManager
{ {
internal const SocketMode DefaultSocketMode = SocketMode.Poll; internal const SocketMode DefaultSocketMode = SocketMode.Poll;
static readonly IntPtr[] EmptyPointers = new IntPtr[0]; private static readonly IntPtr[] EmptyPointers = new IntPtr[0];
static readonly WaitCallback HelpProcessItems = state => private static readonly WaitCallback HelpProcessItems = state =>
{ {
var qdsl = state as QueueDrainSyncLock; if (state is QueueDrainSyncLock qdsl && qdsl.Consume())
if (qdsl != null && qdsl.Consume())
{ {
var mgr = qdsl.Manager; var mgr = qdsl.Manager;
mgr.ProcessItems(false); mgr.ProcessItems(false);
...@@ -23,9 +22,9 @@ partial class SocketManager ...@@ -23,9 +22,9 @@ partial class SocketManager
} }
}; };
private static ParameterizedThreadStart read = state => ((SocketManager)state).Read(); private static readonly ParameterizedThreadStart read = state => ((SocketManager)state).Read();
readonly Queue<ISocketCallback> readQueue = new Queue<ISocketCallback>(), errorQueue = new Queue<ISocketCallback>(); private readonly Queue<ISocketCallback> readQueue = new Queue<ISocketCallback>(), errorQueue = new Queue<ISocketCallback>();
private readonly Dictionary<IntPtr, SocketPair> socketLookup = new Dictionary<IntPtr, SocketPair>(); private readonly Dictionary<IntPtr, SocketPair> socketLookup = new Dictionary<IntPtr, SocketPair>();
...@@ -36,7 +35,6 @@ partial class SocketManager ...@@ -36,7 +35,6 @@ partial class SocketManager
internal static extern int select([In] int ignoredParameter, [In, Out] IntPtr[] readfds, [In, Out] IntPtr[] writefds, [In, Out] IntPtr[] exceptfds, [In] ref TimeValue timeout); internal static extern int select([In] int ignoredParameter, [In, Out] IntPtr[] readfds, [In, Out] IntPtr[] writefds, [In, Out] IntPtr[] exceptfds, [In] ref TimeValue timeout);
private static void ProcessItems(Queue<ISocketCallback> queue, CallbackOperation operation) private static void ProcessItems(Queue<ISocketCallback> queue, CallbackOperation operation)
{ {
if (queue == null) return; if (queue == null) return;
while (true) while (true)
...@@ -73,7 +71,7 @@ private void OnAddRead(Socket socket, ISocketCallback callback) ...@@ -73,7 +71,7 @@ private void OnAddRead(Socket socket, ISocketCallback callback)
lock (socketLookup) lock (socketLookup)
{ {
if (isDisposed) throw new ObjectDisposedException(name); if (isDisposed) throw new ObjectDisposedException(Name);
var handle = socket.Handle; var handle = socket.Handle;
if (handle == IntPtr.Zero) throw new ObjectDisposedException("socket"); if (handle == IntPtr.Zero) throw new ObjectDisposedException("socket");
...@@ -112,6 +110,7 @@ private void ProcessItems(bool setState) ...@@ -112,6 +110,7 @@ private void ProcessItems(bool setState)
if (setState) managerState = ManagerState.ProcessErrorQueue; if (setState) managerState = ManagerState.ProcessErrorQueue;
ProcessItems(errorQueue, CallbackOperation.Error); ProcessItems(errorQueue, CallbackOperation.Error);
} }
private void Read() private void Read()
{ {
bool weAreReader = false; bool weAreReader = false;
...@@ -149,18 +148,19 @@ internal string LastErrorTimeRelative() ...@@ -149,18 +148,19 @@ internal string LastErrorTimeRelative()
if (tmp == 0) return "never"; if (tmp == 0) return "never";
return unchecked(Environment.TickCount - tmp) + "ms ago"; return unchecked(Environment.TickCount - tmp) + "ms ago";
} }
private ISocketCallback GetCallback(IntPtr key) private ISocketCallback GetCallback(IntPtr key)
{ {
lock(socketLookup) lock(socketLookup)
{ {
SocketPair pair; return socketLookup.TryGetValue(key, out SocketPair pair) ? pair.Callback : null;
return socketLookup.TryGetValue(key, out pair) ? pair.Callback : null;
} }
} }
private void ReadImpl() private void ReadImpl()
{ {
List<IntPtr> dead = null, active = new List<IntPtr>(); List<IntPtr> dead = null, active = new List<IntPtr>();
List<ISocketCallback> activeCallbacks = new List<ISocketCallback>(); var activeCallbacks = new List<ISocketCallback>();
IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers; IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers;
long lastHeartbeat = Environment.TickCount; long lastHeartbeat = Environment.TickCount;
SocketPair[] allSocketPairs = null; SocketPair[] allSocketPairs = null;
...@@ -208,6 +208,7 @@ private void ReadImpl() ...@@ -208,6 +208,7 @@ private void ReadImpl()
{ {
var socket = pair.Value.Socket; var socket = pair.Value.Socket;
if (socket.Handle == pair.Key && socket.Connected) if (socket.Handle == pair.Key && socket.Connected)
{
if (pair.Value.Socket.Connected) if (pair.Value.Socket.Connected)
{ {
active.Add(pair.Key); active.Add(pair.Key);
...@@ -218,6 +219,7 @@ private void ReadImpl() ...@@ -218,6 +219,7 @@ private void ReadImpl()
(dead ?? (dead = new List<IntPtr>())).Add(pair.Key); (dead ?? (dead = new List<IntPtr>())).Add(pair.Key);
} }
} }
}
if (dead != null && dead.Count != 0) if (dead != null && dead.Count != 0)
{ {
managerState = ManagerState.CullDeadSockets; managerState = ManagerState.CullDeadSockets;
...@@ -341,7 +343,6 @@ private void ReadImpl() ...@@ -341,7 +343,6 @@ private void ReadImpl()
} }
} }
if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help
{ {
// seek help, work in parallel, then synchronize // seek help, work in parallel, then synchronize
...@@ -374,11 +375,12 @@ private void StartReader() ...@@ -374,11 +375,12 @@ private void StartReader()
var thread = new Thread(read, 32*1024) // don't need a huge stack var thread = new Thread(read, 32*1024) // don't need a huge stack
{ {
Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal, Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal,
Name = name + ":Read", Name = Name + ":Read",
IsBackground = true IsBackground = true
}; };
thread.Start(this); thread.Start(this);
} }
[StructLayout(LayoutKind.Sequential)] [StructLayout(LayoutKind.Sequential)]
internal struct TimeValue internal struct TimeValue
{ {
...@@ -391,7 +393,7 @@ public TimeValue(int microSeconds) ...@@ -391,7 +393,7 @@ public TimeValue(int microSeconds)
} }
} }
struct SocketPair private struct SocketPair
{ {
public readonly ISocketCallback Callback; public readonly ISocketCallback Callback;
public readonly Socket Socket; public readonly Socket Socket;
...@@ -401,13 +403,15 @@ public SocketPair(Socket socket, ISocketCallback callback) ...@@ -401,13 +403,15 @@ public SocketPair(Socket socket, ISocketCallback callback)
Callback = callback; Callback = callback;
} }
} }
sealed class QueueDrainSyncLock
private sealed class QueueDrainSyncLock
{ {
private int workers; private int workers;
public QueueDrainSyncLock(SocketManager manager) public QueueDrainSyncLock(SocketManager manager)
{ {
Manager = manager; Manager = manager;
} }
public SocketManager Manager { get; } public SocketManager Manager { get; }
internal bool Consume() internal bool Consume()
......
...@@ -17,6 +17,7 @@ internal enum SocketMode ...@@ -17,6 +17,7 @@ internal enum SocketMode
Poll, Poll,
Async Async
} }
/// <summary> /// <summary>
/// Allows callbacks from SocketManager as work is discovered /// Allows callbacks from SocketManager as work is discovered
/// </summary> /// </summary>
...@@ -25,7 +26,10 @@ internal partial interface ISocketCallback ...@@ -25,7 +26,10 @@ internal partial interface ISocketCallback
/// <summary> /// <summary>
/// Indicates that a socket has connected /// Indicates that a socket has connected
/// </summary> /// </summary>
/// <param name="stream">The network stream for this socket.</param>
/// <param name="log">A text logger to write to.</param>
SocketMode Connected(Stream stream, TextWriter log); SocketMode Connected(Stream stream, TextWriter log);
/// <summary> /// <summary>
/// Indicates that the socket has signalled an error condition /// Indicates that the socket has signalled an error condition
/// </summary> /// </summary>
...@@ -37,6 +41,7 @@ internal partial interface ISocketCallback ...@@ -37,6 +41,7 @@ internal partial interface ISocketCallback
/// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data /// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary> /// </summary>
void Read(); void Read();
/// <summary> /// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously /// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary> /// </summary>
...@@ -55,6 +60,7 @@ public SocketToken(Socket socket) ...@@ -55,6 +60,7 @@ public SocketToken(Socket socket)
{ {
Socket = socket; Socket = socket;
} }
public int Available => Socket?.Available ?? 0; public int Available => Socket?.Available ?? 0;
public bool HasValue => Socket != null; public bool HasValue => Socket != null;
...@@ -100,8 +106,8 @@ internal enum ManagerState ...@@ -100,8 +106,8 @@ internal enum ManagerState
ProcessQueues, ProcessQueues,
ProcessReadQueue, ProcessReadQueue,
ProcessErrorQueue, ProcessErrorQueue,
} }
private static readonly ParameterizedThreadStart writeAllQueues = context => private static readonly ParameterizedThreadStart writeAllQueues = context =>
{ {
try { ((SocketManager)context).WriteAllQueues(); } catch { } try { ((SocketManager)context).WriteAllQueues(); } catch { }
...@@ -109,37 +115,41 @@ internal enum ManagerState ...@@ -109,37 +115,41 @@ internal enum ManagerState
private static readonly WaitCallback writeOneQueue = context => private static readonly WaitCallback writeOneQueue = context =>
{ {
try { ((SocketManager)context).WriteOneQueue(); } catch { } try { ((SocketManager)context).WriteOneQueue(); } catch { }
}; };
private readonly string name;
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>(); private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
private bool isDisposed;
private readonly bool useHighPrioritySocketThreads = true;
bool isDisposed; /// <summary>
private bool useHighPrioritySocketThreads = true; /// Gets the name of this SocketManager instance
/// </summary>
public string Name { get; }
/// <summary> /// <summary>
/// Creates a new (optionally named) SocketManager instance /// Creates a new (optionally named) <see cref="SocketManager"/> instance
/// </summary> /// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
public SocketManager(string name = null) : this(name, true) { } public SocketManager(string name = null) : this(name, true) { }
/// <summary> /// <summary>
/// Creates a new SocketManager instance /// Creates a new <see cref="SocketManager"/> instance
/// </summary> /// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
public SocketManager(string name, bool useHighPrioritySocketThreads) public SocketManager(string name, bool useHighPrioritySocketThreads)
{ {
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name; if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
this.name = name; Name = name;
this.useHighPrioritySocketThreads = useHighPrioritySocketThreads; this.useHighPrioritySocketThreads = useHighPrioritySocketThreads;
// we need a dedicated writer, because when under heavy ambient load // we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough // (a busy asp.net site, for example), workers are not reliable enough
#if NETSTANDARD1_5 #if NETSTANDARD1_5
Thread dedicatedWriter = new Thread(writeAllQueues); var dedicatedWriter = new Thread(writeAllQueues);
#else #else
Thread dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack; var dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal; dedicatedWriter.Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal;
#endif #endif
dedicatedWriter.Name = name + ":Write"; dedicatedWriter.Name = name + ":Write";
...@@ -153,11 +163,6 @@ private enum CallbackOperation ...@@ -153,11 +163,6 @@ private enum CallbackOperation
Error Error
} }
/// <summary>
/// Gets the name of this SocketManager instance
/// </summary>
public string Name => name;
/// <summary> /// <summary>
/// Releases all resources associated with this instance /// Releases all resources associated with this instance
/// </summary> /// </summary>
...@@ -185,11 +190,9 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C ...@@ -185,11 +190,9 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C
var formattedEndpoint = Format.ToString(endpoint); var formattedEndpoint = Format.ToString(endpoint);
var tuple = Tuple.Create(socket, callback); var tuple = Tuple.Create(socket, callback);
if (endpoint is DnsEndPoint) if (endpoint is DnsEndPoint dnsEndpoint)
{ {
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state) // A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
DnsEndPoint dnsEndpoint = (DnsEndPoint)endpoint;
#if !FEATURE_THREADPOOL #if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
socket.ConnectAsync(dnsEndpoint.Host, dnsEndpoint.Port).ContinueWith(t => socket.ConnectAsync(dnsEndpoint.Host, dnsEndpoint.Port).ContinueWith(t =>
...@@ -247,6 +250,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C ...@@ -247,6 +250,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C
var token = new SocketToken(socket); var token = new SocketToken(socket);
return token; return token;
} }
internal void SetFastLoopbackOption(Socket socket) internal void SetFastLoopbackOption(Socket socket)
{ {
// SIO_LOOPBACK_FAST_PATH (https://msdn.microsoft.com/en-us/library/windows/desktop/jj841212%28v=vs.85%29.aspx) // SIO_LOOPBACK_FAST_PATH (https://msdn.microsoft.com/en-us/library/windows/desktop/jj841212%28v=vs.85%29.aspx)
...@@ -278,7 +282,7 @@ internal void SetFastLoopbackOption(Socket socket) ...@@ -278,7 +282,7 @@ internal void SetFastLoopbackOption(Socket socket)
{ {
// Win8/Server2012+ only // Win8/Server2012+ only
var osVersion = Environment.OSVersion.Version; var osVersion = Environment.OSVersion.Version;
if (osVersion.Major > 6 || osVersion.Major == 6 && osVersion.Minor >= 2) if (osVersion.Major > 6 || (osVersion.Major == 6 && osVersion.Minor >= 2))
{ {
byte[] optionInValue = BitConverter.GetBytes(1); byte[] optionInValue = BitConverter.GetBytes(1);
socket.IOControl(SIO_LOOPBACK_FAST_PATH, optionInValue, null); socket.IOControl(SIO_LOOPBACK_FAST_PATH, optionInValue, null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment