Commit 1bc10e16 authored by mgravell's avatar mgravell

track the position in the queue that an item was *when added*

parent a6c962ca
...@@ -211,6 +211,7 @@ void add(string lk, string sk, string v) ...@@ -211,6 +211,7 @@ void add(string lk, string sk, string v)
add("Timeout", "timeout", Format.ToString(mutiplexer.TimeoutMilliseconds)); add("Timeout", "timeout", Format.ToString(mutiplexer.TimeoutMilliseconds));
try try
{ {
if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta)) if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{ {
add("PhysicalState", "phys", state.ToString()); add("PhysicalState", "phys", state.ToString());
......
...@@ -51,6 +51,7 @@ protected override void WriteImpl(PhysicalConnection physical) ...@@ -51,6 +51,7 @@ protected override void WriteImpl(PhysicalConnection physical)
internal abstract class Message : ICompletable internal abstract class Message : ICompletable
{ {
public readonly int Db; public readonly int Db;
internal int QueuePosition { get; set; }
internal const CommandFlags InternalCallFlag = (CommandFlags)128; internal const CommandFlags InternalCallFlag = (CommandFlags)128;
...@@ -615,6 +616,7 @@ internal bool TrySetResult<T>(T value) ...@@ -615,6 +616,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection) internal void SetEnqueued(PhysicalConnection connection)
{ {
QueuePosition = -1;
SetWriteTime(); SetWriteTime();
performance?.SetEnqueued(); performance?.SetEnqueued();
_enqueuedTo = connection; _enqueuedTo = connection;
......
...@@ -137,11 +137,12 @@ private WriteResult QueueOrFailMessage(Message message) ...@@ -137,11 +137,12 @@ private WriteResult QueueOrFailMessage(Message message)
{ {
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
message.SetEnqueued(null);
lock (_backlog) lock (_backlog)
{ {
message.QueuePosition = _backlog.Count;
_backlog.Enqueue(message); _backlog.Enqueue(message);
} }
message.SetEnqueued(null);
return WriteResult.Success; // we'll take it... return WriteResult.Success; // we'll take it...
} }
else else
...@@ -723,9 +724,11 @@ private bool PushToBacklog(Message message, bool onlyIfExists) ...@@ -723,9 +724,11 @@ private bool PushToBacklog(Message message, bool onlyIfExists)
bool wasEmpty; bool wasEmpty;
lock (_backlog) lock (_backlog)
{ {
wasEmpty = _backlog.Count == 0; int count = _backlog.Count;
wasEmpty = count == 0;
if (wasEmpty & onlyIfExists) return false; if (wasEmpty & onlyIfExists) return false;
message.QueuePosition = count;
_backlog.Enqueue(message); _backlog.Enqueue(message);
} }
if (wasEmpty) StartBacklogProcessor(); if (wasEmpty) StartBacklogProcessor();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment