Commit 087baa90 authored by mgravell's avatar mgravell

still honing in on the write stall; measure/record write and flush excesses

parent 0e031eee
...@@ -643,40 +643,57 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message ...@@ -643,40 +643,57 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active"); Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
return WriteResult.NoConnectionAvailable; return WriteResult.NoConnectionAvailable;
} }
physical.SetWriting(); int startWriteTime = Environment.TickCount;
var messageIsSent = false; try
if (message is IMultiMessage)
{ {
SelectDatabaseInsideWriteLock(physical, message); // need to switch database *before* the transaction physical.SetWriting();
foreach (var subCommand in ((IMultiMessage)message).GetMessages(physical)) var messageIsSent = false;
if (message is IMultiMessage)
{ {
result = WriteMessageToServerInsideWriteLock(physical, subCommand); SelectDatabaseInsideWriteLock(physical, message); // need to switch database *before* the transaction
if (result != WriteResult.Success) foreach (var subCommand in ((IMultiMessage)message).GetMessages(physical))
{
result = WriteMessageToServerInsideWriteLock(physical, subCommand);
if (result != WriteResult.Success)
{
// we screwed up; abort; note that WriteMessageToServer already
// killed the underlying connection
Trace("Unable to write to server");
message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
message.Complete();
return result;
}
//The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == message;
}
if (!messageIsSent)
{ {
// we screwed up; abort; note that WriteMessageToServer already message.SetRequestSent(); // well, it was attempted, at least...
// killed the underlying connection
Trace("Unable to write to server");
message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
message.Complete();
return result;
} }
//The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below return WriteResult.Success;
messageIsSent = messageIsSent || subCommand == message;
} }
if (!messageIsSent) else
{ {
message.SetRequestSent(); // well, it was attempted, at least... return WriteMessageToServerInsideWriteLock(physical, message);
} }
return WriteResult.Success;
} }
else finally
{ {
return WriteMessageToServerInsideWriteLock(physical, message); int endWriteTime = Environment.TickCount;
int writeDuration = unchecked(endWriteTime - startWriteTime);
if (writeDuration > _maxWriteTime)
{
_maxWriteTime = writeDuration;
_maxWriteCommand = message?.Command ?? default;
}
} }
} }
private volatile int _maxWriteTime = -1;
private RedisCommand _maxWriteCommand;
[Obsolete("prefer async")] [Obsolete("prefer async")]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message) internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
{ {
...@@ -823,7 +840,11 @@ private void ProcessBacklog() ...@@ -823,7 +840,11 @@ private void ProcessBacklog()
_backlogStatus = BacklogStatus.RecordingTimeout; _backlogStatus = BacklogStatus.RecordingTimeout;
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint); var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
ex.Data["Redis-BacklogStartDelay"] = msToStartWorker; ex.Data["Redis-BacklogStartDelay"] = msToStartWorker;
ex.Data["Redis-BacklogGetLocDelay"] = msToGetLock; ex.Data["Redis-BacklogGetLockDelay"] = msToGetLock;
if (_maxWriteTime >= 0) ex.Data["Redis-MaxWrite"] = _maxWriteTime.ToString() + ", " + _maxWriteCommand.ToString();
var maxFlush = physical?.MaxFlushTime ?? -1;
if (maxFlush >= 0) ex.Data["Redis-MaxFlush"] = maxFlush;
message.SetExceptionAndComplete(ex, this); message.SetExceptionAndComplete(ex, this);
} }
else else
......
...@@ -842,11 +842,12 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix ...@@ -842,11 +842,12 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix
return WriteCrlf(span, offset); return WriteCrlf(span, offset);
} }
private static async ValueTask<WriteResult> FlushAsync_Awaited(PhysicalConnection connection, ValueTask<FlushResult> flush, bool throwOnFailure) private async ValueTask<WriteResult> FlushAsync_Awaited(PhysicalConnection connection, ValueTask<FlushResult> flush, bool throwOnFailure, int startFlush)
{ {
try try
{ {
await flush.ForAwait(); await flush.ForAwait();
RecordEndFlush(startFlush);
connection._writeStatus = WriteStatus.Flushed; connection._writeStatus = WriteStatus.Flushed;
connection.UpdateLastWriteTime(); connection.UpdateLastWriteTime();
return WriteResult.Success; return WriteResult.Success;
...@@ -869,7 +870,11 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout) ...@@ -869,7 +870,11 @@ internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
} }
return flush.Result; return flush.Result;
void ThrowTimeout() => throw new TimeoutException("timeout while synchronously flushing"); void ThrowTimeout()
{
if (millisecondsTimeout > _maxFlushTime) _maxFlushTime = millisecondsTimeout; // a fair bet even if we didn't measure
throw new TimeoutException("timeout while synchronously flushing");
}
} }
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure) internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
{ {
...@@ -878,8 +883,10 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure) ...@@ -878,8 +883,10 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
try try
{ {
_writeStatus = WriteStatus.Flushing; _writeStatus = WriteStatus.Flushing;
int startFlush = Environment.TickCount;
var flush = tmp.FlushAsync(); var flush = tmp.FlushAsync();
if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure); if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure, startFlush);
RecordEndFlush(startFlush);
_writeStatus = WriteStatus.Flushed; _writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime(); UpdateLastWriteTime();
return new ValueTask<WriteResult>(WriteResult.Success); return new ValueTask<WriteResult>(WriteResult.Success);
...@@ -890,6 +897,14 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure) ...@@ -890,6 +897,14 @@ internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
return new ValueTask<WriteResult>(WriteResult.WriteFailure); return new ValueTask<WriteResult>(WriteResult.WriteFailure);
} }
} }
private void RecordEndFlush(int start)
{
var end = Environment.TickCount;
int taken = unchecked(end - start);
if (taken > _maxFlushTime) _maxFlushTime = taken;
}
private volatile int _maxFlushTime = -1;
internal int MaxFlushTime => _maxFlushTime;
private static readonly ReadOnlyMemory<byte> NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n"); private static readonly ReadOnlyMemory<byte> NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment