Commit d137e52b authored by mgravell's avatar mgravell

remove (or #if) most of the trace info that was added re the write stall;

parent b2818bc1
# Release Notes # Release Notes
## (unreleased)
- performance: resolve intermittent stall in the write-lock that could lead to unexpected timeouts even when at low/reasonable (but concurrent) load
## 2.0.571 ## 2.0.571
- performance: use new [arena allocation API](https://mgravell.github.io/Pipelines.Sockets.Unofficial/docs/arenas) to avoid `RawResult[]` overhead - performance: use new [arena allocation API](https://mgravell.github.io/Pipelines.Sockets.Unofficial/docs/arenas) to avoid `RawResult[]` overhead
......
...@@ -2152,7 +2152,7 @@ private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @ ...@@ -2152,7 +2152,7 @@ private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @
return tcs == null ? default(T) : await tcs.Task.ForAwait(); return tcs == null ? default(T) : await tcs.Task.ForAwait();
} }
internal Exception GetException(WriteResult result, Message message, ServerEndPoint server, [CallerMemberName] string caller = null) internal Exception GetException(WriteResult result, Message message, ServerEndPoint server)
{ {
switch (result) switch (result)
{ {
...@@ -2160,7 +2160,7 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo ...@@ -2160,7 +2160,7 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo
case WriteResult.NoConnectionAvailable: case WriteResult.NoConnectionAvailable:
return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot()); return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
case WriteResult.TimeoutBeforeWrite: case WriteResult.TimeoutBeforeWrite:
return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent", message, server, result, origin: caller); return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent", message, server, result);
case WriteResult.WriteFailure: case WriteResult.WriteFailure:
default: default:
return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server); return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server);
......
...@@ -181,7 +181,7 @@ internal static string GetLibVersion() ...@@ -181,7 +181,7 @@ internal static string GetLibVersion()
} }
return _libVersion; return _libVersion;
} }
internal static Exception Timeout(ConnectionMultiplexer mutiplexer, string baseErrorMessage, Message message, ServerEndPoint server, WriteResult? result = null, string origin = null) internal static Exception Timeout(ConnectionMultiplexer mutiplexer, string baseErrorMessage, Message message, ServerEndPoint server, WriteResult? result = null)
{ {
List<Tuple<string, string>> data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) }; List<Tuple<string, string>> data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
var sb = new StringBuilder(); var sb = new StringBuilder();
...@@ -203,16 +203,16 @@ void add(string lk, string sk, string v) ...@@ -203,16 +203,16 @@ void add(string lk, string sk, string v)
} }
} }
if (!string.IsNullOrWhiteSpace(origin)) add("Origin", null, origin);
// Add timeout data, if we have it // Add timeout data, if we have it
if (result == WriteResult.TimeoutBeforeWrite) if (result == WriteResult.TimeoutBeforeWrite)
{ {
add("Timeout", "timeout", Format.ToString(mutiplexer.TimeoutMilliseconds)); add("Timeout", "timeout", Format.ToString(mutiplexer.TimeoutMilliseconds));
try try
{ {
#if DEBUG
if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue if (message.QueuePosition >= 0) add("QueuePosition", null, message.QueuePosition.ToString()); // the position the item was when added to the queue
if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue if ((int)message.ConnectionWriteState >= 0) add("WriteState", null, message.ConnectionWriteState.ToString()); // what the physical was doing when it was added to the queue
#endif
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta)) if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{ {
add("PhysicalState", "phys", state.ToString()); add("PhysicalState", "phys", state.ToString());
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
...@@ -51,13 +52,18 @@ protected override void WriteImpl(PhysicalConnection physical) ...@@ -51,13 +52,18 @@ protected override void WriteImpl(PhysicalConnection physical)
internal abstract class Message : ICompletable internal abstract class Message : ICompletable
{ {
public readonly int Db; public readonly int Db;
#if DEBUG
internal int QueuePosition { get; private set; } internal int QueuePosition { get; private set; }
internal PhysicalConnection.WriteStatus ConnectionWriteState { get; private set; } internal PhysicalConnection.WriteStatus ConnectionWriteState { get; private set; }
#endif
[Conditional("DEBUG")]
internal void SetBacklogState(int position, PhysicalConnection physical) internal void SetBacklogState(int position, PhysicalConnection physical)
{ {
#if DEBUG
QueuePosition = position; QueuePosition = position;
ConnectionWriteState = physical?.Status ?? (PhysicalConnection.WriteStatus)(-1); ConnectionWriteState = physical?.Status ?? (PhysicalConnection.WriteStatus)(-1);
#endif
} }
internal const CommandFlags InternalCallFlag = (CommandFlags)128; internal const CommandFlags InternalCallFlag = (CommandFlags)128;
...@@ -623,8 +629,10 @@ internal bool TrySetResult<T>(T value) ...@@ -623,8 +629,10 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection) internal void SetEnqueued(PhysicalConnection connection)
{ {
#if DEBUG
QueuePosition = -1; QueuePosition = -1;
ConnectionWriteState = (PhysicalConnection.WriteStatus)(-1); ConnectionWriteState = (PhysicalConnection.WriteStatus)(-1);
#endif
SetWriteTime(); SetWriteTime();
performance?.SetEnqueued(); performance?.SetEnqueued();
_enqueuedTo = connection; _enqueuedTo = connection;
......
...@@ -643,8 +643,10 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -643,8 +643,10 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active"); Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
return WriteResult.NoConnectionAvailable; return WriteResult.NoConnectionAvailable;
} }
#if DEBUG
int startWriteTime = Environment.TickCount; int startWriteTime = Environment.TickCount;
try try
#endif
{ {
physical.SetWriting(); physical.SetWriting();
var messageIsSent = false; var messageIsSent = false;
...@@ -679,6 +681,7 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -679,6 +681,7 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
return WriteMessageToServerInsideWriteLock(physical, message); return WriteMessageToServerInsideWriteLock(physical, message);
} }
} }
#if DEBUG
finally finally
{ {
int endWriteTime = Environment.TickCount; int endWriteTime = Environment.TickCount;
...@@ -689,10 +692,12 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -689,10 +692,12 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
_maxWriteCommand = message?.Command ?? default; _maxWriteCommand = message?.Command ?? default;
} }
} }
#endif
} }
#if DEBUG
private volatile int _maxWriteTime = -1; private volatile int _maxWriteTime = -1;
private RedisCommand _maxWriteCommand; private RedisCommand _maxWriteCommand;
#endif
[Obsolete("prefer async")] [Obsolete("prefer async")]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message) internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
...@@ -755,10 +760,14 @@ private bool PushToBacklog(Message message, bool onlyIfExists) ...@@ -755,10 +760,14 @@ private bool PushToBacklog(Message message, bool onlyIfExists)
private void StartBacklogProcessor() private void StartBacklogProcessor()
{ {
var sched = Multiplexer.SocketManager?.SchedulerPool ?? DedicatedThreadPoolPipeScheduler.Default; var sched = Multiplexer.SocketManager?.SchedulerPool ?? DedicatedThreadPoolPipeScheduler.Default;
#if DEBUG
_backlogProcessorRequestedTime = Environment.TickCount; _backlogProcessorRequestedTime = Environment.TickCount;
#endif
sched.Schedule(s_ProcessBacklog, _weakRefThis); sched.Schedule(s_ProcessBacklog, _weakRefThis);
} }
#if DEBUG
private volatile int _backlogProcessorRequestedTime; private volatile int _backlogProcessorRequestedTime;
#endif
private static readonly Action<object> s_ProcessBacklog = s => private static readonly Action<object> s_ProcessBacklog = s =>
{ {
...@@ -807,20 +816,26 @@ private void ProcessBacklog() ...@@ -807,20 +816,26 @@ private void ProcessBacklog()
LockToken token = default; LockToken token = default;
try try
{ {
#if DEBUG
int tryToAcquireTime = Environment.TickCount; int tryToAcquireTime = Environment.TickCount;
var msToStartWorker = unchecked(tryToAcquireTime - _backlogProcessorRequestedTime); var msToStartWorker = unchecked(tryToAcquireTime - _backlogProcessorRequestedTime);
int failureCount = 0; int failureCount = 0;
#endif
while(true) while(true)
{ {
// try and get the lock; if unsuccessful, check for termination // try and get the lock; if unsuccessful, check for termination
token = _singleWriterMutex.TryWait(); token = _singleWriterMutex.TryWait();
if (token) break; // got the lock if (token) break; // got the lock
lock (_backlog) { if (_backlog.Count == 0) return; } lock (_backlog) { if (_backlog.Count == 0) return; }
#if DEBUG
failureCount++; failureCount++;
#endif
} }
_backlogStatus = BacklogStatus.Started; _backlogStatus = BacklogStatus.Started;
#if DEBUG
int acquiredTime = Environment.TickCount; int acquiredTime = Environment.TickCount;
var msToGetLock = unchecked(acquiredTime - tryToAcquireTime); var msToGetLock = unchecked(acquiredTime - tryToAcquireTime);
#endif
// so now we are the writer; write some things! // so now we are the writer; write some things!
Message message; Message message;
...@@ -841,6 +856,7 @@ private void ProcessBacklog() ...@@ -841,6 +856,7 @@ private void ProcessBacklog()
{ {
_backlogStatus = BacklogStatus.RecordingTimeout; _backlogStatus = BacklogStatus.RecordingTimeout;
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint); var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
#if DEBUG // additional tracking
ex.Data["Redis-BacklogStartDelay"] = msToStartWorker; ex.Data["Redis-BacklogStartDelay"] = msToStartWorker;
ex.Data["Redis-BacklogGetLockDelay"] = msToGetLock; ex.Data["Redis-BacklogGetLockDelay"] = msToGetLock;
if (failureCount != 0) ex.Data["Redis-BacklogFailCount"] = failureCount; if (failureCount != 0) ex.Data["Redis-BacklogFailCount"] = failureCount;
...@@ -848,7 +864,7 @@ private void ProcessBacklog() ...@@ -848,7 +864,7 @@ private void ProcessBacklog()
var maxFlush = physical?.MaxFlushTime ?? -1; var maxFlush = physical?.MaxFlushTime ?? -1;
if (maxFlush >= 0) ex.Data["Redis-MaxFlush"] = maxFlush.ToString() + "ms, " + (physical?.MaxFlushBytes ?? -1).ToString(); if (maxFlush >= 0) ex.Data["Redis-MaxFlush"] = maxFlush.ToString() + "ms, " + (physical?.MaxFlushBytes ?? -1).ToString();
if (_maxLockDuration >= 0) ex.Data["Redis-MaxLockDuration"] = _maxLockDuration; if (_maxLockDuration >= 0) ex.Data["Redis-MaxLockDuration"] = _maxLockDuration;
#endif
message.SetExceptionAndComplete(ex, this); message.SetExceptionAndComplete(ex, this);
} }
else else
...@@ -974,26 +990,33 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect ...@@ -974,26 +990,33 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
{ {
if (releaseLock & token.Success) if (releaseLock & token.Success)
{ {
#if DEBUG
RecordLockDuration(lockTaken); RecordLockDuration(lockTaken);
#endif
token.Dispose(); token.Dispose();
} }
} }
} }
#if DEBUG
private void RecordLockDuration(int lockTaken) private void RecordLockDuration(int lockTaken)
{ {
var lockDuration = unchecked(Environment.TickCount - lockTaken); var lockDuration = unchecked(Environment.TickCount - lockTaken);
if (lockDuration > _maxLockDuration) _maxLockDuration = lockDuration; if (lockDuration > _maxLockDuration) _maxLockDuration = lockDuration;
} }
volatile int _maxLockDuration = -1; volatile int _maxLockDuration = -1;
#endif
private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(ValueTask<LockToken> pending, PhysicalConnection physical, Message message) private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(ValueTask<LockToken> pending, PhysicalConnection physical, Message message)
{ {
try try
{ {
using (var token = await pending.ForAwait()) using (var token = await pending.ForAwait())
{ {
if (!token.Success) return TimedOutBeforeWrite(message); if (!token.Success) return TimedOutBeforeWrite(message);
#if DEBUG
int lockTaken = Environment.TickCount; int lockTaken = Environment.TickCount;
#endif
var result = WriteMessageInsideLock(physical, message); var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success) if (result == WriteResult.Success)
...@@ -1004,7 +1027,9 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va ...@@ -1004,7 +1027,9 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
UnmarkActiveMessage(message); UnmarkActiveMessage(message);
physical.SetIdle(); physical.SetIdle();
#if DEBUG
RecordLockDuration(lockTaken); RecordLockDuration(lockTaken);
#endif
return result; return result;
} }
} }
...@@ -1026,7 +1051,9 @@ private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken ...@@ -1026,7 +1051,9 @@ private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken
return result; return result;
} }
catch (Exception ex) { return HandleWriteException(message, ex); } catch (Exception ex) { return HandleWriteException(message, ex); }
#if DEBUG
finally { RecordLockDuration(lockTaken); } finally { RecordLockDuration(lockTaken); }
#endif
} }
} }
......
...@@ -842,12 +842,18 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix ...@@ -842,12 +842,18 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix
return WriteCrlf(span, offset); return WriteCrlf(span, offset);
} }
private async ValueTask<WriteResult> FlushAsync_Awaited(PhysicalConnection connection, ValueTask<FlushResult> flush, bool throwOnFailure, int startFlush, long flushBytes) private async ValueTask<WriteResult> FlushAsync_Awaited(PhysicalConnection connection, ValueTask<FlushResult> flush, bool throwOnFailure
#if DEBUG
, int startFlush, long flushBytes
#endif
)
{ {
try try
{ {
await flush.ForAwait(); await flush.ForAwait();
#if DEBUG
RecordEndFlush(startFlush, flushBytes); RecordEndFlush(startFlush, flushBytes);
#endif
connection._writeStatus = WriteStatus.Flushed; connection._writeStatus = WriteStatus.Flushed;
connection.UpdateLastWriteTime(); connection.UpdateLastWriteTime();
return WriteResult.Success; return WriteResult.Success;
...@@ -872,7 +878,9 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout) ...@@ -872,7 +878,9 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
void ThrowTimeout() void ThrowTimeout()
{ {
#if DEBUG
if (millisecondsTimeout > _maxFlushTime) _maxFlushTime = millisecondsTimeout; // a fair bet even if we didn't measure if (millisecondsTimeout > _maxFlushTime) _maxFlushTime = millisecondsTimeout; // a fair bet even if we didn't measure
#endif
throw new TimeoutException("timeout while synchronously flushing"); throw new TimeoutException("timeout while synchronously flushing");
} }
} }
...@@ -883,12 +891,20 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure) ...@@ -883,12 +891,20 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
try try
{ {
_writeStatus = WriteStatus.Flushing; _writeStatus = WriteStatus.Flushing;
#if DEBUG
int startFlush = Environment.TickCount; int startFlush = Environment.TickCount;
long flushBytes = -1; long flushBytes = -1;
if (_ioPipe is SocketConnection sc) flushBytes = sc.GetCounters().BytesWaitingToBeSent; if (_ioPipe is SocketConnection sc) flushBytes = sc.GetCounters().BytesWaitingToBeSent;
#endif
var flush = tmp.FlushAsync(); var flush = tmp.FlushAsync();
if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure, startFlush, flushBytes); if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure
#if DEBUG
, startFlush, flushBytes
#endif
);
#if DEBUG
RecordEndFlush(startFlush, flushBytes); RecordEndFlush(startFlush, flushBytes);
#endif
_writeStatus = WriteStatus.Flushed; _writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime(); UpdateLastWriteTime();
return new ValueTask<WriteResult>(WriteResult.Success); return new ValueTask<WriteResult>(WriteResult.Success);
...@@ -899,8 +915,10 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure) ...@@ -899,8 +915,10 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
return new ValueTask<WriteResult>(WriteResult.WriteFailure); return new ValueTask<WriteResult>(WriteResult.WriteFailure);
} }
} }
#if DEBUG
private void RecordEndFlush(int start, long bytes) private void RecordEndFlush(int start, long bytes)
{ {
var end = Environment.TickCount; var end = Environment.TickCount;
int taken = unchecked(end - start); int taken = unchecked(end - start);
if (taken > _maxFlushTime) if (taken > _maxFlushTime)
...@@ -913,8 +931,9 @@ private void RecordEndFlush(int start, long bytes) ...@@ -913,8 +931,9 @@ private void RecordEndFlush(int start, long bytes)
private long _maxFlushBytes = -1; private long _maxFlushBytes = -1;
internal int MaxFlushTime => _maxFlushTime; internal int MaxFlushTime => _maxFlushTime;
internal long MaxFlushBytes => _maxFlushBytes; internal long MaxFlushBytes => _maxFlushBytes;
#endif
private static readonly ReadOnlyMemory<byte> NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n"); private static readonly ReadOnlyMemory<byte> NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static void WriteUnifiedBlob(PipeWriter writer, byte[] value) private static void WriteUnifiedBlob(PipeWriter writer, byte[] value)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment