Commit 7c920855 authored by Marc Gravell's avatar Marc Gravell

move performance completion to start of message completion; fix bug that...

move performance completion to start of message completion; fix bug that resurrected result-box; add async profiled test; make all timestamps not throw on duplicates; fake the response received if message aborted (otherwise values can be nonsensical); add aysnc profiled test
parent fe1c235b
......@@ -81,7 +81,7 @@ private static void AssertProfiledCommandValues(IProfiledCommand command, Connec
Assert.True(command.CreationToEnqueued > TimeSpan.Zero, nameof(command.CreationToEnqueued));
Assert.True(command.EnqueuedToSending > TimeSpan.Zero, nameof(command.EnqueuedToSending));
Assert.True(command.SentToResponse > TimeSpan.Zero, nameof(command.SentToResponse));
Assert.True(command.ResponseToCompletion > TimeSpan.Zero, nameof(command.ResponseToCompletion));
Assert.True(command.ResponseToCompletion >= TimeSpan.Zero, nameof(command.ResponseToCompletion));
Assert.True(command.ElapsedTime > TimeSpan.Zero, nameof(command.ElapsedTime));
Assert.True(command.ElapsedTime > command.CreationToEnqueued && command.ElapsedTime > command.EnqueuedToSending && command.ElapsedTime > command.SentToResponse, "Comparisons");
Assert.True(command.RetransmissionOf == null, nameof(command.RetransmissionOf));
......@@ -208,6 +208,20 @@ private class PerThreadProfiler
public ProfilingSession GetSession() => perThreadSession.Value;
}
private class AsyncLocalProfiler
{
AsyncLocal<ProfilingSession> perThreadSession = new AsyncLocal<ProfilingSession>();
public ProfilingSession GetSession()
{
var val = perThreadSession.Value;
if(val == null)
{
perThreadSession.Value = val = new ProfilingSession();
}
return val;
}
}
[Fact]
public void LowAllocationEnumerable()
......@@ -348,5 +362,51 @@ public void ProfilingMD_Ex2()
Assert.True(perThreadTimings.All(kv => kv.Value.Count == 1000));
}
}
[FactLongRunning]
public async Task ProfilingMD_Ex2_Async()
{
using (var c = Create())
{
ConnectionMultiplexer conn = c;
var profiler = new AsyncLocalProfiler();
var prefix = Me();
conn.RegisterProfiler(profiler.GetSession);
var tasks = new List<Task>();
var perThreadTimings = new ConcurrentBag<List<IProfiledCommand>>();
for (var i = 0; i < 16; i++)
{
var db = conn.GetDatabase(i);
var task = Task.Run(async () =>
{
for (var j = 0; j < 100; j++)
{
await db.StringSetAsync(prefix + j, "" + j);
}
perThreadTimings.Add(profiler.GetSession().GetCommands().ToList());
});
tasks.Add(task);
}
var timeout = Task.Delay(10000);
var complete = Task.WhenAll(tasks);
if (timeout == await Task.WhenAny(timeout, complete))
{
throw new TimeoutException();
}
Assert.Equal(16, perThreadTimings.Count);
foreach(var item in perThreadTimings)
{
Assert.Equal(100, item.Count);
}
}
}
}
}
......@@ -438,33 +438,32 @@ public override string ToString()
return $"[{Db}]:{CommandAndKey} ({resultProcessor?.GetType().Name ?? "(n/a)"})";
}
public void SetResponseReceived()
{
performance?.SetResponseReceived();
}
public void SetResponseReceived() => performance?.SetResponseReceived();
public bool TryComplete(bool isAsync)
{
//Ensure we can never call TryComplete on the same resultBox from two threads by grabbing it now
var currBox = Interlocked.Exchange(ref resultBox, null);
if (!isAsync)
{ // set the performance completion the first chance we get (sync comes first)
performance?.SetCompleted();
}
if (currBox != null)
{
var ret = currBox.TryComplete(isAsync);
//in async mode TryComplete will have unwrapped and recycled resultBox
if (!(ret && isAsync))
if (!(ret || isAsync))
{
//put result box back if it was not already recycled
Interlocked.Exchange(ref resultBox, currBox);
}
performance?.SetCompleted();
return ret;
}
else
{
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
performance?.SetCompleted();
return true;
}
}
......@@ -613,10 +612,7 @@ internal void SetException(Exception exception)
resultBox?.SetException(exception);
}
internal void SetEnqueued()
{
performance?.SetEnqueued();
}
internal void SetEnqueued()=> performance?.SetEnqueued();
......
using System;
using System.Diagnostics;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
namespace StackExchange.Redis.Profiling
......@@ -77,27 +78,17 @@ public void SetMessage(Message msg)
MessageCreatedTimeStamp = msg.createdTimestamp;
}
public void SetEnqueued()
{
// This method should never be called twice
if (EnqueuedTimeStamp > 0) throw new InvalidOperationException($"{nameof(SetEnqueued)} called more than once");
public void SetEnqueued() => SetTimestamp(ref EnqueuedTimeStamp);
EnqueuedTimeStamp = Stopwatch.GetTimestamp();
}
public void SetRequestSent()
{
// This method should never be called twice
if (RequestSentTimeStamp > 0) throw new InvalidOperationException($"{nameof(SetRequestSent)} called more than once");
public void SetRequestSent() => SetTimestamp(ref RequestSentTimeStamp);
RequestSentTimeStamp = Stopwatch.GetTimestamp();
}
public void SetResponseReceived() => SetTimestamp(ref ResponseReceivedTimeStamp);
public void SetResponseReceived()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void SetTimestamp(ref long field)
{
if (ResponseReceivedTimeStamp > 0) throw new InvalidOperationException($"{nameof(SetResponseReceived)} called more than once");
ResponseReceivedTimeStamp = Stopwatch.GetTimestamp();
var now = Stopwatch.GetTimestamp();
Interlocked.CompareExchange(ref field, now, 0);
}
public void SetCompleted()
......@@ -108,11 +99,13 @@ public void SetCompleted()
var now = Stopwatch.GetTimestamp();
var oldVal = Interlocked.CompareExchange(ref CompletedTimeStamp, now, 0);
// second call
if (oldVal != 0) return;
// only push on the first call, no dupes!
PushToWhenFinished?.Add(this);
if (oldVal == 0)
{
// fake a response if we completed prematurely (timeout, broken connection, etc)
Interlocked.CompareExchange(ref ResponseReceivedTimeStamp, now, 0);
PushToWhenFinished?.Add(this);
}
}
public override string ToString()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment