Commit ab5bdcac authored by mgravell's avatar mgravell

- add new "feature flag" API for undocumented experimental oddness

- add more tracking levels to read-state
- if PreventThreadTheft feature flag is enabled, explicitly push continuations to the thread-pool
parent 9976b286
......@@ -13,6 +13,7 @@
using System.Runtime.CompilerServices;
using StackExchange.Redis.Profiling;
using Pipelines.Sockets.Unofficial;
using System.ComponentModel;
namespace StackExchange.Redis
{
......@@ -21,6 +22,41 @@ namespace StackExchange.Redis
/// </summary>
public sealed partial class ConnectionMultiplexer : IInternalConnectionMultiplexer // implies : IConnectionMultiplexer and : IDisposable
{
[Flags]
private enum FeatureFlags
{
None,
PreventThreadTheft = 1,
}
private static FeatureFlags s_featureFlags;
/// <summary>
/// Enables or disables a feature flag; this should only be used under support guidance, and should not be rapidly toggled
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[Browsable(false)]
public static void SetFeatureFlag(string flag, bool enabled)
{
if (Enum.TryParse<FeatureFlags>(flag, true, out var flags))
{
if (enabled) s_featureFlags |= flags;
else s_featureFlags &= ~flags;
}
}
/// <summary>
/// Returns the state of a feature flag; this should only be used under support guidance
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[Browsable(false)]
public static bool GetFeatureFlag(string flag)
=> Enum.TryParse<FeatureFlags>(flag, true, out var flags)
&& (s_featureFlags & flags) == flags;
internal static bool PreventThreadTheft => (s_featureFlags & FeatureFlags.PreventThreadTheft) != 0;
private static TaskFactory _factory = null;
#if DEBUG
......
......@@ -470,6 +470,15 @@ public void Complete()
currBox?.ActivateContinuations();
}
internal bool ResultBoxIsAsync
{
get
{
var currBox = Volatile.Read(ref resultBox);
return currBox != null && currBox.IsAsync;
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys)
{
switch (keys.Length)
......
......@@ -1420,10 +1420,10 @@ private void MatchResult(in RawResult result)
_readStatus = ReadStatus.ComputeResult;
if (msg.ComputeResult(this, result))
{
_readStatus = ReadStatus.CompletePendingMessage;
_readStatus = msg.ResultBoxIsAsync ? ReadStatus.CompletePendingMessageAsync : ReadStatus.CompletePendingMessageSync;
msg.Complete();
}
_readStatus = ReadStatus.MatchResultComplete;
_activeMessage = null;
}
......@@ -1561,9 +1561,11 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
}
finally
{
_readStatus = ReadStatus.ResetArena;
_arena.Reset();
}
}
_readStatus = ReadStatus.ProcessBufferComplete;
return messageCount;
}
//void ISocketCallback.Read()
......@@ -1700,8 +1702,11 @@ internal enum ReadStatus
InvokePubSub,
DequeueResult,
ComputeResult,
CompletePendingMessage,
CompletePendingMessageSync,
CompletePendingMessageAsync,
MatchResultComplete,
ResetArena,
ProcessBufferComplete,
NA = -1,
}
private volatile ReadStatus _readStatus;
......
......@@ -27,7 +27,15 @@ internal abstract class SimpleResultBox : IResultBox
void IResultBox.SetException(Exception exception) => _exception = exception ?? CancelledException;
void IResultBox.Cancel() => _exception = CancelledException;
static readonly WaitCallback s_ActivateContinuations = state => ((SimpleResultBox)state).ActivateContinuationsImpl();
void IResultBox.ActivateContinuations()
{
if (ConnectionMultiplexer.PreventThreadTheft)
ThreadPool.QueueUserWorkItem(s_ActivateContinuations, this);
else
ActivateContinuationsImpl();
}
private void ActivateContinuationsImpl()
{
lock (this)
{ // tell the waiting thread that we're done
......@@ -108,7 +116,15 @@ T IResultBox<T>.GetResult(out Exception ex, bool _)
// nothing to do re recycle: TaskCompletionSource<T> cannot be recycled
}
static readonly WaitCallback s_ActivateContinuations = state => ((TaskResultBox<T>)state).ActivateContinuationsImpl();
void IResultBox.ActivateContinuations()
{
if (ConnectionMultiplexer.PreventThreadTheft)
ThreadPool.QueueUserWorkItem(s_ActivateContinuations, this);
else
ActivateContinuationsImpl();
}
private void ActivateContinuationsImpl()
{
var val = _value;
var ex = _exception;
......
using Xunit;
namespace StackExchange.Redis.Tests
{
[Collection(NonParallelCollection.Name)]
public class FeatureFlags
{
[Fact]
public void UnknownFlagToggle()
{
Assert.False(ConnectionMultiplexer.GetFeatureFlag("nope"));
ConnectionMultiplexer.SetFeatureFlag("nope", true);
Assert.False(ConnectionMultiplexer.GetFeatureFlag("nope"));
}
[Fact]
public void KnownFlagToggle()
{
Assert.False(ConnectionMultiplexer.GetFeatureFlag("preventthreadtheft"));
ConnectionMultiplexer.SetFeatureFlag("preventthreadtheft", true);
Assert.True(ConnectionMultiplexer.GetFeatureFlag("preventthreadtheft"));
ConnectionMultiplexer.SetFeatureFlag("preventthreadtheft", false);
Assert.False(ConnectionMultiplexer.GetFeatureFlag("preventthreadtheft"));
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment