Commit ed4c9987 authored by mgravell's avatar mgravell

track the pipe lengths; trying to understand what is happening re stalls

parent 0355af0f
......@@ -179,6 +179,7 @@ Global
{D082703F-1652-4C35-840D-7D377F6B9979} = {96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}
{8375813E-FBAF-4DA3-A2C7-E4645B39B931} = {E25031D3-5C64-430D-B86F-697B66816FD8}
{3DA1EEED-E9FE-43D9-B293-E000CFCCD91A} = {E25031D3-5C64-430D-B86F-697B66816FD8}
{153A10E4-E668-41AD-9E0F-6785CE7EED66} = {3AD17044-6BFF-4750-9AC2-2CA466375F2A}
{D58114AE-4998-4647-AFCA-9353D20495AE} = {E25031D3-5C64-430D-B86F-697B66816FD8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
......
......@@ -229,12 +229,14 @@ void add(string lk, string sk, string v)
// Add server data, if we have it
if (server != null)
{
server.GetOutstandingCount(message.Command, out int inst, out int qs, out int @in, out int qu);
server.GetOutstandingCount(message.Command, out int inst, out int qs, out long @in, out int qu, out long toRead, out long toWrite);
add("OpsSinceLastHeartbeat", "inst", inst.ToString());
add("Queue-Awaiting-Write", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString());
if (@in >= 0) add("Socket-Inbound-Bytes", "in", @in.ToString());
if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString());
if (toRead >= 0) add("Inbound-Pipe-Bytes", "in-pipe", toRead.ToString());
if (toWrite >= 0) add("Outbound-Pipe-Bytes", "out-pipe", toWrite.ToString());
if (mutiplexer.StormLogThreshold >= 0 && qs >= mutiplexer.StormLogThreshold && Interlocked.CompareExchange(ref mutiplexer.haveStormLog, 1, 0) == 0)
{
......
......@@ -289,7 +289,7 @@ private void ShutdownSubscriptionQueue()
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out int @in, out int qu)
internal void GetOutstandingCount(out int inst, out int qs, out long @in, out int qu, out long toRead, out long toWrite)
{
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
lock(_backlog)
......@@ -300,12 +300,12 @@ internal void GetOutstandingCount(out int inst, out int qs, out int @in, out int
if (tmp == null)
{
qs = 0;
@in = -1;
toRead = toWrite = @in = -1;
}
else
{
qs = tmp.GetSentAwaitingResponseCount();
@in = tmp.GetAvailableInboundBytes();
@in = tmp.GetSocketBytes(out toRead, out toWrite);
}
}
......
......@@ -305,7 +305,7 @@ public Task FlushAsync()
// stop anything new coming in...
bridge?.Trace("Failed: " + failureType);
int @in = -1;
long @in = -1, @toRead = -1, @toWrite = -1;
PhysicalBridge.State oldState = PhysicalBridge.State.Disconnected;
bool isCurrent = false;
bridge?.OnDisconnected(failureType, this, out isCurrent, out oldState);
......@@ -313,7 +313,7 @@ public Task FlushAsync()
{
try
{
@in = GetAvailableInboundBytes();
@in = GetSocketBytes(out toRead, out toWrite);
}
catch { /* best effort only */ }
}
......@@ -383,10 +383,9 @@ void add(string lk, string sk, string v)
add("Keep-Alive", "keep-alive", bridge.ServerEndPoint?.WriteEverySeconds + "s");
add("Previous-Physical-State", "state", oldState.ToString());
add("Manager", "mgr", bridge.Multiplexer.SocketManager?.GetState());
if (@in >= 0)
{
add("Inbound-Bytes", "in", @in.ToString());
}
if (@in >= 0) add("Inbound-Bytes", "in", @in.ToString());
if (toRead >= 0) add("Inbound-Pipe-Bytes", "in-pipe", toRead.ToString());
if (toWrite >= 0) add("Outbound-Pipe-Bytes", "out-pipe", toWrite.ToString());
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (BridgeCouldBeNull.IsBeating ? " (mid-beat)" : ""));
var mbeat = bridge.Multiplexer.LastHeartbeatSecondsAgo;
......@@ -1156,7 +1155,18 @@ internal static void WriteInteger(PipeWriter writer, long value)
writer.Advance(bytes);
}
internal int GetAvailableInboundBytes() => VolatileSocket?.Available ?? -1;
internal long GetSocketBytes(out long readCount, out long writeCount)
{
if (_ioPipe is SocketConnection conn)
{
var counters = conn.GetCounters();
readCount = counters.BytesWaitingToBeRead;
writeCount = counters.BytesWaitingToBeSent;
return counters.BytesAvailableOnSocket;
}
readCount = writeCount = -1;
return VolatileSocket?.Available ?? -1;
}
private RemoteCertificateValidationCallback GetAmbientIssuerCertificateCallback()
{
......
......@@ -375,16 +375,17 @@ internal ServerCounters GetCounters()
return counters;
}
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int @in, out int qu)
internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out long @in, out int qu, out long toRead, out long toWrite)
{
var bridge = GetBridge(command, false);
if (bridge == null)
{
inst = qs = @in = qu = 0;
inst = qs = qu = 0;
@in = toRead = toWrite = 0;
}
else
{
bridge.GetOutstandingCount(out inst, out qs, out @in, out qu);
bridge.GetOutstandingCount(out inst, out qs, out @in, out qu, out toRead, out toWrite);
}
}
......
......@@ -15,7 +15,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.0.7" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.0.9" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.1" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment