Commit fe1c235b authored by Marc Gravell's avatar Marc Gravell

Completely rewrite the profiling public API, so that all the external consumer...

Completely rewrite the profiling public API, so that all the external consumer sees is a function that may provide profiling sessions; everything else is the caller's issue; remove all extraneous profiler tracking from the lib
parent 6b66c8c5
......@@ -36,8 +36,7 @@
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IConnectionMultiplexer))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IDatabase))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IDatabaseAsync))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IProfiledCommand))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IProfiler))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Profiling.IProfiledCommand))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IReconnectRetryPolicy))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IRedis))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.IRedisAsync))]
......@@ -52,7 +51,8 @@
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.LuaScript))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.MigrateOptions))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Order))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.ProfiledCommandEnumerable))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Profiling.ProfiledCommandEnumerable))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Profiling.ProfilingSession))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.Proxy))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.RedisChannel))]
[assembly: TypeForwardedTo(typeof(global::StackExchange.Redis.RedisCommandException))]
......
......@@ -5,6 +5,7 @@
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
using Xunit;
using Xunit.Abstractions;
......@@ -580,34 +581,31 @@ public void GetFromRightNodeBasedOnFlags(CommandFlags flags, bool isSlave)
private static string Describe(EndPoint endpoint) => endpoint?.ToString() ?? "(unknown)";
private class TestProfiler : IProfiler
{
public object MyContext = new object();
public object GetContext() => MyContext;
}
[Fact]
public void SimpleProfiling()
{
using (var conn = Create())
{
var profiler = new TestProfiler();
var profiler = new ProfilingSession();
var key = Me();
var db = conn.GetDatabase();
db.KeyDelete(key, CommandFlags.FireAndForget);
conn.RegisterProfiler(profiler);
conn.BeginProfiling(profiler.MyContext);
conn.RegisterProfiler(() => profiler);
db.StringSet(key, "world");
var val = db.StringGet(key);
Assert.Equal("world", val);
var msgs = conn.FinishProfiling(profiler.MyContext);
var msgs = profiler.GetCommands();
Log("Checking GET...");
Assert.Contains(msgs, m => m.Command == "GET");
Log("Checking SET...");
Assert.Contains(msgs, m => m.Command == "SET");
Assert.Equal(2, msgs.Count());
var arr = msgs.ToArray();
Assert.Equal("SET", arr[0].Command);
Assert.Equal("GET", arr[1].Command);
}
}
......
......@@ -6,6 +6,7 @@
using System.Collections.Concurrent;
using Xunit;
using Xunit.Abstractions;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis.Tests
{
......@@ -13,22 +14,17 @@ public class Profiling : TestBase
{
public Profiling(ITestOutputHelper output) : base (output) { }
private class TestProfiler : IProfiler
{
public object MyContext = new object();
public object GetContext() => MyContext;
}
[Fact]
public void Simple()
{
using (var conn = Create())
{
var profiler = new TestProfiler();
var key = Me();
conn.RegisterProfiler(profiler);
conn.BeginProfiling(profiler.MyContext);
var session = new ProfilingSession();
conn.RegisterProfiler(() => session);
var dbId = TestConfig.GetDedicatedDB();
var db = conn.GetDatabase(dbId);
db.StringSet(key, "world");
......@@ -36,14 +32,18 @@ public void Simple()
Assert.Equal("world", result.AsString());
var val = db.StringGet(key);
Assert.Equal("world", (string)val);
var s = (string)db.Execute("ECHO", "fii");
Assert.Equal("fii", s);
var cmds = conn.FinishProfiling(profiler.MyContext);
var cmds = session.GetCommands();
var i = 0;
foreach (var cmd in cmds)
{
Log("Command {0} (DB: {1}): {2}", i++, cmd.Db, cmd.ToString().Replace("\n", ", "));
}
var all = string.Join(",", cmds.Select(x => x.Command));
Assert.Equal("SET,EVAL,GET,ECHO", all);
Log("Checking for SET");
var set = cmds.SingleOrDefault(cmd => cmd.Command == "SET");
Assert.NotNull(set);
......@@ -53,8 +53,11 @@ public void Simple()
Log("Checking for EVAL");
var eval = cmds.SingleOrDefault(cmd => cmd.Command == "EVAL");
Assert.NotNull(eval);
Log("Checking for ECHO");
var echo = cmds.SingleOrDefault(cmd => cmd.Command == "ECHO");
Assert.NotNull(echo);
Assert.Equal(3, cmds.Count());
Assert.Equal(4, cmds.Count());
Assert.True(set.CommandCreated <= eval.CommandCreated);
Assert.True(eval.CommandCreated <= get.CommandCreated);
......@@ -64,6 +67,10 @@ public void Simple()
AssertProfiledCommandValues(get, conn, dbId);
AssertProfiledCommandValues(eval, conn, dbId);
AssertProfiledCommandValues(echo, conn, dbId);
}
}
......@@ -86,11 +93,10 @@ public void ManyThreads()
{
using (var conn = Create())
{
var profiler = new TestProfiler();
var session = new ProfilingSession();
var prefix = Me();
conn.RegisterProfiler(profiler);
conn.BeginProfiling(profiler.MyContext);
conn.RegisterProfiler(() => session);
var threads = new List<Thread>();
const int CountPer = 100;
......@@ -115,7 +121,7 @@ public void ManyThreads()
threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join());
var allVals = conn.FinishProfiling(profiler.MyContext);
var allVals = session.GetCommands();
var relevant = allVals.Where(cmd => cmd.Db > 0).ToList();
var kinds = relevant.Select(cmd => cmd.Command).Distinct().ToList();
......@@ -141,37 +147,16 @@ public void ManyThreads()
}
}
private class TestProfiler2 : IProfiler
{
private readonly ConcurrentDictionary<int, object> Contexts = new ConcurrentDictionary<int, object>();
public void RegisterContext(object context)
{
Contexts[Thread.CurrentThread.ManagedThreadId] = context;
}
public object GetContext()
{
if (!Contexts.TryGetValue(Thread.CurrentThread.ManagedThreadId, out object ret)) ret = null;
return ret;
}
}
[FactLongRunning]
public void ManyContexts()
{
using (var conn = Create())
{
var profiler = new TestProfiler2();
var profiler = new PerThreadProfiler();
var prefix = Me();
conn.RegisterProfiler(profiler);
var perThreadContexts = new List<object>();
for (var i = 0; i < 16; i++)
{
perThreadContexts.Add(new object());
}
conn.RegisterProfiler(profiler.GetSession);
var threads = new List<Thread>();
var results = new IEnumerable<IProfiledCommand>[16];
......@@ -181,10 +166,6 @@ public void ManyContexts()
var ix = i;
var thread = new Thread(() =>
{
var ctx = perThreadContexts[ix];
profiler.RegisterContext(ctx);
conn.BeginProfiling(ctx);
var db = conn.GetDatabase(ix);
var allTasks = new List<Task>();
......@@ -197,7 +178,7 @@ public void ManyContexts()
Task.WaitAll(allTasks.ToArray());
results[ix] = conn.FinishProfiling(ctx);
results[ix] = profiler.GetSession().GetCommands();
});
threads.Add(thread);
......@@ -221,198 +202,11 @@ public void ManyContexts()
}
}
private class TestProfiler3 : IProfiler
{
private readonly ConcurrentDictionary<int, object> Contexts = new ConcurrentDictionary<int, object>();
public void RegisterContext(object context)
{
Contexts[Thread.CurrentThread.ManagedThreadId] = context;
}
public object AnyContext() => Contexts.First().Value;
public void Reset() => Contexts.Clear();
public object GetContext()
{
if (!Contexts.TryGetValue(Thread.CurrentThread.ManagedThreadId, out object ret)) ret = null;
return ret;
}
}
// This is a separate method for target=DEBUG purposes.
// In release builds, the runtime is smart enough to figure out
// that the contexts are unreachable and should be collected but in
// debug builds... well, it's not very smart.
private object LeaksCollectedAndRePooled_Initialize(ConnectionMultiplexer conn, int threadCount)
{
var profiler = new TestProfiler3();
conn.RegisterProfiler(profiler);
var perThreadContexts = new List<object>();
for (var i = 0; i < threadCount; i++)
{
perThreadContexts.Add(new object());
}
var threads = new List<Thread>();
var results = new IEnumerable<IProfiledCommand>[threadCount];
for (var i = 0; i < threadCount; i++)
{
var ix = i;
var thread = new Thread(() =>
{
var ctx = perThreadContexts[ix];
profiler.RegisterContext(ctx);
conn.BeginProfiling(ctx);
var db = conn.GetDatabase(ix);
var allTasks = new List<Task>();
for (var j = 0; j < 1000; j++)
{
allTasks.Add(db.StringGetAsync("hello" + ix));
allTasks.Add(db.StringSetAsync("hello" + ix, "world" + ix));
}
Task.WaitAll(allTasks.ToArray());
// intentionally leaking!
});
threads.Add(thread);
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
var anyContext = profiler.AnyContext();
profiler.Reset();
return anyContext;
}
[FactLongRunning]
public async Task LeaksCollectedAndRePooled()
private class PerThreadProfiler
{
const int ThreadCount = 16;
using (var conn = Create())
{
var anyContext = LeaksCollectedAndRePooled_Initialize(conn, ThreadCount);
ThreadLocal<ProfilingSession> perThreadSession = new ThreadLocal<ProfilingSession>(() => new ProfilingSession());
// force collection of everything but `anyContext`
GC.Collect(3, GCCollectionMode.Forced);
GC.WaitForPendingFinalizers();
await Task.Delay(TimeSpan.FromMinutes(1.01)).ForAwait();
conn.FinishProfiling(anyContext);
// make sure we haven't left anything in the active contexts dictionary
Assert.Equal(0, conn.profiledCommands.ContextCount);
Assert.Equal(ThreadCount, ConcurrentProfileStorageCollection.AllocationCount);
Assert.Equal(ThreadCount, ConcurrentProfileStorageCollection.CountInPool());
}
}
[FactLongRunning]
public void ReuseStorage()
{
const int ThreadCount = 16;
// have to reset so other tests don't clober
ConcurrentProfileStorageCollection.AllocationCount = 0;
using (var conn = Create())
{
var profiler = new TestProfiler2();
var prefix = Me();
conn.RegisterProfiler(profiler);
var perThreadContexts = new List<object>();
for (var i = 0; i < 16; i++)
{
perThreadContexts.Add(new object());
}
var threads = new List<Thread>();
var results = new List<IEnumerable<IProfiledCommand>>[16];
for (var i = 0; i < 16; i++)
{
results[i] = new List<IEnumerable<IProfiledCommand>>();
}
for (var i = 0; i < ThreadCount; i++)
{
var ix = i;
var thread = new Thread(() =>
{
for (var k = 0; k < 10; k++)
{
var ctx = perThreadContexts[ix];
profiler.RegisterContext(ctx);
conn.BeginProfiling(ctx);
var db = conn.GetDatabase(ix);
var allTasks = new List<Task>();
for (var j = 0; j < 1000; j++)
{
allTasks.Add(db.StringGetAsync(prefix + ix));
allTasks.Add(db.StringSetAsync(prefix + ix, "world" + ix));
}
Task.WaitAll(allTasks.ToArray());
results[ix].Add(conn.FinishProfiling(ctx));
}
});
threads.Add(thread);
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
// only 16 allocations can ever be in flight at once
var allocCount = ConcurrentProfileStorageCollection.AllocationCount;
Assert.True(allocCount <= ThreadCount, allocCount.ToString());
// correctness check for all allocations
for (var i = 0; i < results.Length; i++)
{
var resList = results[i];
foreach (var res in resList)
{
Assert.NotNull(res);
var numGets = res.Count(r => r.Command == "GET");
var numSets = res.Count(r => r.Command == "SET");
Assert.Equal(1000, numGets);
Assert.Equal(1000, numSets);
Assert.True(res.All(cmd => cmd.Db == i));
}
}
// no crossed streams
var everything = results.SelectMany(r => r).ToList();
for (var i = 0; i < everything.Count; i++)
{
for (var j = 0; j < everything.Count; j++)
{
if (i == j) continue;
if (object.ReferenceEquals(everything[i], everything[j]))
{
Assert.True(false, "Profilings were jumbled");
}
}
}
}
public ProfilingSession GetSession() => perThreadSession.Value;
}
[Fact]
......@@ -422,10 +216,8 @@ public void LowAllocationEnumerable()
using (var conn = Create())
{
var profiler = new TestProfiler();
conn.RegisterProfiler(profiler);
conn.BeginProfiling(profiler.MyContext);
var session = new ProfilingSession();
conn.RegisterProfiler(() => session);
var prefix = Me();
var db = conn.GetDatabase(1);
......@@ -446,7 +238,7 @@ public void LowAllocationEnumerable()
conn.WaitAll(allTasks.ToArray());
var res = conn.FinishProfiling(profiler.MyContext);
var res = session.GetCommands();
Assert.True(res.GetType().IsValueType);
using (var e = res.GetEnumerator())
......@@ -469,16 +261,6 @@ public void LowAllocationEnumerable()
}
}
private class ToyProfiler : IProfiler
{
public ConcurrentDictionary<Thread, object> Contexts = new ConcurrentDictionary<Thread, object>();
public object GetContext()
{
if (!Contexts.TryGetValue(Thread.CurrentThread, out object ctx)) ctx = null;
return ctx;
}
}
[FactLongRunning]
public void ProfilingMD_Ex1()
......@@ -486,11 +268,10 @@ public void ProfilingMD_Ex1()
using (var c = Create())
{
ConnectionMultiplexer conn = c;
var profiler = new ToyProfiler();
var session = new ProfilingSession();
var prefix = Me();
var thisGroupContext = new object();
conn.RegisterProfiler(profiler);
conn.RegisterProfiler(() => session);
var threads = new List<Thread>();
......@@ -511,17 +292,13 @@ public void ProfilingMD_Ex1()
Task.WaitAll(threadTasks.ToArray());
});
profiler.Contexts[thread] = thisGroupContext;
threads.Add(thread);
}
conn.BeginProfiling(thisGroupContext);
threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join());
IEnumerable<IProfiledCommand> timings = conn.FinishProfiling(thisGroupContext);
IEnumerable<IProfiledCommand> timings = session.GetCommands();
Assert.Equal(16000, timings.Count());
}
......@@ -533,10 +310,10 @@ public void ProfilingMD_Ex2()
using (var c = Create())
{
ConnectionMultiplexer conn = c;
var profiler = new ToyProfiler();
var profiler = new PerThreadProfiler();
var prefix = Me();
conn.RegisterProfiler(profiler);
conn.RegisterProfiler(profiler.GetSession);
var threads = new List<Thread>();
......@@ -549,9 +326,7 @@ public void ProfilingMD_Ex2()
var thread = new Thread(() =>
{
var threadTasks = new List<Task>();
conn.BeginProfiling(Thread.CurrentThread);
for (var j = 0; j < 1000; j++)
{
var task = db.StringSetAsync(prefix + j, "" + j);
......@@ -560,11 +335,9 @@ public void ProfilingMD_Ex2()
Task.WaitAll(threadTasks.ToArray());
perThreadTimings[Thread.CurrentThread] = conn.FinishProfiling(Thread.CurrentThread).ToList();
perThreadTimings[Thread.CurrentThread] = profiler.GetSession().GetCommands().ToList();
});
profiler.Contexts[thread] = thread;
threads.Add(thread);
}
......
using System;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis
{
public partial class ConnectionMultiplexer
{
private IProfiler profiler;
// internal for test purposes
internal ProfileContextTracker profiledCommands;
/// <summary>
/// <para>Sets an IProfiler instance for this ConnectionMultiplexer.</para>
/// <para>
/// An IProfiler instances is used to determine which context to associate an
/// IProfiledCommand with. See BeginProfiling(object) and FinishProfiling(object)
/// for more details.
/// </para>
/// </summary>
/// <param name="profiler">The profiler to register.</param>
public void RegisterProfiler(IProfiler profiler)
{
if (this.profiler != null) throw new InvalidOperationException("IProfiler already registered for this ConnectionMultiplexer");
this.profiler = profiler ?? throw new ArgumentNullException(nameof(profiler));
profiledCommands = new ProfileContextTracker();
}
/// <summary>
/// <para>Begins profiling for the given context.</para>
/// <para>
/// If the same context object is returned by the registered IProfiler, the IProfiledCommands
/// will be associated with each other.
/// </para>
/// <para>Call FinishProfiling with the same context to get the assocated commands.</para>
/// <para>Note that forContext cannot be a WeakReference or a WeakReference&lt;T</para>&gt;
/// </summary>
/// <param name="forContext">The context to begin profiling.</param>
public void BeginProfiling(object forContext)
{
if (profiler == null) throw new InvalidOperationException("Cannot begin profiling if no IProfiler has been registered with RegisterProfiler");
if (forContext == null) throw new ArgumentNullException(nameof(forContext));
if (forContext is WeakReference) throw new ArgumentException("Context object cannot be a WeakReference", nameof(forContext));
if (!profiledCommands.TryCreate(forContext))
{
throw ExceptionFactory.BeganProfilingWithDuplicateContext(forContext);
}
}
Func<ProfilingSession> _profilingSessionProvider;
/// <summary>
/// <para>Stops profiling for the given context, returns all IProfiledCommands associated.</para>
/// <para>By default this may do a sweep for dead profiling contexts, you can disable this by passing "allowCleanupSweep: false".</para>
/// Register a callback to provide an on-demand ambient session provider based on the
/// calling context; the implementing code is responsible for reliably resolving the same provider
/// based on ambient context, or returning null to not profile
/// </summary>
/// <param name="forContext">The context to begin profiling.</param>
/// <param name="allowCleanupSweep">Whether to allow cleanup of old profiling sessions.</param>
public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
{
if (profiler == null) throw new InvalidOperationException("Cannot begin profiling if no IProfiler has been registered with RegisterProfiler");
if (forContext == null) throw new ArgumentNullException(nameof(forContext));
if (!profiledCommands.TryRemove(forContext, out ProfiledCommandEnumerable ret))
{
throw ExceptionFactory.FinishedProfilingWithInvalidContext(forContext);
}
// conditional, because it could hurt and that may sometimes be unacceptable
if (allowCleanupSweep)
{
profiledCommands.TryCleanup();
}
return ret;
}
public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider) => _profilingSessionProvider = profilingSessionProvider;
}
}
......@@ -11,6 +11,7 @@
using System.Reflection;
using System.IO.Compression;
using System.Runtime.CompilerServices;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis
{
......@@ -1868,10 +1869,10 @@ private WriteResult TryPushMessageToBridge<T>(Message message, ResultProcessor<T
if (server != null)
{
var profCtx = profiler?.GetContext();
if (profCtx != null && profiledCommands.TryGetValue(profCtx, out ConcurrentProfileStorageCollection inFlightForCtx))
var profilingSession = _profilingSessionProvider?.Invoke();
if (profilingSession != null)
{
message.SetProfileStorage(ProfileStorage.NewWithContext(inFlightForCtx, server));
message.SetProfileStorage(ProfiledCommand.NewWithContext(profilingSession, server));
}
if (message.Db >= 0)
......@@ -1952,6 +1953,7 @@ public bool IsConnecting
public void Close(bool allowCommandsToComplete = true)
{
isDisposed = true;
_profilingSessionProvider = null;
using (var tmp = pulse)
{
pulse = null;
......
......@@ -2,6 +2,7 @@
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis
{
......@@ -69,36 +70,11 @@ public interface IConnectionMultiplexer
int StormLogThreshold { get; set; }
/// <summary>
/// Sets an IProfiler instance for this ConnectionMultiplexer.
///
/// An IProfiler instances is used to determine which context to associate an
/// IProfiledCommand with. See BeginProfiling(object) and FinishProfiling(object)
/// for more details.
/// Register a callback to provide an on-demand ambient session provider based on the
/// calling context; the implementing code is responsible for reliably resolving the same provider
/// based on ambient context, or returning null to not profile
/// </summary>
/// <param name="profiler">The profiler to register.</param>
void RegisterProfiler(IProfiler profiler);
/// <summary>
/// Begins profiling for the given context.
///
/// If the same context object is returned by the registered IProfiler, the IProfiledCommands
/// will be associated with each other.
///
/// Call FinishProfiling with the same context to get the assocated commands.
///
/// Note that forContext cannot be a WeakReference or a WeakReference&lt;T&gt;
/// </summary>
/// <param name="forContext">The context to begin profiling for.</param>
void BeginProfiling(object forContext);
/// <summary>
/// Stops profiling for the given context, returns all IProfiledCommands associated.
///
/// By default this may do a sweep for dead profiling contexts, you can disable this by passing "allowCleanupSweep: false".
/// </summary>
/// <param name="forContext">The context to finish profiling for.</param>
/// <param name="allowCleanupSweep">Whether to allow a cleanup sweep of dead profiling contexts.</param>
ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true);
void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider);
/// <summary>
/// Get summary statistics associates with this server
......
......@@ -5,6 +5,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
namespace StackExchange.Redis
{
......@@ -81,7 +82,7 @@ internal abstract class Message : ICompletable
private ResultProcessor resultProcessor;
// All for profiling purposes
private ProfileStorage performance;
private ProfiledCommand performance;
internal DateTime createdDateTime;
internal long createdTimestamp;
......@@ -135,7 +136,7 @@ internal void SetMasterOnly()
}
}
internal void SetProfileStorage(ProfileStorage storage)
internal void SetProfileStorage(ProfiledCommand storage)
{
performance = storage;
performance.SetMessage(this);
......@@ -152,7 +153,7 @@ internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved)
createdDateTime = DateTime.UtcNow;
createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
performance = ProfileStorage.NewAttachedToSameContext(oldPerformance, resendTo, isMoved);
performance = ProfiledCommand.NewAttachedToSameContext(oldPerformance, resendTo, isMoved);
performance.SetMessage(this);
Status = CommandStatus.WaitingToBeSent;
}
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
namespace StackExchange.Redis
{
/// <summary>
/// Big ol' wrapper around most of the profiling storage logic, 'cause it got too big to just live in ConnectionMultiplexer.
/// </summary>
internal sealed class ProfileContextTracker
{
/// <summary>
/// <para>Necessary, because WeakReference can't be readily comparable (since the reference is... weak).</para>
/// <para>This lets us detect leaks* with some reasonable confidence, and cleanup periodically.</para>
/// <para>
/// Some calisthenics are done to avoid allocating WeakReferences for no reason, as often
/// we're just looking up ProfileStorage.
/// </para>
/// <para>* Somebody starts profiling, but for whatever reason never *stops* with a context object</para>
/// </summary>
private readonly struct ProfileContextCell : IEquatable<ProfileContextCell>
{
// This is a union of (object|WeakReference); if it's a WeakReference
// then we're actually interested in it's Target, otherwise
// we're concerned about the actual value of Reference
private readonly object Reference;
// It is absolutely crucial that this value **never change** once instantiated
private readonly int HashCode;
public bool IsContextLeaked => !TryGetTarget(out _);
private ProfileContextCell(object forObj, bool isEphemeral)
{
HashCode = forObj.GetHashCode();
if (isEphemeral)
{
Reference = forObj;
}
else
{
Reference = new WeakReference(forObj, trackResurrection: true); // ughhh, have to handle finalizers
}
}
/// <summary>
/// <para>Suitable for use as a key into something.</para>
/// <para>
/// This instance **WILL NOT** keep forObj alive, so it can
/// be copied out of the calling method's scope.
/// </para>
/// </summary>
/// <param name="forObj">The object to get a context for.</param>
public static ProfileContextCell ToStoreUnder(object forObj) => new ProfileContextCell(forObj, isEphemeral: false);
/// <summary>
/// <para>Only suitable for looking up.</para>
/// <para>
/// This instance **ABSOLUTELY WILL** keep forObj alive, so this
/// had better not be copied into anything outside the scope of the
/// calling method.
/// </para>
/// </summary>
/// <param name="forObj">The object to lookup a context by.</param>
public static ProfileContextCell ToLookupBy(object forObj) => new ProfileContextCell(forObj, isEphemeral: true);
private bool TryGetTarget(out object target)
{
var asWeakRef = Reference as WeakReference;
if (asWeakRef == null)
{
target = Reference;
return true;
}
// Do not use IsAlive here, it's race city
target = asWeakRef.Target;
return target != null;
}
public override bool Equals(object obj)
{
if (!(obj is ProfileContextCell)) return false;
return Equals((ProfileContextCell)obj);
}
public override int GetHashCode() => HashCode;
public bool Equals(ProfileContextCell other)
{
if (other.TryGetTarget(out object otherObj) != TryGetTarget(out object thisObj)) return false;
// dead references are equal
if (thisObj == null) return true;
return thisObj.Equals(otherObj);
}
}
// provided so default behavior doesn't do any boxing, for sure
private sealed class ProfileContextCellComparer : IEqualityComparer<ProfileContextCell>
{
public static readonly ProfileContextCellComparer Singleton = new ProfileContextCellComparer();
private ProfileContextCellComparer() { }
public bool Equals(ProfileContextCell x, ProfileContextCell y)
{
return x.Equals(y);
}
public int GetHashCode(ProfileContextCell obj)
{
return obj.GetHashCode();
}
}
private long lastCleanupSweep;
private readonly ConcurrentDictionary<ProfileContextCell, ConcurrentProfileStorageCollection> profiledCommands;
public int ContextCount => profiledCommands.Count;
public ProfileContextTracker()
{
profiledCommands = new ConcurrentDictionary<ProfileContextCell, ConcurrentProfileStorageCollection>(ProfileContextCellComparer.Singleton);
lastCleanupSweep = DateTime.UtcNow.Ticks;
}
/// <summary>
/// <para>Registers the passed context with a collection that can be retried with subsequent calls to TryGetValue.</para>
/// <para>Returns false if the passed context object is already registered.</para>
/// </summary>
/// <param name="ctx">The context to use.</param>
public bool TryCreate(object ctx)
{
var cell = ProfileContextCell.ToStoreUnder(ctx);
// we can't pass this as a delegate, because TryAdd may invoke the factory multiple times,
// which would lead to over allocation.
var storage = ConcurrentProfileStorageCollection.GetOrCreate();
return profiledCommands.TryAdd(cell, storage);
}
/// <summary>
/// <para>
/// Returns true and sets val to the tracking collection associated with the given context if the context
/// was registered with TryCreate.
/// </para>
/// <para>Otherwise returns false and sets val to null.</para>
/// </summary>
/// <param name="ctx">The context to get a value for.</param>
/// <param name="val">The collection (if present) for <paramref name="ctx"/>.</param>
public bool TryGetValue(object ctx, out ConcurrentProfileStorageCollection val)
{
var cell = ProfileContextCell.ToLookupBy(ctx);
return profiledCommands.TryGetValue(cell, out val);
}
/// <summary>
/// <para>
/// Removes a context, setting all commands to a (non-thread safe) enumerable of
/// all the commands attached to that context.
/// </para>
/// <para>If the context was never registered, will return false and set commands to null.</para>
/// <para>
/// Subsequent calls to TryRemove with the same context will return false unless it is
/// re-registered with TryCreate.
/// </para>
/// </summary>
/// <param name="ctx">The context to remove for.</param>
/// <param name="commands">The commands to remove.</param>
public bool TryRemove(object ctx, out ProfiledCommandEnumerable commands)
{
var cell = ProfileContextCell.ToLookupBy(ctx);
if (!profiledCommands.TryRemove(cell, out ConcurrentProfileStorageCollection storage))
{
commands = default(ProfiledCommandEnumerable);
return false;
}
commands = storage.EnumerateAndReturnForReuse();
return true;
}
/// <summary>
/// If enough time has passed (1 minute) since the last call, this does walk of all contexts
/// and removes those that the GC has collected.
/// </summary>
public bool TryCleanup()
{
const long SweepEveryTicks = 600000000; // once a minute, tops
var now = DateTime.UtcNow.Ticks; // resolution on this isn't great, but it's good enough
var last = lastCleanupSweep;
var since = now - last;
if (since < SweepEveryTicks) return false;
// this is just to keep other threads from wasting time, in theory
// it'd be perfectly safe for this to run concurrently
var saw = Interlocked.CompareExchange(ref lastCleanupSweep, now, last);
if (saw != last) return false;
if (profiledCommands.Count == 0) return false;
using (var e = profiledCommands.GetEnumerator())
{
while (e.MoveNext())
{
var pair = e.Current;
if (pair.Key.IsContextLeaked && profiledCommands.TryRemove(pair.Key, out ConcurrentProfileStorageCollection abandoned))
{
// shove it back in the pool, but don't bother enumerating
abandoned.ReturnForReuse();
}
}
}
return true;
}
}
}
using System;
using System.Net;
namespace StackExchange.Redis
namespace StackExchange.Redis.Profiling
{
/// <summary>
/// <para>A profiled command against a redis instance.</para>
......@@ -89,21 +89,4 @@ public interface IProfiledCommand
/// </summary>
RetransmissionReasonType? RetransmissionReason { get; }
}
/// <summary>
/// Interface for profiling individual commands against an Redis ConnectionMulitplexer.
/// </summary>
public interface IProfiler
{
/// <summary>
/// Called to provide a context object.
///
/// This method is called before the method which triggers work against redis (such as StringSet(Async)) returns,
/// and will always be called on the same thread as that method.
///
/// Note that GetContext() may be called even if ConnectionMultiplexer.BeginProfiling() has not been called.
/// You may return `null` to prevent any tracking of commands.
/// </summary>
object GetContext();
}
}
......@@ -3,16 +3,16 @@
using System.Net;
using System.Threading;
namespace StackExchange.Redis
namespace StackExchange.Redis.Profiling
{
internal class ProfileStorage : IProfiledCommand
internal sealed class ProfiledCommand : IProfiledCommand
{
#region IProfiledCommand Impl
public EndPoint EndPoint => Server.EndPoint;
public int Db => Message.Db;
public string Command => Message.Command.ToString();
public string Command => Message is RedisDatabase.ExecuteMessage em ? em.Command : Message.Command.ToString();
public CommandFlags Flags => Message.Flags;
......@@ -34,11 +34,11 @@ internal class ProfileStorage : IProfiledCommand
#endregion
public ProfileStorage NextElement { get; set; }
public ProfiledCommand NextElement { get; set; }
private Message Message;
private readonly ServerEndPoint Server;
private readonly ProfileStorage OriginalProfiling;
private readonly ProfiledCommand OriginalProfiling;
private DateTime MessageCreatedDateTime;
private long MessageCreatedTimeStamp;
......@@ -47,9 +47,9 @@ internal class ProfileStorage : IProfiledCommand
private long ResponseReceivedTimeStamp;
private long CompletedTimeStamp;
private readonly ConcurrentProfileStorageCollection PushToWhenFinished;
private readonly ProfilingSession PushToWhenFinished;
private ProfileStorage(ConcurrentProfileStorageCollection pushTo, ServerEndPoint server, ProfileStorage resentFor, RetransmissionReasonType? reason)
private ProfiledCommand(ProfilingSession pushTo, ServerEndPoint server, ProfiledCommand resentFor, RetransmissionReasonType? reason)
{
PushToWhenFinished = pushTo;
OriginalProfiling = resentFor;
......@@ -57,14 +57,14 @@ private ProfileStorage(ConcurrentProfileStorageCollection pushTo, ServerEndPoint
RetransmissionReason = reason;
}
public static ProfileStorage NewWithContext(ConcurrentProfileStorageCollection pushTo, ServerEndPoint server)
public static ProfiledCommand NewWithContext(ProfilingSession pushTo, ServerEndPoint server)
{
return new ProfileStorage(pushTo, server, null, null);
return new ProfiledCommand(pushTo, server, null, null);
}
public static ProfileStorage NewAttachedToSameContext(ProfileStorage resentFor, ServerEndPoint server, bool isMoved)
public static ProfiledCommand NewAttachedToSameContext(ProfiledCommand resentFor, ServerEndPoint server, bool isMoved)
{
return new ProfileStorage(resentFor.PushToWhenFinished, server, resentFor, isMoved ? RetransmissionReasonType.Moved : RetransmissionReasonType.Ask);
return new ProfiledCommand(resentFor.PushToWhenFinished, server, resentFor, isMoved ? RetransmissionReasonType.Moved : RetransmissionReasonType.Ask);
}
public void SetMessage(Message msg)
......@@ -112,7 +112,7 @@ public void SetCompleted()
if (oldVal != 0) return;
// only push on the first call, no dupes!
PushToWhenFinished.Add(this);
PushToWhenFinished?.Add(this);
}
public override string ToString()
......
using System.Collections.Generic;
using System.Threading;
namespace StackExchange.Redis
namespace StackExchange.Redis.Profiling
{
/// <summary>
/// <para>A collection of IProfiledCommands.</para>
......@@ -24,13 +23,13 @@ namespace StackExchange.Redis
/// </summary>
public struct Enumerator : IEnumerator<IProfiledCommand>
{
private ProfileStorage Head;
private ProfileStorage CurrentBacker;
private ProfiledCommand Head;
private ProfiledCommand CurrentBacker;
private bool IsEmpty => Head == null;
private bool IsUnstartedOrFinished => CurrentBacker == null;
internal Enumerator(ProfileStorage head)
internal Enumerator(ProfiledCommand head)
{
Head = head;
CurrentBacker = null;
......@@ -81,9 +80,9 @@ public void Dispose()
}
}
private readonly ProfileStorage Head;
private readonly ProfiledCommand Head;
internal ProfiledCommandEnumerable(ProfileStorage head)
internal ProfiledCommandEnumerable(ProfiledCommand head)
{
Head = head;
}
......@@ -101,117 +100,4 @@ internal ProfiledCommandEnumerable(ProfileStorage head)
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}
/// <summary>
/// <para>
/// A thread-safe collection tailored to the "always append, with high contention, then enumerate once with no contention"
/// behavior of our profiling.
/// </para>
/// <para>Performs better than ConcurrentBag, which is important since profiling code shouldn't impact timings.</para>
/// </summary>
internal sealed class ConcurrentProfileStorageCollection
{
// internal for test purposes
internal static int AllocationCount = 0;
// It is, by definition, impossible for an element to be in 2 intrusive collections
// and we force Enumeration to release any reference to the collection object
// so we can **always** pool these.
private const int PoolSize = 64;
private static readonly ConcurrentProfileStorageCollection[] Pool = new ConcurrentProfileStorageCollection[PoolSize];
private volatile ProfileStorage Head;
private ConcurrentProfileStorageCollection() { }
// for testing purposes only
internal static int CountInPool()
{
var ret = 0;
for (var i = 0; i < PoolSize; i++)
{
var inPool = Pool[i];
if (inPool != null) ret++;
}
return ret;
}
/// <summary>
/// <para>This method is thread-safe.</para>
/// <para>Adds an element to the bag.</para>
/// <para>Order is not preserved.</para>
/// <para>The element can only be a member of *one* bag.</para>
/// </summary>
/// <param name="command">The command to add.</param>
public void Add(ProfileStorage command)
{
while (true)
{
var cur = Head;
command.NextElement = cur;
// Interlocked references to volatile fields are perfectly cromulent
#pragma warning disable 420
var got = Interlocked.CompareExchange(ref Head, command, cur);
#pragma warning restore 420
if (object.ReferenceEquals(got, cur)) break;
}
}
/// <summary>
/// <para>
/// This method returns an enumerable view of the bag, and returns it to
/// an internal pool for reuse by GetOrCreate().
/// </para>
/// <para>It is not thread safe.</para>
/// <para>It should only be called once the bag is finished being mutated.</para>
/// </summary>
public ProfiledCommandEnumerable EnumerateAndReturnForReuse()
{
var ret = new ProfiledCommandEnumerable(Head);
ReturnForReuse();
return ret;
}
/// <summary>
/// This returns the ConcurrentProfileStorageCollection to an internal pool for reuse by GetOrCreate().
/// </summary>
public void ReturnForReuse()
{
// no need for interlocking, this isn't a thread safe method
Head = null;
for (var i = 0; i < PoolSize; i++)
{
if (Interlocked.CompareExchange(ref Pool[i], this, null) == null) break;
}
}
/// <summary>
/// <para>Returns a ConcurrentProfileStorageCollection to use.</para>
/// <para>
/// It *may* have allocated a new one, or it may return one that has previously been released.
/// To return the collection, call EnumerateAndReturnForReuse()
/// </para>
/// </summary>
public static ConcurrentProfileStorageCollection GetOrCreate()
{
ConcurrentProfileStorageCollection found;
for (int i = 0; i < PoolSize; i++)
{
if ((found = Interlocked.Exchange(ref Pool[i], null)) != null)
{
return found;
}
}
Interlocked.Increment(ref AllocationCount);
return new ConcurrentProfileStorageCollection();
}
}
}
using System.Threading;
namespace StackExchange.Redis.Profiling
{
/// <summary>
/// Lightweight profiling session that can be optionally registered (via ConnectionMultiplexer.RegisterProfiler) to track messages
/// </summary>
public sealed class ProfilingSession
{
/// <summary>
/// Caller-defined state object
/// </summary>
public object UserToken { get; }
/// <summary>
/// Create a new profiling session, optionally including a caller-defined state object
/// </summary>
public ProfilingSession(object userToken = null) => UserToken = userToken;
object _untypedHead;
internal void Add(ProfiledCommand command)
{
if (command == null) return;
object cur = Thread.VolatileRead(ref _untypedHead); ;
while (true)
{
command.NextElement = (ProfiledCommand)cur;
var got = Interlocked.CompareExchange(ref _untypedHead, command, cur);
if (ReferenceEquals(got, cur)) break; // successful update
cur = got; // retry; no need to re-fetch the field, we just did that
}
}
/// <summary>
/// Yield the commands that were captured as part of this session, resetting the session
/// </summary>
public ProfiledCommandEnumerable GetCommands()
{
var head = (ProfiledCommand)Interlocked.Exchange(ref _untypedHead, null);
// reverse the list so everything is ordered the way the consumer expected them
ProfiledCommand previous = null, current = head, next;
while(current != null)
{
next = current.NextElement;
current.NextElement = previous;
previous = current;
current = next;
}
return new ProfiledCommandEnumerable(previous);
}
}
}
......@@ -3212,6 +3212,8 @@ internal sealed class ExecuteMessage : Message
{
private readonly string _command;
private readonly ICollection<object> args;
public new string Command => _command;
public ExecuteMessage(int db, CommandFlags flags, string command, ICollection<object> args) : base(db, flags, RedisCommand.UNKNOWN)
{
_command = command;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment