Commit 165bdc60 authored by Marc Gravell's avatar Marc Gravell

Moved from Socket.Select to P/Invoke; I will have to revisit this for Mono,...

Moved from Socket.Select to P/Invoke; I will have to revisit this for Mono, but: Socket.Select simply allocates too many damned arrays
parent 66b8773f
...@@ -19,7 +19,7 @@ public void VerifyReceiveConfigChangeBroadcast() ...@@ -19,7 +19,7 @@ public void VerifyReceiveConfigChangeBroadcast()
using (var receiver = Create()) using (var receiver = Create())
{ {
int total = 0; int total = 0;
receiver.ConfigurationChanged += (s, a) => receiver.ConfigurationChangedBroadcast += (s, a) =>
{ {
Console.WriteLine("Config changed: " + (a.EndPoint == null ? "(none)" : a.EndPoint.ToString())); Console.WriteLine("Config changed: " + (a.EndPoint == null ? "(none)" : a.EndPoint.ToString()));
Interlocked.Increment(ref total); Interlocked.Increment(ref total);
...@@ -29,7 +29,7 @@ public void VerifyReceiveConfigChangeBroadcast() ...@@ -29,7 +29,7 @@ public void VerifyReceiveConfigChangeBroadcast()
long count = sender.PublishReconfigure(); long count = sender.PublishReconfigure();
GetServer(receiver).Ping(); GetServer(receiver).Ping();
GetServer(receiver).Ping(); GetServer(receiver).Ping();
Assert.IsTrue(count >= 2); Assert.IsTrue(count >= 2, "subscribers");
Assert.IsTrue(Interlocked.CompareExchange(ref total, 0, 0) >= 1, "total (1st)"); Assert.IsTrue(Interlocked.CompareExchange(ref total, 0, 0) >= 1, "total (1st)");
Interlocked.Exchange(ref total, 0); Interlocked.Exchange(ref total, 0);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System.IO; using System.IO;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
namespace StackExchange.Redis namespace StackExchange.Redis
...@@ -32,8 +33,18 @@ public SocketManager(string name = null) ...@@ -32,8 +33,18 @@ public SocketManager(string name = null)
public string Name { get { return name; } } public string Name { get { return name; } }
bool isDisposed; bool isDisposed;
private readonly Dictionary<Socket, ISocketCallback> socketLookup = new Dictionary<Socket, ISocketCallback>(); private readonly Dictionary<IntPtr, SocketPair> socketLookup = new Dictionary<IntPtr, SocketPair>();
private readonly List<Socket> readQueue = new List<Socket>(), errorQueue = new List<Socket>();
struct SocketPair
{
public readonly Socket Socket;
public readonly ISocketCallback Callback;
public SocketPair(Socket socket, ISocketCallback callback)
{
this.Socket = socket;
this.Callback = callback;
}
}
/// <summary> /// <summary>
/// Adds a new socket and callback to the manager /// Adds a new socket and callback to the manager
...@@ -46,7 +57,10 @@ private void AddRead(Socket socket, ISocketCallback callback) ...@@ -46,7 +57,10 @@ private void AddRead(Socket socket, ISocketCallback callback)
lock (socketLookup) lock (socketLookup)
{ {
if (isDisposed) throw new ObjectDisposedException(name); if (isDisposed) throw new ObjectDisposedException(name);
socketLookup.Add(socket, callback);
var handle = socket.Handle;
if(handle == IntPtr.Zero) throw new ObjectDisposedException("socket");
socketLookup.Add(handle, new SocketPair(socket, callback));
if (socketLookup.Count == 1) if (socketLookup.Count == 1)
{ {
Monitor.PulseAll(socketLookup); Monitor.PulseAll(socketLookup);
...@@ -86,13 +100,17 @@ private void Read() ...@@ -86,13 +100,17 @@ private void Read()
} }
} }
readonly Queue<IntPtr> readQueue = new Queue<IntPtr>(), errorQueue = new Queue<IntPtr>();
static readonly IntPtr[] EmptyPointers = new IntPtr[0];
private void ReadImpl() private void ReadImpl()
{ {
List<Socket> dead = null; List<IntPtr> dead = null, active = new List<IntPtr>();
IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers;
while (true) while (true)
{ {
readQueue.Clear(); active.Clear();
errorQueue.Clear(); if (dead != null) dead.Clear();
lock (socketLookup) lock (socketLookup)
{ {
if (isDisposed) return; if (isDisposed) return;
...@@ -103,17 +121,17 @@ private void ReadImpl() ...@@ -103,17 +121,17 @@ private void ReadImpl()
Monitor.Wait(socketLookup, TimeSpan.FromSeconds(20)); Monitor.Wait(socketLookup, TimeSpan.FromSeconds(20));
if (socketLookup.Count == 0) return; // nothing new came in, so exit if (socketLookup.Count == 0) return; // nothing new came in, so exit
} }
if (dead != null) dead.Clear();
foreach (var pair in socketLookup) foreach (var pair in socketLookup)
{ {
if (pair.Key.Connected) var socket = pair.Value.Socket;
if(socket.Handle == pair.Key && socket.Connected)
if (pair.Value.Socket.Connected)
{ {
readQueue.Add(pair.Key); active.Add(pair.Key);
errorQueue.Add(pair.Key);
} }
else else
{ {
(dead ?? (dead = new List<Socket>())).Add(pair.Key); (dead ?? (dead = new List<IntPtr>())).Add(pair.Key);
} }
} }
if (dead != null && dead.Count != 0) if (dead != null && dead.Count != 0)
...@@ -121,8 +139,7 @@ private void ReadImpl() ...@@ -121,8 +139,7 @@ private void ReadImpl()
foreach (var socket in dead) socketLookup.Remove(socket); foreach (var socket in dead) socketLookup.Remove(socket);
} }
} }
int pollingSockets = active.Count;
int pollingSockets = readQueue.Count;
if (pollingSockets == 0) if (pollingSockets == 0)
{ {
// nobody had actual sockets; just sleep // nobody had actual sockets; just sleep
...@@ -130,22 +147,53 @@ private void ReadImpl() ...@@ -130,22 +147,53 @@ private void ReadImpl()
continue; continue;
} }
if (readSockets.Length < active.Count + 1)
{
ConnectionMultiplexer.TraceWithoutContext("Resizing socket array for " + active.Count + " sockets");
readSockets = new IntPtr[active.Count + 6]; // leave so space for growth
errorSockets = new IntPtr[active.Count + 6];
}
readSockets[0] = errorSockets[0] = (IntPtr)active.Count;
active.CopyTo(readSockets, 1);
active.CopyTo(errorSockets, 1);
int ready;
try try
{ {
Socket.Select(readQueue, null, errorQueue, 100); var timeout = new TimeValue(100);
ConnectionMultiplexer.TraceWithoutContext(readQueue.Count != 0, "Read sockets: " + readQueue.Count); ready = select(0, readSockets, null, errorSockets, ref timeout);
ConnectionMultiplexer.TraceWithoutContext(errorQueue.Count != 0, "Error sockets: " + errorQueue.Count); if (ready <= 0)
{
continue; // -ve typically means a socket was disposed just before; just retry
}
ConnectionMultiplexer.TraceWithoutContext((int)readSockets[0] != 0, "Read sockets: " + (int)readSockets[0]);
ConnectionMultiplexer.TraceWithoutContext((int)errorSockets[0] != 0, "Error sockets: " + (int)errorSockets[0]);
} }
catch (Exception ex) catch (Exception ex)
{ // this typically means a socket was disposed just before { // this typically means a socket was disposed just before; just retry
Trace.WriteLine(ex.Message); Trace.WriteLine(ex.Message);
continue; continue;
} }
int totalWork = readQueue.Count + errorQueue.Count; int queueCount = (int)readSockets[0];
if (totalWork == 0) continue; lock(readQueue)
{
for (int i = 1; i <= queueCount; i++)
{
readQueue.Enqueue(readSockets[i]);
}
}
queueCount = (int)errorSockets[0];
lock (errorQueue)
{
for (int i = 1; i <= queueCount; i++)
{
errorQueue.Enqueue(errorSockets[i]);
}
}
if (totalWork >= 10) // number of sockets we should attempt to process by ourself before asking for help
if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help
{ {
// seek help, work in parallel, then synchronize // seek help, work in parallel, then synchronize
lock (QueueDrainSyncLock) lock (QueueDrainSyncLock)
...@@ -163,6 +211,21 @@ private void ReadImpl() ...@@ -163,6 +211,21 @@ private void ReadImpl()
} }
} }
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern int select([In] int ignoredParameter, [In, Out] IntPtr[] readfds, [In, Out] IntPtr[] writefds, [In, Out] IntPtr[] exceptfds, [In] ref TimeValue timeout);
[StructLayout(LayoutKind.Sequential)]
internal struct TimeValue
{
public int Seconds;
public int Microseconds;
public TimeValue(int microSeconds)
{
Seconds = (int)(microSeconds / 1000000L);
Microseconds = (int)(microSeconds % 1000000L);
}
}
internal void Shutdown(SocketToken token) internal void Shutdown(SocketToken token)
{ {
var socket = token.Socket; var socket = token.Socket;
...@@ -170,7 +233,7 @@ internal void Shutdown(SocketToken token) ...@@ -170,7 +233,7 @@ internal void Shutdown(SocketToken token)
{ {
lock (socketLookup) lock (socketLookup)
{ {
socketLookup.Remove(socket); socketLookup.Remove(socket.Handle);
} }
try { socket.Shutdown(SocketShutdown.Both); } catch { } try { socket.Shutdown(SocketShutdown.Both); } catch { }
try { socket.Close(); } catch { } try { socket.Close(); } catch { }
...@@ -195,26 +258,25 @@ private void ProcessItems() ...@@ -195,26 +258,25 @@ private void ProcessItems()
ProcessItems(socketLookup, errorQueue, CallbackOperation.Error); ProcessItems(socketLookup, errorQueue, CallbackOperation.Error);
} }
private static void ProcessItems(Dictionary<Socket, ISocketCallback> socketLookup, List<Socket> list, CallbackOperation operation) private static void ProcessItems(Dictionary<IntPtr, SocketPair> socketLookup, Queue<IntPtr> queue, CallbackOperation operation)
{ {
if (list == null) return; if (queue == null) return;
while (true) while (true)
{ {
// get the next item (note we could be competing with a worker here, hence lock) // get the next item (note we could be competing with a worker here, hence lock)
Socket socket; IntPtr handle;
lock (list) lock (queue)
{ {
int index = list.Count - 1; if (queue.Count == 0) break;
if (index < 0) break; handle = queue.Dequeue();
socket = list[index];
list.RemoveAt(index); // note: removing from end to avoid moving everything
} }
ISocketCallback callback; SocketPair pair;
lock (socketLookup) lock (socketLookup)
{ {
if (!socketLookup.TryGetValue(socket, out callback)) callback = null; if (!socketLookup.TryGetValue(handle, out pair)) continue;
} }
var callback = pair.Callback;
if (callback != null) if (callback != null)
{ {
#if VERBOSE #if VERBOSE
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment