Commit 2876ed04 authored by mgravell's avatar mgravell

include state tracking on the read path

parent dd808a7f
...@@ -213,9 +213,10 @@ void add(string lk, string sk, string v) ...@@ -213,9 +213,10 @@ void add(string lk, string sk, string v)
if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue
if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue
#endif #endif
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta)) if (message.TryGetPhysicalState(out var ws, out var rs, out var sentDelta, out var receivedDelta))
{ {
add("PhysicalState", "phys", state.ToString()); add("Write-State", null, ws.ToString());
add("Read-State", null, rs.ToString());
// these might not always be available // these might not always be available
if (sentDelta >= 0) if (sentDelta >= 0)
{ {
...@@ -233,12 +234,14 @@ void add(string lk, string sk, string v) ...@@ -233,12 +234,14 @@ void add(string lk, string sk, string v)
// Add server data, if we have it // Add server data, if we have it
if (server != null) if (server != null)
{ {
server.GetOutstandingCount(message.Command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out var bs); server.GetOutstandingCount(message.Command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out var bs, out var rs, out var ws);
add("OpsSinceLastHeartbeat", "inst", inst.ToString()); add("OpsSinceLastHeartbeat", "inst", inst.ToString());
add("Queue-Awaiting-Write", "qu", qu.ToString()); add("Queue-Awaiting-Write", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString()); add("Queue-Awaiting-Response", "qs", qs.ToString());
add("Active-Writer", "aw", aw.ToString()); add("Active-Writer", "aw", aw.ToString());
if (qu != 0) add("Backlog-Writer", "bw", bs.ToString()); if (qu != 0) add("Backlog-Writer", "bw", bs.ToString());
if (rs != PhysicalConnection.ReadStatus.NA) add("Read-State", "rs", rs.ToString());
if (ws != PhysicalConnection.WriteStatus.NA) add("Write-State", "ws", ws.ToString());
if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString()); if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString());
if (toRead >= 0) add("Inbound-Pipe-Bytes", "in-pipe", toRead.ToString()); if (toRead >= 0) add("Inbound-Pipe-Bytes", "in-pipe", toRead.ToString());
......
...@@ -62,7 +62,7 @@ internal void SetBacklogState(int position, PhysicalConnection physical) ...@@ -62,7 +62,7 @@ internal void SetBacklogState(int position, PhysicalConnection physical)
{ {
#if DEBUG #if DEBUG
QueuePosition = position; QueuePosition = position;
ConnectionWriteState = physical?.Status ?? (PhysicalConnection.WriteStatus)(-1); ConnectionWriteState = physical?.GetWriteStatus() ?? PhysicalConnection.WriteStatus.NA;
#endif #endif
} }
...@@ -631,7 +631,7 @@ internal void SetEnqueued(PhysicalConnection connection) ...@@ -631,7 +631,7 @@ internal void SetEnqueued(PhysicalConnection connection)
{ {
#if DEBUG #if DEBUG
QueuePosition = -1; QueuePosition = -1;
ConnectionWriteState = (PhysicalConnection.WriteStatus)(-1); ConnectionWriteState = PhysicalConnection.WriteStatus.NA;
#endif #endif
SetWriteTime(); SetWriteTime();
performance?.SetEnqueued(); performance?.SetEnqueued();
...@@ -646,13 +646,14 @@ internal void SetEnqueued(PhysicalConnection connection) ...@@ -646,13 +646,14 @@ internal void SetEnqueued(PhysicalConnection connection)
} }
} }
internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus status, out long sentDelta, out long receivedDelta) internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus ws, out PhysicalConnection.ReadStatus rs, out long sentDelta, out long receivedDelta)
{ {
var connection = _enqueuedTo; var connection = _enqueuedTo;
sentDelta = receivedDelta = -1; sentDelta = receivedDelta = -1;
if (connection != null) if (connection != null)
{ {
status = connection.Status; ws = connection.GetWriteStatus();
rs = connection.GetReadStatus();
connection.GetBytes(out var sent, out var received); connection.GetBytes(out var sent, out var received);
if (sent >= 0 && _queuedStampSent >= 0) sentDelta = sent - _queuedStampSent; if (sent >= 0 && _queuedStampSent >= 0) sentDelta = sent - _queuedStampSent;
if (received >= 0 && _queuedStampReceived >= 0) receivedDelta = received - _queuedStampReceived; if (received >= 0 && _queuedStampReceived >= 0) receivedDelta = received - _queuedStampReceived;
...@@ -660,7 +661,8 @@ internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus status, out ...@@ -660,7 +661,8 @@ internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus status, out
} }
else else
{ {
status = default; ws = PhysicalConnection.WriteStatus.NA;
rs = PhysicalConnection.ReadStatus.NA;
return false; return false;
} }
} }
......
...@@ -290,7 +290,8 @@ private void ShutdownSubscriptionQueue() ...@@ -290,7 +290,8 @@ private void ShutdownSubscriptionQueue()
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state) internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state); => isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out BacklogStatus bs) internal void GetOutstandingCount(out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite,
out BacklogStatus bs, out PhysicalConnection.ReadStatus rs, out PhysicalConnection.WriteStatus ws)
{ {
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
lock(_backlog) lock(_backlog)
...@@ -304,11 +305,15 @@ internal void GetOutstandingCount(out int inst, out int qs, out long @in, out in ...@@ -304,11 +305,15 @@ internal void GetOutstandingCount(out int inst, out int qs, out long @in, out in
{ {
qs = 0; qs = 0;
toRead = toWrite = @in = -1; toRead = toWrite = @in = -1;
rs = PhysicalConnection.ReadStatus.NA;
ws = PhysicalConnection.WriteStatus.NA;
} }
else else
{ {
qs = tmp.GetSentAwaitingResponseCount(); qs = tmp.GetSentAwaitingResponseCount();
@in = tmp.GetSocketBytes(out toRead, out toWrite); @in = tmp.GetSocketBytes(out toRead, out toWrite);
rs = tmp.GetReadStatus();
ws = tmp.GetWriteStatus();
} }
} }
......
...@@ -368,7 +368,7 @@ void add(string lk, string sk, string v) ...@@ -368,7 +368,7 @@ void add(string lk, string sk, string v)
if (bridge != null) if (bridge != null)
{ {
exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType) exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType)
.Append(", ").Append(_writeStatus) .Append(", ").Append(_writeStatus).Append("/").Append(_readStatus)
.Append(", last: ").Append(bridge.LastCommand); .Append(", last: ").Append(bridge.LastCommand);
data.Add(Tuple.Create("FailureType", failureType.ToString())); data.Add(Tuple.Create("FailureType", failureType.ToString()));
...@@ -447,7 +447,7 @@ void add(string lk, string sk, string v) ...@@ -447,7 +447,7 @@ void add(string lk, string sk, string v)
private volatile WriteStatus _writeStatus; private volatile WriteStatus _writeStatus;
internal WriteStatus Status => _writeStatus; internal WriteStatus GetWriteStatus() => _writeStatus;
internal enum WriteStatus internal enum WriteStatus
{ {
...@@ -456,6 +456,8 @@ internal enum WriteStatus ...@@ -456,6 +456,8 @@ internal enum WriteStatus
Writing, Writing,
Flushing, Flushing,
Flushed, Flushed,
NA = -1,
} }
/// <summary>Returns a string that represents the current object.</summary> /// <summary>Returns a string that represents the current object.</summary>
...@@ -607,7 +609,7 @@ internal void OnBridgeHeartbeat() ...@@ -607,7 +609,7 @@ internal void OnBridgeHeartbeat()
{ {
if (msg.HasAsyncTimedOut(now, timeout, out var elapsed)) if (msg.HasAsyncTimedOut(now, timeout, out var elapsed))
{ {
bool haveDeltas = msg.TryGetPhysicalState(out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0; bool haveDeltas = msg.TryGetPhysicalState(out _, out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0;
var timeoutEx = ExceptionFactory.Timeout(bridge.Multiplexer, haveDeltas var timeoutEx = ExceptionFactory.Timeout(bridge.Multiplexer, haveDeltas
? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)" ? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)"
: $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); : $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
...@@ -1331,8 +1333,9 @@ private void MatchResult(in RawResult result) ...@@ -1331,8 +1333,9 @@ private void MatchResult(in RawResult result)
var items = result.GetItems(); var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message)) if (items.Length >= 3 && items[0].IsEqual(message))
{ {
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) _readStatus = ReadStatus.PubSubMessage;
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel; var configChanged = muxer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged)) if (configChanged != null && items[1].IsEqual(configChanged))
{ {
...@@ -1346,6 +1349,7 @@ private void MatchResult(in RawResult result) ...@@ -1346,6 +1349,7 @@ private void MatchResult(in RawResult result)
} }
catch { /* no biggie */ } catch { /* no biggie */ }
Trace("Configuration changed: " + Format.ToString(blame)); Trace("Configuration changed: " + Format.ToString(blame));
_readStatus = ReadStatus.Reconfigure;
muxer.ReconfigureIfNeeded(blame, true, "broadcast"); muxer.ReconfigureIfNeeded(blame, true, "broadcast");
} }
...@@ -1354,17 +1358,21 @@ private void MatchResult(in RawResult result) ...@@ -1354,17 +1358,21 @@ private void MatchResult(in RawResult result)
Trace("MESSAGE: " + channel); Trace("MESSAGE: " + channel);
if (!channel.IsNull) if (!channel.IsNull)
{ {
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(channel, channel, items[2].AsRedisValue()); muxer.OnMessage(channel, channel, items[2].AsRedisValue());
} }
return; // AND STOP PROCESSING! return; // AND STOP PROCESSING!
} }
else if (items.Length >= 4 && items[0].IsEqual(pmessage)) else if (items.Length >= 4 && items[0].IsEqual(pmessage))
{ {
_readStatus = ReadStatus.PubSubPMessage;
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Trace("PMESSAGE: " + channel); Trace("PMESSAGE: " + channel);
if (!channel.IsNull) if (!channel.IsNull)
{ {
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, items[3].AsRedisValue()); muxer.OnMessage(sub, channel, items[3].AsRedisValue());
} }
return; // AND STOP PROCESSING! return; // AND STOP PROCESSING!
...@@ -1374,6 +1382,7 @@ private void MatchResult(in RawResult result) ...@@ -1374,6 +1382,7 @@ private void MatchResult(in RawResult result)
} }
Trace("Matching result..."); Trace("Matching result...");
Message msg; Message msg;
_readStatus = ReadStatus.DequeueResult;
lock (_writtenAwaitingResponse) lock (_writtenAwaitingResponse)
{ {
if (_writtenAwaitingResponse.Count == 0) if (_writtenAwaitingResponse.Count == 0)
...@@ -1382,8 +1391,10 @@ private void MatchResult(in RawResult result) ...@@ -1382,8 +1391,10 @@ private void MatchResult(in RawResult result)
} }
Trace("Response to: " + msg); Trace("Response to: " + msg);
_readStatus = ReadStatus.ComputeResult;
if (msg.ComputeResult(this, result)) if (msg.ComputeResult(this, result))
{ {
_readStatus = ReadStatus.CompletePendingMessage;
msg.Complete(); msg.Complete();
} }
} }
...@@ -1409,6 +1420,7 @@ private async Task ReadFromPipe() ...@@ -1409,6 +1420,7 @@ private async Task ReadFromPipe()
bool allowSyncRead = true, isReading = false; bool allowSyncRead = true, isReading = false;
try try
{ {
_readStatus = ReadStatus.Init;
while (true) while (true)
{ {
var input = _ioPipe?.Input; var input = _ioPipe?.Input;
...@@ -1417,13 +1429,17 @@ private async Task ReadFromPipe() ...@@ -1417,13 +1429,17 @@ private async Task ReadFromPipe()
// note: TryRead will give us back the same buffer in a tight loop // note: TryRead will give us back the same buffer in a tight loop
// - so: only use that if we're making progress // - so: only use that if we're making progress
isReading = true; isReading = true;
_readStatus = ReadStatus.ReadSync;
if (!(allowSyncRead && input.TryRead(out var readResult))) if (!(allowSyncRead && input.TryRead(out var readResult)))
{ {
_readStatus = ReadStatus.ReadAsync;
readResult = await input.ReadAsync().ForAwait(); readResult = await input.ReadAsync().ForAwait();
} }
isReading = false; isReading = false;
_readStatus = ReadStatus.UpdateWriteTime;
UpdateLastReadTime(); UpdateLastReadTime();
_readStatus = ReadStatus.ProcessBuffer;
var buffer = readResult.Buffer; var buffer = readResult.Buffer;
int handled = 0; int handled = 0;
if (!buffer.IsEmpty) if (!buffer.IsEmpty)
...@@ -1433,6 +1449,7 @@ private async Task ReadFromPipe() ...@@ -1433,6 +1449,7 @@ private async Task ReadFromPipe()
allowSyncRead = handled != 0; allowSyncRead = handled != 0;
_readStatus = ReadStatus.MarkProcessed;
Trace($"Processed {handled} messages"); Trace($"Processed {handled} messages");
input.AdvanceTo(buffer.Start, buffer.End); input.AdvanceTo(buffer.Start, buffer.End);
...@@ -1443,9 +1460,11 @@ private async Task ReadFromPipe() ...@@ -1443,9 +1460,11 @@ private async Task ReadFromPipe()
} }
Trace("EOF"); Trace("EOF");
RecordConnectionFailed(ConnectionFailureType.SocketClosed); RecordConnectionFailed(ConnectionFailureType.SocketClosed);
_readStatus = ReadStatus.RanToCompletion;
} }
catch (Exception ex) catch (Exception ex)
{ {
_readStatus = ReadStatus.Faulted;
// this CEX is just a hardcore "seriously, read the actual value" - there's no // this CEX is just a hardcore "seriously, read the actual value" - there's no
// convenient "Thread.VolatileRead<T>(ref T field) where T : class", and I don't // convenient "Thread.VolatileRead<T>(ref T field) where T : class", and I don't
// want to make the field volatile just for this one place that needs it // want to make the field volatile just for this one place that needs it
...@@ -1482,6 +1501,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer) ...@@ -1482,6 +1501,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
while (!buffer.IsEmpty) while (!buffer.IsEmpty)
{ {
_readStatus = ReadStatus.TryParseResult;
var reader = new BufferReader(buffer); var reader = new BufferReader(buffer);
var result = TryParseResult(_arena, in buffer, ref reader, IncludeDetailInExceptions, BridgeCouldBeNull?.ServerEndPoint); var result = TryParseResult(_arena, in buffer, ref reader, IncludeDetailInExceptions, BridgeCouldBeNull?.ServerEndPoint);
try try
...@@ -1492,6 +1512,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer) ...@@ -1492,6 +1512,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
messageCount++; messageCount++;
Trace(result.ToString()); Trace(result.ToString());
_readStatus = ReadStatus.MatchResult;
MatchResult(result); MatchResult(result);
} }
else else
...@@ -1621,6 +1642,32 @@ private static RawResult ReadLineTerminatedString(ResultType type, ref BufferRea ...@@ -1621,6 +1642,32 @@ private static RawResult ReadLineTerminatedString(ResultType type, ref BufferRea
return new RawResult(type, payload, false); return new RawResult(type, payload, false);
} }
internal enum ReadStatus
{
NotStarted,
Init,
RanToCompletion,
Faulted,
ReadSync,
ReadAsync,
UpdateWriteTime,
ProcessBuffer,
MarkProcessed,
TryParseResult,
MatchResult,
PubSubMessage,
PubSubPMessage,
Reconfigure,
InvokePubSub,
DequeueResult,
ComputeResult,
CompletePendingMessage,
NA = -1,
}
private volatile ReadStatus _readStatus;
internal ReadStatus GetReadStatus() => _readStatus;
internal void StartReading() => ReadFromPipe().RedisFireAndForget(); internal void StartReading() => ReadFromPipe().RedisFireAndForget();
internal static RawResult TryParseResult(Arena<RawResult> arena, in ReadOnlySequence<byte> buffer, ref BufferReader reader, internal static RawResult TryParseResult(Arena<RawResult> arena, in ReadOnlySequence<byte> buffer, ref BufferReader reader,
......
...@@ -376,7 +376,8 @@ internal ServerCounters GetCounters() ...@@ -376,7 +376,8 @@ internal ServerCounters GetCounters()
return counters; return counters;
} }
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out BacklogStatus bs) internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite,
out BacklogStatus bs, out PhysicalConnection.ReadStatus rs, out PhysicalConnection.WriteStatus ws)
{ {
var bridge = GetBridge(command, false); var bridge = GetBridge(command, false);
if (bridge == null) if (bridge == null)
...@@ -385,10 +386,12 @@ internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs ...@@ -385,10 +386,12 @@ internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs
@in = toRead = toWrite = 0; @in = toRead = toWrite = 0;
aw = false; aw = false;
bs = BacklogStatus.Inactive; bs = BacklogStatus.Inactive;
rs = PhysicalConnection.ReadStatus.NA;
ws = PhysicalConnection.WriteStatus.NA;
} }
else else
{ {
bridge.GetOutstandingCount(out inst, out qs, out @in, out qu, out aw, out toRead, out toWrite, out bs); bridge.GetOutstandingCount(out inst, out qs, out @in, out qu, out aw, out toRead, out toWrite, out bs, out rs, out ws);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment