Commit 090cb210 authored by Marc Gravell's avatar Marc Gravell

Merge branch 'profiling' into pipelines

parents 6b66c8c5 7c920855
...@@ -36,8 +36,7 @@ ...@@ -36,8 +36,7 @@
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IConnectionMultiplexer))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IConnectionMultiplexer))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IDatabase))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IDatabase))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IDatabaseAsync))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IDatabaseAsync))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IProfiledCommand))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Profiling.IProfiledCommand))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IProfiler))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IReconnectRetryPolicy))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IReconnectRetryPolicy))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IRedis))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IRedis))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IRedisAsync))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IRedisAsync))]
...@@ -52,7 +51,8 @@ ...@@ -52,7 +51,8 @@
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.LuaScript))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.LuaScript))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.MigrateOptions))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.MigrateOptions))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Order))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Order))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.ProfiledCommandEnumerable))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Profiling.ProfiledCommandEnumerable))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Profiling.ProfilingSession))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Proxy))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Proxy))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.RedisChannel))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.RedisChannel))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.RedisCommandException))] [assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.RedisCommandException))]
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
using System.Net; using System.Net;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
using Xunit; using Xunit;
using Xunit.Abstractions; using Xunit.Abstractions;
...@@ -580,34 +581,31 @@ public void GetFromRightNodeBasedOnFlags(CommandFlags flags, bool isSlave) ...@@ -580,34 +581,31 @@ public void GetFromRightNodeBasedOnFlags(CommandFlags flags, bool isSlave)
private static string Describe(EndPoint endpoint) => endpoint?.ToString() ?? "(unknown)"; private static string Describe(EndPoint endpoint) => endpoint?.ToString() ?? "(unknown)";
private class TestProfiler : IProfiler
{
public object MyContext = new object();
public object GetContext() => MyContext;
}
[Fact] [Fact]
public void SimpleProfiling() public void SimpleProfiling()
{ {
using (var conn = Create()) using (var conn = Create())
{ {
var profiler = new TestProfiler(); var profiler = new ProfilingSession();
var key = Me(); var key = Me();
var db = conn.GetDatabase(); var db = conn.GetDatabase();
db.KeyDelete(key, CommandFlags.FireAndForget); db.KeyDelete(key, CommandFlags.FireAndForget);
conn.RegisterProfiler(profiler); conn.RegisterProfiler(() => profiler);
conn.BeginProfiling(profiler.MyContext);
db.StringSet(key, "world"); db.StringSet(key, "world");
var val = db.StringGet(key); var val = db.StringGet(key);
Assert.Equal("world", val); Assert.Equal("world", val);
var msgs = conn.FinishProfiling(profiler.MyContext); var msgs = profiler.GetCommands();
Log("Checking GET..."); Log("Checking GET...");
Assert.Contains(msgs, m => m.Command == "GET"); Assert.Contains(msgs, m => m.Command == "GET");
Log("Checking SET..."); Log("Checking SET...");
Assert.Contains(msgs, m => m.Command == "SET"); Assert.Contains(msgs, m => m.Command == "SET");
Assert.Equal(2, msgs.Count()); Assert.Equal(2, msgs.Count());
var arr = msgs.ToArray();
Assert.Equal("SET", arr[0].Command);
Assert.Equal("GET", arr[1].Command);
} }
} }
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using Xunit; using Xunit;
using Xunit.Abstractions; using Xunit.Abstractions;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
{ {
...@@ -13,22 +14,17 @@ public class Profiling : TestBase ...@@ -13,22 +14,17 @@ public class Profiling : TestBase
{ {
public Profiling(ITestOutputHelper output) : base (output) { } public Profiling(ITestOutputHelper output) : base (output) { }
private class TestProfiler : IProfiler
{
public object MyContext = new object();
public object GetContext() => MyContext;
}
[Fact] [Fact]
public void Simple() public void Simple()
{ {
using (var conn = Create()) using (var conn = Create())
{ {
var profiler = new TestProfiler();
var key = Me(); var key = Me();
conn.RegisterProfiler(profiler); var session = new ProfilingSession();
conn.BeginProfiling(profiler.MyContext);
conn.RegisterProfiler(() => session);
var dbId = TestConfig.GetDedicatedDB(); var dbId = TestConfig.GetDedicatedDB();
var db = conn.GetDatabase(dbId); var db = conn.GetDatabase(dbId);
db.StringSet(key, "world"); db.StringSet(key, "world");
...@@ -36,14 +32,18 @@ public void Simple() ...@@ -36,14 +32,18 @@ public void Simple()
Assert.Equal("world", result.AsString()); Assert.Equal("world", result.AsString());
var val = db.StringGet(key); var val = db.StringGet(key);
Assert.Equal("world", (string)val); Assert.Equal("world", (string)val);
var s = (string)db.Execute("ECHO", "fii");
Assert.Equal("fii", s);
var cmds = conn.FinishProfiling(profiler.MyContext); var cmds = session.GetCommands();
var i = 0; var i = 0;
foreach (var cmd in cmds) foreach (var cmd in cmds)
{ {
Log("Command {0} (DB: {1}): {2}", i++, cmd.Db, cmd.ToString().Replace("\n", ", ")); Log("Command {0} (DB: {1}): {2}", i++, cmd.Db, cmd.ToString().Replace("\n", ", "));
} }
var all = string.Join(",", cmds.Select(x => x.Command));
Assert.Equal("SET,EVAL,GET,ECHO", all);
Log("Checking for SET"); Log("Checking for SET");
var set = cmds.SingleOrDefault(cmd => cmd.Command == "SET"); var set = cmds.SingleOrDefault(cmd => cmd.Command == "SET");
Assert.NotNull(set); Assert.NotNull(set);
...@@ -53,8 +53,11 @@ public void Simple() ...@@ -53,8 +53,11 @@ public void Simple()
Log("Checking for EVAL"); Log("Checking for EVAL");
var eval = cmds.SingleOrDefault(cmd => cmd.Command == "EVAL"); var eval = cmds.SingleOrDefault(cmd => cmd.Command == "EVAL");
Assert.NotNull(eval); Assert.NotNull(eval);
Log("Checking for ECHO");
var echo = cmds.SingleOrDefault(cmd => cmd.Command == "ECHO");
Assert.NotNull(echo);
Assert.Equal(3, cmds.Count()); Assert.Equal(4, cmds.Count());
Assert.True(set.CommandCreated <= eval.CommandCreated); Assert.True(set.CommandCreated <= eval.CommandCreated);
Assert.True(eval.CommandCreated <= get.CommandCreated); Assert.True(eval.CommandCreated <= get.CommandCreated);
...@@ -64,6 +67,10 @@ public void Simple() ...@@ -64,6 +67,10 @@ public void Simple()
AssertProfiledCommandValues(get, conn, dbId); AssertProfiledCommandValues(get, conn, dbId);
AssertProfiledCommandValues(eval, conn, dbId); AssertProfiledCommandValues(eval, conn, dbId);
AssertProfiledCommandValues(echo, conn, dbId);
} }
} }
...@@ -74,7 +81,7 @@ private static void AssertProfiledCommandValues(IProfiledCommand command, Connec ...@@ -74,7 +81,7 @@ private static void AssertProfiledCommandValues(IProfiledCommand command, Connec
Assert.True(command.CreationToEnqueued > TimeSpan.Zero, nameof(command.CreationToEnqueued)); Assert.True(command.CreationToEnqueued > TimeSpan.Zero, nameof(command.CreationToEnqueued));
Assert.True(command.EnqueuedToSending > TimeSpan.Zero, nameof(command.EnqueuedToSending)); Assert.True(command.EnqueuedToSending > TimeSpan.Zero, nameof(command.EnqueuedToSending));
Assert.True(command.SentToResponse > TimeSpan.Zero, nameof(command.SentToResponse)); Assert.True(command.SentToResponse > TimeSpan.Zero, nameof(command.SentToResponse));
Assert.True(command.ResponseToCompletion > TimeSpan.Zero, nameof(command.ResponseToCompletion)); Assert.True(command.ResponseToCompletion >= TimeSpan.Zero, nameof(command.ResponseToCompletion));
Assert.True(command.ElapsedTime > TimeSpan.Zero, nameof(command.ElapsedTime)); Assert.True(command.ElapsedTime > TimeSpan.Zero, nameof(command.ElapsedTime));
Assert.True(command.ElapsedTime > command.CreationToEnqueued && command.ElapsedTime > command.EnqueuedToSending && command.ElapsedTime > command.SentToResponse, "Comparisons"); Assert.True(command.ElapsedTime > command.CreationToEnqueued && command.ElapsedTime > command.EnqueuedToSending && command.ElapsedTime > command.SentToResponse, "Comparisons");
Assert.True(command.RetransmissionOf == null, nameof(command.RetransmissionOf)); Assert.True(command.RetransmissionOf == null, nameof(command.RetransmissionOf));
...@@ -86,11 +93,10 @@ public void ManyThreads() ...@@ -86,11 +93,10 @@ public void ManyThreads()
{ {
using (var conn = Create()) using (var conn = Create())
{ {
var profiler = new TestProfiler(); var session = new ProfilingSession();
var prefix = Me(); var prefix = Me();
conn.RegisterProfiler(profiler); conn.RegisterProfiler(() => session);
conn.BeginProfiling(profiler.MyContext);
var threads = new List<Thread>(); var threads = new List<Thread>();
const int CountPer = 100; const int CountPer = 100;
...@@ -115,7 +121,7 @@ public void ManyThreads() ...@@ -115,7 +121,7 @@ public void ManyThreads()
threads.ForEach(thread => thread.Start()); threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join()); threads.ForEach(thread => thread.Join());
var allVals = conn.FinishProfiling(profiler.MyContext); var allVals = session.GetCommands();
var relevant = allVals.Where(cmd => cmd.Db > 0).ToList(); var relevant = allVals.Where(cmd => cmd.Db > 0).ToList();
var kinds = relevant.Select(cmd => cmd.Command).Distinct().ToList(); var kinds = relevant.Select(cmd => cmd.Command).Distinct().ToList();
...@@ -141,37 +147,16 @@ public void ManyThreads() ...@@ -141,37 +147,16 @@ public void ManyThreads()
} }
} }
private class TestProfiler2 : IProfiler
{
private readonly ConcurrentDictionary<int, object> Contexts = new ConcurrentDictionary<int, object>();
public void RegisterContext(object context)
{
Contexts[Thread.CurrentThread.ManagedThreadId] = context;
}
public object GetContext()
{
if (!Contexts.TryGetValue(Thread.CurrentThread.ManagedThreadId, out object ret)) ret = null;
return ret;
}
}
[FactLongRunning] [FactLongRunning]
public void ManyContexts() public void ManyContexts()
{ {
using (var conn = Create()) using (var conn = Create())
{ {
var profiler = new TestProfiler2(); var profiler = new PerThreadProfiler();
var prefix = Me(); var prefix = Me();
conn.RegisterProfiler(profiler); conn.RegisterProfiler(profiler.GetSession);
var perThreadContexts = new List<object>();
for (var i = 0; i < 16; i++)
{
perThreadContexts.Add(new object());
}
var threads = new List<Thread>(); var threads = new List<Thread>();
var results = new IEnumerable<IProfiledCommand>[16]; var results = new IEnumerable<IProfiledCommand>[16];
...@@ -181,10 +166,6 @@ public void ManyContexts() ...@@ -181,10 +166,6 @@ public void ManyContexts()
var ix = i; var ix = i;
var thread = new Thread(() => var thread = new Thread(() =>
{ {
var ctx = perThreadContexts[ix];
profiler.RegisterContext(ctx);
conn.BeginProfiling(ctx);
var db = conn.GetDatabase(ix); var db = conn.GetDatabase(ix);
var allTasks = new List<Task>(); var allTasks = new List<Task>();
...@@ -197,7 +178,7 @@ public void ManyContexts() ...@@ -197,7 +178,7 @@ public void ManyContexts()
Task.WaitAll(allTasks.ToArray()); Task.WaitAll(allTasks.ToArray());
results[ix] = conn.FinishProfiling(ctx); results[ix] = profiler.GetSession().GetCommands();
}); });
threads.Add(thread); threads.Add(thread);
...@@ -221,197 +202,24 @@ public void ManyContexts() ...@@ -221,197 +202,24 @@ public void ManyContexts()
} }
} }
private class TestProfiler3 : IProfiler private class PerThreadProfiler
{ {
private readonly ConcurrentDictionary<int, object> Contexts = new ConcurrentDictionary<int, object>(); ThreadLocal<ProfilingSession> perThreadSession = new ThreadLocal<ProfilingSession>(() => new ProfilingSession());
public void RegisterContext(object context)
{
Contexts[Thread.CurrentThread.ManagedThreadId] = context;
}
public object AnyContext() => Contexts.First().Value; public ProfilingSession GetSession() => perThreadSession.Value;
public void Reset() => Contexts.Clear();
public object GetContext()
{
if (!Contexts.TryGetValue(Thread.CurrentThread.ManagedThreadId, out object ret)) ret = null;
return ret;
}
} }
private class AsyncLocalProfiler
// This is a separate method for target=DEBUG purposes.
// In release builds, the runtime is smart enough to figure out
// that the contexts are unreachable and should be collected but in
// debug builds... well, it's not very smart.
private object LeaksCollectedAndRePooled_Initialize(ConnectionMultiplexer conn, int threadCount)
{ {
var profiler = new TestProfiler3(); AsyncLocal<ProfilingSession> perThreadSession = new AsyncLocal<ProfilingSession>();
conn.RegisterProfiler(profiler);
var perThreadContexts = new List<object>();
for (var i = 0; i < threadCount; i++)
{
perThreadContexts.Add(new object());
}
var threads = new List<Thread>();
var results = new IEnumerable<IProfiledCommand>[threadCount];
for (var i = 0; i < threadCount; i++) public ProfilingSession GetSession()
{ {
var ix = i; var val = perThreadSession.Value;
var thread = new Thread(() => if(val == null)
{ {
var ctx = perThreadContexts[ix]; perThreadSession.Value = val = new ProfilingSession();
profiler.RegisterContext(ctx);
conn.BeginProfiling(ctx);
var db = conn.GetDatabase(ix);
var allTasks = new List<Task>();
for (var j = 0; j < 1000; j++)
{
allTasks.Add(db.StringGetAsync("hello" + ix));
allTasks.Add(db.StringSetAsync("hello" + ix, "world" + ix));
}
Task.WaitAll(allTasks.ToArray());
// intentionally leaking!
});
threads.Add(thread);
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
var anyContext = profiler.AnyContext();
profiler.Reset();
return anyContext;
}
[FactLongRunning]
public async Task LeaksCollectedAndRePooled()
{
const int ThreadCount = 16;
using (var conn = Create())
{
var anyContext = LeaksCollectedAndRePooled_Initialize(conn, ThreadCount);
// force collection of everything but `anyContext`
GC.Collect(3, GCCollectionMode.Forced);
GC.WaitForPendingFinalizers();
await Task.Delay(TimeSpan.FromMinutes(1.01)).ForAwait();
conn.FinishProfiling(anyContext);
// make sure we haven't left anything in the active contexts dictionary
Assert.Equal(0, conn.profiledCommands.ContextCount);
Assert.Equal(ThreadCount, ConcurrentProfileStorageCollection.AllocationCount);
Assert.Equal(ThreadCount, ConcurrentProfileStorageCollection.CountInPool());
}
}
[FactLongRunning]
public void ReuseStorage()
{
const int ThreadCount = 16;
// have to reset so other tests don't clober
ConcurrentProfileStorageCollection.AllocationCount = 0;
using (var conn = Create())
{
var profiler = new TestProfiler2();
var prefix = Me();
conn.RegisterProfiler(profiler);
var perThreadContexts = new List<object>();
for (var i = 0; i < 16; i++)
{
perThreadContexts.Add(new object());
}
var threads = new List<Thread>();
var results = new List<IEnumerable<IProfiledCommand>>[16];
for (var i = 0; i < 16; i++)
{
results[i] = new List<IEnumerable<IProfiledCommand>>();
}
for (var i = 0; i < ThreadCount; i++)
{
var ix = i;
var thread = new Thread(() =>
{
for (var k = 0; k < 10; k++)
{
var ctx = perThreadContexts[ix];
profiler.RegisterContext(ctx);
conn.BeginProfiling(ctx);
var db = conn.GetDatabase(ix);
var allTasks = new List<Task>();
for (var j = 0; j < 1000; j++)
{
allTasks.Add(db.StringGetAsync(prefix + ix));
allTasks.Add(db.StringSetAsync(prefix + ix, "world" + ix));
}
Task.WaitAll(allTasks.ToArray());
results[ix].Add(conn.FinishProfiling(ctx));
}
});
threads.Add(thread);
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
// only 16 allocations can ever be in flight at once
var allocCount = ConcurrentProfileStorageCollection.AllocationCount;
Assert.True(allocCount <= ThreadCount, allocCount.ToString());
// correctness check for all allocations
for (var i = 0; i < results.Length; i++)
{
var resList = results[i];
foreach (var res in resList)
{
Assert.NotNull(res);
var numGets = res.Count(r => r.Command == "GET");
var numSets = res.Count(r => r.Command == "SET");
Assert.Equal(1000, numGets);
Assert.Equal(1000, numSets);
Assert.True(res.All(cmd => cmd.Db == i));
}
}
// no crossed streams
var everything = results.SelectMany(r => r).ToList();
for (var i = 0; i < everything.Count; i++)
{
for (var j = 0; j < everything.Count; j++)
{
if (i == j) continue;
if (object.ReferenceEquals(everything[i], everything[j]))
{
Assert.True(false, "Profilings were jumbled");
}
}
} }
return val;
} }
} }
...@@ -422,10 +230,8 @@ public void LowAllocationEnumerable() ...@@ -422,10 +230,8 @@ public void LowAllocationEnumerable()
using (var conn = Create()) using (var conn = Create())
{ {
var profiler = new TestProfiler(); var session = new ProfilingSession();
conn.RegisterProfiler(profiler); conn.RegisterProfiler(() => session);
conn.BeginProfiling(profiler.MyContext);
var prefix = Me(); var prefix = Me();
var db = conn.GetDatabase(1); var db = conn.GetDatabase(1);
...@@ -446,7 +252,7 @@ public void LowAllocationEnumerable() ...@@ -446,7 +252,7 @@ public void LowAllocationEnumerable()
conn.WaitAll(allTasks.ToArray()); conn.WaitAll(allTasks.ToArray());
var res = conn.FinishProfiling(profiler.MyContext); var res = session.GetCommands();
Assert.True(res.GetType().IsValueType); Assert.True(res.GetType().IsValueType);
using (var e = res.GetEnumerator()) using (var e = res.GetEnumerator())
...@@ -469,16 +275,6 @@ public void LowAllocationEnumerable() ...@@ -469,16 +275,6 @@ public void LowAllocationEnumerable()
} }
} }
private class ToyProfiler : IProfiler
{
public ConcurrentDictionary<Thread, object> Contexts = new ConcurrentDictionary<Thread, object>();
public object GetContext()
{
if (!Contexts.TryGetValue(Thread.CurrentThread, out object ctx)) ctx = null;
return ctx;
}
}
[FactLongRunning] [FactLongRunning]
public void ProfilingMD_Ex1() public void ProfilingMD_Ex1()
...@@ -486,11 +282,10 @@ public void ProfilingMD_Ex1() ...@@ -486,11 +282,10 @@ public void ProfilingMD_Ex1()
using (var c = Create()) using (var c = Create())
{ {
ConnectionMultiplexer conn = c; ConnectionMultiplexer conn = c;
var profiler = new ToyProfiler(); var session = new ProfilingSession();
var prefix = Me(); var prefix = Me();
var thisGroupContext = new object();
conn.RegisterProfiler(profiler); conn.RegisterProfiler(() => session);
var threads = new List<Thread>(); var threads = new List<Thread>();
...@@ -511,17 +306,13 @@ public void ProfilingMD_Ex1() ...@@ -511,17 +306,13 @@ public void ProfilingMD_Ex1()
Task.WaitAll(threadTasks.ToArray()); Task.WaitAll(threadTasks.ToArray());
}); });
profiler.Contexts[thread] = thisGroupContext;
threads.Add(thread); threads.Add(thread);
} }
conn.BeginProfiling(thisGroupContext);
threads.ForEach(thread => thread.Start()); threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join()); threads.ForEach(thread => thread.Join());
IEnumerable<IProfiledCommand> timings = conn.FinishProfiling(thisGroupContext); IEnumerable<IProfiledCommand> timings = session.GetCommands();
Assert.Equal(16000, timings.Count()); Assert.Equal(16000, timings.Count());
} }
...@@ -533,10 +324,10 @@ public void ProfilingMD_Ex2() ...@@ -533,10 +324,10 @@ public void ProfilingMD_Ex2()
using (var c = Create()) using (var c = Create())
{ {
ConnectionMultiplexer conn = c; ConnectionMultiplexer conn = c;
var profiler = new ToyProfiler(); var profiler = new PerThreadProfiler();
var prefix = Me(); var prefix = Me();
conn.RegisterProfiler(profiler); conn.RegisterProfiler(profiler.GetSession);
var threads = new List<Thread>(); var threads = new List<Thread>();
...@@ -549,9 +340,7 @@ public void ProfilingMD_Ex2() ...@@ -549,9 +340,7 @@ public void ProfilingMD_Ex2()
var thread = new Thread(() => var thread = new Thread(() =>
{ {
var threadTasks = new List<Task>(); var threadTasks = new List<Task>();
conn.BeginProfiling(Thread.CurrentThread);
for (var j = 0; j < 1000; j++) for (var j = 0; j < 1000; j++)
{ {
var task = db.StringSetAsync(prefix + j, "" + j); var task = db.StringSetAsync(prefix + j, "" + j);
...@@ -560,11 +349,9 @@ public void ProfilingMD_Ex2() ...@@ -560,11 +349,9 @@ public void ProfilingMD_Ex2()
Task.WaitAll(threadTasks.ToArray()); Task.WaitAll(threadTasks.ToArray());
perThreadTimings[Thread.CurrentThread] = conn.FinishProfiling(Thread.CurrentThread).ToList(); perThreadTimings[Thread.CurrentThread] = profiler.GetSession().GetCommands().ToList();
}); });
profiler.Contexts[thread] = thread;
threads.Add(thread); threads.Add(thread);
} }
...@@ -575,5 +362,51 @@ public void ProfilingMD_Ex2() ...@@ -575,5 +362,51 @@ public void ProfilingMD_Ex2()
Assert.True(perThreadTimings.All(kv => kv.Value.Count == 1000)); Assert.True(perThreadTimings.All(kv => kv.Value.Count == 1000));
} }
} }
[FactLongRunning]
public async Task ProfilingMD_Ex2_Async()
{
using (var c = Create())
{
ConnectionMultiplexer conn = c;
var profiler = new AsyncLocalProfiler();
var prefix = Me();
conn.RegisterProfiler(profiler.GetSession);
var tasks = new List<Task>();
var perThreadTimings = new ConcurrentBag<List<IProfiledCommand>>();
for (var i = 0; i < 16; i++)
{
var db = conn.GetDatabase(i);
var task = Task.Run(async () =>
{
for (var j = 0; j < 100; j++)
{
await db.StringSetAsync(prefix + j, "" + j);
}
perThreadTimings.Add(profiler.GetSession().GetCommands().ToList());
});
tasks.Add(task);
}
var timeout = Task.Delay(10000);
var complete = Task.WhenAll(tasks);
if (timeout == await Task.WhenAny(timeout, complete))
{
throw new TimeoutException();
}
Assert.Equal(16, perThreadTimings.Count);
foreach(var item in perThreadTimings)
{
Assert.Equal(100, item.Count);
}
}
}
} }
} }
using System; using System;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
public partial class ConnectionMultiplexer public partial class ConnectionMultiplexer
{ {
private IProfiler profiler; Func<ProfilingSession> _profilingSessionProvider;
// internal for test purposes
internal ProfileContextTracker profiledCommands;
/// <summary>
/// <para>Sets an IProfiler instance for this ConnectionMultiplexer.</para>
/// <para>
/// An IProfiler instances is used to determine which context to associate an
/// IProfiledCommand with. See BeginProfiling(object) and FinishProfiling(object)
/// for more details.
/// </para>
/// </summary>
/// <param name="profiler">The profiler to register.</param>
public void RegisterProfiler(IProfiler profiler)
{
if (this.profiler != null) throw new InvalidOperationException("IProfiler already registered for this ConnectionMultiplexer");
this.profiler = profiler ?? throw new ArgumentNullException(nameof(profiler));
profiledCommands = new ProfileContextTracker();
}
/// <summary>
/// <para>Begins profiling for the given context.</para>
/// <para>
/// If the same context object is returned by the registered IProfiler, the IProfiledCommands
/// will be associated with each other.
/// </para>
/// <para>Call FinishProfiling with the same context to get the assocated commands.</para>
/// <para>Note that forContext cannot be a WeakReference or a WeakReference&lt;T</para>&gt;
/// </summary>
/// <param name="forContext">The context to begin profiling.</param>
public void BeginProfiling(object forContext)
{
if (profiler == null) throw new InvalidOperationException("Cannot begin profiling if no IProfiler has been registered with RegisterProfiler");
if (forContext == null) throw new ArgumentNullException(nameof(forContext));
if (forContext is WeakReference) throw new ArgumentException("Context object cannot be a WeakReference", nameof(forContext));
if (!profiledCommands.TryCreate(forContext))
{
throw ExceptionFactory.BeganProfilingWithDuplicateContext(forContext);
}
}
/// <summary> /// <summary>
/// <para>Stops profiling for the given context, returns all IProfiledCommands associated.</para> /// Register a callback to provide an on-demand ambient session provider based on the
/// <para>By default this may do a sweep for dead profiling contexts, you can disable this by passing "allowCleanupSweep: false".</para> /// calling context; the implementing code is responsible for reliably resolving the same provider
/// based on ambient context, or returning null to not profile
/// </summary> /// </summary>
/// <param name="forContext">The context to begin profiling.</param> public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider) => _profilingSessionProvider = profilingSessionProvider;
/// <param name="allowCleanupSweep">Whether to allow cleanup of old profiling sessions.</param>
public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
{
if (profiler == null) throw new InvalidOperationException("Cannot begin profiling if no IProfiler has been registered with RegisterProfiler");
if (forContext == null) throw new ArgumentNullException(nameof(forContext));
if (!profiledCommands.TryRemove(forContext, out ProfiledCommandEnumerable ret))
{
throw ExceptionFactory.FinishedProfilingWithInvalidContext(forContext);
}
// conditional, because it could hurt and that may sometimes be unacceptable
if (allowCleanupSweep)
{
profiledCommands.TryCleanup();
}
return ret;
}
} }
} }
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
using System.Reflection; using System.Reflection;
using System.IO.Compression; using System.IO.Compression;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
...@@ -1868,10 +1869,10 @@ private WriteResult TryPushMessageToBridge<T>(Message message, ResultProcessor<T ...@@ -1868,10 +1869,10 @@ private WriteResult TryPushMessageToBridge<T>(Message message, ResultProcessor<T
if (server != null) if (server != null)
{ {
var profCtx = profiler?.GetContext(); var profilingSession = _profilingSessionProvider?.Invoke();
if (profCtx != null && profiledCommands.TryGetValue(profCtx, out ConcurrentProfileStorageCollection inFlightForCtx)) if (profilingSession != null)
{ {
message.SetProfileStorage(ProfileStorage.NewWithContext(inFlightForCtx, server)); message.SetProfileStorage(ProfiledCommand.NewWithContext(profilingSession, server));
} }
if (message.Db >= 0) if (message.Db >= 0)
...@@ -1952,6 +1953,7 @@ public bool IsConnecting ...@@ -1952,6 +1953,7 @@ public bool IsConnecting
public void Close(bool allowCommandsToComplete = true) public void Close(bool allowCommandsToComplete = true)
{ {
isDisposed = true; isDisposed = true;
_profilingSessionProvider = null;
using (var tmp = pulse) using (var tmp = pulse)
{ {
pulse = null; pulse = null;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
using System.IO; using System.IO;
using System.Net; using System.Net;
using System.Threading.Tasks; using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
...@@ -69,36 +70,11 @@ public interface IConnectionMultiplexer ...@@ -69,36 +70,11 @@ public interface IConnectionMultiplexer
int StormLogThreshold { get; set; } int StormLogThreshold { get; set; }
/// <summary> /// <summary>
/// Sets an IProfiler instance for this ConnectionMultiplexer. /// Register a callback to provide an on-demand ambient session provider based on the
/// /// calling context; the implementing code is responsible for reliably resolving the same provider
/// An IProfiler instances is used to determine which context to associate an /// based on ambient context, or returning null to not profile
/// IProfiledCommand with. See BeginProfiling(object) and FinishProfiling(object)
/// for more details.
/// </summary> /// </summary>
/// <param name="profiler">The profiler to register.</param> void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider);
void RegisterProfiler(IProfiler profiler);
/// <summary>
/// Begins profiling for the given context.
///
/// If the same context object is returned by the registered IProfiler, the IProfiledCommands
/// will be associated with each other.
///
/// Call FinishProfiling with the same context to get the assocated commands.
///
/// Note that forContext cannot be a WeakReference or a WeakReference&lt;T&gt;
/// </summary>
/// <param name="forContext">The context to begin profiling for.</param>
void BeginProfiling(object forContext);
/// <summary>
/// Stops profiling for the given context, returns all IProfiledCommands associated.
///
/// By default this may do a sweep for dead profiling contexts, you can disable this by passing "allowCleanupSweep: false".
/// </summary>
/// <param name="forContext">The context to finish profiling for.</param>
/// <param name="allowCleanupSweep">Whether to allow a cleanup sweep of dead profiling contexts.</param>
ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true);
/// <summary> /// <summary>
/// Get summary statistics associates with this server /// Get summary statistics associates with this server
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
...@@ -81,7 +82,7 @@ internal abstract class Message : ICompletable ...@@ -81,7 +82,7 @@ internal abstract class Message : ICompletable
private ResultProcessor resultProcessor; private ResultProcessor resultProcessor;
// All for profiling purposes // All for profiling purposes
private ProfileStorage performance; private ProfiledCommand performance;
internal DateTime createdDateTime; internal DateTime createdDateTime;
internal long createdTimestamp; internal long createdTimestamp;
...@@ -135,7 +136,7 @@ internal void SetMasterOnly() ...@@ -135,7 +136,7 @@ internal void SetMasterOnly()
} }
} }
internal void SetProfileStorage(ProfileStorage storage) internal void SetProfileStorage(ProfiledCommand storage)
{ {
performance = storage; performance = storage;
performance.SetMessage(this); performance.SetMessage(this);
...@@ -152,7 +153,7 @@ internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved) ...@@ -152,7 +153,7 @@ internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved)
createdDateTime = DateTime.UtcNow; createdDateTime = DateTime.UtcNow;
createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp(); createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
performance = ProfileStorage.NewAttachedToSameContext(oldPerformance, resendTo, isMoved); performance = ProfiledCommand.NewAttachedToSameContext(oldPerformance, resendTo, isMoved);
performance.SetMessage(this); performance.SetMessage(this);
Status = CommandStatus.WaitingToBeSent; Status = CommandStatus.WaitingToBeSent;
} }
...@@ -437,33 +438,32 @@ public override string ToString() ...@@ -437,33 +438,32 @@ public override string ToString()
return $"[{Db}]:{CommandAndKey} ({resultProcessor?.GetType().Name ?? "(n/a)"})"; return $"[{Db}]:{CommandAndKey} ({resultProcessor?.GetType().Name ?? "(n/a)"})";
} }
public void SetResponseReceived() public void SetResponseReceived() => performance?.SetResponseReceived();
{
performance?.SetResponseReceived();
}
public bool TryComplete(bool isAsync) public bool TryComplete(bool isAsync)
{ {
//Ensure we can never call TryComplete on the same resultBox from two threads by grabbing it now //Ensure we can never call TryComplete on the same resultBox from two threads by grabbing it now
var currBox = Interlocked.Exchange(ref resultBox, null); var currBox = Interlocked.Exchange(ref resultBox, null);
if (!isAsync)
{ // set the performance completion the first chance we get (sync comes first)
performance?.SetCompleted();
}
if (currBox != null) if (currBox != null)
{ {
var ret = currBox.TryComplete(isAsync); var ret = currBox.TryComplete(isAsync);
//in async mode TryComplete will have unwrapped and recycled resultBox //in async mode TryComplete will have unwrapped and recycled resultBox
if (!(ret && isAsync)) if (!(ret || isAsync))
{ {
//put result box back if it was not already recycled //put result box back if it was not already recycled
Interlocked.Exchange(ref resultBox, currBox); Interlocked.Exchange(ref resultBox, currBox);
} }
performance?.SetCompleted();
return ret; return ret;
} }
else else
{ {
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message"); ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
performance?.SetCompleted();
return true; return true;
} }
} }
...@@ -612,10 +612,7 @@ internal void SetException(Exception exception) ...@@ -612,10 +612,7 @@ internal void SetException(Exception exception)
resultBox?.SetException(exception); resultBox?.SetException(exception);
} }
internal void SetEnqueued() internal void SetEnqueued()=> performance?.SetEnqueued();
{
performance?.SetEnqueued();
}
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
namespace StackExchange.Redis
{
/// <summary>
/// Big ol' wrapper around most of the profiling storage logic, 'cause it got too big to just live in ConnectionMultiplexer.
/// </summary>
internal sealed class ProfileContextTracker
{
/// <summary>
/// <para>Necessary, because WeakReference can't be readily comparable (since the reference is... weak).</para>
/// <para>This lets us detect leaks* with some reasonable confidence, and cleanup periodically.</para>
/// <para>
/// Some calisthenics are done to avoid allocating WeakReferences for no reason, as often
/// we're just looking up ProfileStorage.
/// </para>
/// <para>* Somebody starts profiling, but for whatever reason never *stops* with a context object</para>
/// </summary>
private readonly struct ProfileContextCell : IEquatable<ProfileContextCell>
{
// This is a union of (object|WeakReference); if it's a WeakReference
// then we're actually interested in it's Target, otherwise
// we're concerned about the actual value of Reference
private readonly object Reference;
// It is absolutely crucial that this value **never change** once instantiated
private readonly int HashCode;
public bool IsContextLeaked => !TryGetTarget(out _);
private ProfileContextCell(object forObj, bool isEphemeral)
{
HashCode = forObj.GetHashCode();
if (isEphemeral)
{
Reference = forObj;
}
else
{
Reference = new WeakReference(forObj, trackResurrection: true); // ughhh, have to handle finalizers
}
}
/// <summary>
/// <para>Suitable for use as a key into something.</para>
/// <para>
/// This instance **WILL NOT** keep forObj alive, so it can
/// be copied out of the calling method's scope.
/// </para>
/// </summary>
/// <param name="forObj">The object to get a context for.</param>
public static ProfileContextCell ToStoreUnder(object forObj) => new ProfileContextCell(forObj, isEphemeral: false);
/// <summary>
/// <para>Only suitable for looking up.</para>
/// <para>
/// This instance **ABSOLUTELY WILL** keep forObj alive, so this
/// had better not be copied into anything outside the scope of the
/// calling method.
/// </para>
/// </summary>
/// <param name="forObj">The object to lookup a context by.</param>
public static ProfileContextCell ToLookupBy(object forObj) => new ProfileContextCell(forObj, isEphemeral: true);
private bool TryGetTarget(out object target)
{
var asWeakRef = Reference as WeakReference;
if (asWeakRef == null)
{
target = Reference;
return true;
}
// Do not use IsAlive here, it's race city
target = asWeakRef.Target;
return target != null;
}
public override bool Equals(object obj)
{
if (!(obj is ProfileContextCell)) return false;
return Equals((ProfileContextCell)obj);
}
public override int GetHashCode() => HashCode;
public bool Equals(ProfileContextCell other)
{
if (other.TryGetTarget(out object otherObj) != TryGetTarget(out object thisObj)) return false;
// dead references are equal
if (thisObj == null) return true;
return thisObj.Equals(otherObj);
}
}
// provided so default behavior doesn't do any boxing, for sure
private sealed class ProfileContextCellComparer : IEqualityComparer<ProfileContextCell>
{
public static readonly ProfileContextCellComparer Singleton = new ProfileContextCellComparer();
private ProfileContextCellComparer() { }
public bool Equals(ProfileContextCell x, ProfileContextCell y)
{
return x.Equals(y);
}
public int GetHashCode(ProfileContextCell obj)
{
return obj.GetHashCode();
}
}
private long lastCleanupSweep;
private readonly ConcurrentDictionary<ProfileContextCell, ConcurrentProfileStorageCollection> profiledCommands;
public int ContextCount => profiledCommands.Count;
public ProfileContextTracker()
{
profiledCommands = new ConcurrentDictionary<ProfileContextCell, ConcurrentProfileStorageCollection>(ProfileContextCellComparer.Singleton);
lastCleanupSweep = DateTime.UtcNow.Ticks;
}
/// <summary>
/// <para>Registers the passed context with a collection that can be retried with subsequent calls to TryGetValue.</para>
/// <para>Returns false if the passed context object is already registered.</para>
/// </summary>
/// <param name="ctx">The context to use.</param>
public bool TryCreate(object ctx)
{
var cell = ProfileContextCell.ToStoreUnder(ctx);
// we can't pass this as a delegate, because TryAdd may invoke the factory multiple times,
// which would lead to over allocation.
var storage = ConcurrentProfileStorageCollection.GetOrCreate();
return profiledCommands.TryAdd(cell, storage);
}
/// <summary>
/// <para>
/// Returns true and sets val to the tracking collection associated with the given context if the context
/// was registered with TryCreate.
/// </para>
/// <para>Otherwise returns false and sets val to null.</para>
/// </summary>
/// <param name="ctx">The context to get a value for.</param>
/// <param name="val">The collection (if present) for <paramref name="ctx"/>.</param>
public bool TryGetValue(object ctx, out ConcurrentProfileStorageCollection val)
{
var cell = ProfileContextCell.ToLookupBy(ctx);
return profiledCommands.TryGetValue(cell, out val);
}
/// <summary>
/// <para>
/// Removes a context, setting all commands to a (non-thread safe) enumerable of
/// all the commands attached to that context.
/// </para>
/// <para>If the context was never registered, will return false and set commands to null.</para>
/// <para>
/// Subsequent calls to TryRemove with the same context will return false unless it is
/// re-registered with TryCreate.
/// </para>
/// </summary>
/// <param name="ctx">The context to remove for.</param>
/// <param name="commands">The commands to remove.</param>
public bool TryRemove(object ctx, out ProfiledCommandEnumerable commands)
{
var cell = ProfileContextCell.ToLookupBy(ctx);
if (!profiledCommands.TryRemove(cell, out ConcurrentProfileStorageCollection storage))
{
commands = default(ProfiledCommandEnumerable);
return false;
}
commands = storage.EnumerateAndReturnForReuse();
return true;
}
/// <summary>
/// If enough time has passed (1 minute) since the last call, this does walk of all contexts
/// and removes those that the GC has collected.
/// </summary>
public bool TryCleanup()
{
const long SweepEveryTicks = 600000000; // once a minute, tops
var now = DateTime.UtcNow.Ticks; // resolution on this isn't great, but it's good enough
var last = lastCleanupSweep;
var since = now - last;
if (since < SweepEveryTicks) return false;
// this is just to keep other threads from wasting time, in theory
// it'd be perfectly safe for this to run concurrently
var saw = Interlocked.CompareExchange(ref lastCleanupSweep, now, last);
if (saw != last) return false;
if (profiledCommands.Count == 0) return false;
using (var e = profiledCommands.GetEnumerator())
{
while (e.MoveNext())
{
var pair = e.Current;
if (pair.Key.IsContextLeaked && profiledCommands.TryRemove(pair.Key, out ConcurrentProfileStorageCollection abandoned))
{
// shove it back in the pool, but don't bother enumerating
abandoned.ReturnForReuse();
}
}
}
return true;
}
}
}
using System; using System;
using System.Net; using System.Net;
namespace StackExchange.Redis namespace StackExchange.Redis.Profiling
{ {
/// <summary> /// <summary>
/// <para>A profiled command against a redis instance.</para> /// <para>A profiled command against a redis instance.</para>
...@@ -89,21 +89,4 @@ public interface IProfiledCommand ...@@ -89,21 +89,4 @@ public interface IProfiledCommand
/// </summary> /// </summary>
RetransmissionReasonType? RetransmissionReason { get; } RetransmissionReasonType? RetransmissionReason { get; }
} }
/// <summary>
/// Interface for profiling individual commands against an Redis ConnectionMulitplexer.
/// </summary>
public interface IProfiler
{
/// <summary>
/// Called to provide a context object.
///
/// This method is called before the method which triggers work against redis (such as StringSet(Async)) returns,
/// and will always be called on the same thread as that method.
///
/// Note that GetContext() may be called even if ConnectionMultiplexer.BeginProfiling() has not been called.
/// You may return `null` to prevent any tracking of commands.
/// </summary>
object GetContext();
}
} }
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Net; using System.Net;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
namespace StackExchange.Redis namespace StackExchange.Redis.Profiling
{ {
internal class ProfileStorage : IProfiledCommand internal sealed class ProfiledCommand : IProfiledCommand
{ {
#region IProfiledCommand Impl #region IProfiledCommand Impl
public EndPoint EndPoint => Server.EndPoint; public EndPoint EndPoint => Server.EndPoint;
public int Db => Message.Db; public int Db => Message.Db;
public string Command => Message.Command.ToString(); public string Command => Message is RedisDatabase.ExecuteMessage em ? em.Command : Message.Command.ToString();
public CommandFlags Flags => Message.Flags; public CommandFlags Flags => Message.Flags;
...@@ -34,11 +35,11 @@ internal class ProfileStorage : IProfiledCommand ...@@ -34,11 +35,11 @@ internal class ProfileStorage : IProfiledCommand
#endregion #endregion
public ProfileStorage NextElement { get; set; } public ProfiledCommand NextElement { get; set; }
private Message Message; private Message Message;
private readonly ServerEndPoint Server; private readonly ServerEndPoint Server;
private readonly ProfileStorage OriginalProfiling; private readonly ProfiledCommand OriginalProfiling;
private DateTime MessageCreatedDateTime; private DateTime MessageCreatedDateTime;
private long MessageCreatedTimeStamp; private long MessageCreatedTimeStamp;
...@@ -47,9 +48,9 @@ internal class ProfileStorage : IProfiledCommand ...@@ -47,9 +48,9 @@ internal class ProfileStorage : IProfiledCommand
private long ResponseReceivedTimeStamp; private long ResponseReceivedTimeStamp;
private long CompletedTimeStamp; private long CompletedTimeStamp;
private readonly ConcurrentProfileStorageCollection PushToWhenFinished; private readonly ProfilingSession PushToWhenFinished;
private ProfileStorage(ConcurrentProfileStorageCollection pushTo, ServerEndPoint server, ProfileStorage resentFor, RetransmissionReasonType? reason) private ProfiledCommand(ProfilingSession pushTo, ServerEndPoint server, ProfiledCommand resentFor, RetransmissionReasonType? reason)
{ {
PushToWhenFinished = pushTo; PushToWhenFinished = pushTo;
OriginalProfiling = resentFor; OriginalProfiling = resentFor;
...@@ -57,14 +58,14 @@ private ProfileStorage(ConcurrentProfileStorageCollection pushTo, ServerEndPoint ...@@ -57,14 +58,14 @@ private ProfileStorage(ConcurrentProfileStorageCollection pushTo, ServerEndPoint
RetransmissionReason = reason; RetransmissionReason = reason;
} }
public static ProfileStorage NewWithContext(ConcurrentProfileStorageCollection pushTo, ServerEndPoint server) public static ProfiledCommand NewWithContext(ProfilingSession pushTo, ServerEndPoint server)
{ {
return new ProfileStorage(pushTo, server, null, null); return new ProfiledCommand(pushTo, server, null, null);
} }
public static ProfileStorage NewAttachedToSameContext(ProfileStorage resentFor, ServerEndPoint server, bool isMoved) public static ProfiledCommand NewAttachedToSameContext(ProfiledCommand resentFor, ServerEndPoint server, bool isMoved)
{ {
return new ProfileStorage(resentFor.PushToWhenFinished, server, resentFor, isMoved ? RetransmissionReasonType.Moved : RetransmissionReasonType.Ask); return new ProfiledCommand(resentFor.PushToWhenFinished, server, resentFor, isMoved ? RetransmissionReasonType.Moved : RetransmissionReasonType.Ask);
} }
public void SetMessage(Message msg) public void SetMessage(Message msg)
...@@ -77,27 +78,17 @@ public void SetMessage(Message msg) ...@@ -77,27 +78,17 @@ public void SetMessage(Message msg)
MessageCreatedTimeStamp = msg.createdTimestamp; MessageCreatedTimeStamp = msg.createdTimestamp;
} }
public void SetEnqueued() public void SetEnqueued() => SetTimestamp(ref EnqueuedTimeStamp);
{
// This method should never be called twice
if (EnqueuedTimeStamp > 0) throw new InvalidOperationException($"{nameof(SetEnqueued)} called more than once");
EnqueuedTimeStamp = Stopwatch.GetTimestamp(); public void SetRequestSent() => SetTimestamp(ref RequestSentTimeStamp);
}
public void SetRequestSent()
{
// This method should never be called twice
if (RequestSentTimeStamp > 0) throw new InvalidOperationException($"{nameof(SetRequestSent)} called more than once");
RequestSentTimeStamp = Stopwatch.GetTimestamp(); public void SetResponseReceived() => SetTimestamp(ref ResponseReceivedTimeStamp);
}
public void SetResponseReceived() [MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void SetTimestamp(ref long field)
{ {
if (ResponseReceivedTimeStamp > 0) throw new InvalidOperationException($"{nameof(SetResponseReceived)} called more than once"); var now = Stopwatch.GetTimestamp();
Interlocked.CompareExchange(ref field, now, 0);
ResponseReceivedTimeStamp = Stopwatch.GetTimestamp();
} }
public void SetCompleted() public void SetCompleted()
...@@ -108,11 +99,13 @@ public void SetCompleted() ...@@ -108,11 +99,13 @@ public void SetCompleted()
var now = Stopwatch.GetTimestamp(); var now = Stopwatch.GetTimestamp();
var oldVal = Interlocked.CompareExchange(ref CompletedTimeStamp, now, 0); var oldVal = Interlocked.CompareExchange(ref CompletedTimeStamp, now, 0);
// second call
if (oldVal != 0) return;
// only push on the first call, no dupes! // only push on the first call, no dupes!
PushToWhenFinished.Add(this); if (oldVal == 0)
{
// fake a response if we completed prematurely (timeout, broken connection, etc)
Interlocked.CompareExchange(ref ResponseReceivedTimeStamp, now, 0);
PushToWhenFinished?.Add(this);
}
} }
public override string ToString() public override string ToString()
......
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
namespace StackExchange.Redis namespace StackExchange.Redis.Profiling
{ {
/// <summary> /// <summary>
/// <para>A collection of IProfiledCommands.</para> /// <para>A collection of IProfiledCommands.</para>
...@@ -24,13 +23,13 @@ namespace StackExchange.Redis ...@@ -24,13 +23,13 @@ namespace StackExchange.Redis
/// </summary> /// </summary>
public struct Enumerator : IEnumerator<IProfiledCommand> public struct Enumerator : IEnumerator<IProfiledCommand>
{ {
private ProfileStorage Head; private ProfiledCommand Head;
private ProfileStorage CurrentBacker; private ProfiledCommand CurrentBacker;
private bool IsEmpty => Head == null; private bool IsEmpty => Head == null;
private bool IsUnstartedOrFinished => CurrentBacker == null; private bool IsUnstartedOrFinished => CurrentBacker == null;
internal Enumerator(ProfileStorage head) internal Enumerator(ProfiledCommand head)
{ {
Head = head; Head = head;
CurrentBacker = null; CurrentBacker = null;
...@@ -81,9 +80,9 @@ public void Dispose() ...@@ -81,9 +80,9 @@ public void Dispose()
} }
} }
private readonly ProfileStorage Head; private readonly ProfiledCommand Head;
internal ProfiledCommandEnumerable(ProfileStorage head) internal ProfiledCommandEnumerable(ProfiledCommand head)
{ {
Head = head; Head = head;
} }
...@@ -101,117 +100,4 @@ internal ProfiledCommandEnumerable(ProfileStorage head) ...@@ -101,117 +100,4 @@ internal ProfiledCommandEnumerable(ProfileStorage head)
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
} }
/// <summary>
/// <para>
/// A thread-safe collection tailored to the "always append, with high contention, then enumerate once with no contention"
/// behavior of our profiling.
/// </para>
/// <para>Performs better than ConcurrentBag, which is important since profiling code shouldn't impact timings.</para>
/// </summary>
internal sealed class ConcurrentProfileStorageCollection
{
// internal for test purposes
internal static int AllocationCount = 0;
// It is, by definition, impossible for an element to be in 2 intrusive collections
// and we force Enumeration to release any reference to the collection object
// so we can **always** pool these.
private const int PoolSize = 64;
private static readonly ConcurrentProfileStorageCollection[] Pool = new ConcurrentProfileStorageCollection[PoolSize];
private volatile ProfileStorage Head;
private ConcurrentProfileStorageCollection() { }
// for testing purposes only
internal static int CountInPool()
{
var ret = 0;
for (var i = 0; i < PoolSize; i++)
{
var inPool = Pool[i];
if (inPool != null) ret++;
}
return ret;
}
/// <summary>
/// <para>This method is thread-safe.</para>
/// <para>Adds an element to the bag.</para>
/// <para>Order is not preserved.</para>
/// <para>The element can only be a member of *one* bag.</para>
/// </summary>
/// <param name="command">The command to add.</param>
public void Add(ProfileStorage command)
{
while (true)
{
var cur = Head;
command.NextElement = cur;
// Interlocked references to volatile fields are perfectly cromulent
#pragma warning disable 420
var got = Interlocked.CompareExchange(ref Head, command, cur);
#pragma warning restore 420
if (object.ReferenceEquals(got, cur)) break;
}
}
/// <summary>
/// <para>
/// This method returns an enumerable view of the bag, and returns it to
/// an internal pool for reuse by GetOrCreate().
/// </para>
/// <para>It is not thread safe.</para>
/// <para>It should only be called once the bag is finished being mutated.</para>
/// </summary>
public ProfiledCommandEnumerable EnumerateAndReturnForReuse()
{
var ret = new ProfiledCommandEnumerable(Head);
ReturnForReuse();
return ret;
}
/// <summary>
/// This returns the ConcurrentProfileStorageCollection to an internal pool for reuse by GetOrCreate().
/// </summary>
public void ReturnForReuse()
{
// no need for interlocking, this isn't a thread safe method
Head = null;
for (var i = 0; i < PoolSize; i++)
{
if (Interlocked.CompareExchange(ref Pool[i], this, null) == null) break;
}
}
/// <summary>
/// <para>Returns a ConcurrentProfileStorageCollection to use.</para>
/// <para>
/// It *may* have allocated a new one, or it may return one that has previously been released.
/// To return the collection, call EnumerateAndReturnForReuse()
/// </para>
/// </summary>
public static ConcurrentProfileStorageCollection GetOrCreate()
{
ConcurrentProfileStorageCollection found;
for (int i = 0; i < PoolSize; i++)
{
if ((found = Interlocked.Exchange(ref Pool[i], null)) != null)
{
return found;
}
}
Interlocked.Increment(ref AllocationCount);
return new ConcurrentProfileStorageCollection();
}
}
} }
using System.Threading;
namespace StackExchange.Redis.Profiling
{
/// <summary>
/// Lightweight profiling session that can be optionally registered (via ConnectionMultiplexer.RegisterProfiler) to track messages
/// </summary>
public sealed class ProfilingSession
{
/// <summary>
/// Caller-defined state object
/// </summary>
public object UserToken { get; }
/// <summary>
/// Create a new profiling session, optionally including a caller-defined state object
/// </summary>
public ProfilingSession(object userToken = null) => UserToken = userToken;
object _untypedHead;
internal void Add(ProfiledCommand command)
{
if (command == null) return;
object cur = Thread.VolatileRead(ref _untypedHead); ;
while (true)
{
command.NextElement = (ProfiledCommand)cur;
var got = Interlocked.CompareExchange(ref _untypedHead, command, cur);
if (ReferenceEquals(got, cur)) break; // successful update
cur = got; // retry; no need to re-fetch the field, we just did that
}
}
/// <summary>
/// Yield the commands that were captured as part of this session, resetting the session
/// </summary>
public ProfiledCommandEnumerable GetCommands()
{
var head = (ProfiledCommand)Interlocked.Exchange(ref _untypedHead, null);
// reverse the list so everything is ordered the way the consumer expected them
ProfiledCommand previous = null, current = head, next;
while(current != null)
{
next = current.NextElement;
current.NextElement = previous;
previous = current;
current = next;
}
return new ProfiledCommandEnumerable(previous);
}
}
}
...@@ -3212,6 +3212,8 @@ internal sealed class ExecuteMessage : Message ...@@ -3212,6 +3212,8 @@ internal sealed class ExecuteMessage : Message
{ {
private readonly string _command; private readonly string _command;
private readonly ICollection<object> args; private readonly ICollection<object> args;
public new string Command => _command;
public ExecuteMessage(int db, CommandFlags flags, string command, ICollection<object> args) : base(db, flags, RedisCommand.UNKNOWN) public ExecuteMessage(int db, CommandFlags flags, string command, ICollection<object> args) : base(db, flags, RedisCommand.UNKNOWN)
{ {
_command = command; _command = command;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment