Commit 72eb2581 authored by Nick Craver's avatar Nick Craver

Cleanup: ConnectionMultiplexer

parent fda90baf
......@@ -14,41 +14,12 @@
namespace StackExchange.Redis
{
internal static partial class TaskExtensions
{
private static readonly Action<Task> observeErrors = ObverveErrors;
private static void ObverveErrors(this Task task)
{
if (task != null) GC.KeepAlive(task.Exception);
}
public static Task ObserveErrors(this Task task)
{
task?.ContinueWith(observeErrors, TaskContinuationOptions.OnlyOnFaulted);
return task;
}
public static Task<T> ObserveErrors<T>(this Task<T> task)
{
task?.ContinueWith(observeErrors, TaskContinuationOptions.OnlyOnFaulted);
return task;
}
public static ConfiguredTaskAwaitable ForAwait(this Task task)
{
return task.ConfigureAwait(false);
}
public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task)
{
return task.ConfigureAwait(false);
}
}
/// <summary>
/// Represents an inter-related group of connections to redis servers
/// </summary>
public sealed partial class ConnectionMultiplexer : IConnectionMultiplexer, IDisposable
{
private static readonly string timeoutHelpLink = "https://stackexchange.github.io/StackExchange.Redis/Timeouts";
private const string timeoutHelpLink = "https://stackexchange.github.io/StackExchange.Redis/Timeouts";
private static TaskFactory _factory = null;
......@@ -58,15 +29,8 @@ public sealed partial class ConnectionMultiplexer : IConnectionMultiplexer, IDis
/// </summary>
public static TaskFactory Factory
{
get
{
return _factory ?? Task.Factory;
}
set
{
_factory = value;
}
get => _factory ?? Task.Factory;
set => _factory = value;
}
/// <summary>
......@@ -88,19 +52,15 @@ public ServerCounters GetCounters()
/// <summary>
/// Gets the client-name that will be used on all new connections
/// </summary>
public string ClientName => configuration.ClientName ?? ConnectionMultiplexer.GetDefaultClientName();
public string ClientName => configuration.ClientName ?? GetDefaultClientName();
private static string defaultClientName;
private static string GetDefaultClientName()
{
if (defaultClientName == null)
{
defaultClientName = TryGetAzureRoleInstanceIdNoThrow()
return defaultClientName ?? (defaultClientName = TryGetAzureRoleInstanceIdNoThrow()
?? Environment.MachineName
?? Environment.GetEnvironmentVariable("ComputerName")
?? "StackExchange.Redis";
}
return defaultClientName;
?? "StackExchange.Redis");
}
/// <summary>
......@@ -109,9 +69,11 @@ private static string GetDefaultClientName()
/// </summary>
internal static string TryGetAzureRoleInstanceIdNoThrow()
{
#if NETSTANDARD1_5
return null;
#else
string roleInstanceId = null;
// TODO: CoreCLR port pending https://github.com/dotnet/coreclr/issues/919
#if !NETSTANDARD1_5
try
{
Assembly asm = null;
......@@ -146,8 +108,8 @@ internal static string TryGetAzureRoleInstanceIdNoThrow()
//silently ignores the exception
roleInstanceId = null;
}
#endif
return roleInstanceId;
#endif
}
/// <summary>
......@@ -170,7 +132,8 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp
ReconfigureIfNeeded(endpoint, false, "connection failed");
}
}
internal void OnInternalError(Exception exception, EndPoint endpoint = null, ConnectionType connectionType = ConnectionType.None, [System.Runtime.CompilerServices.CallerMemberName] string origin = null)
internal void OnInternalError(Exception exception, EndPoint endpoint = null, ConnectionType connectionType = ConnectionType.None, [CallerMemberName] string origin = null)
{
try
{
......@@ -202,7 +165,6 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT
ReconfigureIfNeeded(endpoint, false, "connection restored");
}
private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs> handler)
{
if (isDisposed) return;
......@@ -213,14 +175,9 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs
);
}
}
internal void OnConfigurationChanged(EndPoint endpoint)
{
OnEndpointChanged(endpoint, ConfigurationChanged);
}
internal void OnConfigurationChangedBroadcast(EndPoint endpoint)
{
OnEndpointChanged(endpoint, ConfigurationChangedBroadcast);
}
internal void OnConfigurationChanged(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChanged);
internal void OnConfigurationChangedBroadcast(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChangedBroadcast);
/// <summary>
/// A server replied with an error message;
......@@ -239,7 +196,7 @@ internal void OnErrorMessage(EndPoint endpoint, string message)
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
static void Write<T>(ZipArchive zip, string name, Task task, Action<T, StreamWriter> callback)
private static void Write<T>(ZipArchive zip, string name, Task task, Action<T, StreamWriter> callback)
{
var entry = zip.CreateEntry(name,
#if __MonoCS__
......@@ -270,6 +227,8 @@ static void Write<T>(ZipArchive zip, string name, Task task, Action<T, StreamWri
/// <summary>
/// Write the configuration of all servers to an output stream
/// </summary>
/// <param name="destination">The destination stream to write the export to.</param>
/// <param name="options">The options to use for this export.</param>
public void ExportConfiguration(Stream destination, ExportOptions options = ExportOptions.All)
{
if (destination == null) throw new ArgumentNullException(nameof(destination));
......@@ -364,7 +323,8 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
try
{
srv.Ping(flags); // if it isn't happy, we're not happy
} catch (Exception ex)
}
catch (Exception ex)
{
LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message);
throw;
......@@ -394,7 +354,8 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
try
{
srv.SlaveOf(null, flags);
} catch (Exception ex)
}
catch (Exception ex)
{
LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message);
throw;
......@@ -408,8 +369,6 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
server.QueueDirectFireAndForget(msg, ResultProcessor.DemandOK);
}
// try and broadcast this everywhere, to catch the maximum audience
if ((options & ReplicationChangeOptions.Broadcast) != 0 && ConfigurationChangedChannel != null
&& CommandMap.IsAvailable(RedisCommand.PUBLISH))
......@@ -424,7 +383,6 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
}
}
if ((options & ReplicationChangeOptions.EnslaveSubordinates) != 0)
{
foreach (var node in nodes)
......@@ -450,24 +408,28 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
/// </summary>
private object LogSyncLock => UniqueId;
// we know this has strong identity: readonly and unique to us
// we know this has strong identity: readonly and unique to us
internal void LogLocked(TextWriter log, string line)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line); }
}
internal void LogLocked(TextWriter log, string line, object arg)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg); }
}
internal void LogLocked(TextWriter log, string line, object arg0, object arg1)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1); }
}
internal void LogLocked(TextWriter log, string line, object arg0, object arg1, object arg2)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1, arg2); }
}
internal void LogLocked(TextWriter log, string line, params object[] args)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, args); }
......@@ -480,7 +442,7 @@ internal void CheckMessage(Message message)
CommandMap.AssertAvailable(message.Command);
}
static void WriteNormalizingLineEndings(string source, StreamWriter writer)
private static void WriteNormalizingLineEndings(string source, StreamWriter writer)
{
using (var reader = new StringReader(source))
{
......@@ -524,7 +486,7 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer)
/// <summary>
/// Gets all endpoints defined on the server
/// </summary>
/// <returns></returns>
/// <param name="configuredOnly">Whether to get only the endpoints specified explicitly in the config.</param>
public EndPoint[] GetEndPoints(bool configuredOnly = false)
{
if (configuredOnly) return configuration.EndPoints.ToArray();
......@@ -536,16 +498,15 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false)
private readonly ConfigurationOptions configuration;
internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
{
return serverSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved);
}
/// <summary>
/// Wait for a given asynchronous operation to complete (or timeout)
/// </summary>
/// <param name="task">The task to wait on.</param>
public void Wait(Task task)
{
if (task == null) throw new ArgumentNullException(nameof(task));
......@@ -555,16 +516,19 @@ public void Wait(Task task)
/// <summary>
/// Wait for a given asynchronous operation to complete (or timeout)
/// </summary>
/// <typeparam name="T">The type contains in the task to wait on.</typeparam>
/// <param name="task">The task to wait on.</param>
public T Wait<T>(Task<T> task)
{
if (task == null) throw new ArgumentNullException(nameof(task));
if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException();
return task.Result;
}
/// <summary>
/// Wait for the given asynchronous operations to complete (or timeout)
/// </summary>
/// <param name="tasks">The tasks to wait on.</param>
public void WaitAll(params Task[] tasks)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
......@@ -572,10 +536,8 @@ public void WaitAll(params Task[] tasks)
if (!Task.WaitAll(tasks, timeoutMilliseconds)) throw new TimeoutException();
}
private bool WaitAllIgnoreErrors(Task[] tasks)
{
return WaitAllIgnoreErrors(tasks, timeoutMilliseconds);
}
private bool WaitAllIgnoreErrors(Task[] tasks) => WaitAllIgnoreErrors(tasks, timeoutMilliseconds);
private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
......@@ -612,21 +574,20 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
private void LogLockedWithThreadPoolStats(TextWriter log, string message, out int busyWorkerCount)
{
busyWorkerCount = 0;
if(log != null)
if (log != null)
{
var sb = new StringBuilder();
sb.Append(message);
string iocp, worker;
busyWorkerCount = GetThreadPoolStats(out iocp, out worker);
busyWorkerCount = GetThreadPoolStats(out string iocp, out string worker);
sb.Append(", IOCP: ").Append(iocp).Append(", WORKER: ").Append(worker);
LogLocked(log, sb.ToString());
}
}
#endif
static bool AllComplete(Task[] tasks)
private static bool AllComplete(Task[] tasks)
{
for(int i = 0 ; i < tasks.Length ; i++)
for (int i = 0; i < tasks.Length; i++)
{
var task = tasks[i];
if (!task.IsCanceled && !task.IsCompleted && !task.IsFaulted)
......@@ -634,6 +595,7 @@ static bool AllComplete(Task[] tasks)
}
return true;
}
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
......@@ -651,8 +613,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var watch = Stopwatch.StartNew();
#if FEATURE_THREADPOOL
int busyWorkerCount;
LogLockedWithThreadPoolStats(log, "Awaiting task completion", out busyWorkerCount);
LogLockedWithThreadPoolStats(log, "Awaiting task completion", out int busyWorkerCount);
#endif
try
{
......@@ -706,7 +667,6 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
return false;
}
/// <summary>
/// Raised when a hash-slot has been relocated
/// </summary>
......@@ -726,10 +686,8 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
/// <summary>
/// Compute the hash-slot of a specified key
/// </summary>
public int HashSlot(RedisKey key)
{
return serverSelectionStrategy.HashSlot(key);
}
/// <param name="key">The key to get a hash slot ID for.</param>
public int HashSlot(RedisKey key) => serverSelectionStrategy.HashSlot(key);
internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags)
{
......@@ -752,7 +710,8 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re
fallback = server;
break;
}
} else
}
else
{
switch (flags)
{
......@@ -769,12 +728,14 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re
return fallback;
}
volatile bool isDisposed;
private volatile bool isDisposed;
internal bool IsDisposed => isDisposed;
/// <summary>
/// Create a new ConnectionMultiplexer instance
/// </summary>
/// <param name="configuration">The string configuration to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static async Task<ConnectionMultiplexer> ConnectAsync(string configuration, TextWriter log = null)
{
IDisposable killMe = null;
......@@ -789,7 +750,8 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio
}
killMe = null;
return muxer;
} finally
}
finally
{
if (killMe != null) try { killMe.Dispose(); } catch { }
}
......@@ -798,6 +760,8 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio
/// <summary>
/// Create a new ConnectionMultiplexer instance
/// </summary>
/// <param name="configuration">The configuration options to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOptions configuration, TextWriter log = null)
{
IDisposable killMe = null;
......@@ -812,23 +776,26 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOption
}
killMe = null;
return muxer;
} finally
}
finally
{
if (killMe != null) try { killMe.Dispose(); } catch { }
}
}
static ConnectionMultiplexer CreateMultiplexer(object configuration)
private static ConnectionMultiplexer CreateMultiplexer(object configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
ConfigurationOptions config;
if (configuration is string)
{
config = ConfigurationOptions.Parse((string)configuration);
} else if (configuration is ConfigurationOptions)
}
else if (configuration is ConfigurationOptions)
{
config = ((ConfigurationOptions)configuration).Clone();
} else
}
else
{
throw new ArgumentException("configuration");
}
......@@ -836,9 +803,12 @@ static ConnectionMultiplexer CreateMultiplexer(object configuration)
config.SetDefaultPorts();
return new ConnectionMultiplexer(config);
}
/// <summary>
/// Create a new ConnectionMultiplexer instance
/// </summary>
/// <param name="configuration">The string configuration to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static ConnectionMultiplexer Connect(string configuration, TextWriter log = null)
{
return ConnectImpl(() => CreateMultiplexer(configuration), log);
......@@ -847,6 +817,8 @@ public static ConnectionMultiplexer Connect(string configuration, TextWriter log
/// <summary>
/// Create a new ConnectionMultiplexer instance
/// </summary>
/// <param name="configuration">The configurtion options to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, TextWriter log = null)
{
return ConnectImpl(() => CreateMultiplexer(configuration), log);
......@@ -915,7 +887,6 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint)
newSnapshot[newSnapshot.Length - 1] = server;
serverSnapshot = newSnapshot;
}
}
}
return server;
......@@ -924,16 +895,15 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint)
internal readonly CommandMap CommandMap;
private ConnectionMultiplexer(ConfigurationOptions configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
IncludeDetailInExceptions = true;
IncludePerformanceCountersInExceptions = false;
this.configuration = configuration;
this.configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
var map = CommandMap = configuration.CommandMap;
if (!string.IsNullOrWhiteSpace(configuration.Password)) map.AssertAvailable(RedisCommand.AUTH);
if(!map.IsAvailable(RedisCommand.ECHO) && !map.IsAvailable(RedisCommand.PING) && !map.IsAvailable(RedisCommand.TIME))
if (!map.IsAvailable(RedisCommand.ECHO) && !map.IsAvailable(RedisCommand.PING) && !map.IsAvailable(RedisCommand.TIME))
{ // I mean really, give me a CHANCE! I need *something* to check the server is available to me...
// see also: SendTracer (matching logic)
map.AssertAvailable(RedisCommand.EXISTS);
......@@ -958,10 +928,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
internal const int MillisecondsPerHeartbeat = 1000;
private static readonly TimerCallback heartbeat = state =>
{
((ConnectionMultiplexer)state).OnHeartbeat();
};
private static readonly TimerCallback heartbeat = state => ((ConnectionMultiplexer)state).OnHeartbeat();
private int _activeHeartbeatErrors;
private void OnHeartbeat()
......@@ -995,8 +962,10 @@ private void OnHeartbeat()
private int lastHeartbeatTicks;
private static int lastGlobalHeartbeatTicks = Environment.TickCount;
internal long LastHeartbeatSecondsAgo {
get {
internal long LastHeartbeatSecondsAgo
{
get
{
if (pulse == null) return -1;
return unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastHeartbeatTicks)) / 1000;
}
......@@ -1011,14 +980,18 @@ private void OnHeartbeat()
/// <summary>
/// Obtain a pub/sub subscriber connection to the specified server
/// </summary>
/// <param name="asyncState">The async state object to pass to the created <see cref="RedisSubscriber"/>.</param>
public ISubscriber GetSubscriber(object asyncState = null)
{
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
return new RedisSubscriber(this, asyncState);
}
/// <summary>
/// Obtain an interactive connection to a database inside redis
/// </summary>
/// <param name="db">The ID to get a database for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisDatabase"/>.</param>
public IDatabase GetDatabase(int db = -1, object asyncState = null)
{
if (db == -1)
......@@ -1033,7 +1006,7 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
}
// DB zero is stored separately, since 0-only is a massively common use-case
const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
private const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
// side note: "databases 16" is the default in redis.conf; happy to store one extra to get nice alignment etc
private IDatabase dbCacheZero;
private IDatabase[] dbCacheLow;
......@@ -1042,7 +1015,7 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
// note we don't need to worry about *always* returning the same instance
// - if two threads ask for db 3 at the same time, it is OK for them to get
// different instances, one of which (arbitrarily) ends up cached for later use
if(db == 0)
if (db == 0)
{
return dbCacheZero ?? (dbCacheZero = new RedisDatabase(this, 0, null));
}
......@@ -1053,28 +1026,30 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
/// <summary>
/// Obtain a configuration API for an individual server
/// </summary>
public IServer GetServer(string host, int port, object asyncState = null)
{
return GetServer(Format.ParseEndPoint(host, port), asyncState);
}
/// <param name="host">The host to get a server for.</param>
/// <param name="port">The port for <paramref name="host"/> to get a server for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param>
public IServer GetServer(string host, int port, object asyncState = null) => GetServer(Format.ParseEndPoint(host, port), asyncState);
/// <summary>
/// Obtain a configuration API for an individual server
/// </summary>
public IServer GetServer(string hostAndPort, object asyncState = null)
{
return GetServer(Format.TryParseEndPoint(hostAndPort), asyncState);
}
/// <param name="hostAndPort">The "host:port" string to get a server for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param>
public IServer GetServer(string hostAndPort, object asyncState = null) => GetServer(Format.TryParseEndPoint(hostAndPort), asyncState);
/// <summary>
/// Obtain a configuration API for an individual server
/// </summary>
public IServer GetServer(IPAddress host, int port)
{
return GetServer(new IPEndPoint(host, port));
}
/// <param name="host">The host to get a server for.</param>
/// <param name="port">The port for <paramref name="host"/> to get a server for.</param>
public IServer GetServer(IPAddress host, int port) => GetServer(new IPEndPoint(host, port));
/// <summary>
/// Obtain a configuration API for an individual server
/// </summary>
/// <param name="endpoint">The endpoint to get a server for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param>
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
if (endpoint == null) throw new ArgumentNullException(nameof(endpoint));
......@@ -1084,14 +1059,14 @@ public IServer GetServer(EndPoint endpoint, object asyncState = null)
return new RedisServer(this, server, asyncState);
}
[Conditional("VERBOSE")]
internal void Trace(string message, [System.Runtime.CompilerServices.CallerMemberName] string category = null)
internal void Trace(string message, [CallerMemberName] string category = null)
{
OnTrace(message, category);
}
[Conditional("VERBOSE")]
internal void Trace(bool condition, string message, [System.Runtime.CompilerServices.CallerMemberName] string category = null)
internal void Trace(bool condition, string message, [CallerMemberName] string category = null)
{
if (condition) OnTrace(message, category);
}
......@@ -1100,14 +1075,15 @@ internal void Trace(bool condition, string message, [System.Runtime.CompilerServ
static partial void OnTraceWithoutContext(string message, string category);
[Conditional("VERBOSE")]
internal static void TraceWithoutContext(string message, [System.Runtime.CompilerServices.CallerMemberName] string category = null)
internal static void TraceWithoutContext(string message, [CallerMemberName] string category = null)
{
OnTraceWithoutContext(message, category);
}
[Conditional("VERBOSE")]
internal static void TraceWithoutContext(bool condition, string message, [System.Runtime.CompilerServices.CallerMemberName] string category = null)
internal static void TraceWithoutContext(bool condition, string message, [CallerMemberName] string category = null)
{
if(condition) OnTraceWithoutContext(message, category);
if (condition) OnTraceWithoutContext(message, category);
}
private readonly CompletionManager unprocessableCompletionManager;
......@@ -1115,7 +1091,8 @@ internal static void TraceWithoutContext(bool condition, string message, [System
/// <summary>
/// The number of operations that have been performed on all connections
/// </summary>
public long OperationCount {
public long OperationCount
{
get
{
long total = 0;
......@@ -1125,7 +1102,7 @@ internal static void TraceWithoutContext(bool condition, string message, [System
}
}
string activeConfigCause;
private string activeConfigCause;
internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cause, bool publishReconfigure = false, CommandFlags flags = CommandFlags.None)
{
......@@ -1140,7 +1117,8 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau
Trace("Configuration change detected; checking nodes", "Configuration");
ReconfigureAsync(false, reconfigureAll, null, blame, cause, publishReconfigure, flags).ObserveErrors();
return true;
} else
}
else
{
Trace("Configuration change skipped; already in progress via " + activeCause, "Configuration");
return false;
......@@ -1150,13 +1128,16 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau
/// <summary>
/// Reconfigure the current connections based on the existing configuration
/// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public Task<bool> ConfigureAsync(TextWriter log = null)
{
return ReconfigureAsync(false, true, log, null, "configure").ObserveErrors();
}
/// <summary>
/// Reconfigure the current connections based on the existing configuration
/// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public bool Configure(TextWriter log = null)
{
// note we expect ReconfigureAsync to internally allow [n] duration,
......@@ -1190,20 +1171,23 @@ internal int SyncConnectTimeout(bool forConnect)
if (timeout >= int.MaxValue - 500) return int.MaxValue;
return timeout + Math.Min(500, timeout);
}
/// <summary>
/// Provides a text overview of the status of all connections
/// </summary>
public string GetStatus()
{
using(var sw = new StringWriter())
using (var sw = new StringWriter())
{
GetStatus(sw);
return sw.ToString();
}
}
/// <summary>
/// Provides a text overview of the status of all connections
/// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public void GetStatus(TextWriter log)
{
if (log == null) return;
......@@ -1218,6 +1202,7 @@ public void GetStatus(TextWriter log)
LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
}
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{
if (isDisposed) throw new ObjectDisposedException(ToString());
......@@ -1244,7 +1229,6 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
LogLocked(log, configuration.ToString(includePassword: false));
LogLocked(log, "");
if (first)
{
if (configuration.ResolveDns && configuration.HasDnsEndPoints())
......@@ -1488,7 +1472,6 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
long coveredSlots = serverSelectionStrategy.CountCoveredSlots();
LogLocked(log, "Cluster: {0} of {1} slots covered",
coveredSlots, serverSelectionStrategy.TotalSlots);
}
if (!first)
{
......@@ -1523,7 +1506,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
//WTF("?: " + attempts);
} while (first && !healthy && attemptsLeft > 0);
if(first && configuration.AbortOnConnectFail && !healthy)
if (first && configuration.AbortOnConnectFail && !healthy)
{
return false;
}
......@@ -1532,7 +1515,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
LogLocked(log, "Starting heartbeat...");
pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
}
if(publishReconfigure)
if (publishReconfigure)
{
try
{
......@@ -1543,8 +1526,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ }
}
return true;
} catch (Exception ex)
}
catch (Exception ex)
{
Trace(ex.Message);
throw;
......@@ -1574,17 +1557,16 @@ private async Task<EndPointCollection> GetEndpointsFromClusterNodes(ServerEndPoi
}
}
private void ResetAllNonConnected()
{
var snapshot = serverSnapshot;
foreach(var server in snapshot)
foreach (var server in snapshot)
{
server.ResetNonConnected();
}
}
partial void OnTraceLog(TextWriter log, [System.Runtime.CompilerServices.CallerMemberName] string caller = null);
partial void OnTraceLog(TextWriter log, [CallerMemberName] string caller = null);
private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, ServerEndPoint[] servers, bool useTieBreakers, Task<string>[] tieBreakers, List<ServerEndPoint> masters)
{
Dictionary<string, int> uniques = null;
......@@ -1607,8 +1589,7 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
else
{
LogLocked(log, "{0} nominates: {1}", Format.ToString(ep), s);
int count;
if (!uniques.TryGetValue(s, out count)) count = 0;
if (!uniques.TryGetValue(s, out int count)) count = 0;
uniques[s] = count + 1;
}
break;
......@@ -1627,7 +1608,6 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
}
}
switch (masters.Count)
{
case 0:
......@@ -1685,16 +1665,13 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
return highest;
}
break;
}
}
break;
}
LogLocked(log, "Choosing master arbitrarily: {0}", Format.ToString(masters[0].EndPoint));
return masters[0];
}
private ServerEndPoint SelectServerByElection(ServerEndPoint[] servers, string endpoint, TextWriter log)
......@@ -1718,7 +1695,7 @@ private ServerEndPoint SelectServerByElection(ServerEndPoint[] servers, string e
return null;
}
static string DeDotifyHost(string input)
private static string DeDotifyHost(string input)
{
if (string.IsNullOrWhiteSpace(input)) return input; // GIGO
......@@ -1772,6 +1749,7 @@ internal ServerEndPoint SelectServer(int db, RedisCommand command, CommandFlags
{
return serverSelectionStrategy.Select(db, command, key, flags);
}
private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
{
message.SetSource(processor, resultBox);
......@@ -1782,13 +1760,12 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
}
else // a server was specified; do we trust their choice, though?
{
if (message.IsMasterOnly() && server.IsSlave)
{
throw ExceptionFactory.MasterOnly(IncludeDetailInExceptions, message.Command, message, server);
}
switch(server.ServerType)
switch (server.ServerType)
{
case ServerType.Cluster:
case ServerType.Twemproxy: // strictly speaking twemproxy uses a different hashing algo, but the hash-tag behavior is
......@@ -1809,20 +1786,18 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
if (server != null)
{
var profCtx = profiler?.GetContext();
if (profCtx != null)
{
ConcurrentProfileStorageCollection inFlightForCtx;
if (profiledCommands.TryGetValue(profCtx, out inFlightForCtx))
if (profCtx != null && profiledCommands.TryGetValue(profCtx, out ConcurrentProfileStorageCollection inFlightForCtx))
{
message.SetProfileStorage(ProfileStorage.NewWithContext(inFlightForCtx, server));
}
}
if (message.Db >= 0)
{
int availableDatabases = server.Databases;
if (availableDatabases > 0 && message.Db >= availableDatabases) throw ExceptionFactory.DatabaseOutfRange(
IncludeDetailInExceptions, message.Db, message, server);
if (availableDatabases > 0 && message.Db >= availableDatabases)
{
throw ExceptionFactory.DatabaseOutfRange(IncludeDetailInExceptions, message.Db, message, server);
}
}
Trace("Queueing on server: " + message);
......@@ -1832,7 +1807,6 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
return false;
}
/// <summary>
/// See Object.ToString()
/// </summary>
......@@ -1846,7 +1820,6 @@ public override string ToString()
internal readonly byte[] ConfigurationChangedChannel; // this gets accessed for every received event; let's make sure we can process it "raw"
internal readonly byte[] UniqueId = Guid.NewGuid().ToByteArray(); // unique identifier used when tracing
/// <summary>
/// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order
/// </summary>
......@@ -1870,10 +1843,10 @@ public bool IsConnected
internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy;
/// <summary>
/// Close all connections and release all resources associated with this object
/// </summary>
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public void Close(bool allowCommandsToComplete = true)
{
isDisposed = true;
......@@ -1890,6 +1863,7 @@ public void Close(bool allowCommandsToComplete = true)
DisposeAndClearServers();
OnCloseReaderWriter();
}
partial void OnCloseReaderWriter();
private void DisposeAndClearServers()
......@@ -1925,6 +1899,7 @@ private Task[] QuitAllServers()
/// <summary>
/// Close all connections and release all resources associated with this object
/// </summary>
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public async Task CloseAsync(bool allowCommandsToComplete = true)
{
isDisposed = true;
......@@ -1950,7 +1925,6 @@ public void Dispose()
Close(!isDisposed);
}
internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> processor, object state, ServerEndPoint server)
{
if (isDisposed) throw new ObjectDisposedException(ToString());
......@@ -1982,13 +1956,15 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un
try
{
throw unthrownException;
} catch (Exception ex)
}
catch (Exception ex)
{
source.TrySetException(ex);
GC.KeepAlive(source.Task.Exception);
GC.SuppressFinalize(source.Task);
}
}
internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server)
{
if (isDisposed) throw new ObjectDisposedException(ToString());
......@@ -2017,11 +1993,11 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
if (Monitor.Wait(source, timeoutMilliseconds))
{
Trace("Timeley response to " + message.ToString());
Trace("Timeley response to " + message);
}
else
{
Trace("Timeout performing " + message.ToString());
Trace("Timeout performing " + message);
Interlocked.Increment(ref syncTimeouts);
string errMessage;
List<Tuple<string, string>> data = null;
......@@ -2031,21 +2007,19 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
}
else
{
int inst, qu, qs, qc, wr, wq, @in, ar;
#if FEATURE_SOCKET_MODE_POLL
var mgrState = socketManager.State;
var lastError = socketManager.LastErrorTimeRelative();
#endif
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey);
data = new List<Tuple<string, string>> {Tuple.Create("Message", message.CommandAndKey)};
Action<string, string, string> add = (lk, sk, v) =>
data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
void add(string lk, string sk, string v)
{
data.Add(Tuple.Create(lk, v));
sb.Append(", " + sk + ": " + v);
};
sb.Append(", ").Append(sk).Append(": ").Append(v);
}
int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar);
int queue = server.GetOutstandingCount(message.Command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in, out int ar);
add("Instantaneous", "inst", inst.ToString());
#if FEATURE_SOCKET_MODE_POLL
add("Manager-State", "mgr", mgrState.ToString());
......@@ -2069,8 +2043,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
add("Key-HashSlot", "keyHashSlot", message.GetHashSlot(this.ServerSelectionStrategy).ToString());
}
#if FEATURE_THREADPOOL
string iocp, worker;
int busyWorkerCount = GetThreadPoolStats(out iocp, out worker);
int busyWorkerCount = GetThreadPoolStats(out string iocp, out string worker);
add("ThreadPool-IO-Completion", "IOCP", iocp);
add("ThreadPool-Workers", "WORKER", worker);
data.Add(Tuple.Create("Busy-Workers", busyWorkerCount.ToString()));
......@@ -2085,7 +2058,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
sb.Append(timeoutHelpLink);
sb.Append(")");
errMessage = sb.ToString();
if (stormLogThreshold >= 0 && queue >= stormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0)
if (StormLogThreshold >= 0 && queue >= StormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0)
{
var log = server.GetStormLog(message.Command);
if (string.IsNullOrWhiteSpace(log)) Interlocked.Exchange(ref haveStormLog, 0);
......@@ -2107,9 +2080,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
}
}
// snapshot these so that we can recycle the box
Exception ex;
T val;
ResultBox<T>.UnwrapAndRecycle(source, true, out val, out ex); // now that we aren't locking it...
ResultBox<T>.UnwrapAndRecycle(source, true, out T val, out Exception ex); // now that we aren't locking it...
if (ex != null) throw ex;
Trace(message + " received " + val);
return val;
......@@ -2119,36 +2090,24 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
#if FEATURE_PERFCOUNTER
internal static string GetThreadPoolAndCPUSummary(bool includePerformanceCounters)
{
string iocp, worker;
GetThreadPoolStats(out iocp, out worker);
GetThreadPoolStats(out string iocp, out string worker);
var cpu = includePerformanceCounters ? GetSystemCpuPercent() : "n/a";
return $"IOCP: {iocp}, WORKER: {worker}, Local-CPU: {cpu}";
}
private static string GetSystemCpuPercent()
{
float systemCPU;
if (PerfCounterHelper.TryGetSystemCPU(out systemCPU))
{
return Math.Round(systemCPU, 2) + "%";
}
return "unavailable";
return (PerfCounterHelper.TryGetSystemCPU(out float systemCPU))
? Math.Round(systemCPU, 2) + "%"
: "unavailable";
}
#endif
#if FEATURE_THREADPOOL
private static int GetThreadPoolStats(out string iocp, out string worker)
{
//BusyThreads = TP.GetMaxThreads() –TP.GetAVailable();
//If BusyThreads >= TP.GetMinThreads(), then threadpool growth throttling is possible.
int maxIoThreads, maxWorkerThreads;
ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxIoThreads);
int freeIoThreads, freeWorkerThreads;
ThreadPool.GetAvailableThreads(out freeWorkerThreads, out freeIoThreads);
int minIoThreads, minWorkerThreads;
ThreadPool.GetMinThreads(out minWorkerThreads, out minIoThreads);
ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxIoThreads);
ThreadPool.GetAvailableThreads(out int freeWorkerThreads, out int freeIoThreads);
ThreadPool.GetMinThreads(out int minWorkerThreads, out int minIoThreads);
int busyIoThreads = maxIoThreads - freeIoThreads;
int busyWorkerThreads = maxWorkerThreads - freeWorkerThreads;
......@@ -2169,13 +2128,14 @@ private static int GetThreadPoolStats(out string iocp, out string worker)
/// </summary>
public bool IncludePerformanceCountersInExceptions { get; set; }
int haveStormLog = 0, stormLogThreshold = 15;
string stormLogSnapshot;
private int haveStormLog = 0;
private string stormLogSnapshot;
/// <summary>
/// Limit at which to start recording unusual busy patterns (only one log will be retained at a time;
/// set to a negative value to disable this feature)
/// </summary>
public int StormLogThreshold { get { return stormLogThreshold; } set { stormLogThreshold = value; } }
public int StormLogThreshold { get; set; } = 15;
/// <summary>
/// Obtains the log of unusual busy patterns
/// </summary>
......@@ -2192,11 +2152,13 @@ public void ResetStormLog()
Interlocked.Exchange(ref stormLogSnapshot, null);
Interlocked.Exchange(ref haveStormLog, 0);
}
private long syncTimeouts, fireAndForgets;
/// <summary>
/// Request all compatible clients to reconfigure or reconnect
/// </summary>
/// <param name="flags">The command flags to use.</param>2
/// <returns>The number of instances known to have received the message (however, the actual number can be higher; returns -1 if the operation is pending)</returns>
public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
{
......@@ -2211,6 +2173,7 @@ public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
return PublishReconfigureImpl(flags);
}
}
private long PublishReconfigureImpl(CommandFlags flags)
{
byte[] channel = ConfigurationChangedChannel;
......@@ -2221,6 +2184,7 @@ private long PublishReconfigureImpl(CommandFlags flags)
/// <summary>
/// Request all compatible clients to reconfigure or reconnect
/// </summary>
/// <param name="flags">The command flags to use.</param>
/// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns>
public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
{
......
......@@ -54,10 +54,9 @@ public static IDatabase WithKeyPrefix(this IDatabase database, RedisKey keyPrefi
return database; // fine - you can keep using the original, then
}
if(database is DatabaseWrapper)
if (database is DatabaseWrapper wrapper)
{
// combine the key in advance to minimize indirection
var wrapper = (DatabaseWrapper)database;
keyPrefix = wrapper.ToInner(keyPrefix);
database = wrapper.Inner;
}
......
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
internal static class TaskExtensions
{
private static readonly Action<Task> observeErrors = ObverveErrors;
private static void ObverveErrors(this Task task)
{
if (task != null) GC.KeepAlive(task.Exception);
}
public static Task ObserveErrors(this Task task)
{
task?.ContinueWith(observeErrors, TaskContinuationOptions.OnlyOnFaulted);
return task;
}
public static Task<T> ObserveErrors<T>(this Task<T> task)
{
task?.ContinueWith(observeErrors, TaskContinuationOptions.OnlyOnFaulted);
return task;
}
public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment