Commit b1d6d788 authored by Marc Gravell's avatar Marc Gravell

Kill pendingCount; redundant - risks being incorrect

parent d4be9a3b
...@@ -45,11 +45,12 @@ public Message PeekPing(out int queueLength) ...@@ -45,11 +45,12 @@ public Message PeekPing(out int queueLength)
return null; return null;
} }
public void Push(Message message) public bool Push(Message message)
{ {
lock (regular) lock (regular)
{ {
(message.IsHighPriority ? high : regular).Enqueue(message); (message.IsHighPriority ? high : regular).Enqueue(message);
return high.Count + regular.Count == 1;
} }
} }
......
...@@ -25,7 +25,6 @@ private static readonly Message ...@@ -25,7 +25,6 @@ private static readonly Message
volatile bool isDisposed; volatile bool isDisposed;
//private volatile int missedHeartbeats; //private volatile int missedHeartbeats;
private long operationCount, socketCount; private long operationCount, socketCount;
private int pendingCount;
private volatile PhysicalConnection physical; private volatile PhysicalConnection physical;
...@@ -116,7 +115,6 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -116,7 +115,6 @@ public bool TryEnqueue(Message message, bool isSlave)
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
queue.Push(message); queue.Push(message);
Interlocked.Increment(ref pendingCount);
return true; return true;
} }
else else
...@@ -126,12 +124,11 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -126,12 +124,11 @@ public bool TryEnqueue(Message message, bool isSlave)
} }
} }
queue.Push(message); bool reqWrite = queue.Push(message);
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
int newPendingCount = Interlocked.Increment(ref pendingCount); Trace("Now pending: " + GetPendingCount());
Trace("Now pending: " + newPendingCount);
if (newPendingCount == 1) if (reqWrite)
{ {
multiplexer.RequestWrite(this, false); multiplexer.RequestWrite(this, false);
} }
...@@ -336,7 +333,7 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -336,7 +333,7 @@ internal void OnFullyEstablished(PhysicalConnection connection)
} }
internal int GetPendingCount() internal int GetPendingCount()
{ {
return Thread.VolatileRead(ref pendingCount); return queue.Count();
} }
internal void OnHeartbeat() internal void OnHeartbeat()
{ {
...@@ -358,8 +355,8 @@ internal void OnHeartbeat() ...@@ -358,8 +355,8 @@ internal void OnHeartbeat()
var tmp = physical; var tmp = physical;
if (tmp != null) if (tmp != null)
{ {
int writeEvery = serverEndPoint.WriteEverySeconds; int writeEverySeconds = serverEndPoint.WriteEverySeconds;
if (writeEvery > 0 && tmp.LastWriteSecondsAgo >= writeEvery && Thread.VolatileRead(ref pendingCount) == 0) if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds)
{ {
Trace("OnHeartbeat - overdue"); Trace("OnHeartbeat - overdue");
if (state == (int)State.ConnectedEstablished) if (state == (int)State.ConnectedEstablished)
...@@ -417,16 +414,16 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave) ...@@ -417,16 +414,16 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
{ {
return false; return false;
} }
bool reqWrite = false;
foreach(var message in messages) foreach(var message in messages)
{ // deliberately not taking a single lock here; we don't care if { // deliberately not taking a single lock here; we don't care if
// other threads manage to interleave - in fact, it would be desirable // other threads manage to interleave - in fact, it would be desirable
// (to avoid a batch monopolising the connection) // (to avoid a batch monopolising the connection)
queue.Push(message); if (queue.Push(message)) reqWrite = true;
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
} }
int newPendingCount = Interlocked.Add(ref pendingCount, messages.Count); Trace("Now pending: " + GetPendingCount());
Trace("Now pending: " + newPendingCount); if(reqWrite) // was empty before
if(newPendingCount == messages.Count) // was empty before
{ {
multiplexer.RequestWrite(this, false); multiplexer.RequestWrite(this, false);
} }
...@@ -679,8 +676,8 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -679,8 +676,8 @@ internal WriteResult WriteQueue(int maxWork)
return WriteResult.QueueEmpty; return WriteResult.QueueEmpty;
} }
last = next; last = next;
var newPendingCount = Interlocked.Decrement(ref pendingCount);
Trace("Now pending: " + newPendingCount); Trace("Now pending: " + GetPendingCount());
WriteMessageDirect(conn, next); WriteMessageDirect(conn, next);
count++; count++;
if (maxWork > 0 && count >= maxWork) if (maxWork > 0 && count >= maxWork)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment