Commit 73d087d6 authored by mgravell's avatar mgravell

on message, track what the *physical* is doing when we add it to backlog

parent 1bc10e16
...@@ -212,6 +212,7 @@ void add(string lk, string sk, string v) ...@@ -212,6 +212,7 @@ void add(string lk, string sk, string v)
try try
{ {
if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue
if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta)) if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{ {
add("PhysicalState", "phys", state.ToString()); add("PhysicalState", "phys", state.ToString());
......
...@@ -51,7 +51,14 @@ protected override void WriteImpl(PhysicalConnection physical) ...@@ -51,7 +51,14 @@ protected override void WriteImpl(PhysicalConnection physical)
internal abstract class Message : ICompletable internal abstract class Message : ICompletable
{ {
public readonly int Db; public readonly int Db;
internal int QueuePosition { get; set; } internal int QueuePosition { get; private set; }
internal PhysicalConnection.WriteStatus ConnectionWriteState { get; private set; }
internal void SetBacklogState(int position, PhysicalConnection physical)
{
QueuePosition = position;
ConnectionWriteState = physical?.Status ?? (PhysicalConnection.WriteStatus)(-1);
}
internal const CommandFlags InternalCallFlag = (CommandFlags)128; internal const CommandFlags InternalCallFlag = (CommandFlags)128;
...@@ -617,6 +624,7 @@ internal bool TrySetResult<T>(T value) ...@@ -617,6 +624,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection) internal void SetEnqueued(PhysicalConnection connection)
{ {
QueuePosition = -1; QueuePosition = -1;
ConnectionWriteState = (PhysicalConnection.WriteStatus)(-1);
SetWriteTime(); SetWriteTime();
performance?.SetEnqueued(); performance?.SetEnqueued();
_enqueuedTo = connection; _enqueuedTo = connection;
......
...@@ -140,7 +140,7 @@ private WriteResult QueueOrFailMessage(Message message) ...@@ -140,7 +140,7 @@ private WriteResult QueueOrFailMessage(Message message)
message.SetEnqueued(null); message.SetEnqueued(null);
lock (_backlog) lock (_backlog)
{ {
message.QueuePosition = _backlog.Count; message.SetBacklogState(_backlog.Count, null);
_backlog.Enqueue(message); _backlog.Enqueue(message);
} }
return WriteResult.Success; // we'll take it... return WriteResult.Success; // we'll take it...
...@@ -728,7 +728,7 @@ private bool PushToBacklog(Message message, bool onlyIfExists) ...@@ -728,7 +728,7 @@ private bool PushToBacklog(Message message, bool onlyIfExists)
wasEmpty = count == 0; wasEmpty = count == 0;
if (wasEmpty & onlyIfExists) return false; if (wasEmpty & onlyIfExists) return false;
message.QueuePosition = count; message.SetBacklogState(count, physical);
_backlog.Enqueue(message); _backlog.Enqueue(message);
} }
if (wasEmpty) StartBacklogProcessor(); if (wasEmpty) StartBacklogProcessor();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment