Commit 21c8bf36 authored by mgravell's avatar mgravell

having problems with the backlog writer, so add state tracking to investigate what is stalling

parent e363f069
...@@ -229,11 +229,12 @@ void add(string lk, string sk, string v) ...@@ -229,11 +229,12 @@ void add(string lk, string sk, string v)
// Add server data, if we have it // Add server data, if we have it
if (server != null) if (server != null)
{ {
server.GetOutstandingCount(message.Command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite); server.GetOutstandingCount(message.Command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out var bs);
add("OpsSinceLastHeartbeat", "inst", inst.ToString()); add("OpsSinceLastHeartbeat", "inst", inst.ToString());
add("Queue-Awaiting-Write", "qu", qu.ToString()); add("Queue-Awaiting-Write", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString()); add("Queue-Awaiting-Response", "qs", qs.ToString());
add("Active-Writer", "aw", aw.ToString()); add("Active-Writer", "aw", aw.ToString());
add("Backlog-Writer", "bs", bs.ToString());
if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString()); if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString());
if (toRead >= 0) add("Inbound-Pipe-Bytes", "in-pipe", toRead.ToString()); if (toRead >= 0) add("Inbound-Pipe-Bytes", "in-pipe", toRead.ToString());
......
...@@ -289,7 +289,7 @@ private void ShutdownSubscriptionQueue() ...@@ -289,7 +289,7 @@ private void ShutdownSubscriptionQueue()
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state) internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state); => isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite) internal void GetOutstandingCount(out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out BacklogStatus bs)
{ {
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
lock(_backlog) lock(_backlog)
...@@ -297,6 +297,7 @@ internal void GetOutstandingCount(out int inst, out int qs, out long @in, out in ...@@ -297,6 +297,7 @@ internal void GetOutstandingCount(out int inst, out int qs, out long @in, out in
qu = _backlog.Count; qu = _backlog.Count;
} }
aw = !_singleWriterMutex.IsAvailable; aw = !_singleWriterMutex.IsAvailable;
bs = _backlogStatus;
var tmp = physical; var tmp = physical;
if (tmp == null) if (tmp == null)
{ {
...@@ -763,7 +764,22 @@ private void StartBacklogProcessor() ...@@ -763,7 +764,22 @@ private void StartBacklogProcessor()
} }
} }
} }
internal enum BacklogStatus : byte
{
Inactive,
Started,
CheckingForWork,
CheckingForTimeout,
RecordingTimeout,
WritingMessage,
Flushing,
MarkingInactive,
RecordingWriteFailure,
RecordingFault,
SettingIdle,
Faulted,
}
private volatile BacklogStatus _backlogStatus;
private void ProcessBacklog() private void ProcessBacklog()
{ {
LockToken token = default; LockToken token = default;
...@@ -776,13 +792,15 @@ private void ProcessBacklog() ...@@ -776,13 +792,15 @@ private void ProcessBacklog()
if (token) break; // got the lock if (token) break; // got the lock
lock (_backlog) { if (_backlog.Count == 0) return; } lock (_backlog) { if (_backlog.Count == 0) return; }
} }
_backlogStatus = BacklogStatus.Started;
// so now we are the writer; write some things! // so now we are the writer; write some things!
Message message; Message message;
var timeout = TimeoutMilliseconds; var timeout = TimeoutMilliseconds;
while(true) while(true)
{ {
lock(_backlog) _backlogStatus = BacklogStatus.CheckingForWork;
lock (_backlog)
{ {
if (_backlog.Count == 0) break; // all done if (_backlog.Count == 0) break; // all done
message = _backlog.Dequeue(); message = _backlog.Dequeue();
...@@ -790,25 +808,31 @@ private void ProcessBacklog() ...@@ -790,25 +808,31 @@ private void ProcessBacklog()
try try
{ {
_backlogStatus = BacklogStatus.CheckingForTimeout;
if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var _)) if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var _))
{ {
_backlogStatus = BacklogStatus.RecordingTimeout;
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint); var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this); message.SetExceptionAndComplete(ex, this);
} }
else else
{ {
_backlogStatus = BacklogStatus.WritingMessage;
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success) if (result == WriteResult.Success)
{ {
_backlogStatus = BacklogStatus.Flushing;
#pragma warning disable CS0618 #pragma warning disable CS0618
result = physical.FlushSync(false, timeout); result = physical.FlushSync(false, timeout);
#pragma warning restore CS0618 #pragma warning restore CS0618
} }
_backlogStatus = BacklogStatus.MarkingInactive;
UnmarkActiveMessage(message); UnmarkActiveMessage(message);
if (result != WriteResult.Success) if (result != WriteResult.Success)
{ {
_backlogStatus = BacklogStatus.RecordingWriteFailure;
var ex = Multiplexer.GetException(result, message, ServerEndPoint); var ex = Multiplexer.GetException(result, message, ServerEndPoint);
HandleWriteException(message, ex); HandleWriteException(message, ex);
} }
...@@ -816,13 +840,20 @@ private void ProcessBacklog() ...@@ -816,13 +840,20 @@ private void ProcessBacklog()
} }
catch (Exception ex) catch (Exception ex)
{ {
_backlogStatus = BacklogStatus.RecordingFault;
HandleWriteException(message, ex); HandleWriteException(message, ex);
} }
} }
_backlogStatus = BacklogStatus.SettingIdle;
physical.SetIdle(); physical.SetIdle();
_backlogStatus = BacklogStatus.Inactive;
} }
finally catch
{ {
_backlogStatus = BacklogStatus.Faulted;
}
finally
{
token.Dispose(); token.Dispose();
} }
} }
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using static StackExchange.Redis.PhysicalBridge;
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
...@@ -375,7 +376,7 @@ internal ServerCounters GetCounters() ...@@ -375,7 +376,7 @@ internal ServerCounters GetCounters()
return counters; return counters;
} }
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite) internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out BacklogStatus bs)
{ {
var bridge = GetBridge(command, false); var bridge = GetBridge(command, false);
if (bridge == null) if (bridge == null)
...@@ -383,10 +384,11 @@ internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs ...@@ -383,10 +384,11 @@ internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs
inst = qs = qu = 0; inst = qs = qu = 0;
@in = toRead = toWrite = 0; @in = toRead = toWrite = 0;
aw = false; aw = false;
bs = BacklogStatus.Inactive;
} }
else else
{ {
bridge.GetOutstandingCount(out inst, out qs, out @in, out qu, out aw, out toRead, out toWrite); bridge.GetOutstandingCount(out inst, out qs, out @in, out qu, out aw, out toRead, out toWrite, out bs);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment