Commit aaa6afa0 authored by Nick Craver's avatar Nick Craver

Lib: general cleanup

parent 4ac77037
...@@ -10,7 +10,7 @@ public class AggregationTest : RediSearchTestBase ...@@ -10,7 +10,7 @@ public class AggregationTest : RediSearchTestBase
public AggregationTest(ITestOutputHelper output) : base(output) { } public AggregationTest(ITestOutputHelper output) : base(output) { }
[Fact] [Fact]
public void testAggregations() public void TestAggregations()
{ {
/** /**
127.0.0.1:6379> FT.CREATE test_index SCHEMA name TEXT SORTABLE count NUMERIC SORTABLE 127.0.0.1:6379> FT.CREATE test_index SCHEMA name TEXT SORTABLE count NUMERIC SORTABLE
...@@ -35,7 +35,6 @@ public void testAggregations() ...@@ -35,7 +35,6 @@ public void testAggregations()
.GroupBy("@name", Reducers.Sum("@count").As("sum")) .GroupBy("@name", Reducers.Sum("@count").As("sum"))
.SortBy(SortedField.Descending("@sum"), 10); .SortBy(SortedField.Descending("@sum"), 10);
// actual search // actual search
AggregationResult res = cl.Aggregate(r); AggregationResult res = cl.Aggregate(r);
var r1 = res.GetRow(0); var r1 = res.GetRow(0);
......
...@@ -11,7 +11,7 @@ public SortedField(string field, Order order) ...@@ -11,7 +11,7 @@ public SortedField(string field, Order order)
Field = field; Field = field;
Order = order; Order = order;
} }
public string Field { get; } public string Field { get; }
public Order Order { get; } public Order Order { get; }
......
...@@ -26,8 +26,8 @@ public async Task TestManualIncr() ...@@ -26,8 +26,8 @@ public async Task TestManualIncr()
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
{ {
conn.KeyDelete(key, CommandFlags.FireAndForget); conn.KeyDelete(key, CommandFlags.FireAndForget);
Assert.Equal(1, await ManualIncrAsync(conn, key)); Assert.Equal(1, await ManualIncrAsync(conn, key).ForAwait());
Assert.Equal(2, await ManualIncrAsync(conn, key)); Assert.Equal(2, await ManualIncrAsync(conn, key).ForAwait());
Assert.Equal(2, (long)conn.StringGet(key)); Assert.Equal(2, (long)conn.StringGet(key));
} }
} }
......
...@@ -21,7 +21,7 @@ public async Task DBExecute() ...@@ -21,7 +21,7 @@ public async Task DBExecute()
var actual = (string)db.Execute("GET", key); var actual = (string)db.Execute("GET", key);
Assert.Equal("some value", actual); Assert.Equal("some value", actual);
actual = (string)await db.ExecuteAsync("GET", key); actual = (string)await db.ExecuteAsync("GET", key).ForAwait();
Assert.Equal("some value", actual); Assert.Equal("some value", actual);
} }
} }
...@@ -35,7 +35,7 @@ public async Task ServerExecute() ...@@ -35,7 +35,7 @@ public async Task ServerExecute()
var actual = (string)server.Execute("echo", "some value"); var actual = (string)server.Execute("echo", "some value");
Assert.Equal("some value", actual); Assert.Equal("some value", actual);
actual = (string)await server.ExecuteAsync("echo", "some value"); actual = (string)await server.ExecuteAsync("echo", "some value").ForAwait();
Assert.Equal("some value", actual); Assert.Equal("some value", actual);
} }
} }
......
...@@ -12,6 +12,24 @@ public class MassiveOps : TestBase ...@@ -12,6 +12,24 @@ public class MassiveOps : TestBase
{ {
public MassiveOps(ITestOutputHelper output) : base(output) { } public MassiveOps(ITestOutputHelper output) : base(output) { }
[Fact]
public async Task LongRunning()
{
var key = Me();
using (var conn = Create())
{
var db = conn.GetDatabase();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.StringSet(key, "test value", flags: CommandFlags.FireAndForget);
for (var i = 0; i < 200; i++)
{
var val = await db.StringGetAsync(key).ForAwait();
Assert.Equal("test value", (string)val);
await Task.Delay(50).ForAwait();
}
}
}
[Theory] [Theory]
[InlineData(true)] [InlineData(true)]
[InlineData(false)] [InlineData(false)]
......
...@@ -69,8 +69,6 @@ public void Simple() ...@@ -69,8 +69,6 @@ public void Simple()
AssertProfiledCommandValues(eval, conn, dbId); AssertProfiledCommandValues(eval, conn, dbId);
AssertProfiledCommandValues(echo, conn, dbId); AssertProfiledCommandValues(echo, conn, dbId);
} }
} }
...@@ -156,7 +154,6 @@ public void ManyContexts() ...@@ -156,7 +154,6 @@ public void ManyContexts()
var prefix = Me(); var prefix = Me();
conn.RegisterProfiler(profiler.GetSession); conn.RegisterProfiler(profiler.GetSession);
var threads = new List<Thread>(); var threads = new List<Thread>();
var results = new IEnumerable<IProfiledCommand>[16]; var results = new IEnumerable<IProfiledCommand>[16];
...@@ -204,13 +201,13 @@ public void ManyContexts() ...@@ -204,13 +201,13 @@ public void ManyContexts()
internal class PerThreadProfiler internal class PerThreadProfiler
{ {
ThreadLocal<ProfilingSession> perThreadSession = new ThreadLocal<ProfilingSession>(() => new ProfilingSession()); private readonly ThreadLocal<ProfilingSession> perThreadSession = new ThreadLocal<ProfilingSession>(() => new ProfilingSession());
public ProfilingSession GetSession() => perThreadSession.Value; public ProfilingSession GetSession() => perThreadSession.Value;
} }
internal class AsyncLocalProfiler internal class AsyncLocalProfiler
{ {
AsyncLocal<ProfilingSession> perThreadSession = new AsyncLocal<ProfilingSession>(); private readonly AsyncLocal<ProfilingSession> perThreadSession = new AsyncLocal<ProfilingSession>();
public ProfilingSession GetSession() public ProfilingSession GetSession()
{ {
...@@ -275,7 +272,6 @@ public void LowAllocationEnumerable() ...@@ -275,7 +272,6 @@ public void LowAllocationEnumerable()
} }
} }
[FactLongRunning] [FactLongRunning]
public void ProfilingMD_Ex1() public void ProfilingMD_Ex1()
{ {
...@@ -340,7 +336,7 @@ public void ProfilingMD_Ex2() ...@@ -340,7 +336,7 @@ public void ProfilingMD_Ex2()
var thread = new Thread(() => var thread = new Thread(() =>
{ {
var threadTasks = new List<Task>(); var threadTasks = new List<Task>();
for (var j = 0; j < 1000; j++) for (var j = 0; j < 1000; j++)
{ {
var task = db.StringSetAsync(prefix + j, "" + j); var task = db.StringSetAsync(prefix + j, "" + j);
...@@ -385,7 +381,7 @@ public async Task ProfilingMD_Ex2_Async() ...@@ -385,7 +381,7 @@ public async Task ProfilingMD_Ex2_Async()
{ {
for (var j = 0; j < 100; j++) for (var j = 0; j < 100; j++)
{ {
await db.StringSetAsync(prefix + j, "" + j); await db.StringSetAsync(prefix + j, "" + j).ForAwait();
} }
perThreadTimings.Add(profiler.GetSession().FinishProfiling().ToList()); perThreadTimings.Add(profiler.GetSession().FinishProfiling().ToList());
...@@ -396,11 +392,11 @@ public async Task ProfilingMD_Ex2_Async() ...@@ -396,11 +392,11 @@ public async Task ProfilingMD_Ex2_Async()
var timeout = Task.Delay(10000); var timeout = Task.Delay(10000);
var complete = Task.WhenAll(tasks); var complete = Task.WhenAll(tasks);
if (timeout == await Task.WhenAny(timeout, complete)) if (timeout == await Task.WhenAny(timeout, complete).ForAwait())
{ {
throw new TimeoutException(); throw new TimeoutException();
} }
Assert.Equal(16, perThreadTimings.Count); Assert.Equal(16, perThreadTimings.Count);
foreach(var item in perThreadTimings) foreach(var item in perThreadTimings)
{ {
......
...@@ -147,25 +147,25 @@ public async Task ConnectToSSLServer(bool useSsl, bool specifyHost) ...@@ -147,25 +147,25 @@ public async Task ConnectToSSLServer(bool useSsl, bool specifyHost)
} }
} }
private void TestConcurrent(IDatabase db, RedisKey key, int SyncLoop, int Threads) //private void TestConcurrent(IDatabase db, RedisKey key, int SyncLoop, int Threads)
{ //{
long value; // long value;
db.KeyDelete(key, CommandFlags.FireAndForget); // db.KeyDelete(key, CommandFlags.FireAndForget);
var time = RunConcurrent(delegate // var time = RunConcurrent(delegate
{ // {
for (int i = 0; i < SyncLoop; i++) // for (int i = 0; i < SyncLoop; i++)
{ // {
db.StringIncrement(key); // db.StringIncrement(key);
} // }
}, Threads, timeout: 45000); // }, Threads, timeout: 45000);
value = (long)db.StringGet(key); // value = (long)db.StringGet(key);
Assert.Equal(SyncLoop * Threads, value); // Assert.Equal(SyncLoop * Threads, value);
Log("Sync: {0} INCR using {1} threads, {2:###,##0}ms, {3} ops/s; final value: {4}", // Log("Sync: {0} INCR using {1} threads, {2:###,##0}ms, {3} ops/s; final value: {4}",
SyncLoop * Threads, Threads, // SyncLoop * Threads, Threads,
(long)time.TotalMilliseconds, // (long)time.TotalMilliseconds,
(long)((SyncLoop * Threads) / time.TotalSeconds), // (long)((SyncLoop * Threads) / time.TotalSeconds),
value); // value);
} //}
[Fact] [Fact]
public void RedisLabsSSL() public void RedisLabsSSL()
......
...@@ -280,7 +280,7 @@ protected static TimeSpan RunConcurrent(Action work, int threads, int timeout = ...@@ -280,7 +280,7 @@ protected static TimeSpan RunConcurrent(Action work, int threads, int timeout =
ManualResetEvent allDone = new ManualResetEvent(false); ManualResetEvent allDone = new ManualResetEvent(false);
object token = new object(); object token = new object();
int active = 0; int active = 0;
ThreadStart callback = delegate void callback()
{ {
lock (token) lock (token)
{ {
...@@ -301,7 +301,7 @@ protected static TimeSpan RunConcurrent(Action work, int threads, int timeout = ...@@ -301,7 +301,7 @@ protected static TimeSpan RunConcurrent(Action work, int threads, int timeout =
watch.Stop(); watch.Stop();
allDone.Set(); allDone.Set();
} }
}; }
var threadArr = new Thread[threads]; var threadArr = new Thread[threads];
for (int i = 0; i < threads; i++) for (int i = 0; i < threads; i++)
......
...@@ -21,9 +21,11 @@ namespace StackExchange.Redis ...@@ -21,9 +21,11 @@ namespace StackExchange.Redis
/// See Object.GetHashCode /// See Object.GetHashCode
/// </summary> /// </summary>
public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode(); public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode();
/// <summary> /// <summary>
/// See Object.Equals /// See Object.Equals
/// </summary> /// </summary>
/// <param name="obj">The <see cref="object"/> to compare.</param>
public override bool Equals(object obj) => obj is ChannelMessage cm public override bool Equals(object obj) => obj is ChannelMessage cm
&& cm.Channel == Channel && cm.Message == Message; && cm.Channel == Channel && cm.Message == Message;
internal ChannelMessage(ChannelMessageQueue queue, RedisChannel channel, RedisValue value) internal ChannelMessage(ChannelMessageQueue queue, RedisChannel channel, RedisValue value)
...@@ -48,7 +50,6 @@ internal ChannelMessage(ChannelMessageQueue queue, RedisChannel channel, RedisVa ...@@ -48,7 +50,6 @@ internal ChannelMessage(ChannelMessageQueue queue, RedisChannel channel, RedisVa
public RedisValue Message { get; } public RedisValue Message { get; }
} }
/// <summary> /// <summary>
/// Represents a message queue of ordered pub/sub notifications /// Represents a message queue of ordered pub/sub notifications
/// </summary> /// </summary>
...@@ -80,7 +81,7 @@ internal ChannelMessageQueue(RedisChannel redisChannel, RedisSubscriber parent) ...@@ -80,7 +81,7 @@ internal ChannelMessageQueue(RedisChannel redisChannel, RedisSubscriber parent)
_queue.Reader.Completion.ContinueWith( _queue.Reader.Completion.ContinueWith(
(t, state) => ((ChannelMessageQueue)state).IsCompleted = true, this, TaskContinuationOptions.ExecuteSynchronously); (t, state) => ((ChannelMessageQueue)state).IsCompleted = true, this, TaskContinuationOptions.ExecuteSynchronously);
} }
static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions
{ {
SingleWriter = true, SingleWriter = true,
SingleReader = false, SingleReader = false,
...@@ -102,21 +103,23 @@ private void HandleMessage(RedisChannel channel, RedisValue value) ...@@ -102,21 +103,23 @@ private void HandleMessage(RedisChannel channel, RedisValue value)
} }
} }
/// <summary> /// <summary>
/// Consume a message from the channel /// Consume a message from the channel.
/// </summary> /// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to use.</param>
public ValueTask<ChannelMessage> ReadAsync(CancellationToken cancellationToken = default) public ValueTask<ChannelMessage> ReadAsync(CancellationToken cancellationToken = default)
=> _queue.Reader.ReadAsync(cancellationToken); => _queue.Reader.ReadAsync(cancellationToken);
/// <summary> /// <summary>
/// Attempt to synchronously consume a message from the channel /// Attempt to synchronously consume a message from the channel.
/// </summary> /// </summary>
/// <param name="item">The <see cref="ChannelMessage"/> read from the Channel.</param>
public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item); public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item);
/// <summary> /// <summary>
/// Attempt to query the backlog length of the queue /// Attempt to query the backlog length of the queue.
/// </summary> /// </summary>
/// <param name="count">The (approximate) count of items in the Channel.</param>
public bool TryGetCount(out int count) public bool TryGetCount(out int count)
{ {
// get this using the reflection // get this using the reflection
...@@ -141,9 +144,11 @@ private void AssertOnMessage(Delegate handler) ...@@ -141,9 +144,11 @@ private void AssertOnMessage(Delegate handler)
if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null) if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null)
throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed"); throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed");
} }
/// <summary> /// <summary>
/// Create a message loop that processes messages sequentially /// Create a message loop that processes messages sequentially.
/// </summary> /// </summary>
/// <param name="handler">The handler to run when receiving a message.</param>
public void OnMessage(Action<ChannelMessage> handler) public void OnMessage(Action<ChannelMessage> handler)
{ {
AssertOnMessage(handler); AssertOnMessage(handler);
...@@ -170,8 +175,9 @@ private async void OnMessageSyncImpl() ...@@ -170,8 +175,9 @@ private async void OnMessageSyncImpl()
} }
/// <summary> /// <summary>
/// Create a message loop that processes messages sequentially /// Create a message loop that processes messages sequentially.
/// </summary> /// </summary>
/// <param name="handler">The handler to execute when receiving a message.</param>
public void OnMessage(Func<ChannelMessage, Task> handler) public void OnMessage(Func<ChannelMessage, Task> handler)
{ {
AssertOnMessage(handler); AssertOnMessage(handler);
...@@ -226,7 +232,7 @@ internal static bool IsOneOf(Action<RedisChannel, RedisValue> handler) ...@@ -226,7 +232,7 @@ internal static bool IsOneOf(Action<RedisChannel, RedisValue> handler)
{ {
try try
{ {
return handler != null && handler.Target is ChannelMessageQueue return handler?.Target is ChannelMessageQueue
&& handler.Method.Name == nameof(HandleMessage); && handler.Method.Name == nameof(HandleMessage);
} }
catch catch
...@@ -236,12 +242,15 @@ internal static bool IsOneOf(Action<RedisChannel, RedisValue> handler) ...@@ -236,12 +242,15 @@ internal static bool IsOneOf(Action<RedisChannel, RedisValue> handler)
} }
/// <summary> /// <summary>
/// Stop receiving messages on this channel /// Stop receiving messages on this channel.
/// </summary> /// </summary>
/// <param name="flags">The flags to use when unsubscribing.</param>
public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags); public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags);
/// <summary> /// <summary>
/// Stop receiving messages on this channel /// Stop receiving messages on this channel.
/// </summary> /// </summary>
/// <param name="flags">The flags to use when unsubscribing.</param>
public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags); public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
} }
} }
...@@ -5,13 +5,14 @@ namespace StackExchange.Redis ...@@ -5,13 +5,14 @@ namespace StackExchange.Redis
{ {
public partial class ConnectionMultiplexer public partial class ConnectionMultiplexer
{ {
Func<ProfilingSession> _profilingSessionProvider; private Func<ProfilingSession> _profilingSessionProvider;
/// <summary> /// <summary>
/// Register a callback to provide an on-demand ambient session provider based on the /// Register a callback to provide an on-demand ambient session provider based on the
/// calling context; the implementing code is responsible for reliably resolving the same provider /// calling context; the implementing code is responsible for reliably resolving the same provider
/// based on ambient context, or returning null to not profile /// based on ambient context, or returning null to not profile
/// </summary> /// </summary>
/// <param name="profilingSessionProvider">The session provider to register.</param>
public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider) => _profilingSessionProvider = profilingSessionProvider; public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider) => _profilingSessionProvider = profilingSessionProvider;
} }
} }
...@@ -945,7 +945,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, TextWriter log = nu ...@@ -945,7 +945,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, TextWriter log = nu
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
server = new ServerEndPoint(this, endpoint, log); server = new ServerEndPoint(this, endpoint);
servers.Add(endpoint, server); servers.Add(endpoint, server);
isNew = true; isNew = true;
_serverSnapshot = _serverSnapshot.Add(server); _serverSnapshot = _serverSnapshot.Add(server);
...@@ -991,7 +991,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -991,7 +991,6 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
partial void OnCreateReaderWriter(ConfigurationOptions configuration); partial void OnCreateReaderWriter(ConfigurationOptions configuration);
internal const int MillisecondsPerHeartbeat = 1000; internal const int MillisecondsPerHeartbeat = 1000;
private sealed class TimerToken private sealed class TimerToken
{ {
...@@ -1020,7 +1019,6 @@ public TimerToken(ConnectionMultiplexer muxer) ...@@ -1020,7 +1019,6 @@ public TimerToken(ConnectionMultiplexer muxer)
} }
}; };
internal static IDisposable Create(ConnectionMultiplexer connection) internal static IDisposable Create(ConnectionMultiplexer connection)
{ {
var token = new TimerToken(connection); var token = new TimerToken(connection);
...@@ -1030,8 +1028,6 @@ internal static IDisposable Create(ConnectionMultiplexer connection) ...@@ -1030,8 +1028,6 @@ internal static IDisposable Create(ConnectionMultiplexer connection)
} }
} }
private int _activeHeartbeatErrors; private int _activeHeartbeatErrors;
private void OnHeartbeat() private void OnHeartbeat()
{ {
......
...@@ -74,6 +74,7 @@ public interface IConnectionMultiplexer ...@@ -74,6 +74,7 @@ public interface IConnectionMultiplexer
/// calling context; the implementing code is responsible for reliably resolving the same provider /// calling context; the implementing code is responsible for reliably resolving the same provider
/// based on ambient context, or returning null to not profile /// based on ambient context, or returning null to not profile
/// </summary> /// </summary>
/// <param name="profilingSessionProvider">The profiling session provider.</param>
void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider); void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider);
/// <summary> /// <summary>
...@@ -259,10 +260,10 @@ public interface IConnectionMultiplexer ...@@ -259,10 +260,10 @@ public interface IConnectionMultiplexer
/// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns> /// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns>
Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None); Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Get the hash-slot associated with a given key, if applicable; this can be useful for grouping operations /// Get the hash-slot associated with a given key, if applicable; this can be useful for grouping operations
/// </summary> /// </summary>
/// <param name="key">The key to get a the slot for.</param>
int GetHashSlot(RedisKey key); int GetHashSlot(RedisKey key);
} }
} }
...@@ -591,12 +591,18 @@ public partial interface IServer : IRedis ...@@ -591,12 +591,18 @@ public partial interface IServer : IRedis
/// <summary> /// <summary>
/// Swaps two Redis databases, so that immediately all the clients connected to a given database will see the data of the other database, and the other way around /// Swaps two Redis databases, so that immediately all the clients connected to a given database will see the data of the other database, and the other way around
/// </summary> /// </summary>
/// <param name="first">The ID of the first database.</param>
/// <param name="second">The ID of the second database.</param>
/// <param name="flags">The command flags to use.</param>
/// <remarks>https://redis.io/commands/swapdb</remarks> /// <remarks>https://redis.io/commands/swapdb</remarks>
void SwapDatabases(int first, int second, CommandFlags flags = CommandFlags.None); void SwapDatabases(int first, int second, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Swaps two Redis databases, so that immediately all the clients connected to a given database will see the data of the other database, and the other way around /// Swaps two Redis databases, so that immediately all the clients connected to a given database will see the data of the other database, and the other way around
/// </summary> /// </summary>
/// <param name="first">The ID of the first database.</param>
/// <param name="second">The ID of the second database.</param>
/// <param name="flags">The command flags to use.</param>
/// <remarks>https://redis.io/commands/swapdb</remarks> /// <remarks>https://redis.io/commands/swapdb</remarks>
Task SwapDatabasesAsync(int first, int second, CommandFlags flags = CommandFlags.None); Task SwapDatabasesAsync(int first, int second, CommandFlags flags = CommandFlags.None);
......
...@@ -614,8 +614,6 @@ internal void SetException(Exception exception) ...@@ -614,8 +614,6 @@ internal void SetException(Exception exception)
internal void SetEnqueued()=> performance?.SetEnqueued(); internal void SetEnqueued()=> performance?.SetEnqueued();
internal void SetRequestSent() internal void SetRequestSent()
{ {
Status = CommandStatus.Sent; Status = CommandStatus.Sent;
......
...@@ -38,7 +38,6 @@ public bool TryComplete(bool isAsync) ...@@ -38,7 +38,6 @@ public bool TryComplete(bool isAsync)
} }
else else
{ {
if (syncHandler != null) if (syncHandler != null)
{ {
ConnectionMultiplexer.TraceWithoutContext("Invoking (sync)...: " + (string)channel, "Subscription"); ConnectionMultiplexer.TraceWithoutContext("Invoking (sync)...: " + (string)channel, "Subscription");
......
...@@ -84,7 +84,7 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -84,7 +84,7 @@ public PhysicalConnection(PhysicalBridge bridge)
if (ChannelPrefix?.Length == 0) ChannelPrefix = null; // null tests are easier than null+empty if (ChannelPrefix?.Length == 0) ChannelPrefix = null; // null tests are easier than null+empty
var endpoint = bridge.ServerEndPoint.EndPoint; var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint); physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
OnCreateEcho(); OnCreateEcho();
} }
...@@ -96,7 +96,6 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -96,7 +96,6 @@ internal async void BeginConnectAsync(TextWriter log)
if(endpoint == null) if(endpoint == null)
{ {
log?.WriteLine("No endpoint"); log?.WriteLine("No endpoint");
} }
Trace("Connecting..."); Trace("Connecting...");
...@@ -108,7 +107,7 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -108,7 +107,7 @@ internal async void BeginConnectAsync(TextWriter log)
try try
{ {
var awaitable = new SocketAwaitable(); var awaitable = new SocketAwaitable();
using (var _socketArgs = new SocketAsyncEventArgs using (var _socketArgs = new SocketAsyncEventArgs
{ {
UserToken = awaitable, UserToken = awaitable,
...@@ -116,7 +115,7 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -116,7 +115,7 @@ internal async void BeginConnectAsync(TextWriter log)
}) })
{ {
_socketArgs.Completed += SocketAwaitable.Callback; _socketArgs.Completed += SocketAwaitable.Callback;
if (_socket.ConnectAsync(_socketArgs)) if (_socket.ConnectAsync(_socketArgs))
{ // asynchronous operation is pending { // asynchronous operation is pending
timeoutSource = ConfigureTimeout(_socketArgs, bridge.Multiplexer.RawConfig.ConnectTimeout); timeoutSource = ConfigureTimeout(_socketArgs, bridge.Multiplexer.RawConfig.ConnectTimeout);
...@@ -223,14 +222,7 @@ private enum ReadMode : byte ...@@ -223,14 +222,7 @@ private enum ReadMode : byte
public long LastWriteSecondsAgo => unchecked(Environment.TickCount - Thread.VolatileRead(ref lastWriteTickCount)) / 1000; public long LastWriteSecondsAgo => unchecked(Environment.TickCount - Thread.VolatileRead(ref lastWriteTickCount)) / 1000;
private bool IncludeDetailInExceptions private bool IncludeDetailInExceptions => BridgeCouldBeNull?.Multiplexer.IncludeDetailInExceptions ?? false;
{
get
{
var bridge = BridgeCouldBeNull;
return bridge == null ? false : bridge.Multiplexer.IncludeDetailInExceptions;
}
}
[Conditional("VERBOSE")] [Conditional("VERBOSE")]
internal void Trace(string message) => BridgeCouldBeNull?.Multiplexer?.Trace(message, physicalName); internal void Trace(string message) => BridgeCouldBeNull?.Multiplexer?.Trace(message, physicalName);
...@@ -249,7 +241,6 @@ internal void Shutdown() ...@@ -249,7 +241,6 @@ internal void Shutdown()
_ioPipe = null; _ioPipe = null;
_socket = null; _socket = null;
if (ioPipe != null) if (ioPipe != null)
{ {
Trace("Disconnecting..."); Trace("Disconnecting...");
...@@ -259,8 +250,8 @@ internal void Shutdown() ...@@ -259,8 +250,8 @@ internal void Shutdown()
try { ioPipe.Output?.Complete(); } catch { } try { ioPipe.Output?.Complete(); } catch { }
try { using (ioPipe as IDisposable) { } } catch { } try { using (ioPipe as IDisposable) { } } catch { }
} }
if (socket != null) if (socket != null)
{ {
try { socket.Shutdown(SocketShutdown.Both); } catch { } try { socket.Shutdown(SocketShutdown.Both); } catch { }
...@@ -336,7 +327,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -336,7 +327,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
var bridge = BridgeCouldBeNull; var bridge = BridgeCouldBeNull;
if (bridge != null) if (bridge != null)
{ {
exMessage.Append(" on " + Format.ToString(bridge.ServerEndPoint?.EndPoint) + "/" + connectionType); exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType);
data.Add(Tuple.Create("FailureType", failureType.ToString())); data.Add(Tuple.Create("FailureType", failureType.ToString()));
data.Add(Tuple.Create("EndPoint", Format.ToString(bridge.ServerEndPoint?.EndPoint))); data.Add(Tuple.Create("EndPoint", Format.ToString(bridge.ServerEndPoint?.EndPoint)));
...@@ -533,12 +524,11 @@ internal void OnBridgeHeartbeat() ...@@ -533,12 +524,11 @@ internal void OnBridgeHeartbeat()
{ {
var now = Environment.TickCount; var now = Environment.TickCount;
Interlocked.Exchange(ref lastBeatTickCount, now); Interlocked.Exchange(ref lastBeatTickCount, now);
lock (_writtenAwaitingResponse) lock (_writtenAwaitingResponse)
{ {
if (_writtenAwaitingResponse.Count != 0) if (_writtenAwaitingResponse.Count != 0)
{ {
var bridge = BridgeCouldBeNull; var bridge = BridgeCouldBeNull;
if (bridge == null) return; if (bridge == null) return;
...@@ -579,9 +569,9 @@ internal void SetUnknownDatabase() ...@@ -579,9 +569,9 @@ internal void SetUnknownDatabase()
internal void Write(RedisKey key) internal void Write(RedisKey key)
{ {
var val = key.KeyValue; var val = key.KeyValue;
if (val is string) if (val is string s)
{ {
WriteUnified(_ioPipe.Output, key.KeyPrefix, (string)val, outEncoder); WriteUnified(_ioPipe.Output, key.KeyPrefix, s, outEncoder);
} }
else else
{ {
...@@ -923,7 +913,7 @@ internal static void WriteUnified(PipeWriter writer, byte[] prefix, string value ...@@ -923,7 +913,7 @@ internal static void WriteUnified(PipeWriter writer, byte[] prefix, string value
// ${total-len}\r\n 3 + MaxInt32TextLen // ${total-len}\r\n 3 + MaxInt32TextLen
// {prefix}{value}\r\n // {prefix}{value}\r\n
int encodedLength = Encoding.UTF8.GetByteCount(value), int encodedLength = Encoding.UTF8.GetByteCount(value),
prefixLength = prefix == null ? 0 : prefix.Length, prefixLength = prefix?.Length ?? 0,
totalLength = prefixLength + encodedLength; totalLength = prefixLength + encodedLength;
if (totalLength == 0) if (totalLength == 0)
...@@ -1178,7 +1168,7 @@ private void MatchResult(RawResult result) ...@@ -1178,7 +1168,7 @@ private void MatchResult(RawResult result)
if (items.Length >= 3 && items[0].IsEqual(message)) if (items.Length >= 3 && items[0].IsEqual(message))
{ {
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel; var configChanged = muxer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged)) if (configChanged != null && items[1].IsEqual(configChanged))
{ {
...@@ -1271,7 +1261,7 @@ internal void OnHeartbeat() ...@@ -1271,7 +1261,7 @@ internal void OnHeartbeat()
// - so: only use that if we're making progress // - so: only use that if we're making progress
if (!(allowSyncRead && input.TryRead(out var readResult))) if (!(allowSyncRead && input.TryRead(out var readResult)))
{ {
readResult = await input.ReadAsync(); readResult = await input.ReadAsync().ForAwait();
} }
var buffer = readResult.Buffer; var buffer = readResult.Buffer;
...@@ -1458,7 +1448,7 @@ private static RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySe ...@@ -1458,7 +1448,7 @@ private static RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySe
throw new InvalidOperationException("Unexpected response prefix: " + (char)prefix); throw new InvalidOperationException("Unexpected response prefix: " + (char)prefix);
} }
} }
static RawResult ParseInlineProtocol(RawResult line) private static RawResult ParseInlineProtocol(RawResult line)
{ {
if (!line.HasValue) return RawResult.Nil; // incomplete line if (!line.HasValue) return RawResult.Nil; // incomplete line
...@@ -1472,7 +1462,5 @@ static RawResult ParseInlineProtocol(RawResult line) ...@@ -1472,7 +1462,5 @@ static RawResult ParseInlineProtocol(RawResult line)
} }
return new RawResult(oversized, count); return new RawResult(oversized, count);
} }
} }
} }
...@@ -3,26 +3,27 @@ ...@@ -3,26 +3,27 @@
namespace StackExchange.Redis.Profiling namespace StackExchange.Redis.Profiling
{ {
/// <summary> /// <summary>
/// Lightweight profiling session that can be optionally registered (via ConnectionMultiplexer.RegisterProfiler) to track messages /// Lightweight profiling session that can be optionally registered (via ConnectionMultiplexer.RegisterProfiler) to track messages.
/// </summary> /// </summary>
public sealed class ProfilingSession public sealed class ProfilingSession
{ {
/// <summary> /// <summary>
/// Caller-defined state object /// Caller-defined state object.
/// </summary> /// </summary>
public object UserToken { get; } public object UserToken { get; }
/// <summary> /// <summary>
/// Create a new profiling session, optionally including a caller-defined state object /// Create a new profiling session, optionally including a caller-defined state object.
/// </summary> /// </summary>
/// <param name="userToken">The state object to use for this session.</param>
public ProfilingSession(object userToken = null) => UserToken = userToken; public ProfilingSession(object userToken = null) => UserToken = userToken;
object _untypedHead; private object _untypedHead;
internal void Add(ProfiledCommand command) internal void Add(ProfiledCommand command)
{ {
if (command == null) return; if (command == null) return;
object cur = Thread.VolatileRead(ref _untypedHead); ; object cur = Thread.VolatileRead(ref _untypedHead);
while (true) while (true)
{ {
command.NextElement = (ProfiledCommand)cur; command.NextElement = (ProfiledCommand)cur;
...@@ -34,7 +35,7 @@ internal void Add(ProfiledCommand command) ...@@ -34,7 +35,7 @@ internal void Add(ProfiledCommand command)
/// <summary> /// <summary>
/// Reset the session and yield the commands that were captured for enumeration; if additional commands /// Reset the session and yield the commands that were captured for enumeration; if additional commands
/// are added, they can be retrieved via additional calls to FinishProfiling /// are added, they can be retrieved via additional calls to FinishProfiling.
/// </summary> /// </summary>
public ProfiledCommandEnumerable FinishProfiling() public ProfiledCommandEnumerable FinishProfiling()
{ {
......
...@@ -89,8 +89,8 @@ public override string ToString() ...@@ -89,8 +89,8 @@ public override string ToString()
// specifically; the line: abc "def ghi" jkl // specifically; the line: abc "def ghi" jkl
// is 3 tokens: "abc", "def ghi" and "jkl" // is 3 tokens: "abc", "def ghi" and "jkl"
public Tokenizer GetEnumerator() => this; public Tokenizer GetEnumerator() => this;
BufferReader _value; private BufferReader _value;
public Tokenizer(ReadOnlySequence<byte> value) public Tokenizer(ReadOnlySequence<byte> value)
{ {
_value = new BufferReader(value); _value = new BufferReader(value);
...@@ -116,7 +116,7 @@ public bool MoveNext() ...@@ -116,7 +116,7 @@ public bool MoveNext()
_value.Consume(1); _value.Consume(1);
break; break;
} }
int end = BufferReader.FindNext(_value, terminator); int end = BufferReader.FindNext(_value, terminator);
if (end < 0) if (end < 0)
{ {
...@@ -128,10 +128,8 @@ public bool MoveNext() ...@@ -128,10 +128,8 @@ public bool MoveNext()
_value.Consume(1); // drop the terminator itself; _value.Consume(1); // drop the terminator itself;
} }
return true; return true;
} }
public ReadOnlySequence<byte> Current { get; private set; } public ReadOnlySequence<byte> Current { get; private set; }
} }
internal RedisChannel AsRedisChannel(byte[] channelPrefix, RedisChannel.PatternMode mode) internal RedisChannel AsRedisChannel(byte[] channelPrefix, RedisChannel.PatternMode mode)
{ {
......
...@@ -15,7 +15,6 @@ public abstract class RedisResult ...@@ -15,7 +15,6 @@ public abstract class RedisResult
/// <returns> new <see cref="RedisResult"/>.</returns> /// <returns> new <see cref="RedisResult"/>.</returns>
public static RedisResult Create(RedisValue value, ResultType? resultType = null) => new SingleRedisResult(value, resultType); public static RedisResult Create(RedisValue value, ResultType? resultType = null) => new SingleRedisResult(value, resultType);
/// <summary> /// <summary>
/// Create a new RedisResult representing an array of values. /// Create a new RedisResult representing an array of values.
/// </summary> /// </summary>
...@@ -267,8 +266,8 @@ internal override byte[][] AsByteArrayArray() ...@@ -267,8 +266,8 @@ internal override byte[][] AsByteArrayArray()
: _value.Length == 0 ? Array.Empty<byte[]>() : _value.Length == 0 ? Array.Empty<byte[]>()
: Array.ConvertAll(_value, x => x.AsByteArray()); : Array.ConvertAll(_value, x => x.AsByteArray());
private bool IsSingleton => _value != null && _value.Length == 1; private bool IsSingleton => _value?.Length == 1;
private bool IsEmpty => _value != null && _value.Length == 0; private bool IsEmpty => _value?.Length == 0;
internal override double AsDouble() internal override double AsDouble()
{ {
if (IsSingleton) return _value[0].AsDouble(); if (IsSingleton) return _value[0].AsDouble();
...@@ -363,12 +362,15 @@ internal override string[] AsStringArray() ...@@ -363,12 +362,15 @@ internal override string[] AsStringArray()
} }
/// <summary> /// <summary>
/// Create a RedisResult from a key /// Create a <see cref="RedisResult"/> from a key.
/// </summary> /// </summary>
/// <param name="key">The <see cref="RedisKey"/> to create a <see cref="RedisResult"/> from.</param>
public static RedisResult Create(RedisKey key) => Create(key.AsRedisValue(), ResultType.BulkString); public static RedisResult Create(RedisKey key) => Create(key.AsRedisValue(), ResultType.BulkString);
/// <summary> /// <summary>
/// Create a RedisResult from a channel /// Create a <see cref="RedisResult"/> from a channel.
/// </summary> /// </summary>
/// <param name="channel">The <see cref="RedisChannel"/> to create a <see cref="RedisResult"/> from.</param>
public static RedisResult Create(RedisChannel channel) => Create((byte[])channel, ResultType.BulkString); public static RedisResult Create(RedisChannel channel) => Create((byte[])channel, ResultType.BulkString);
private sealed class ErrorRedisResult : RedisResult private sealed class ErrorRedisResult : RedisResult
......
...@@ -89,7 +89,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) ...@@ -89,7 +89,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
if(msg != null) UnprocessableCompletionManager?.CompleteSyncOrAsync(msg); if(msg != null) UnprocessableCompletionManager?.CompleteSyncOrAsync(msg);
pair.Value.Remove(true, null); pair.Value.Remove(true, null);
pair.Value.Remove(false, null); pair.Value.Remove(false, null);
var task = pair.Value.UnsubscribeFromServer(pair.Key, flags, asyncState, false); var task = pair.Value.UnsubscribeFromServer(pair.Key, flags, asyncState, false);
if (task != null) last = task; if (task != null) last = task;
} }
...@@ -325,7 +325,7 @@ public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue ...@@ -325,7 +325,7 @@ public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue
public async Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) public async Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{ {
var c = new ChannelMessageQueue(channel, this); var c = new ChannelMessageQueue(channel, this);
await c.SubscribeAsync(flags); await c.SubscribeAsync(flags).ForAwait();
return c; return c;
} }
......
...@@ -26,9 +26,6 @@ private RedisValue(long overlappedValue64, ReadOnlyMemory<byte> memory, object o ...@@ -26,9 +26,6 @@ private RedisValue(long overlappedValue64, ReadOnlyMemory<byte> memory, object o
} }
private readonly static object Sentinel_Integer = new object(); private readonly static object Sentinel_Integer = new object();
private readonly static object Sentinel_Raw = new object(); private readonly static object Sentinel_Raw = new object();
private readonly static object Sentinel_Double = new object(); private readonly static object Sentinel_Double = new object();
...@@ -41,9 +38,11 @@ public object Box() ...@@ -41,9 +38,11 @@ public object Box()
if (obj is null || obj is string || obj is byte[]) return obj; if (obj is null || obj is string || obj is byte[]) return obj;
return this; return this;
} }
/// <summary> /// <summary>
/// Parse this object as a value - to be used alongside Box /// Parse this object as a value - to be used alongside Box.
/// </summary> /// </summary>
/// <param name="value">The value to unbox.</param>
public static RedisValue Unbox(object value) public static RedisValue Unbox(object value)
{ {
if (value == null) return RedisValue.Null; if (value == null) return RedisValue.Null;
......
...@@ -180,7 +180,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra ...@@ -180,7 +180,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
if (result.IsError) if (result.IsError)
{ {
if (result.AssertStarts(NOAUTH)) bridge?.Multiplexer?.SetAuthSuspect(); if (result.AssertStarts(NOAUTH)) bridge?.Multiplexer?.SetAuthSuspect();
var server = bridge.ServerEndPoint; var server = bridge.ServerEndPoint;
bool log = !message.IsInternalCall; bool log = !message.IsInternalCall;
bool isMoved = result.AssertStarts(MOVED); bool isMoved = result.AssertStarts(MOVED);
...@@ -572,7 +572,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R ...@@ -572,7 +572,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
var server = bridge.ServerEndPoint; var server = bridge.ServerEndPoint;
server.Multiplexer.Trace("Auto-configured role: slave"); server.Multiplexer.Trace("Auto-configured role: slave");
server.IsSlave = true; server.IsSlave = true;
} }
} }
return base.SetResult(connection, message, result); return base.SetResult(connection, message, result);
} }
...@@ -1606,7 +1606,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1606,7 +1606,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var entries = ParseRedisStreamEntries(tmp); var entries = ParseRedisStreamEntries(tmp);
// note: don't .Recycle(), would be a stack overflow because // note: don't .Recycle(), would be a stack overflow because
// it would bridge the fake and real result set // it would bridge the fake and real result set
ArrayPool<RawResult>.Shared.Return(leased); ArrayPool<RawResult>.Shared.Return(leased);
var streamInfo = new StreamInfo(length: (int)arr[1].AsRedisValue(), var streamInfo = new StreamInfo(length: (int)arr[1].AsRedisValue(),
radixTreeKeys: (int)arr[3].AsRedisValue(), radixTreeKeys: (int)arr[3].AsRedisValue(),
......
...@@ -105,26 +105,25 @@ private static string MakeOrdinalScriptWithoutKeys(string rawScript, string[] ar ...@@ -105,26 +105,25 @@ private static string MakeOrdinalScriptWithoutKeys(string rawScript, string[] ar
return ret.ToString(); return ret.ToString();
} }
private static Dictionary<Type, MethodInfo> _conversionOperators; private static readonly Dictionary<Type, MethodInfo> _conversionOperators;
static ScriptParameterMapper() static ScriptParameterMapper()
{ {
var tmp = new Dictionary<Type, MethodInfo>(); var tmp = new Dictionary<Type, MethodInfo>();
foreach(var method in typeof(RedisValue).GetMethods(BindingFlags.Public | BindingFlags.Static)) foreach (var method in typeof(RedisValue).GetMethods(BindingFlags.Public | BindingFlags.Static))
{ {
if(method.ReturnType == typeof(RedisValue) && if (method.ReturnType == typeof(RedisValue) && (method.Name == "op_Implicit" || method.Name == "op_Explicit"))
(method.Name == "op_Implicit" || method.Name == "op_Explicit"))
{ {
var p = method.GetParameters(); var p = method.GetParameters();
if (p != null && p.Length == 1) if (p?.Length == 1)
{ {
tmp[p[0].ParameterType] = method; tmp[p[0].ParameterType] = method;
} }
} }
} }
_conversionOperators = tmp; _conversionOperators = tmp;
} }
/// <summary> /// <summary>
/// Turns a script with @namedParameters into a LuaScript that can be executed /// Turns a script with @namedParameters into a LuaScript that can be executed
/// against a given IDatabase(Async) object /// against a given IDatabase(Async) object
...@@ -176,7 +175,8 @@ public static bool IsValidParameterHash(Type t, LuaScript script, out string mis ...@@ -176,7 +175,8 @@ public static bool IsValidParameterHash(Type t, LuaScript script, out string mis
} }
var memberType = member is FieldInfo ? ((FieldInfo)member).FieldType : ((PropertyInfo)member).PropertyType; var memberType = member is FieldInfo ? ((FieldInfo)member).FieldType : ((PropertyInfo)member).PropertyType;
if(!ConvertableTypes.Contains(memberType)){ if (!ConvertableTypes.Contains(memberType))
{
missingMember = null; missingMember = null;
badTypeMember = argName; badTypeMember = argName;
return false; return false;
...@@ -186,7 +186,7 @@ public static bool IsValidParameterHash(Type t, LuaScript script, out string mis ...@@ -186,7 +186,7 @@ public static bool IsValidParameterHash(Type t, LuaScript script, out string mis
missingMember = badTypeMember = null; missingMember = badTypeMember = null;
return true; return true;
} }
/// <summary> /// <summary>
/// <para>Creates a Func that extracts parameters from the given type for use by a LuaScript.</para> /// <para>Creates a Func that extracts parameters from the given type for use by a LuaScript.</para>
/// <para> /// <para>
...@@ -210,7 +210,7 @@ public static bool IsValidParameterHash(Type t, LuaScript script, out string mis ...@@ -210,7 +210,7 @@ public static bool IsValidParameterHash(Type t, LuaScript script, out string mis
Expression GetMember(Expression root, MemberInfo member) Expression GetMember(Expression root, MemberInfo member)
{ {
switch(member.MemberType) switch (member.MemberType)
{ {
case MemberTypes.Property: case MemberTypes.Property:
return Expression.Property(root, (PropertyInfo)member); return Expression.Property(root, (PropertyInfo)member);
...@@ -222,7 +222,7 @@ Expression GetMember(Expression root, MemberInfo member) ...@@ -222,7 +222,7 @@ Expression GetMember(Expression root, MemberInfo member)
} }
var keys = new List<MemberInfo>(); var keys = new List<MemberInfo>();
var args = new List<MemberInfo>(); var args = new List<MemberInfo>();
for (var i = 0; i < script.Arguments.Length; i++) for (var i = 0; i < script.Arguments.Length; i++)
{ {
var argName = script.Arguments[i]; var argName = script.Arguments[i];
...@@ -244,7 +244,7 @@ Expression GetMember(Expression root, MemberInfo member) ...@@ -244,7 +244,7 @@ Expression GetMember(Expression root, MemberInfo member)
var objUntyped = Expression.Parameter(typeof(object), "obj"); var objUntyped = Expression.Parameter(typeof(object), "obj");
var objTyped = Expression.Convert(objUntyped, t); var objTyped = Expression.Convert(objUntyped, t);
var keyPrefix = Expression.Parameter(typeof(RedisKey?), "keyPrefix"); var keyPrefix = Expression.Parameter(typeof(RedisKey?), "keyPrefix");
Expression keysResult, valuesResult; Expression keysResult, valuesResult;
MethodInfo asRedisValue = null; MethodInfo asRedisValue = null;
Expression[] keysResultArr = null; Expression[] keysResultArr = null;
...@@ -264,7 +264,7 @@ Expression GetMember(Expression root, MemberInfo member) ...@@ -264,7 +264,7 @@ Expression GetMember(Expression root, MemberInfo member)
BindingFlags.NonPublic | BindingFlags.Instance); BindingFlags.NonPublic | BindingFlags.Instance);
keysResultArr = new Expression[keys.Count]; keysResultArr = new Expression[keys.Count];
for(int i = 0; i < keysResultArr.Length; i++) for (int i = 0; i < keysResultArr.Length; i++)
{ {
var member = GetMember(objTyped, keys[i]); var member = GetMember(objTyped, keys[i]);
keysResultArr[i] = Expression.Condition(needsKeyPrefix, keysResultArr[i] = Expression.Condition(needsKeyPrefix,
...@@ -274,7 +274,6 @@ Expression GetMember(Expression root, MemberInfo member) ...@@ -274,7 +274,6 @@ Expression GetMember(Expression root, MemberInfo member)
keysResult = Expression.NewArrayInit(typeof(RedisKey), keysResultArr); keysResult = Expression.NewArrayInit(typeof(RedisKey), keysResultArr);
} }
if (args.Count == 0) if (args.Count == 0)
{ {
// if there are no args, don't allocate // if there are no args, don't allocate
......
...@@ -43,7 +43,7 @@ internal void ResetNonConnected() ...@@ -43,7 +43,7 @@ internal void ResetNonConnected()
subscription?.ResetNonConnected(); subscription?.ResetNonConnected();
} }
public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, TextWriter log) public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
{ {
Multiplexer = multiplexer; Multiplexer = multiplexer;
EndPoint = endpoint; EndPoint = endpoint;
...@@ -642,7 +642,6 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, ...@@ -642,7 +642,6 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
{ {
bridge.WriteMessageTakingWriteLock(connection, message); bridge.WriteMessageTakingWriteLock(connection, message);
} }
} }
} }
} }
......
...@@ -36,6 +36,5 @@ namespace StackExchange.Redis ...@@ -36,6 +36,5 @@ namespace StackExchange.Redis
/// An array of consumers within the consumer group that have pending messages. /// An array of consumers within the consumer group that have pending messages.
/// </summary> /// </summary>
public StreamConsumer[] Consumers { get; } public StreamConsumer[] Consumers { get; }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment