Commit 4a0d3538 authored by mgravell's avatar mgravell

add tracking for "now" and "next" messages when recording message timeouts

parent 23ff71b4
...@@ -217,7 +217,7 @@ void add(string lk, string sk, string v) ...@@ -217,7 +217,7 @@ void add(string lk, string sk, string v)
if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue
if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue
#endif #endif
if (message.TryGetPhysicalState(out var ws, out var rs, out var sentDelta, out var receivedDelta)) if (message != null && message.TryGetPhysicalState(out var ws, out var rs, out var sentDelta, out var receivedDelta))
{ {
add("Write-State", null, ws.ToString()); add("Write-State", null, ws.ToString());
add("Read-State", null, rs.ToString()); add("Read-State", null, rs.ToString());
...@@ -235,6 +235,13 @@ void add(string lk, string sk, string v) ...@@ -235,6 +235,13 @@ void add(string lk, string sk, string v)
catch { } catch { }
} }
if (message != null)
{
message.TryGetHeadMessages(out var now, out var next);
if (now != null) add("Message-Current", "active", mutiplexer.IncludeDetailInExceptions ? now.CommandAndKey : now.Command.ToString());
if (next != null) add("Message-Next", "next", mutiplexer.IncludeDetailInExceptions ? next.CommandAndKey : next.Command.ToString());
}
// Add server data, if we have it // Add server data, if we have it
if (server != null) if (server != null)
{ {
......
...@@ -647,7 +647,15 @@ internal void SetEnqueued(PhysicalConnection connection) ...@@ -647,7 +647,15 @@ internal void SetEnqueued(PhysicalConnection connection)
} }
} }
internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus ws, out PhysicalConnection.ReadStatus rs, out long sentDelta, out long receivedDelta) internal void TryGetHeadMessages(out Message now, out Message next)
{
var connection = _enqueuedTo;
now = next = null;
if (connection != null) connection.GetHeadMessages(out now, out next);
}
internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus ws, out PhysicalConnection.ReadStatus rs,
out long sentDelta, out long receivedDelta)
{ {
var connection = _enqueuedTo; var connection = _enqueuedTo;
sentDelta = receivedDelta = -1; sentDelta = receivedDelta = -1;
......
...@@ -1414,6 +1414,7 @@ private void MatchResult(in RawResult result) ...@@ -1414,6 +1414,7 @@ private void MatchResult(in RawResult result)
throw new InvalidOperationException("Received response with no message waiting: " + result.ToString()); throw new InvalidOperationException("Received response with no message waiting: " + result.ToString());
msg = _writtenAwaitingResponse.Dequeue(); msg = _writtenAwaitingResponse.Dequeue();
} }
_activeMessage = msg;
Trace("Response to: " + msg); Trace("Response to: " + msg);
_readStatus = ReadStatus.ComputeResult; _readStatus = ReadStatus.ComputeResult;
...@@ -1422,6 +1423,19 @@ private void MatchResult(in RawResult result) ...@@ -1422,6 +1423,19 @@ private void MatchResult(in RawResult result)
_readStatus = ReadStatus.CompletePendingMessage; _readStatus = ReadStatus.CompletePendingMessage;
msg.Complete(); msg.Complete();
} }
_activeMessage = null;
}
private volatile Message _activeMessage;
internal void GetHeadMessages(out Message now, out Message next)
{
now = _activeMessage;
lock(_writtenAwaitingResponse)
{
next = _writtenAwaitingResponse.Count == 0 ? null : _writtenAwaitingResponse.Peek();
}
} }
partial void OnCloseEcho(); partial void OnCloseEcho();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment