Unverified Commit 7de44835 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

improve error reporting (in particular for timeouts) (#1012)

* improve error reporting (in particular for timeouts) by reporting a: what the *offending physical connection was doing*, and b: how much data has flown on the connection while trying to do it

* use IMeasuredDuplexPipe so we can talk about total sent/received even over TLS; needs pipelines-unofficial bump to 1.0.9
parent 9907db28
...@@ -868,7 +868,7 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration, Tex ...@@ -868,7 +868,7 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration, Tex
{ {
var muxer = new ConnectionMultiplexer(PrepareConfig(configuration)); var muxer = new ConnectionMultiplexer(PrepareConfig(configuration));
connectHandler = null; connectHandler = null;
if(log != null) if (log != null)
{ {
// create a detachable event-handler to log detailed errors if something happens during connect/handshake // create a detachable event-handler to log detailed errors if something happens during connect/handshake
connectHandler = (_, a) => connectHandler = (_, a) =>
...@@ -2135,21 +2135,27 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo ...@@ -2135,21 +2135,27 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo
case WriteResult.NoConnectionAvailable: case WriteResult.NoConnectionAvailable:
return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot()); return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
case WriteResult.TimeoutBeforeWrite: case WriteResult.TimeoutBeforeWrite:
string counters = null; string bridgeCounters = null, connectionState = null;
try try
{ {
if (message.TryGetPhysicalState(out var state, out var sentDelta, out var receivedDelta))
{
connectionState = (sentDelta >= 0 && receivedDelta >= 0) // these might not always be available
? $", state={state}, outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB"
: $", state={state}";
}
var bridge = server.GetBridge(message.Command, false); var bridge = server.GetBridge(message.Command, false);
if (bridge != null) if (bridge != null)
{ {
var active = bridge.GetActiveMessage(); var active = bridge.GetActiveMessage();
bridge.GetOutstandingCount(out var inst, out var qs, out var @in); bridge.GetOutstandingCount(out var inst, out var qs, out var @in);
counters = $", inst={inst}, qs={qs}, in={@in}, active={active}"; bridgeCounters = $", inst={inst}, qs={qs}, in={@in}, active={active}";
} }
} }
catch { } catch { }
return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent (" return ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent ("
+ Format.ToString(TimeoutMilliseconds) + "ms" + counters + ")", message, server); + Format.ToString(TimeoutMilliseconds) + "ms" + connectionState + bridgeCounters + ")", message, server);
case WriteResult.WriteFailure: case WriteResult.WriteFailure:
default: default:
return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server); return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server);
......
...@@ -617,13 +617,48 @@ internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridg ...@@ -617,13 +617,48 @@ internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridg
internal bool TrySetResult<T>(T value) => resultBox is ResultBox<T> typed && typed.TrySetResult(value); internal bool TrySetResult<T>(T value) => resultBox is ResultBox<T> typed && typed.TrySetResult(value);
internal void SetEnqueued() => performance?.SetEnqueued(); internal void SetEnqueued(PhysicalConnection connection)
{
performance?.SetEnqueued();
_enqueuedTo = connection;
if (connection == null)
{
_queuedStampSent = _queuedStampReceived = -1;
}
else
{
connection.GetBytes(out _queuedStampSent, out _queuedStampReceived);
}
}
internal bool TryGetPhysicalState(out PhysicalConnection.WriteStatus status, out long sentDelta, out long receivedDelta)
{
var connection = _enqueuedTo;
sentDelta = receivedDelta = -1;
if (connection != null)
{
status = connection.Status;
connection.GetBytes(out var sent, out var received);
if (sent >= 0 && _queuedStampSent >= 0) sentDelta = sent - _queuedStampSent;
if (received >= 0 && _queuedStampReceived >= 0) receivedDelta = received - _queuedStampReceived;
return true;
}
else
{
status = default;
return false;
}
}
private PhysicalConnection _enqueuedTo;
private long _queuedStampReceived, _queuedStampSent;
internal void SetRequestSent() internal void SetRequestSent()
{ {
Status = CommandStatus.Sent; Status = CommandStatus.Sent;
performance?.SetRequestSent(); performance?.SetRequestSent();
} }
// the time (ticks) at which this message was considered written // the time (ticks) at which this message was considered written
internal void SetWriteTime() internal void SetWriteTime()
{ {
......
...@@ -137,7 +137,7 @@ public WriteResult TryWrite(Message message, bool isSlave) ...@@ -137,7 +137,7 @@ public WriteResult TryWrite(Message message, bool isSlave)
{ {
queue.Enqueue(message); queue.Enqueue(message);
} }
message.SetEnqueued(); message.SetEnqueued(null);
return WriteResult.Success; // we'll take it... return WriteResult.Success; // we'll take it...
} }
else else
...@@ -549,7 +549,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave) ...@@ -549,7 +549,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Message message) internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Message message)
{ {
Trace("Writing: " + message); Trace("Writing: " + message);
message.SetEnqueued(); message.SetEnqueued(physical); // this also records the read/write stats at this point
WriteResult result; WriteResult result;
bool haveLock = false; bool haveLock = false;
......
...@@ -53,6 +53,19 @@ private static readonly Message ...@@ -53,6 +53,19 @@ private static readonly Message
private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount; private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private int firstUnansweredWriteTickCount; private int firstUnansweredWriteTickCount;
internal void GetBytes(out long sent, out long received)
{
if(_ioPipe is IMeasuredDuplexPipe sc)
{
sent = sc.TotalBytesSent;
received = sc.TotalBytesReceived;
}
else
{
sent = received = -1;
}
}
private IDuplexPipe _ioPipe; private IDuplexPipe _ioPipe;
private Socket _socket; private Socket _socket;
...@@ -315,7 +328,8 @@ public Task FlushAsync() ...@@ -315,7 +328,8 @@ public Task FlushAsync()
var exMessage = new StringBuilder(failureType.ToString()); var exMessage = new StringBuilder(failureType.ToString());
if ((connectingPipe ?? _ioPipe) is SocketConnection sc) var pipe = connectingPipe ?? _ioPipe;
if (pipe is SocketConnection sc)
{ {
exMessage.Append(" (").Append(sc.ShutdownKind); exMessage.Append(" (").Append(sc.ShutdownKind);
if (sc.SocketError != SocketError.Success) if (sc.SocketError != SocketError.Success)
...@@ -326,6 +340,13 @@ public Task FlushAsync() ...@@ -326,6 +340,13 @@ public Task FlushAsync()
if (sc.BytesSent == 0) exMessage.Append(", 0-sent"); if (sc.BytesSent == 0) exMessage.Append(", 0-sent");
exMessage.Append(", last-recv: ").Append(sc.LastReceived).Append(")"); exMessage.Append(", last-recv: ").Append(sc.LastReceived).Append(")");
} }
else if (pipe is IMeasuredDuplexPipe mdp)
{
long sent = mdp.TotalBytesSent, recd = mdp.TotalBytesReceived;
if (sent == 0) { exMessage.Append(recd == 0 ? " (0-read, 0-sent)" : " (0-sent)"); }
else if (recd == 0) { exMessage.Append(" (0-read)"); }
}
var data = new List<Tuple<string, string>>(); var data = new List<Tuple<string, string>>();
void add(string lk, string sk, string v) void add(string lk, string sk, string v)
...@@ -418,7 +439,10 @@ void add(string lk, string sk, string v) ...@@ -418,7 +439,10 @@ void add(string lk, string sk, string v)
internal void SetWriting() => _writeStatus = WriteStatus.Writing; internal void SetWriting() => _writeStatus = WriteStatus.Writing;
private volatile WriteStatus _writeStatus; private volatile WriteStatus _writeStatus;
private enum WriteStatus
internal WriteStatus Status => _writeStatus;
internal enum WriteStatus
{ {
Initializing, Initializing,
Idle, Idle,
...@@ -577,7 +601,10 @@ internal void OnBridgeHeartbeat() ...@@ -577,7 +601,10 @@ internal void OnBridgeHeartbeat()
{ {
if (msg.HasAsyncTimedOut(now, timeout, out var elapsed)) if (msg.HasAsyncTimedOut(now, timeout, out var elapsed))
{ {
var timeoutEx = ExceptionFactory.Timeout(bridge.Multiplexer, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); bool haveDeltas = msg.TryGetPhysicalState(out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0;
var timeoutEx = ExceptionFactory.Timeout(bridge.Multiplexer, haveDeltas
? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)"
: $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
bridge.Multiplexer?.OnMessageFaulted(msg, timeoutEx); bridge.Multiplexer?.OnMessageFaulted(msg, timeoutEx);
msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed
bridge.Multiplexer.OnAsyncTimeout(); bridge.Multiplexer.OnAsyncTimeout();
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.7" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.9" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.1" /> <PackageReference Include="System.IO.Pipelines" Version="4.5.1" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
...@@ -186,7 +186,6 @@ public void StringSet() ...@@ -186,7 +186,6 @@ public void StringSet()
} }
} }
/// <summary> /// <summary>
/// Run StringGet lots of times /// Run StringGet lots of times
/// </summary> /// </summary>
......
...@@ -4,57 +4,59 @@ ...@@ -4,57 +4,59 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using StackExchange.Redis; using StackExchange.Redis;
static class Program namespace TestConsole
{ {
private static int taskCount = 10; internal static class Program
private static int totalRecords = 100000;
static void Main()
{ {
private const int taskCount = 10;
private const int totalRecords = 100000;
private static void Main()
{
#if SEV2 #if SEV2
Pipelines.Sockets.Unofficial.SocketConnection.AssertDependencies(); Pipelines.Sockets.Unofficial.SocketConnection.AssertDependencies();
Console.WriteLine("We loaded the things..."); Console.WriteLine("We loaded the things...");
// Console.ReadLine(); // Console.ReadLine();
#endif #endif
Stopwatch stopwatch = new Stopwatch(); Stopwatch stopwatch = new Stopwatch();
stopwatch.Start(); stopwatch.Start();
var taskList = new List<Task>();
var connection = ConnectionMultiplexer.Connect("127.0.0.1");
for (int i = 0; i < taskCount; i++)
{
var i1 = i;
var task = new Task(() => Run(i1, connection));
task.Start();
taskList.Add(task);
}
Task.WaitAll(taskList.ToArray()); var taskList = new List<Task>();
var connection = ConnectionMultiplexer.Connect("127.0.0.1");
for (int i = 0; i < taskCount; i++)
{
var i1 = i;
var task = new Task(() => Run(i1, connection));
task.Start();
taskList.Add(task);
}
stopwatch.Stop(); Task.WaitAll(taskList.ToArray());
Console.WriteLine($"Done. {stopwatch.ElapsedMilliseconds}"); stopwatch.Stop();
Console.ReadLine();
}
static void Run(int taskId, ConnectionMultiplexer connection) Console.WriteLine($"Done. {stopwatch.ElapsedMilliseconds}");
{ Console.ReadLine();
Console.WriteLine($"{taskId} Started"); }
var database = connection.GetDatabase(0);
for (int i = 0; i < totalRecords; i++) private static void Run(int taskId, ConnectionMultiplexer connection)
{ {
database.StringSet(i.ToString(), i.ToString()); Console.WriteLine($"{taskId} Started");
} var database = connection.GetDatabase(0);
Console.WriteLine($"{taskId} Insert completed"); for (int i = 0; i < totalRecords; i++)
{
database.StringSet(i.ToString(), i.ToString());
}
for (int i = 0; i < totalRecords; i++) Console.WriteLine($"{taskId} Insert completed");
{
var result = database.StringGet(i.ToString()); for (int i = 0; i < totalRecords; i++)
{
var result = database.StringGet(i.ToString());
}
Console.WriteLine($"{taskId} Completed");
} }
Console.WriteLine($"{taskId} Completed");
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment