Commit ce3785ec authored by Marc Gravell's avatar Marc Gravell

Check for available data even if socket-poll is failing to report data

parent aa4e6f69
......@@ -931,6 +931,15 @@ void ISocketCallback.Read()
Interlocked.Decrement(ref haveReader);
}
}
bool ISocketCallback.IsDataAvailable
{
get
{
try { return socketToken.Available > 0; }
catch { return false; }
}
}
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
......
......@@ -27,7 +27,7 @@ partial class SocketManager
private static ParameterizedThreadStart read = state => ((SocketManager)state).Read();
readonly Queue<IntPtr> readQueue = new Queue<IntPtr>(), errorQueue = new Queue<IntPtr>();
readonly Queue<ISocketCallback> readQueue = new Queue<ISocketCallback>(), errorQueue = new Queue<ISocketCallback>();
private readonly Dictionary<IntPtr, SocketPair> socketLookup = new Dictionary<IntPtr, SocketPair>();
......@@ -37,25 +37,19 @@ partial class SocketManager
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern int select([In] int ignoredParameter, [In, Out] IntPtr[] readfds, [In, Out] IntPtr[] writefds, [In, Out] IntPtr[] exceptfds, [In] ref TimeValue timeout);
private static void ProcessItems(Dictionary<IntPtr, SocketPair> socketLookup, Queue<IntPtr> queue, CallbackOperation operation)
private static void ProcessItems(Queue<ISocketCallback> queue, CallbackOperation operation)
{
if (queue == null) return;
while (true)
{
// get the next item (note we could be competing with a worker here, hence lock)
IntPtr handle;
ISocketCallback callback;
lock (queue)
{
if (queue.Count == 0) break;
handle = queue.Dequeue();
callback = queue.Dequeue();
}
SocketPair pair;
lock (socketLookup)
{
if (!socketLookup.TryGetValue(handle, out pair)) continue;
}
var callback = pair.Callback;
if (callback != null)
{
try
......@@ -116,9 +110,9 @@ private void OnAddRead(Socket socket, ISocketCallback callback)
private void ProcessItems(bool setState)
{
if(setState) managerState = ManagerState.ProcessReadQueue;
ProcessItems(socketLookup, readQueue, CallbackOperation.Read);
ProcessItems(readQueue, CallbackOperation.Read);
if (setState) managerState = ManagerState.ProcessErrorQueue;
ProcessItems(socketLookup, errorQueue, CallbackOperation.Error);
ProcessItems(errorQueue, CallbackOperation.Error);
}
private void Read()
{
......@@ -166,6 +160,7 @@ internal enum ManagerState
CheckForStaleConnections,
EnqueueRead,
EnqueueError,
EnqueueReadFallback,
RequestAssistance,
ProcessQueues,
ProcessReadQueue,
......@@ -183,6 +178,14 @@ internal string LastErrorTimeRelative()
if (tmp == 0) return "never";
return unchecked(Environment.TickCount - tmp) + "ms ago";
}
private ISocketCallback GetCallback(IntPtr key)
{
lock(socketLookup)
{
SocketPair pair;
return socketLookup.TryGetValue(key, out pair) ? pair.Callback : null;
}
}
private void ReadImpl()
{
List<IntPtr> dead = null, active = new List<IntPtr>();
......@@ -277,23 +280,33 @@ private void ReadImpl()
managerState = ManagerState.ExecuteSelect;
ready = select(0, readSockets, null, errorSockets, ref timeout);
managerState = ManagerState.ExecuteSelectComplete;
if (ready <= 0)
if (ready <= 0) // -ve typically means a socket was disposed just before; just retry
{
bool hasWorkToDo = false;
if (ready == 0)
{
managerState = ManagerState.CheckForStaleConnections;
foreach (var s in activeCallbacks)
{
s.CheckForStaleConnection();
if (s.IsDataAvailable)
{
hasWorkToDo = true;
}
else
{
s.CheckForStaleConnection();
}
}
}
else
{
lastErrorTicks = Environment.TickCount;
}
continue; // -ve typically means a socket was disposed just before; just retry
if (!hasWorkToDo)
{
continue;
}
}
ConnectionMultiplexer.TraceWithoutContext((int)readSockets[0] != 0, "Read sockets: " + (int)readSockets[0]);
ConnectionMultiplexer.TraceWithoutContext((int)errorSockets[0] != 0, "Error sockets: " + (int)errorSockets[0]);
}
......@@ -303,6 +316,7 @@ private void ReadImpl()
continue;
}
bool haveWork = false;
int queueCount = (int)readSockets[0];
if (queueCount != 0)
{
......@@ -311,7 +325,12 @@ private void ReadImpl()
{
for (int i = 1; i <= queueCount; i++)
{
readQueue.Enqueue(readSockets[i]);
var callback = GetCallback(readSockets[i]);
if (callback != null)
{
readQueue.Enqueue(callback);
haveWork = true;
}
}
}
}
......@@ -323,10 +342,31 @@ private void ReadImpl()
{
for (int i = 1; i <= queueCount; i++)
{
errorQueue.Enqueue(errorSockets[i]);
var callback = GetCallback(errorSockets[i]);
if (callback != null)
{
errorQueue.Enqueue(callback);
haveWork = true;
}
}
}
}
if(!haveWork)
{
// edge case: select is returning 0, but data could still be available
managerState = ManagerState.EnqueueReadFallback;
lock (readQueue)
{
foreach (var callback in activeCallbacks)
{
if(callback.IsDataAvailable)
{
readQueue.Enqueue(callback);
}
}
}
}
if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help
{
......
......@@ -41,6 +41,8 @@ internal partial interface ISocketCallback
// check for write-read timeout
void CheckForStaleConnection();
bool IsDataAvailable { get; }
}
internal struct SocketToken
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment