Commit 91415c0e authored by Marc Gravell's avatar Marc Gravell

Move writer queue into the SocketManager

parent 187094f9
@..\packages\Redis-64.2.6.12.1\tools\redis-server.exe master.conf
@..\packages\Redis-64.2.6.12.1\tools\redis-server.exe slave.conf
@..\packages\Redis-64.2.6.12.1\tools\redis-server.exe secure.conf
\ No newline at end of file
......@@ -39,6 +39,7 @@ public void ShutdownRaisesConnectionFailedAndRestore()
Assert.AreEqual(0, Interlocked.CompareExchange(ref failed, 0, 0));
Assert.AreEqual(0, Interlocked.CompareExchange(ref restored, 0, 0));
#if DEBUG
conn.AllowConnect = false;
var server = (IRedisServerDebug)conn.GetServer(PrimaryServer, PrimaryPort);
......@@ -54,6 +55,7 @@ public void ShutdownRaisesConnectionFailedAndRestore()
Thread.Sleep(1500);
Assert.AreEqual(2, Interlocked.CompareExchange(ref failed, 0, 0), "failed");
Assert.AreEqual(2, Interlocked.CompareExchange(ref restored, 0, 0), "restored");
#endif
watch.Stop();
}
......
......@@ -24,6 +24,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Redis Configs", "Redis Conf
Redis Configs\redis-cli master.cmd = Redis Configs\redis-cli master.cmd
Redis Configs\redis-cli secure.cmd = Redis Configs\redis-cli secure.cmd
Redis Configs\redis-cli slave.cmd = Redis Configs\redis-cli slave.cmd
Redis Configs\redis-server alll local.cmd = Redis Configs\redis-server alll local.cmd
Redis Configs\redis-server master.cmd = Redis Configs\redis-server master.cmd
Redis Configs\redis-server secure.cmd = Redis Configs\redis-server secure.cmd
Redis Configs\redis-server slave.cmd = Redis Configs\redis-server slave.cmd
......
......@@ -14,139 +14,26 @@ partial class ConnectionMultiplexer
{
this.ownsSocketManager = configuration.SocketManager == null;
this.socketManager = configuration.SocketManager ?? new SocketManager(configuration.ClientName);
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
Thread dedicatedWriter = new Thread(writeAllQueues);
dedicatedWriter.Name = socketManager.Name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed
}
partial void OnCloseReaderWriter()
{
lock (writeQueue)
{ // make sure writer threads know to exit
Monitor.PulseAll(writeQueue);
}
if (ownsSocketManager) socketManager.Dispose();
socketManager = null;
}
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (bridge == null) return;
Trace("Requesting write: " + bridge.Name);
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
var tmp = SocketManager;
if (tmp != null)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
ThreadPool.QueueUserWorkItem(writeOneQueue, this);
}
}
Trace("Requesting write: " + bridge.Name);
tmp.RequestWrite(bridge, forced);
}
}
partial void OnWriterCreated();
private static readonly ParameterizedThreadStart writeAllQueues = context =>
{
try { ((ConnectionMultiplexer)context).WriteAllQueues(); } catch { }
};
private static readonly WaitCallback writeOneQueue = context =>
{
try { ((ConnectionMultiplexer)context).WriteOneQueue(); } catch { }
};
private void WriteAllQueues()
{
OnWriterCreated();
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue, 500);
continue;
}
bridge = writeQueue.Dequeue();
}
switch (bridge.WriteQueue(200))
{
case WriteResult.MoreWork:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.QueueEmpty:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
}
private void WriteOneQueue()
{
OnWriterCreated();
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return;
bool keepGoing;
do
{
switch (bridge.WriteQueue(-1))
{
case WriteResult.MoreWork:
keepGoing = true;
break;
case WriteResult.QueueEmpty:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
//private void Read()
//{
// throw new NotImplementedException();
//}
}
}
......@@ -187,16 +187,6 @@ public static long GetAsyncCompletionWorkerCount()
/// </summary>
public bool AllowConnect { get { return allowConnect; } set { allowConnect = value; } }
private volatile bool allowConnect = true;
static long writerCount;
partial void OnWriterCreated()
{
Interlocked.Increment(ref writerCount);
}
internal static long GetWriterCount()
{
return Interlocked.Read(ref writerCount);
}
}
partial class MessageQueue
......
......@@ -25,6 +25,14 @@ public SocketManager(string name = null)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
this.name = name;
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
Thread dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = ThreadPriority.AboveNormal; // time critical
dedicatedWriter.Name = name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed
}
/// <summary>
......@@ -71,6 +79,25 @@ private void AddRead(Socket socket, ISocketCallback callback)
}
}
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
ThreadPool.QueueUserWorkItem(writeOneQueue, this);
}
}
}
}
private void StartReader()
{
var thread = new Thread(read, 32 * 1024); // don't need a huge stack
......@@ -315,6 +342,10 @@ private enum CallbackOperation
/// </summary>
public void Dispose()
{
lock (writeQueue)
{ // make sure writer threads know to exit
Monitor.PulseAll(writeQueue);
}
lock (socketLookup)
{
isDisposed = true;
......@@ -323,6 +354,93 @@ public void Dispose()
}
}
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
private static readonly ParameterizedThreadStart writeAllQueues = context =>
{
try { ((SocketManager)context).WriteAllQueues(); } catch { }
};
private static readonly WaitCallback writeOneQueue = context =>
{
try { ((SocketManager)context).WriteOneQueue(); } catch { }
};
private void WriteAllQueues()
{
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue, 500);
continue;
}
bridge = writeQueue.Dequeue();
}
switch (bridge.WriteQueue(200))
{
case WriteResult.MoreWork:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.QueueEmpty:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
}
private void WriteOneQueue()
{
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return;
bool keepGoing;
do
{
switch (bridge.WriteQueue(-1))
{
case WriteResult.MoreWork:
keepGoing = true;
break;
case WriteResult.QueueEmpty:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment