Commit 67cbc1a3 authored by Marc Gravell's avatar Marc Gravell

Only flush the output on a completely no-op call to WriteQueue; if work is...

Only flush the output on a completely no-op call to WriteQueue; if work is done, but it back in in the work-queue and retry; this adds a tiny tiny delay, but is enough to help reduce packet fragmentation from high-perf producers
parent d5ca30fa
...@@ -10,7 +10,8 @@ namespace StackExchange.Redis ...@@ -10,7 +10,8 @@ namespace StackExchange.Redis
{ {
enum WriteResult enum WriteResult
{ {
QueueEmpty, QueueEmptyAfterWrite,
NothingToDo,
MoreWork, MoreWork,
CompetingWriter, CompetingWriter,
NoConnection, NoConnection,
...@@ -529,17 +530,20 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -529,17 +530,20 @@ internal WriteResult WriteQueue(int maxWork)
return WriteResult.NoConnection; return WriteResult.NoConnection;
} }
int count = 0;
Message last = null; Message last = null;
int count = 0;
while (true) while (true)
{ {
var next = queue.Dequeue(); var next = queue.Dequeue();
if (next == null) if (next == null)
{ {
Trace("Nothing to write; exiting"); Trace("Nothing to write; exiting");
Trace(last != null, "Flushed up to: " + last); if(count == 0)
conn.Flush(); {
return WriteResult.QueueEmpty; conn.Flush(); // only flush on an empty run
return WriteResult.NothingToDo;
}
return WriteResult.QueueEmptyAfterWrite;
} }
last = next; last = next;
...@@ -562,7 +566,11 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -562,7 +566,11 @@ internal WriteResult WriteQueue(int maxWork)
} }
catch (IOException ex) catch (IOException ex)
{ {
if (conn != null) conn.RecordConnectionFailed(ConnectionFailureType.SocketFailure, ex); if (conn != null)
{
conn.RecordConnectionFailed(ConnectionFailureType.SocketFailure, ex);
conn = null;
}
AbortUnsent(); AbortUnsent();
} }
catch (Exception ex) catch (Exception ex)
...@@ -578,7 +586,7 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -578,7 +586,7 @@ internal WriteResult WriteQueue(int maxWork)
Trace("Exiting writer"); Trace("Exiting writer");
} }
} }
return queue.Any() ? WriteResult.MoreWork : WriteResult.QueueEmpty; return queue.Any() ? WriteResult.MoreWork : WriteResult.QueueEmptyAfterWrite;
} }
private void AbortUnsent() private void AbortUnsent()
......
...@@ -12,14 +12,6 @@ ...@@ -12,14 +12,6 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
internal enum WorkState
{
Pending,
Failed,
NothingToDo,
Disconnected,
HasWork
}
internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
{ {
......
...@@ -6,7 +6,7 @@ partial class SocketManager ...@@ -6,7 +6,7 @@ partial class SocketManager
{ {
internal const SocketMode DefaultSocketMode = SocketMode.Async; internal const SocketMode DefaultSocketMode = SocketMode.Async;
partial void OnAddRead(System.Net.Sockets.Socket socket, ISocketCallback callback) private void OnAddRead(System.Net.Sockets.Socket socket, ISocketCallback callback)
{ {
throw new System.NotSupportedException(); throw new System.NotSupportedException();
} }
......
...@@ -71,7 +71,7 @@ private static void ProcessItems(Dictionary<IntPtr, SocketPair> socketLookup, Qu ...@@ -71,7 +71,7 @@ private static void ProcessItems(Dictionary<IntPtr, SocketPair> socketLookup, Qu
} }
} }
partial void OnAddRead(Socket socket, ISocketCallback callback) private void OnAddRead(Socket socket, ISocketCallback callback)
{ {
if (socket == null) throw new ArgumentNullException("socket"); if (socket == null) throw new ArgumentNullException("socket");
if (callback == null) throw new ArgumentNullException("callback"); if (callback == null) throw new ArgumentNullException("callback");
......
...@@ -191,10 +191,6 @@ private void EndConnect(IAsyncResult ar) ...@@ -191,10 +191,6 @@ private void EndConnect(IAsyncResult ar)
} }
} }
/// <summary>
/// Adds a new socket and callback to the manager
/// </summary>
partial void OnAddRead(Socket socket, ISocketCallback callback);
partial void OnDispose(); partial void OnDispose();
partial void OnShutdown(Socket socket); partial void OnShutdown(Socket socket);
...@@ -229,6 +225,7 @@ private void WriteAllQueues() ...@@ -229,6 +225,7 @@ private void WriteAllQueues()
switch (bridge.WriteQueue(200)) switch (bridge.WriteQueue(200))
{ {
case WriteResult.MoreWork: case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
// back of the line! // back of the line!
lock (writeQueue) lock (writeQueue)
{ {
...@@ -240,7 +237,7 @@ private void WriteAllQueues() ...@@ -240,7 +237,7 @@ private void WriteAllQueues()
case WriteResult.NoConnection: case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0); Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break; break;
case WriteResult.QueueEmpty: case WriteResult.NothingToDo:
if (!bridge.ConfirmRemoveFromWriteQueue()) if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line! { // more snuck in; back of the line!
lock (writeQueue) lock (writeQueue)
...@@ -267,9 +264,10 @@ private void WriteOneQueue() ...@@ -267,9 +264,10 @@ private void WriteOneQueue()
switch (bridge.WriteQueue(-1)) switch (bridge.WriteQueue(-1))
{ {
case WriteResult.MoreWork: case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
keepGoing = true; keepGoing = true;
break; break;
case WriteResult.QueueEmpty: case WriteResult.NothingToDo:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue(); keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break; break;
case WriteResult.CompetingWriter: case WriteResult.CompetingWriter:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment