Commit f5fe0e45 authored by Marc Gravell's avatar Marc Gravell

Merge branch 'profiling-squashed-v4' of...

Merge branch 'profiling-squashed-v4' of https://github.com/kevin-montrose/StackExchange.Redis into kevin-montrose-profiling-squashed-v4

Conflicts:
	StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj
	StackExchange.Redis/StackExchange/Redis/ConnectionMultiplexer.cs
parent 0c7dd146
...@@ -49,6 +49,7 @@ ...@@ -49,6 +49,7 @@
<None Include="Events.md" /> <None Include="Events.md" />
<None Include="Configuration.md" /> <None Include="Configuration.md" />
<None Include="ExecSync.md" /> <None Include="ExecSync.md" />
<None Include="Profiling.md" />
<None Include="PubSubOrder.md" /> <None Include="PubSubOrder.md" />
<None Include="PipelinesMultiplexers.md" /> <None Include="PipelinesMultiplexers.md" />
<None Include="Transactions.md" /> <None Include="Transactions.md" />
......
Profiling
===
StackExchange.Redis exposes a handful of methods and types to enable performance profiling. Due to its asynchronous and multiplexing
behavior profiling is a somewhat complicated topic.
Interfaces
---
The profiling interface is composed of `IProfiler`, `ConnectionMultiplexer.RegisterProfiler(IProfiler)`, `ConnectionMultiplexer.BeginProfiling(object)`,
`ConnectionMultiplexer.FinishProfiling(object)`, and `IProfiledCommand`.
You register a single `IProfiler` with a `ConnectionMultiplexer` instance, it cannot be changed. You begin profiling for a given context (ie. Thread,
Http Request, and so on) by calling `BeginProfiling(object)`, and finish by calling `FinishProfiling(object)`. `FinishProfiling(object)` returns
a collection of `IProfiledCommand`s which contain timing information for all commands sent to redis by the configured `ConnectionMultiplexer` between
the `(Begin|Finish)Profiling` calls with the given context.
What "context" object should be used is application specific.
Available Timings
---
StackExchange.Redis exposes information about:
- The redis server involved
- The redis DB being queried
- The redis command run
- The flags used to route the command
- The initial creation time of a command
- How long it took to enqueue the command
- How long it took to send the command, after it was enqueued
- How long it took the response from redis to be received, after the command was sent
- How long it took for the response to be processed, after it was received
- If the command was sent in response to a cluster ASK or MOVED response
- If so, what the original command was
`TimeSpan`s are high resolution, if supported by the runtime. `DateTime`s are only as precise as `DateTime.UtcNow`.
Choosing Context
---
Due to StackExchange.Redis's asynchronous interface, profiling requires outside assistance to group related commands together. This is achieved
by providing context objects when you start and end profiling (via the `BeginProfiling(object)` & `FinishProfiling(object)` methods), and when a
command is sent (via the `IProfiler` interface's `GetContext()` method).
A toy example of assocating commands issued from many different threads together
```
class ToyProfiler : IProfiler
{
public ConcurrentDictionary<Thread, object> Contexts = new ConcurrentDictionary<Thread, object>();
public object GetContext()
{
object ctx;
if(!Contexts.TryGetValue(Thread.CurrentThread, out ctx)) ctx = null;
return ctx;
}
}
// ...
ConnectionMultiplexer conn = /* initialization */;
var profiler = new ToyProfiler();
var thisGroupContext = new object();
conn.RegisterProfiler(profiler);
var threads = new List<Thread>();
for (var i = 0; i < 16; i++)
{
var db = conn.GetDatabase(i);
var thread =
new Thread(
delegate()
{
var threadTasks = new List<Task>();
for (var j = 0; j < 1000; j++)
{
var task = db.StringSetAsync("" + j, "" + j);
threadTasks.Add(task);
}
Task.WaitAll(threadTasks.ToArray());
}
);
profiler.Contexts[thread] = thisGroupContext;
threads.Add(thread);
}
conn.BeginProfiling(thisGroupContext);
threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join());
IEnumerable<IProfiledCommand> timings = conn.FinishProfiling(thisGroupContext);
```
At the end, `timings` will contain 16,000 `IProfiledCommand` objects - one for each command issued to redis.
If instead you did the following:
```
ConnectionMultiplexer conn = /* initialization */;
var profiler = new ToyProfiler();
conn.RegisterProfiler(profiler);
var threads = new List<Thread>();
var perThreadTimings = new ConcurrentDictionary<Thread, List<IProfiledCommand>>();
for (var i = 0; i < 16; i++)
{
var db = conn.GetDatabase(i);
var thread =
new Thread(
delegate()
{
var threadTasks = new List<Task>();
conn.BeginProfiling(Thread.CurrentThread);
for (var j = 0; j < 1000; j++)
{
var task = db.StringSetAsync("" + j, "" + j);
threadTasks.Add(task);
}
Task.WaitAll(threadTasks.ToArray());
perThreadTimings[Thread.CurrentThread] = conn.FinishProfiling(Thread.CurrentThread).ToList();
}
);
profiler.Contexts[thread] = thread;
threads.Add(thread);
}
threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join());
```
`perThreadTimings` would end up with 16 entries of 1,000 `IProfilingCommand`s, keyed by the `Thread` the issued them.
Moving away from toy examples, here's how you can profile StackExchange.Redis in an MVC5 application.
First register the following `IProfiler` against your `ConnectionMultiplexer`:
```
public class RedisProfiler : IProfiler
{
const string RequestContextKey = "RequestProfilingContext";
public object GetContext()
{
var ctx = HttpContext.Current;
if (ctx == null) return null;
return ctx.Items[RequestContextKey];
}
public object CreateContextForCurrentRequest()
{
var ctx = HttpContext.Current;
if (ctx == null) return null;
object ret;
ctx.Items[RequestContextKey] = ret = new object();
return ret;
}
}
```
Then, add the following to your Global.asax.cs file:
```
protected void Application_BeginRequest()
{
var ctxObj = RedisProfiler.CreateContextForCurrentRequest();
if (ctxObj != null)
{
RedisConnection.BeginProfiling(ctxObj);
}
}
protected void Application_EndRequest()
{
var ctxObj = RedisProfiler.GetContext();
if (ctxObj != null)
{
var timings = RedisConnection.FinishProfiling(ctxObj);
// do what you will with `timings` here
}
}
```
This implementation will group all redis commands, including `async/await`-ed ones, with the http request that initiated them.
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup> <PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{97B45B3A-34DB-43C3-A979-37F217390142}</ProjectGuid> <ProjectGuid>{97B45B3A-34DB-43C3-A979-37F217390142}</ProjectGuid>
<OutputType>Library</OutputType> <OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder> <AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>MigratedBookSleeveTestSuite</RootNamespace> <RootNamespace>MigratedBookSleeveTestSuite</RootNamespace>
<AssemblyName>MigratedBookSleeveTestSuite</AssemblyName> <AssemblyName>MigratedBookSleeveTestSuite</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion> <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment> <FileAlignment>512</FileAlignment>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget> <PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols> <DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType> <DebugType>full</DebugType>
<Optimize>false</Optimize> <Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath> <OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants> <DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport> <ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel> <WarningLevel>4</WarningLevel>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget> <PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType> <DebugType>pdbonly</DebugType>
<Optimize>true</Optimize> <Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath> <OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants> <DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport> <ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel> <WarningLevel>4</WarningLevel>
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
<StartupObject /> <StartupObject />
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Reference Include="nunit.framework, Version=2.6.4.14350, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL"> <Reference Include="nunit.framework, Version=2.6.4.14350, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion> <SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath> <HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference> </Reference>
<Reference Include="System" /> <Reference Include="System" />
<Reference Include="System.Core" /> <Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" /> <Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" /> <Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" /> <Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" /> <Reference Include="System.Data" />
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="Batches.cs" /> <Compile Include="Batches.cs" />
<Compile Include="Config.cs" /> <Compile Include="Config.cs" />
<Compile Include="Connection.cs" /> <Compile Include="Connection.cs" />
<Compile Include="Constraints.cs" /> <Compile Include="Constraints.cs" />
<Compile Include="Hashes.cs" /> <Compile Include="Hashes.cs" />
<Compile Include="Issues\Issue10.cs" /> <Compile Include="Issues\Issue10.cs" />
<Compile Include="Issues\Massive Delete.cs" /> <Compile Include="Issues\Massive Delete.cs" />
<Compile Include="Issues\SO10504853.cs" /> <Compile Include="Issues\SO10504853.cs" />
<Compile Include="Issues\SO10825542.cs" /> <Compile Include="Issues\SO10825542.cs" />
<Compile Include="Issues\SO11766033.cs" /> <Compile Include="Issues\SO11766033.cs" />
<Compile Include="Keys.cs" /> <Compile Include="Keys.cs" />
<Compile Include="Lists.cs" /> <Compile Include="Lists.cs" />
<Compile Include="Locking.cs" /> <Compile Include="Locking.cs" />
<Compile Include="Performance.cs" /> <Compile Include="Performance.cs" />
<Compile Include="Program.cs" /> <Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSub.cs" /> <Compile Include="PubSub.cs" />
<Compile Include="redis-sharp.cs" /> <Compile Include="redis-sharp.cs" />
<Compile Include="Scripting.cs" /> <Compile Include="Scripting.cs" />
<Compile Include="Server.cs" /> <Compile Include="Server.cs" />
<Compile Include="Sets.cs" /> <Compile Include="Sets.cs" />
<Compile Include="SortedSets.cs" /> <Compile Include="SortedSets.cs" />
<Compile Include="Strings.cs" /> <Compile Include="Strings.cs" />
<Compile Include="Transactions.cs" /> <Compile Include="Transactions.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="App.config" /> <None Include="App.config" />
<None Include="packages.config" /> <None Include="packages.config" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj"> <ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj">
<Project>{7cec07f2-8c03-4c42-b048-738b215824c1}</Project> <Project>{7cec07f2-8c03-4c42-b048-738b215824c1}</Project>
<Name>StackExchange.Redis</Name> <Name>StackExchange.Redis</Name>
</ProjectReference> </ProjectReference>
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild"> <Target Name="BeforeBuild">
</Target> </Target>
<Target Name="AfterBuild"> <Target Name="AfterBuild">
</Target> </Target>
--> -->
</Project> </Project>
\ No newline at end of file
...@@ -12,8 +12,9 @@ namespace StackExchange.Redis.Tests ...@@ -12,8 +12,9 @@ namespace StackExchange.Redis.Tests
{ {
[TestFixture] [TestFixture]
public class Cluster : TestBase public class Cluster : TestBase
{ {
private const string ClusterIp = "192.168.0.15"; //private const string ClusterIp = "192.168.0.15"; // marc
private const string ClusterIp = "10.110.11.102"; // kmontrose
private const int ServerCount = 6, FirstPort = 7000; private const int ServerCount = 6, FirstPort = 7000;
protected override string GetConfiguration() protected override string GetConfiguration()
...@@ -545,6 +546,122 @@ public void GetFromRightNodeBasedOnFlags(CommandFlags flags, bool isSlave) ...@@ -545,6 +546,122 @@ public void GetFromRightNodeBasedOnFlags(CommandFlags flags, bool isSlave)
private static string Describe(EndPoint endpoint) private static string Describe(EndPoint endpoint)
{ {
return endpoint == null ? "(unknown)" : endpoint.ToString(); return endpoint == null ? "(unknown)" : endpoint.ToString();
}
class TestProfiler : IProfiler
{
public object MyContext = new object();
public object GetContext()
{
return MyContext;
}
} }
[Test]
public void SimpleProfiling()
{
using (var conn = Create())
{
var profiler = new TestProfiler();
conn.RegisterProfiler(profiler);
conn.BeginProfiling(profiler.MyContext);
var db = conn.GetDatabase();
db.StringSet("hello", "world");
var val = db.StringGet("hello");
Assert.AreEqual("world", (string)val);
var msgs = conn.FinishProfiling(profiler.MyContext);
Assert.AreEqual(2, msgs.Count());
Assert.IsTrue(msgs.Any(m => m.Command == "GET"));
Assert.IsTrue(msgs.Any(m => m.Command == "SET"));
}
}
#if DEBUG
[Test]
public void MovedProfiling()
{
const string Key = "redirected-key";
const string Value = "redirected-value";
var profiler = new TestProfiler();
using (var conn = Create())
{
conn.RegisterProfiler(profiler);
var endpoints = conn.GetEndPoints();
var servers = Array.ConvertAll(endpoints, e => conn.GetServer(e));
conn.BeginProfiling(profiler.MyContext);
var db = conn.GetDatabase();
db.KeyDelete(Key);
db.StringSet(Key, Value);
var config = servers.First().ClusterConfiguration;
Assert.IsNotNull(config);
int slot = conn.HashSlot(Key);
var rightMasterNode = config.GetBySlot(Key);
Assert.IsNotNull(rightMasterNode);
string a = conn.GetServer(rightMasterNode.EndPoint).StringGet(db.Database, Key);
Assert.AreEqual(Value, a, "right master");
var wrongMasterNode = config.Nodes.FirstOrDefault(x => !x.IsSlave && x.NodeId != rightMasterNode.NodeId);
Assert.IsNotNull(wrongMasterNode);
string b = conn.GetServer(wrongMasterNode.EndPoint).StringGet(db.Database, Key);
Assert.AreEqual(Value, b, "wrong master, allow redirect");
var msgs = conn.FinishProfiling(profiler.MyContext).ToList();
// verify that things actually got recorded properly, and the retransmission profilings are connected as expected
{
// expect 1 DEL, 1 SET, 1 GET (to right master), 1 GET (to wrong master) that was responded to by an ASK, and 1 GET (to right master or a slave of it)
Assert.AreEqual(5, msgs.Count);
Assert.AreEqual(1, msgs.Count(c => c.Command == "DEL"));
Assert.AreEqual(1, msgs.Count(c => c.Command == "SET"));
Assert.AreEqual(3, msgs.Count(c => c.Command == "GET"));
var toRightMasterNotRetransmission = msgs.Where(m => m.Command == "GET" && m.EndPoint.Equals(rightMasterNode.EndPoint) && m.RetransmissionOf == null);
Assert.AreEqual(1, toRightMasterNotRetransmission.Count());
var toWrongMasterWithoutRetransmission = msgs.Where(m => m.Command == "GET" && m.EndPoint.Equals(wrongMasterNode.EndPoint) && m.RetransmissionOf == null);
Assert.AreEqual(1, toWrongMasterWithoutRetransmission.Count());
var toRightMasterOrSlaveAsRetransmission = msgs.Where(m => m.Command == "GET" && (m.EndPoint.Equals(rightMasterNode.EndPoint) || rightMasterNode.Children.Any(c => m.EndPoint.Equals(c.EndPoint))) && m.RetransmissionOf != null);
Assert.AreEqual(1, toRightMasterOrSlaveAsRetransmission.Count());
var originalWrongMaster = toWrongMasterWithoutRetransmission.Single();
var retransmissionToRight = toRightMasterOrSlaveAsRetransmission.Single();
Assert.IsTrue(object.ReferenceEquals(originalWrongMaster, retransmissionToRight.RetransmissionOf));
}
foreach(var msg in msgs)
{
Assert.IsTrue(msg.CommandCreated != default(DateTime));
Assert.IsTrue(msg.CreationToEnqueued > TimeSpan.Zero);
Assert.IsTrue(msg.EnqueuedToSending > TimeSpan.Zero);
Assert.IsTrue(msg.SentToResponse > TimeSpan.Zero);
Assert.IsTrue(msg.ResponseToCompletion > TimeSpan.Zero);
Assert.IsTrue(msg.ElapsedTime > TimeSpan.Zero);
if (msg.RetransmissionOf != null)
{
// imprecision of DateTime.UtcNow makes this pretty approximate
Assert.IsTrue(msg.RetransmissionOf.CommandCreated <= msg.CommandCreated);
Assert.AreEqual(RetransmissionReasonType.Moved, msg.RetransmissionReason.Value);
}
else
{
Assert.IsFalse(msg.RetransmissionReason.HasValue);
}
}
}
}
#endif
} }
} }
This diff is collapsed.
...@@ -69,10 +69,11 @@ ...@@ -69,10 +69,11 @@
<Compile Include="AsyncTests.cs" /> <Compile Include="AsyncTests.cs" />
<Compile Include="BasicOps.cs" /> <Compile Include="BasicOps.cs" />
<Compile Include="ConnectingFailDetection.cs" /> <Compile Include="ConnectingFailDetection.cs" />
<Compile Include="ConnectToUnexistingHost.cs" /> <Compile Include="ConnectToUnexistingHost.cs" />
<Compile Include="HyperLogLog.cs" /> <Compile Include="HyperLogLog.cs" />
<Compile Include="Issues\DefaultDatabase.cs" /> <Compile Include="Issues\DefaultDatabase.cs" />
<Compile Include="Issues\Issue182.cs" /> <Compile Include="Profiling.cs" />
<Compile Include="Issues\Issue182.cs" />
<Compile Include="WrapperBaseTests.cs" /> <Compile Include="WrapperBaseTests.cs" />
<Compile Include="TransactionWrapperTests.cs" /> <Compile Include="TransactionWrapperTests.cs" />
<Compile Include="Bits.cs" /> <Compile Include="Bits.cs" />
...@@ -134,6 +135,9 @@ ...@@ -134,6 +135,9 @@
<Name>StackExchange.Redis</Name> <Name>StackExchange.Redis</Name>
</ProjectReference> </ProjectReference>
</ItemGroup> </ItemGroup>
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.
......
...@@ -70,10 +70,17 @@ ...@@ -70,10 +70,17 @@
<ItemGroup> <ItemGroup>
<Compile Include="StackExchange\Redis\Aggregate.cs" /> <Compile Include="StackExchange\Redis\Aggregate.cs" />
<Compile Include="StackExchange\Redis\ClientType.cs" /> <Compile Include="StackExchange\Redis\ClientType.cs" />
<Compile Include="StackExchange\Redis\ConcurrentProfileStorageCollection.cs" />
<Compile Include="StackExchange\Redis\ConnectionMultiplexer.Profiling.cs">
<DependentUpon>ConnectionMultiplexer.cs</DependentUpon>
</Compile>
<Compile Include="StackExchange\Redis\ExtensionMethods.cs" /> <Compile Include="StackExchange\Redis\ExtensionMethods.cs" />
<Compile Include="StackExchange\Redis\HashEntry.cs" /> <Compile Include="StackExchange\Redis\HashEntry.cs" />
<Compile Include="StackExchange\Redis\InternalErrorEventArgs.cs" /> <Compile Include="StackExchange\Redis\InternalErrorEventArgs.cs" />
<Compile Include="StackExchange\Redis\IProfiler.cs" />
<Compile Include="StackExchange\Redis\MigrateOptions.cs" /> <Compile Include="StackExchange\Redis\MigrateOptions.cs" />
<Compile Include="StackExchange\Redis\ProfileContextTracker.cs" />
<Compile Include="StackExchange\Redis\ProfileStorage.cs" />
<Compile Include="StackExchange\Redis\LuaScript.cs" /> <Compile Include="StackExchange\Redis\LuaScript.cs" />
<Compile Include="StackExchange\Redis\RedisChannel.cs" /> <Compile Include="StackExchange\Redis\RedisChannel.cs" />
<Compile Include="StackExchange\Redis\Bitwise.cs" /> <Compile Include="StackExchange\Redis\Bitwise.cs" />
......
...@@ -64,11 +64,18 @@ ...@@ -64,11 +64,18 @@
<ItemGroup> <ItemGroup>
<Compile Include="StackExchange\Redis\Aggregate.cs" /> <Compile Include="StackExchange\Redis\Aggregate.cs" />
<Compile Include="StackExchange\Redis\ClientType.cs" /> <Compile Include="StackExchange\Redis\ClientType.cs" />
<Compile Include="StackExchange\Redis\ConcurrentProfileStorageCollection.cs" />
<Compile Include="StackExchange\Redis\ConnectionMultiplexer.Profiling.cs">
<DependentUpon>ConnectionMultiplexer.cs</DependentUpon>
</Compile>
<Compile Include="StackExchange\Redis\ExtensionMethods.cs" /> <Compile Include="StackExchange\Redis\ExtensionMethods.cs" />
<Compile Include="StackExchange\Redis\HashEntry.cs" /> <Compile Include="StackExchange\Redis\HashEntry.cs" />
<Compile Include="StackExchange\Redis\InternalErrorEventArgs.cs" /> <Compile Include="StackExchange\Redis\InternalErrorEventArgs.cs" />
<Compile Include="StackExchange\Redis\MigrateOptions.cs" /> <Compile Include="StackExchange\Redis\IProfiler.cs" />
<Compile Include="StackExchange\Redis\LuaScript.cs" /> <Compile Include="StackExchange\Redis\MigrateOptions.cs" />
<Compile Include="StackExchange\Redis\ProfileContextTracker.cs" />
<Compile Include="StackExchange\Redis\ProfileStorage.cs" />
<Compile Include="StackExchange\Redis\LuaScript.cs" />
<Compile Include="StackExchange\Redis\RedisChannel.cs" /> <Compile Include="StackExchange\Redis\RedisChannel.cs" />
<Compile Include="StackExchange\Redis\Bitwise.cs" /> <Compile Include="StackExchange\Redis\Bitwise.cs" />
<Compile Include="StackExchange\Redis\ClientFlags.cs" /> <Compile Include="StackExchange\Redis\ClientFlags.cs" />
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// A collection of IProfiledCommands.
///
/// This is a very light weight data structure, only supporting enumeration.
///
/// While it implements IEnumerable, it there are fewer allocations if one uses
/// it's explicit GetEnumerator() method. Using `foreach` does this automatically.
///
/// This type is not threadsafe.
/// </summary>
public struct ProfiledCommandEnumerable : IEnumerable<IProfiledCommand>
{
/// <summary>
/// Implements IEnumerator for ProfiledCommandEnumerable.
/// This implementation is comparable to List.Enumerator and Dictionary.Enumerator,
/// and is provided to reduce allocations in the common (ie. foreach) case.
///
/// This type is not threadsafe.
/// </summary>
public struct Enumerator : IEnumerator<IProfiledCommand>
{
ProfileStorage Head;
ProfileStorage CurrentBacker;
bool IsEmpty { get { return Head == null; } }
bool IsUnstartedOrFinished { get { return CurrentBacker == null; } }
internal Enumerator(ProfileStorage head)
{
Head = head;
CurrentBacker = null;
}
/// <summary>
/// The current element.
/// </summary>
public IProfiledCommand Current
{
get { return CurrentBacker; }
}
object System.Collections.IEnumerator.Current
{
get { return CurrentBacker; }
}
/// <summary>
/// Advances the enumeration, returning true if there is a new element to consume and false
/// if enumeration is complete.
/// </summary>
public bool MoveNext()
{
if (IsEmpty) return false;
if (IsUnstartedOrFinished)
{
CurrentBacker = Head;
}
else
{
CurrentBacker = CurrentBacker.NextElement;
}
return CurrentBacker != null;
}
/// <summary>
/// Resets the enumeration.
/// </summary>
public void Reset()
{
CurrentBacker = null;
}
/// <summary>
/// Disposes the enumeration.
/// subsequent attempts to enumerate results in undefined behavior.
/// </summary>
public void Dispose()
{
CurrentBacker = Head = null;
}
}
ProfileStorage Head;
internal ProfiledCommandEnumerable(ProfileStorage head)
{
Head = head;
}
/// <summary>
/// Returns an implementor of IEnumerator that, provided it isn't accessed
/// though an interface, avoids allocations.
///
/// `foreach` will automatically use this method.
/// </summary>
public Enumerator GetEnumerator()
{
return new Enumerator(Head);
}
IEnumerator<IProfiledCommand> IEnumerable<IProfiledCommand>.GetEnumerator()
{
return GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
/// <summary>
/// A thread-safe collection tailored to the "always append, with high contention, then enumerate once with no contention"
/// behavior of our profiling.
///
/// Performs better than ConcurrentBag, which is important since profiling code shouldn't impact timings.
/// </summary>
sealed class ConcurrentProfileStorageCollection
{
// internal for test purposes
internal static int AllocationCount = 0;
// It is, by definition, impossible for an element to be in 2 intrusive collections
// and we force Enumeration to release any reference to the collection object
// so we can **always** pool these.
const int PoolSize = 64;
static ConcurrentProfileStorageCollection[] Pool = new ConcurrentProfileStorageCollection[PoolSize];
volatile ProfileStorage Head;
private ConcurrentProfileStorageCollection() { }
// for testing purposes only
internal static int CountInPool()
{
var ret = 0;
for (var i = 0; i < PoolSize; i++)
{
var inPool = Pool[i];
if (inPool != null) ret++;
}
return ret;
}
/// <summary>
/// This method is thread-safe.
///
/// Adds an element to the bag.
///
/// Order is not preserved.
///
/// The element can only be a member of *one* bag.
/// </summary>
public void Add(ProfileStorage command)
{
do
{
var cur = Head;
command.NextElement = cur;
// Interlocked references to volatile fields are perfectly cromulent
#pragma warning disable 420
var got = Interlocked.CompareExchange(ref Head, command, cur);
#pragma warning restore 420
if (object.ReferenceEquals(got, cur)) break;
} while (true);
}
/// <summary>
/// This method returns an enumerable view of the bag, and returns it to
/// an internal pool for reuse by GetOrCreate().
///
/// It is not thread safe.
///
/// It should only be called once the bag is finished being mutated.
/// </summary>
public ProfiledCommandEnumerable EnumerateAndReturnForReuse()
{
var ret = new ProfiledCommandEnumerable(Head);
ReturnForReuse();
return ret;
}
/// <summary>
/// This returns the ConcurrentProfileStorageCollection to an internal pool for reuse by GetOrCreate().
/// </summary>
public void ReturnForReuse()
{
// no need for interlocking, this isn't a thread safe method
Head = null;
for (var i = 0; i < PoolSize; i++)
{
if (Interlocked.CompareExchange(ref Pool[i], this, null) == null) break;
}
}
/// <summary>
/// Returns a ConcurrentProfileStorageCollection to use.
///
/// It *may* have allocated a new one, or it may return one that has previously been released.
/// To return the collection, call EnumerateAndReturnForReuse()
/// </summary>
public static ConcurrentProfileStorageCollection GetOrCreate()
{
ConcurrentProfileStorageCollection found;
for (int i = 0; i < PoolSize; i++)
{
if ((found = Interlocked.Exchange(ref Pool[i], null)) != null)
{
return found;
}
}
Interlocked.Increment(ref AllocationCount);
found = new ConcurrentProfileStorageCollection();
return found;
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
partial class ConnectionMultiplexer
{
private IProfiler profiler;
// internal for test purposes
internal ProfileContextTracker profiledCommands;
/// <summary>
/// Sets an IProfiler instance for this ConnectionMultiplexer.
///
/// An IProfiler instances is used to determine which context to associate an
/// IProfiledCommand with. See BeginProfiling(object) and FinishProfiling(object)
/// for more details.
/// </summary>
public void RegisterProfiler(IProfiler profiler)
{
if (profiler == null) throw new ArgumentNullException("profiler");
if (this.profiler != null) throw new InvalidOperationException("IProfiler already registered for this ConnectionMultiplexer");
this.profiler = profiler;
this.profiledCommands = new ProfileContextTracker();
}
/// <summary>
/// Begins profiling for the given context.
///
/// If the same context object is returned by the registered IProfiler, the IProfiledCommands
/// will be associated with each other.
///
/// Call FinishProfiling with the same context to get the assocated commands.
///
/// Note that forContext cannot be a WeakReference or a WeakReference&lt;T&gt;
/// </summary>
public void BeginProfiling(object forContext)
{
if (profiler == null) throw new InvalidOperationException("Cannot begin profiling if no IProfiler has been registered with RegisterProfiler");
if (forContext == null) throw new ArgumentNullException("forContext");
if (forContext is WeakReference) throw new ArgumentException("Context object cannot be a WeakReference", "forContext");
if (!profiledCommands.TryCreate(forContext))
{
throw ExceptionFactory.BeganProfilingWithDuplicateContext(forContext);
}
}
/// <summary>
/// Stops profiling for the given context, returns all IProfiledCommands associated.
///
/// By default this may do a sweep for dead profiling contexts, you can disable this by passing "allowCleanupSweep: false".
/// </summary>
public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
{
if (profiler == null) throw new InvalidOperationException("Cannot begin profiling if no IProfiler has been registered with RegisterProfiler");
if (forContext == null) throw new ArgumentNullException("forContext");
ProfiledCommandEnumerable ret;
if (!profiledCommands.TryRemove(forContext, out ret))
{
throw ExceptionFactory.FinishedProfilingWithInvalidContext(forContext);
}
// conditional, because it could hurt and that may sometimes be unacceptable
if (allowCleanupSweep)
{
profiledCommands.TryCleanup();
}
return ret;
}
}
}
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Collections.Concurrent;
#if NET40 #if NET40
using Microsoft.Runtime.CompilerServices; using Microsoft.Runtime.CompilerServices;
#else #else
...@@ -1639,8 +1640,23 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce ...@@ -1639,8 +1640,23 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
server = null; server = null;
} }
} }
if (server != null) if (server != null)
{ {
if (profiler != null)
{
var profCtx = profiler.GetContext();
if(profCtx != null)
{
ConcurrentProfileStorageCollection inFlightForCtx;
if (profiledCommands.TryGetValue(profCtx, out inFlightForCtx))
{
message.SetProfileStorage(ProfileStorage.NewWithContext(inFlightForCtx, server));
}
}
}
if (message.Db >= 0) if (message.Db >= 0)
{ {
int availableDatabases = server.Databases; int availableDatabases = server.Databases;
...@@ -1782,6 +1798,7 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process ...@@ -1782,6 +1798,7 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
{ {
return CompletedTask<T>.Default(state); return CompletedTask<T>.Default(state);
} }
if (message.IsFireAndForget) if (message.IsFireAndForget)
{ {
TryPushMessageToBridge(message, processor, null, ref server); TryPushMessageToBridge(message, processor, null, ref server);
......
...@@ -117,5 +117,19 @@ internal static Exception UnableToConnect(string failureMessage=null) ...@@ -117,5 +117,19 @@ internal static Exception UnableToConnect(string failureMessage=null)
return new RedisConnectionException(ConnectionFailureType.UnableToConnect, return new RedisConnectionException(ConnectionFailureType.UnableToConnect,
"It was not possible to connect to the redis server(s); to create a disconnected multiplexer, disable AbortOnConnectFail. " + failureMessage); "It was not possible to connect to the redis server(s); to create a disconnected multiplexer, disable AbortOnConnectFail. " + failureMessage);
} }
internal static Exception BeganProfilingWithDuplicateContext(object forContext)
{
var exc = new InvalidOperationException("Attempted to begin profiling for the same context twice");
exc.Data["forContext"] = forContext;
return exc;
}
internal static Exception FinishedProfilingWithInvalidContext(object forContext)
{
var exc = new InvalidOperationException("Attempted to finish profiling for a context which is no longer valid, or was never begun");
exc.Data["forContext"] = forContext;
return exc;
}
} }
} }
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// If an IProfiledCommand is a retransmission of a previous command, this enum
/// is used to indicate what prompted the retransmission.
///
/// This can be used to distinguish between transient causes (moving hashslots, joining nodes, etc.)
/// and incorrect routing.
/// </summary>
public enum RetransmissionReasonType
{
/// <summary>
/// No stated reason
/// </summary>
None = 0,
/// <summary>
/// Issued to investigate which node owns a key
/// </summary>
Ask,
/// <summary>
/// A node has indicated that it does *not* own the given key
/// </summary>
Moved
}
/// <summary>
/// A profiled command against a redis instance.
///
/// TimeSpans returned by this interface use a high precision timer if possible.
/// DateTimes returned by this interface are no more precise than DateTime.UtcNow.
/// </summary>
public interface IProfiledCommand
{
/// <summary>
/// The endpoint this command was sent to.
/// </summary>
EndPoint EndPoint { get; }
/// <summary>
/// The Db this command was sent to.
/// </summary>
int Db { get; }
/// <summary>
/// The name of this command.
/// </summary>
string Command { get; }
/// <summary>
/// The CommandFlags the command was submitted with.
/// </summary>
CommandFlags Flags { get; }
/// <summary>
/// When this command was *created*, will be approximately
/// when the paired method of StackExchange.Redis was called but
/// before that method returned.
///
/// Note that the resolution of the returned DateTime is limited by DateTime.UtcNow.
/// </summary>
DateTime CommandCreated { get; }
/// <summary>
/// How long this command waited to be added to the queue of pending
/// redis commands. A large TimeSpan indicates serious contention for
/// the pending queue.
/// </summary>
TimeSpan CreationToEnqueued { get; }
/// <summary>
/// How long this command spent in the pending queue before being sent to redis.
/// A large TimeSpan can indicate a large number of pending events, large pending events,
/// or network issues.
/// </summary>
TimeSpan EnqueuedToSending { get; }
/// <summary>
/// How long before Redis responded to this command and it's response could be handled after it was sent.
/// A large TimeSpan can indicate a large response body, an overtaxed redis instance, or network issues.
/// </summary>
TimeSpan SentToResponse { get; }
/// <summary>
/// How long between Redis responding to this command and awaiting consumers being notified.
/// </summary>
TimeSpan ResponseToCompletion { get; }
/// <summary>
/// How long it took this redis command to be processed, from creation to deserializing the final resposne.
///
/// Note that this TimeSpan *does not* include time spent awaiting a Task in consumer code.
/// </summary>
TimeSpan ElapsedTime { get; }
/// <summary>
/// If a command has to be resent due to an ASK or MOVED response from redis (in a cluster configuration),
/// the second sending of the command will have this property set to the original IProfiledCommand.
///
/// This can only be set if redis is configured as a cluster.
/// </summary>
IProfiledCommand RetransmissionOf { get; }
/// <summary>
/// If RetransmissionOf is not null, this property will be set to either Ask or Moved to indicate
/// what sort of response triggered the retransmission.
///
/// This can be useful for determining the root cause of extra commands.
/// </summary>
RetransmissionReasonType? RetransmissionReason { get; }
}
/// <summary>
/// Interface for profiling individual commands against an Redis ConnectionMulitplexer.
/// </summary>
public interface IProfiler
{
/// <summary>
/// Called to provide a context object.
///
/// This method is called before the method which triggers work against redis (such as StringSet(Async)) returns,
/// and will always be called on the same thread as that method.
///
/// Note that GetContext() may be called even if ConnectionMultiplexer.BeginProfiling() has not been called.
/// You may return `null` to prevent any tracking of commands.
/// </summary>
object GetContext();
}
}
...@@ -122,6 +122,7 @@ internal override void WriteImpl(PhysicalConnection physical) ...@@ -122,6 +122,7 @@ internal override void WriteImpl(PhysicalConnection physical)
public TextWriter Log { get { return log; } } public TextWriter Log { get { return log; } }
} }
abstract class Message : ICompletable abstract class Message : ICompletable
{ {
...@@ -145,7 +146,12 @@ abstract class Message : ICompletable ...@@ -145,7 +146,12 @@ abstract class Message : ICompletable
internal CommandFlags FlagsRaw { get { return flags; } set { flags = value; } } internal CommandFlags FlagsRaw { get { return flags; } set { flags = value; } }
private ResultBox resultBox; private ResultBox resultBox;
private ResultProcessor resultProcessor; private ResultProcessor resultProcessor;
// All for profiling purposes
private ProfileStorage performance;
internal DateTime createdDateTime;
internal long createdTimestamp;
protected Message(int db, CommandFlags flags, RedisCommand command) protected Message(int db, CommandFlags flags, RedisCommand command)
{ {
...@@ -183,7 +189,31 @@ protected Message(int db, CommandFlags flags, RedisCommand command) ...@@ -183,7 +189,31 @@ protected Message(int db, CommandFlags flags, RedisCommand command)
} }
this.Db = db; this.Db = db;
this.command = command; this.command = command;
this.flags = flags & UserSelectableFlags; this.flags = flags & UserSelectableFlags;
createdDateTime = DateTime.UtcNow;
createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
}
internal void SetProfileStorage(ProfileStorage storage)
{
performance = storage;
performance.SetMessage(this);
}
internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved)
{
if (performance == null) return;
var oldPerformance = performance;
oldPerformance.SetCompleted();
performance = null;
createdDateTime = DateTime.UtcNow;
createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
performance = ProfileStorage.NewAttachedToSameContext(oldPerformance, resendTo, isMoved);
performance.SetMessage(this);
} }
public RedisCommand Command { get { return command; } } public RedisCommand Command { get { return command; } }
...@@ -456,17 +486,34 @@ public override string ToString() ...@@ -456,17 +486,34 @@ public override string ToString()
{ {
return string.Format("[{0}]:{1} ({2})", Db, CommandAndKey, return string.Format("[{0}]:{1} ({2})", Db, CommandAndKey,
resultProcessor == null ? "(n/a)" : resultProcessor.GetType().Name); resultProcessor == null ? "(n/a)" : resultProcessor.GetType().Name);
}
public void SetResponseReceived()
{
if (performance != null)
{
performance.SetResponseReceived();
}
} }
public bool TryComplete(bool isAsync) public bool TryComplete(bool isAsync)
{ {
if (resultBox != null) if (resultBox != null)
{ {
return resultBox.TryComplete(isAsync); var ret = resultBox.TryComplete(isAsync);
if (performance != null)
{
performance.SetCompleted();
}
return ret;
} }
else else
{ {
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message"); ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
if (performance != null)
{
performance.SetCompleted();
}
return true; return true;
} }
} }
...@@ -600,6 +647,22 @@ internal void Fail(ConnectionFailureType failure, Exception innerException) ...@@ -600,6 +647,22 @@ internal void Fail(ConnectionFailureType failure, Exception innerException)
{ {
resultProcessor.ConnectionFail(this, failure, innerException); resultProcessor.ConnectionFail(this, failure, innerException);
} }
}
internal void SetEnqueued()
{
if(performance != null)
{
performance.SetEnqueued();
}
}
internal void SetRequestSent()
{
if (performance != null)
{
performance.SetRequestSent();
}
} }
internal void SetAsking(bool value) internal void SetAsking(bool value)
......
...@@ -136,7 +136,8 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -136,7 +136,8 @@ public bool TryEnqueue(Message message, bool isSlave)
{ {
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
queue.Push(message); queue.Push(message);
message.SetEnqueued();
return true; return true;
} }
else else
...@@ -146,7 +147,8 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -146,7 +147,8 @@ public bool TryEnqueue(Message message, bool isSlave)
} }
} }
bool reqWrite = queue.Push(message); bool reqWrite = queue.Push(message);
message.SetEnqueued();
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
Trace("Now pending: " + GetPendingCount()); Trace("Now pending: " + GetPendingCount());
...@@ -547,7 +549,10 @@ internal bool WriteMessageDirect(PhysicalConnection tmp, Message next) ...@@ -547,7 +549,10 @@ internal bool WriteMessageDirect(PhysicalConnection tmp, Message next)
CompleteSyncOrAsync(next); CompleteSyncOrAsync(next);
return false; return false;
} }
} }
next.SetRequestSent();
return true; return true;
} }
else else
...@@ -742,7 +747,8 @@ private void SelectDatabase(PhysicalConnection connection, Message message) ...@@ -742,7 +747,8 @@ private void SelectDatabase(PhysicalConnection connection, Message message)
if (sel != null) if (sel != null)
{ {
connection.Enqueue(sel); connection.Enqueue(sel);
sel.WriteImpl(connection); sel.WriteImpl(connection);
sel.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
} }
} }
...@@ -768,7 +774,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -768,7 +774,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
if (readmode != null) if (readmode != null)
{ {
connection.Enqueue(readmode); connection.Enqueue(readmode);
readmode.WriteTo(connection); readmode.WriteTo(connection);
readmode.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
} }
...@@ -776,7 +783,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -776,7 +783,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
{ {
var asking = ReusableAskingCommand; var asking = ReusableAskingCommand;
connection.Enqueue(asking); connection.Enqueue(asking);
asking.WriteImpl(connection); asking.WriteImpl(connection);
asking.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
} }
} }
...@@ -794,7 +802,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -794,7 +802,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
} }
connection.Enqueue(message); connection.Enqueue(message);
message.WriteImpl(connection); message.WriteImpl(connection);
message.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
// some commands smash our ability to trust the database; some commands // some commands smash our ability to trust the database; some commands
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// Big ol' wrapper around most of the profiling storage logic, 'cause it got too big to just live in ConnectionMultiplexer.
/// </summary>
sealed class ProfileContextTracker
{
/// <summary>
/// Necessary, because WeakReference can't be readily comparable (since the reference is... weak).
///
/// This lets us detect leaks* with some reasonable confidence, and cleanup periodically.
///
/// Some calisthenics are done to avoid allocating WeakReferences for no reason, as often
/// we're just looking up ProfileStorage.
///
/// * Somebody starts profiling, but for whatever reason never *stops* with a context object
/// </summary>
struct ProfileContextCell : IEquatable<ProfileContextCell>
{
// This is a union of (object|WeakReference); if it's a WeakReference
// then we're actually interested in it's Target, otherwise
// we're concerned about the actual value of Reference
object Reference;
// It is absolutely crucial that this value **never change** once instantiated
readonly int HashCode;
public bool IsContextLeaked
{
get
{
object ignored;
return !TryGetTarget(out ignored);
}
}
private ProfileContextCell(object forObj, bool isEphemeral)
{
HashCode = forObj.GetHashCode();
if (isEphemeral)
{
Reference = forObj;
}
else
{
Reference = new WeakReference(forObj, trackResurrection: true); // ughhh, have to handle finalizers
}
}
/// <summary>
/// Suitable for use as a key into something.
///
/// This instance **WILL NOT** keep forObj alive, so it can
/// be copied out of the calling method's scope.
/// </summary>
public static ProfileContextCell ToStoreUnder(object forObj)
{
return new ProfileContextCell(forObj, isEphemeral: false);
}
/// <summary>
/// Only suitable for looking up.
///
/// This instance **ABSOLUTELY WILL** keep forObj alive, so this
/// had better not be copied into anything outside the scope of the
/// calling method.
/// </summary>
public static ProfileContextCell ToLookupBy(object forObj)
{
return new ProfileContextCell(forObj, isEphemeral: true);
}
bool TryGetTarget(out object target)
{
var asWeakRef = Reference as WeakReference;
if (asWeakRef == null)
{
target = Reference;
return true;
}
// Do not use IsAlive here, it's race city
target = asWeakRef.Target;
return target != null;
}
public override bool Equals(object obj)
{
if (!(obj is ProfileContextCell)) return false;
return Equals((ProfileContextCell)obj);
}
public override int GetHashCode()
{
return HashCode;
}
public bool Equals(ProfileContextCell other)
{
object thisObj, otherObj;
if (other.TryGetTarget(out otherObj) != TryGetTarget(out thisObj)) return false;
// dead references are equal
if (thisObj == null) return true;
return thisObj.Equals(otherObj);
}
}
// provided so default behavior doesn't do any boxing, for sure
sealed class ProfileContextCellComparer : IEqualityComparer<ProfileContextCell>
{
public static readonly ProfileContextCellComparer Singleton = new ProfileContextCellComparer();
private ProfileContextCellComparer() { }
public bool Equals(ProfileContextCell x, ProfileContextCell y)
{
return x.Equals(y);
}
public int GetHashCode(ProfileContextCell obj)
{
return obj.GetHashCode();
}
}
private long lastCleanupSweep;
private ConcurrentDictionary<ProfileContextCell, ConcurrentProfileStorageCollection> profiledCommands;
public int ContextCount { get { return profiledCommands.Count; } }
public ProfileContextTracker()
{
profiledCommands = new ConcurrentDictionary<ProfileContextCell, ConcurrentProfileStorageCollection>(ProfileContextCellComparer.Singleton);
lastCleanupSweep = DateTime.UtcNow.Ticks;
}
/// <summary>
/// Registers the passed context with a collection that can be retried with subsequent calls to TryGetValue.
///
/// Returns false if the passed context object is already registered.
/// </summary>
public bool TryCreate(object ctx)
{
var cell = ProfileContextCell.ToStoreUnder(ctx);
// we can't pass this as a delegate, because TryAdd may invoke the factory multiple times,
// which would lead to over allocation.
var storage = ConcurrentProfileStorageCollection.GetOrCreate();
return profiledCommands.TryAdd(cell, storage);
}
/// <summary>
/// Returns true and sets val to the tracking collection associated with the given context if the context
/// was registered with TryCreate.
///
/// Otherwise returns false and sets val to null.
/// </summary>
public bool TryGetValue(object ctx, out ConcurrentProfileStorageCollection val)
{
var cell = ProfileContextCell.ToLookupBy(ctx);
return profiledCommands.TryGetValue(cell, out val);
}
/// <summary>
/// Removes a context, setting all commands to a (non-thread safe) enumerable of
/// all the commands attached to that context.
///
/// If the context was never registered, will return false and set commands to null.
///
/// Subsequent calls to TryRemove with the same context will return false unless it is
/// re-registered with TryCreate.
/// </summary>
public bool TryRemove(object ctx, out ProfiledCommandEnumerable commands)
{
var cell = ProfileContextCell.ToLookupBy(ctx);
ConcurrentProfileStorageCollection storage;
if (!profiledCommands.TryRemove(cell, out storage))
{
commands = default(ProfiledCommandEnumerable);
return false;
}
commands = storage.EnumerateAndReturnForReuse();
return true;
}
/// <summary>
/// If enough time has passed (1 minute) since the last call, this does walk of all contexts
/// and removes those that the GC has collected.
/// </summary>
public bool TryCleanup()
{
const long SweepEveryTicks = 600000000; // once a minute, tops
var now = DateTime.UtcNow.Ticks; // resolution on this isn't great, but it's good enough
var last = lastCleanupSweep;
var since = now - last;
if (since < SweepEveryTicks) return false;
// this is just to keep other threads from wasting time, in theory
// it'd be perfectly safe for this to run concurrently
var saw = Interlocked.CompareExchange(ref lastCleanupSweep, now, last);
if (saw != last) return false;
if (profiledCommands.Count == 0) return false;
using(var e = profiledCommands.GetEnumerator())
{
while(e.MoveNext())
{
var pair = e.Current;
if(pair.Key.IsContextLeaked)
{
ConcurrentProfileStorageCollection abandoned;
if(profiledCommands.TryRemove(pair.Key, out abandoned))
{
// shove it back in the pool, but don't bother enumerating
abandoned.ReturnForReuse();
}
}
}
}
return true;
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
class ProfileStorage : IProfiledCommand
{
#region IProfiledCommand Impl
public EndPoint EndPoint
{
get { return Server.EndPoint; }
}
public int Db
{
get { return Message.Db; }
}
public string Command
{
get { return Message.Command.ToString(); }
}
public CommandFlags Flags
{
get { return Message.Flags; }
}
public DateTime CommandCreated
{
get { return MessageCreatedDateTime; }
}
public TimeSpan CreationToEnqueued
{
get { return TimeSpan.FromTicks(EnqueuedTimeStamp - MessageCreatedTimeStamp); }
}
public TimeSpan EnqueuedToSending
{
get { return TimeSpan.FromTicks(RequestSentTimeStamp - EnqueuedTimeStamp); }
}
public TimeSpan SentToResponse
{
get { return TimeSpan.FromTicks(ResponseReceivedTimeStamp - RequestSentTimeStamp); }
}
public TimeSpan ResponseToCompletion
{
get { return TimeSpan.FromTicks(CompletedTimeStamp - ResponseReceivedTimeStamp); }
}
public TimeSpan ElapsedTime
{
get { return TimeSpan.FromTicks(CompletedTimeStamp - MessageCreatedTimeStamp); }
}
public IProfiledCommand RetransmissionOf
{
get { return OriginalProfiling; }
}
public RetransmissionReasonType? RetransmissionReason
{
get { return Reason; }
}
#endregion
public ProfileStorage NextElement { get; set; }
private Message Message;
private ServerEndPoint Server;
private ProfileStorage OriginalProfiling;
private RetransmissionReasonType? Reason;
private DateTime MessageCreatedDateTime;
private long MessageCreatedTimeStamp;
private long EnqueuedTimeStamp;
private long RequestSentTimeStamp;
private long ResponseReceivedTimeStamp;
private long CompletedTimeStamp;
private ConcurrentProfileStorageCollection PushToWhenFinished;
private ProfileStorage(ConcurrentProfileStorageCollection pushTo, ServerEndPoint server, ProfileStorage resentFor, RetransmissionReasonType? reason)
{
PushToWhenFinished = pushTo;
OriginalProfiling = resentFor;
Server = server;
Reason = reason;
}
public static ProfileStorage NewWithContext(ConcurrentProfileStorageCollection pushTo, ServerEndPoint server)
{
return new ProfileStorage(pushTo, server, null, null);
}
public static ProfileStorage NewAttachedToSameContext(ProfileStorage resentFor, ServerEndPoint server, bool isMoved)
{
return new ProfileStorage(resentFor.PushToWhenFinished, server, resentFor, isMoved ? RetransmissionReasonType.Moved : RetransmissionReasonType.Ask);
}
public void SetMessage(Message msg)
{
// This method should never be called twice
if (Message != null) throw new InvalidOperationException();
Message = msg;
MessageCreatedDateTime = msg.createdDateTime;
MessageCreatedTimeStamp = msg.createdTimestamp;
}
public void SetEnqueued()
{
// This method should never be called twice
if (EnqueuedTimeStamp > 0) throw new InvalidOperationException();
EnqueuedTimeStamp = Stopwatch.GetTimestamp();
}
public void SetRequestSent()
{
// This method should never be called twice
if (RequestSentTimeStamp > 0) throw new InvalidOperationException();
RequestSentTimeStamp = Stopwatch.GetTimestamp();
}
public void SetResponseReceived()
{
if (ResponseReceivedTimeStamp > 0) throw new InvalidOperationException();
ResponseReceivedTimeStamp = Stopwatch.GetTimestamp();
}
public void SetCompleted()
{
// this method can be called multiple times, depending on how the task completed (async vs not)
// so we actually have to guard against it.
var now = Stopwatch.GetTimestamp();
var oldVal = Interlocked.CompareExchange(ref CompletedTimeStamp, now, 0);
// second call
if (oldVal != 0) return;
// only push on the first call, no dupes!
PushToWhenFinished.Add(this);
}
public override string ToString()
{
return
string.Format(
@"EndPoint = {0}
Db = {1}
Command = {2}
CommandCreated = {3:u}
CreationToEnqueued = {4}
EnqueuedToSending = {5}
SentToResponse = {6}
ResponseToCompletion = {7}
ElapsedTime = {8}
Flags = {9}
RetransmissionOf = ({10})",
EndPoint,
Db,
Command,
CommandCreated,
CreationToEnqueued,
EnqueuedToSending,
SentToResponse,
ResponseToCompletion,
ElapsedTime,
Flags,
RetransmissionOf
);
}
}
}
...@@ -149,7 +149,8 @@ public bool WasQueued ...@@ -149,7 +149,8 @@ public bool WasQueued
public Message Wrapped { get { return wrapped; } } public Message Wrapped { get { return wrapped; } }
internal override void WriteImpl(PhysicalConnection physical) internal override void WriteImpl(PhysicalConnection physical)
{ {
wrapped.WriteImpl(physical); wrapped.WriteImpl(physical);
wrapped.SetRequestSent();
} }
} }
......
...@@ -7,7 +7,7 @@ namespace StackExchange.Redis ...@@ -7,7 +7,7 @@ namespace StackExchange.Redis
{ {
abstract partial class ResultBox abstract partial class ResultBox
{ {
protected Exception exception; protected Exception exception;
public void SetException(Exception exception) public void SetException(Exception exception)
{ {
...@@ -21,6 +21,7 @@ public void SetException(Exception exception) ...@@ -21,6 +21,7 @@ public void SetException(Exception exception)
// this.exception = caught; // this.exception = caught;
//} //}
} }
public abstract bool TryComplete(bool isAsync); public abstract bool TryComplete(bool isAsync);
[Conditional("DEBUG")] [Conditional("DEBUG")]
...@@ -37,11 +38,11 @@ sealed class ResultBox<T> : ResultBox ...@@ -37,11 +38,11 @@ sealed class ResultBox<T> : ResultBox
private object stateOrCompletionSource; private object stateOrCompletionSource;
private T value; private T value;
public ResultBox(object stateOrCompletionSource) public ResultBox(object stateOrCompletionSource)
{ {
this.stateOrCompletionSource = stateOrCompletionSource; this.stateOrCompletionSource = stateOrCompletionSource;
} }
public object AsyncState public object AsyncState
{ {
...@@ -135,7 +136,8 @@ public override bool TryComplete(bool isAsync) ...@@ -135,7 +136,8 @@ public override bool TryComplete(bool isAsync)
private void Reset(object stateOrCompletionSource) private void Reset(object stateOrCompletionSource)
{ {
value = default(T); value = default(T);
exception = null; exception = null;
this.stateOrCompletionSource = stateOrCompletionSource; this.stateOrCompletionSource = stateOrCompletionSource;
} }
} }
......
...@@ -142,7 +142,9 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra ...@@ -142,7 +142,9 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
bool log = !message.IsInternalCall; bool log = !message.IsInternalCall;
bool isMoved = result.AssertStarts(MOVED); bool isMoved = result.AssertStarts(MOVED);
if (isMoved || result.AssertStarts(ASK)) if (isMoved || result.AssertStarts(ASK))
{ {
message.SetResponseReceived();
log = false; log = false;
string[] parts = result.GetString().Split(StringSplits.Space, 3); string[] parts = result.GetString().Split(StringSplits.Space, 3);
int hashSlot; int hashSlot;
...@@ -1280,7 +1282,9 @@ internal abstract class ResultProcessor<T> : ResultProcessor ...@@ -1280,7 +1282,9 @@ internal abstract class ResultProcessor<T> : ResultProcessor
protected void SetResult(Message message, T value) protected void SetResult(Message message, T value)
{ {
if (message == null) return; if (message == null) return;
var box = message.ResultBox as ResultBox<T>; var box = message.ResultBox as ResultBox<T>;
message.SetResponseReceived();
if (box != null) box.SetResult(value); if (box != null) box.SetResult(value);
} }
} }
......
...@@ -150,7 +150,8 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM ...@@ -150,7 +150,8 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
multiplexer.Trace("Unable to resend to " + endpoint); multiplexer.Trace("Unable to resend to " + endpoint);
} }
else else
{ {
message.PrepareToResend(resendVia, isMoved);
retry = resendVia.TryEnqueue(message); retry = resendVia.TryEnqueue(message);
} }
} }
......
...@@ -73,6 +73,10 @@ ...@@ -73,6 +73,10 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClientType.cs"> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClientType.cs">
<Link>ClientType.cs</Link> <Link>ClientType.cs</Link>
</Compile> </Compile>
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ConcurrentProfileStorageCollection.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ConnectionMultiplexer.Profiling.cs">
<DependentUpon>ConnectionMultiplexer.cs</DependentUpon>
</Compile>
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClusterConfiguration.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClusterConfiguration.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandFlags.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandFlags.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandMap.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandMap.cs" />
...@@ -103,6 +107,7 @@ ...@@ -103,6 +107,7 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IDatabaseAsync.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IDatabaseAsync.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IMultiMessage.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IMultiMessage.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\InternalErrorEventArgs.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\InternalErrorEventArgs.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IProfiler.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedis.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedis.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedisAsync.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedisAsync.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IServer.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IServer.cs" />
...@@ -113,6 +118,8 @@ ...@@ -113,6 +118,8 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageCompletable.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageCompletable.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageQueue.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageQueue.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MigrateOptions.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\MigrateOptions.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ProfileContextTracker.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ProfileStorage.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\LuaScript.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\LuaScript.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\Order.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\Order.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\PhysicalBridge.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\PhysicalBridge.cs" />
......
...@@ -70,6 +70,10 @@ ...@@ -70,6 +70,10 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClientType.cs"> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClientType.cs">
<Link>ClientType.cs</Link> <Link>ClientType.cs</Link>
</Compile> </Compile>
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ConcurrentProfileStorageCollection.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ConnectionMultiplexer.Profiling.cs">
<DependentUpon>ConnectionMultiplexer.cs</DependentUpon>
</Compile>
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClusterConfiguration.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\ClusterConfiguration.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandFlags.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandFlags.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandMap.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\CommandMap.cs" />
...@@ -100,6 +104,7 @@ ...@@ -100,6 +104,7 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IDatabaseAsync.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IDatabaseAsync.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IMultiMessage.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IMultiMessage.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\InternalErrorEventArgs.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\InternalErrorEventArgs.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IProfiler.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedis.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedis.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedisAsync.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IRedisAsync.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\IServer.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\IServer.cs" />
...@@ -110,6 +115,8 @@ ...@@ -110,6 +115,8 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageCompletable.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageCompletable.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageQueue.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageQueue.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MigrateOptions.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\MigrateOptions.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ProfileContextTracker.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\ProfileStorage.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\LuaScript.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\LuaScript.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\Order.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\Order.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\PhysicalBridge.cs" /> <Compile Include="..\StackExchange.Redis\StackExchange\Redis\PhysicalBridge.cs" />
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment