Commit 0190e1a0 authored by Marc Gravell's avatar Marc Gravell

track what the SocketManager is doing at all times; this could be relevent to timeouts

parent 50547eba
...@@ -1766,9 +1766,11 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -1766,9 +1766,11 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
else else
{ {
int inst, qu, qs, qc, wr, wq, @in, ar; int inst, qu, qs, qc, wr, wq, @in, ar;
var mgrState = socketManager.State;
int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar); int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar);
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey) var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey)
.Append(", inst: ").Append(inst) .Append(", inst: ").Append(inst)
.Append(", mgr: ").Append(mgrState)
.Append(", queue: ").Append(queue).Append(", qu=").Append(qu) .Append(", queue: ").Append(queue).Append(", qu=").Append(qu)
.Append(", qs=").Append(qs).Append(", qc=").Append(qc) .Append(", qs=").Append(qs).Append(", qc=").Append(qc)
.Append(", wr=").Append(wr).Append("/").Append(wq) .Append(", wr=").Append(wr).Append("/").Append(wq)
......
...@@ -20,7 +20,7 @@ partial class SocketManager ...@@ -20,7 +20,7 @@ partial class SocketManager
if (qdsl != null && qdsl.Consume()) if (qdsl != null && qdsl.Consume())
{ {
var mgr = qdsl.Manager; var mgr = qdsl.Manager;
mgr.ProcessItems(); mgr.ProcessItems(false);
qdsl.Pulse(); qdsl.Pulse();
} }
}; };
...@@ -94,6 +94,7 @@ private void OnAddRead(Socket socket, ISocketCallback callback) ...@@ -94,6 +94,7 @@ private void OnAddRead(Socket socket, ISocketCallback callback)
} }
} }
} }
partial void OnDispose() partial void OnDispose()
{ {
lock (socketLookup) lock (socketLookup)
...@@ -112,9 +113,11 @@ private void OnAddRead(Socket socket, ISocketCallback callback) ...@@ -112,9 +113,11 @@ private void OnAddRead(Socket socket, ISocketCallback callback)
} }
} }
private void ProcessItems() private void ProcessItems(bool setState)
{ {
if(setState) managerState = ManagerState.ProcessReadQueue;
ProcessItems(socketLookup, readQueue, CallbackOperation.Read); ProcessItems(socketLookup, readQueue, CallbackOperation.Read);
if (setState) managerState = ManagerState.ProcessErrorQueue;
ProcessItems(socketLookup, errorQueue, CallbackOperation.Error); ProcessItems(socketLookup, errorQueue, CallbackOperation.Error);
} }
private void Read() private void Read()
...@@ -123,10 +126,19 @@ private void Read() ...@@ -123,10 +126,19 @@ private void Read()
try try
{ {
weAreReader = Interlocked.CompareExchange(ref readerCount, 1, 0) == 0; weAreReader = Interlocked.CompareExchange(ref readerCount, 1, 0) == 0;
if (weAreReader) ReadImpl(); if (weAreReader)
{
managerState = ManagerState.Preparing;
ReadImpl();
managerState = ManagerState.Inactive;
}
} }
catch (Exception ex) catch (Exception ex)
{ {
if (weAreReader)
{
managerState = ManagerState.Faulted;
}
Debug.WriteLine(ex); Debug.WriteLine(ex);
Trace.WriteLine(ex); Trace.WriteLine(ex);
} }
...@@ -135,6 +147,33 @@ private void Read() ...@@ -135,6 +147,33 @@ private void Read()
if (weAreReader) Interlocked.Exchange(ref readerCount, 0); if (weAreReader) Interlocked.Exchange(ref readerCount, 0);
} }
} }
internal enum ManagerState
{
Inactive,
Preparing,
Faulted,
CheckForHeartbeat,
ExecuteHeartbeat,
LocateActiveSockets,
NoSocketsPause,
PrepareActiveSockets,
CullDeadSockets,
NoActiveSocketsPause,
GrowingSocketArray,
CopyingPointersForSelect,
ExecuteSelect,
EnqueueRead,
EnqueueError,
RequestAssistance,
ProcessQueues,
ProcessReadQueue,
ProcessErrorQueue,
}
internal ManagerState State
{
get { return managerState; }
}
private volatile ManagerState managerState;
private void ReadImpl() private void ReadImpl()
{ {
...@@ -144,6 +183,7 @@ private void ReadImpl() ...@@ -144,6 +183,7 @@ private void ReadImpl()
SocketPair[] allSocketPairs = null; SocketPair[] allSocketPairs = null;
while (true) while (true)
{ {
managerState = ManagerState.CheckForHeartbeat;
active.Clear(); active.Clear();
if (dead != null) dead.Clear(); if (dead != null) dead.Clear();
...@@ -152,6 +192,7 @@ private void ReadImpl() ...@@ -152,6 +192,7 @@ private void ReadImpl()
long now = Environment.TickCount; long now = Environment.TickCount;
if (unchecked(now - lastHeartbeat) >= 15000) if (unchecked(now - lastHeartbeat) >= 15000)
{ {
managerState = ManagerState.ExecuteHeartbeat;
lastHeartbeat = now; lastHeartbeat = now;
lock (socketLookup) lock (socketLookup)
{ {
...@@ -166,7 +207,7 @@ private void ReadImpl() ...@@ -166,7 +207,7 @@ private void ReadImpl()
} }
} }
managerState = ManagerState.LocateActiveSockets;
lock (socketLookup) lock (socketLookup)
{ {
if (isDisposed) return; if (isDisposed) return;
...@@ -174,9 +215,11 @@ private void ReadImpl() ...@@ -174,9 +215,11 @@ private void ReadImpl()
if (socketLookup.Count == 0) if (socketLookup.Count == 0)
{ {
// if empty, give it a few seconds chance before exiting // if empty, give it a few seconds chance before exiting
managerState = ManagerState.NoSocketsPause;
Monitor.Wait(socketLookup, TimeSpan.FromSeconds(20)); Monitor.Wait(socketLookup, TimeSpan.FromSeconds(20));
if (socketLookup.Count == 0) return; // nothing new came in, so exit if (socketLookup.Count == 0) return; // nothing new came in, so exit
} }
managerState = ManagerState.PrepareActiveSockets;
foreach (var pair in socketLookup) foreach (var pair in socketLookup)
{ {
var socket = pair.Value.Socket; var socket = pair.Value.Socket;
...@@ -192,6 +235,7 @@ private void ReadImpl() ...@@ -192,6 +235,7 @@ private void ReadImpl()
} }
if (dead != null && dead.Count != 0) if (dead != null && dead.Count != 0)
{ {
managerState = ManagerState.CullDeadSockets;
foreach (var socket in dead) socketLookup.Remove(socket); foreach (var socket in dead) socketLookup.Remove(socket);
} }
} }
...@@ -199,16 +243,19 @@ private void ReadImpl() ...@@ -199,16 +243,19 @@ private void ReadImpl()
if (pollingSockets == 0) if (pollingSockets == 0)
{ {
// nobody had actual sockets; just sleep // nobody had actual sockets; just sleep
managerState = ManagerState.NoActiveSocketsPause;
Thread.Sleep(10); Thread.Sleep(10);
continue; continue;
} }
if (readSockets.Length < active.Count + 1) if (readSockets.Length < active.Count + 1)
{ {
managerState = ManagerState.GrowingSocketArray;
ConnectionMultiplexer.TraceWithoutContext("Resizing socket array for " + active.Count + " sockets"); ConnectionMultiplexer.TraceWithoutContext("Resizing socket array for " + active.Count + " sockets");
readSockets = new IntPtr[active.Count + 6]; // leave so space for growth readSockets = new IntPtr[active.Count + 6]; // leave so space for growth
errorSockets = new IntPtr[active.Count + 6]; errorSockets = new IntPtr[active.Count + 6];
} }
managerState = ManagerState.CopyingPointersForSelect;
readSockets[0] = errorSockets[0] = (IntPtr)active.Count; readSockets[0] = errorSockets[0] = (IntPtr)active.Count;
active.CopyTo(readSockets, 1); active.CopyTo(readSockets, 1);
active.CopyTo(errorSockets, 1); active.CopyTo(errorSockets, 1);
...@@ -216,6 +263,7 @@ private void ReadImpl() ...@@ -216,6 +263,7 @@ private void ReadImpl()
try try
{ {
var timeout = new TimeValue(1000); var timeout = new TimeValue(1000);
managerState = ManagerState.ExecuteSelect;
ready = select(0, readSockets, null, errorSockets, ref timeout); ready = select(0, readSockets, null, errorSockets, ref timeout);
if (ready <= 0) if (ready <= 0)
{ {
...@@ -232,31 +280,40 @@ private void ReadImpl() ...@@ -232,31 +280,40 @@ private void ReadImpl()
} }
int queueCount = (int)readSockets[0]; int queueCount = (int)readSockets[0];
lock (readQueue) if (queueCount != 0)
{ {
for (int i = 1; i <= queueCount; i++) managerState = ManagerState.EnqueueRead;
lock (readQueue)
{ {
readQueue.Enqueue(readSockets[i]); for (int i = 1; i <= queueCount; i++)
{
readQueue.Enqueue(readSockets[i]);
}
} }
} }
queueCount = (int)errorSockets[0]; queueCount = (int)errorSockets[0];
lock (errorQueue) if (queueCount != 0)
{ {
for (int i = 1; i <= queueCount; i++) managerState = ManagerState.EnqueueError;
lock (errorQueue)
{ {
errorQueue.Enqueue(errorSockets[i]); for (int i = 1; i <= queueCount; i++)
{
errorQueue.Enqueue(errorSockets[i]);
}
} }
} }
if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help if (ready >= 5) // number of sockets we should attempt to process by ourself before asking for help
{ {
// seek help, work in parallel, then synchronize // seek help, work in parallel, then synchronize
var obj = new QueueDrainSyncLock(this); var obj = new QueueDrainSyncLock(this);
lock (obj) lock (obj)
{ {
managerState = ManagerState.RequestAssistance;
ThreadPool.QueueUserWorkItem(HelpProcessItems, obj); ThreadPool.QueueUserWorkItem(HelpProcessItems, obj);
ProcessItems(); managerState = ManagerState.ProcessQueues;
ProcessItems(true);
if (!obj.Consume()) if (!obj.Consume())
{ // then our worker arrived and picked up work; we need { // then our worker arrived and picked up work; we need
// to let it finish; note that if it *didn't* get that far // to let it finish; note that if it *didn't* get that far
...@@ -268,7 +325,8 @@ private void ReadImpl() ...@@ -268,7 +325,8 @@ private void ReadImpl()
else else
{ {
// just do it ourself // just do it ourself
ProcessItems(); managerState = ManagerState.ProcessQueues;
ProcessItems(true);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment