Commit 95271bbb authored by Nick Craver's avatar Nick Craver

Multiplexer: normalize line endings

parent a86a5e5f
...@@ -63,12 +63,12 @@ private static string GetDefaultClientName() ...@@ -63,12 +63,12 @@ private static string GetDefaultClientName()
?? "StackExchange.Redis"); ?? "StackExchange.Redis");
} }
/// <summary> /// <summary>
/// Tries to get the Roleinstance Id if Microsoft.WindowsAzure.ServiceRuntime is loaded. /// Tries to get the Roleinstance Id if Microsoft.WindowsAzure.ServiceRuntime is loaded.
/// In case of any failure, swallows the exception and returns null /// In case of any failure, swallows the exception and returns null
/// </summary> /// </summary>
internal static string TryGetAzureRoleInstanceIdNoThrow() internal static string TryGetAzureRoleInstanceIdNoThrow()
{ {
#if NETSTANDARD1_5 #if NETSTANDARD1_5
return null; return null;
#else #else
...@@ -132,7 +132,7 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp ...@@ -132,7 +132,7 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp
ReconfigureIfNeeded(endpoint, false, "connection failed"); ReconfigureIfNeeded(endpoint, false, "connection failed");
} }
} }
internal void OnInternalError(Exception exception, EndPoint endpoint = null, ConnectionType connectionType = ConnectionType.None, [CallerMemberName] string origin = null) internal void OnInternalError(Exception exception, EndPoint endpoint = null, ConnectionType connectionType = ConnectionType.None, [CallerMemberName] string origin = null)
{ {
try try
...@@ -175,8 +175,8 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs ...@@ -175,8 +175,8 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs
); );
} }
} }
internal void OnConfigurationChanged(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChanged); internal void OnConfigurationChanged(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChanged);
internal void OnConfigurationChangedBroadcast(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChangedBroadcast); internal void OnConfigurationChangedBroadcast(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChangedBroadcast);
/// <summary> /// <summary>
...@@ -223,11 +223,11 @@ private static void Write<T>(ZipArchive zip, string name, Task task, Action<T, S ...@@ -223,11 +223,11 @@ private static void Write<T>(ZipArchive zip, string name, Task task, Action<T, S
break; break;
} }
} }
} }
/// <summary> /// <summary>
/// Write the configuration of all servers to an output stream /// Write the configuration of all servers to an output stream
/// </summary> /// </summary>
/// <param name="destination">The destination stream to write the export to.</param> /// <param name="destination">The destination stream to write the export to.</param>
/// <param name="options">The options to use for this export.</param> /// <param name="options">The options to use for this export.</param>
public void ExportConfiguration(Stream destination, ExportOptions options = ExportOptions.All) public void ExportConfiguration(Stream destination, ExportOptions options = ExportOptions.All)
{ {
...@@ -323,7 +323,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -323,7 +323,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
try try
{ {
srv.Ping(flags); // if it isn't happy, we're not happy srv.Ping(flags); // if it isn't happy, we're not happy
} }
catch (Exception ex) catch (Exception ex)
{ {
LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message); LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message);
...@@ -354,7 +354,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -354,7 +354,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
try try
{ {
srv.SlaveOf(null, flags); srv.SlaveOf(null, flags);
} }
catch (Exception ex) catch (Exception ex)
{ {
LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message); LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message);
...@@ -406,30 +406,30 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -406,30 +406,30 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
/// <summary> /// <summary>
/// Used internally to synchronize loggine without depending on locking the log instance /// Used internally to synchronize loggine without depending on locking the log instance
/// </summary> /// </summary>
private object LogSyncLock => UniqueId; private object LogSyncLock => UniqueId;
// we know this has strong identity: readonly and unique to us // we know this has strong identity: readonly and unique to us
internal void LogLocked(TextWriter log, string line) internal void LogLocked(TextWriter log, string line)
{ {
if (log != null) lock (LogSyncLock) { log.WriteLine(line); } if (log != null) lock (LogSyncLock) { log.WriteLine(line); }
} }
internal void LogLocked(TextWriter log, string line, object arg) internal void LogLocked(TextWriter log, string line, object arg)
{ {
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg); }
} }
internal void LogLocked(TextWriter log, string line, object arg0, object arg1) internal void LogLocked(TextWriter log, string line, object arg0, object arg1)
{ {
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1); }
} }
internal void LogLocked(TextWriter log, string line, object arg0, object arg1, object arg2) internal void LogLocked(TextWriter log, string line, object arg0, object arg1, object arg2)
{ {
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1, arg2); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1, arg2); }
} }
internal void LogLocked(TextWriter log, string line, params object[] args) internal void LogLocked(TextWriter log, string line, params object[] args)
{ {
if (log != null) lock (LogSyncLock) { log.WriteLine(line, args); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, args); }
...@@ -481,11 +481,11 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ ...@@ -481,11 +481,11 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ
/// <summary> /// <summary>
/// Gets the timeout associated with the connections /// Gets the timeout associated with the connections
/// </summary> /// </summary>
public int TimeoutMilliseconds => timeoutMilliseconds; public int TimeoutMilliseconds => timeoutMilliseconds;
/// <summary> /// <summary>
/// Gets all endpoints defined on the server /// Gets all endpoints defined on the server
/// </summary> /// </summary>
/// <param name="configuredOnly">Whether to get only the endpoints specified explicitly in the config.</param> /// <param name="configuredOnly">Whether to get only the endpoints specified explicitly in the config.</param>
public EndPoint[] GetEndPoints(bool configuredOnly = false) public EndPoint[] GetEndPoints(bool configuredOnly = false)
{ {
...@@ -501,33 +501,33 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false) ...@@ -501,33 +501,33 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false)
internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved) internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
{ {
return serverSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved); return serverSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved);
} }
/// <summary> /// <summary>
/// Wait for a given asynchronous operation to complete (or timeout) /// Wait for a given asynchronous operation to complete (or timeout)
/// </summary> /// </summary>
/// <param name="task">The task to wait on.</param> /// <param name="task">The task to wait on.</param>
public void Wait(Task task) public void Wait(Task task)
{ {
if (task == null) throw new ArgumentNullException(nameof(task)); if (task == null) throw new ArgumentNullException(nameof(task));
if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException(); if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException();
} }
/// <summary> /// <summary>
/// Wait for a given asynchronous operation to complete (or timeout) /// Wait for a given asynchronous operation to complete (or timeout)
/// </summary> /// </summary>
/// <typeparam name="T">The type contains in the task to wait on.</typeparam> /// <typeparam name="T">The type contains in the task to wait on.</typeparam>
/// <param name="task">The task to wait on.</param> /// <param name="task">The task to wait on.</param>
public T Wait<T>(Task<T> task) public T Wait<T>(Task<T> task)
{ {
if (task == null) throw new ArgumentNullException(nameof(task)); if (task == null) throw new ArgumentNullException(nameof(task));
if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException(); if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException();
return task.Result; return task.Result;
} }
/// <summary> /// <summary>
/// Wait for the given asynchronous operations to complete (or timeout) /// Wait for the given asynchronous operations to complete (or timeout)
/// </summary> /// </summary>
/// <param name="tasks">The tasks to wait on.</param> /// <param name="tasks">The tasks to wait on.</param>
public void WaitAll(params Task[] tasks) public void WaitAll(params Task[] tasks)
{ {
...@@ -537,7 +537,7 @@ public void WaitAll(params Task[] tasks) ...@@ -537,7 +537,7 @@ public void WaitAll(params Task[] tasks)
} }
private bool WaitAllIgnoreErrors(Task[] tasks) => WaitAllIgnoreErrors(tasks, timeoutMilliseconds); private bool WaitAllIgnoreErrors(Task[] tasks) => WaitAllIgnoreErrors(tasks, timeoutMilliseconds);
private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
{ {
if (tasks == null) throw new ArgumentNullException(nameof(tasks)); if (tasks == null) throw new ArgumentNullException(nameof(tasks));
...@@ -569,7 +569,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) ...@@ -569,7 +569,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
} }
return false; return false;
} }
#if FEATURE_THREADPOOL #if FEATURE_THREADPOOL
private void LogLockedWithThreadPoolStats(TextWriter log, string message, out int busyWorkerCount) private void LogLockedWithThreadPoolStats(TextWriter log, string message, out int busyWorkerCount)
{ {
...@@ -595,7 +595,7 @@ private static bool AllComplete(Task[] tasks) ...@@ -595,7 +595,7 @@ private static bool AllComplete(Task[] tasks)
} }
return true; return true;
} }
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log) private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log)
{ {
if (tasks == null) throw new ArgumentNullException(nameof(tasks)); if (tasks == null) throw new ArgumentNullException(nameof(tasks));
...@@ -681,11 +681,11 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new) ...@@ -681,11 +681,11 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new) new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new)
); );
} }
} }
/// <summary> /// <summary>
/// Compute the hash-slot of a specified key /// Compute the hash-slot of a specified key
/// </summary> /// </summary>
/// <param name="key">The key to get a hash slot ID for.</param> /// <param name="key">The key to get a hash slot ID for.</param>
public int HashSlot(RedisKey key) => serverSelectionStrategy.HashSlot(key); public int HashSlot(RedisKey key) => serverSelectionStrategy.HashSlot(key);
...@@ -710,7 +710,7 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re ...@@ -710,7 +710,7 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re
fallback = server; fallback = server;
break; break;
} }
} }
else else
{ {
switch (flags) switch (flags)
...@@ -729,12 +729,12 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re ...@@ -729,12 +729,12 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re
} }
private volatile bool isDisposed; private volatile bool isDisposed;
internal bool IsDisposed => isDisposed; internal bool IsDisposed => isDisposed;
/// <summary> /// <summary>
/// Create a new ConnectionMultiplexer instance /// Create a new ConnectionMultiplexer instance
/// </summary> /// </summary>
/// <param name="configuration">The string configuration to use for this multiplexer.</param> /// <param name="configuration">The string configuration to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static async Task<ConnectionMultiplexer> ConnectAsync(string configuration, TextWriter log = null) public static async Task<ConnectionMultiplexer> ConnectAsync(string configuration, TextWriter log = null)
{ {
...@@ -750,7 +750,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio ...@@ -750,7 +750,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio
} }
killMe = null; killMe = null;
return muxer; return muxer;
} }
finally finally
{ {
if (killMe != null) try { killMe.Dispose(); } catch { } if (killMe != null) try { killMe.Dispose(); } catch { }
...@@ -759,8 +759,8 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio ...@@ -759,8 +759,8 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio
/// <summary> /// <summary>
/// Create a new ConnectionMultiplexer instance /// Create a new ConnectionMultiplexer instance
/// </summary> /// </summary>
/// <param name="configuration">The configuration options to use for this multiplexer.</param> /// <param name="configuration">The configuration options to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOptions configuration, TextWriter log = null) public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOptions configuration, TextWriter log = null)
{ {
...@@ -776,7 +776,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOption ...@@ -776,7 +776,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOption
} }
killMe = null; killMe = null;
return muxer; return muxer;
} }
finally finally
{ {
if (killMe != null) try { killMe.Dispose(); } catch { } if (killMe != null) try { killMe.Dispose(); } catch { }
...@@ -790,11 +790,11 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration) ...@@ -790,11 +790,11 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration)
if (configuration is string) if (configuration is string)
{ {
config = ConfigurationOptions.Parse((string)configuration); config = ConfigurationOptions.Parse((string)configuration);
} }
else if (configuration is ConfigurationOptions) else if (configuration is ConfigurationOptions)
{ {
config = ((ConfigurationOptions)configuration).Clone(); config = ((ConfigurationOptions)configuration).Clone();
} }
else else
{ {
throw new ArgumentException("configuration"); throw new ArgumentException("configuration");
...@@ -806,8 +806,8 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration) ...@@ -806,8 +806,8 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration)
/// <summary> /// <summary>
/// Create a new ConnectionMultiplexer instance /// Create a new ConnectionMultiplexer instance
/// </summary> /// </summary>
/// <param name="configuration">The string configuration to use for this multiplexer.</param> /// <param name="configuration">The string configuration to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static ConnectionMultiplexer Connect(string configuration, TextWriter log = null) public static ConnectionMultiplexer Connect(string configuration, TextWriter log = null)
{ {
...@@ -816,8 +816,8 @@ public static ConnectionMultiplexer Connect(string configuration, TextWriter log ...@@ -816,8 +816,8 @@ public static ConnectionMultiplexer Connect(string configuration, TextWriter log
/// <summary> /// <summary>
/// Create a new ConnectionMultiplexer instance /// Create a new ConnectionMultiplexer instance
/// </summary> /// </summary>
/// <param name="configuration">The configurtion options to use for this multiplexer.</param> /// <param name="configuration">The configurtion options to use for this multiplexer.</param>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, TextWriter log = null) public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, TextWriter log = null)
{ {
...@@ -962,9 +962,9 @@ private void OnHeartbeat() ...@@ -962,9 +962,9 @@ private void OnHeartbeat()
private int lastHeartbeatTicks; private int lastHeartbeatTicks;
private static int lastGlobalHeartbeatTicks = Environment.TickCount; private static int lastGlobalHeartbeatTicks = Environment.TickCount;
internal long LastHeartbeatSecondsAgo internal long LastHeartbeatSecondsAgo
{ {
get get
{ {
if (pulse == null) return -1; if (pulse == null) return -1;
return unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastHeartbeatTicks)) / 1000; return unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastHeartbeatTicks)) / 1000;
...@@ -975,22 +975,22 @@ internal long LastHeartbeatSecondsAgo ...@@ -975,22 +975,22 @@ internal long LastHeartbeatSecondsAgo
internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastGlobalHeartbeatTicks)) / 1000; internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastGlobalHeartbeatTicks)) / 1000;
internal CompletionManager UnprocessableCompletionManager => unprocessableCompletionManager; internal CompletionManager UnprocessableCompletionManager => unprocessableCompletionManager;
/// <summary> /// <summary>
/// Obtain a pub/sub subscriber connection to the specified server /// Obtain a pub/sub subscriber connection to the specified server
/// </summary> /// </summary>
/// <param name="asyncState">The async state object to pass to the created <see cref="RedisSubscriber"/>.</param> /// <param name="asyncState">The async state object to pass to the created <see cref="RedisSubscriber"/>.</param>
public ISubscriber GetSubscriber(object asyncState = null) public ISubscriber GetSubscriber(object asyncState = null)
{ {
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy"); if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
return new RedisSubscriber(this, asyncState); return new RedisSubscriber(this, asyncState);
} }
/// <summary> /// <summary>
/// Obtain an interactive connection to a database inside redis /// Obtain an interactive connection to a database inside redis
/// </summary> /// </summary>
/// <param name="db">The ID to get a database for.</param> /// <param name="db">The ID to get a database for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisDatabase"/>.</param> /// <param name="asyncState">The async state to pass into the resulting <see cref="RedisDatabase"/>.</param>
public IDatabase GetDatabase(int db = -1, object asyncState = null) public IDatabase GetDatabase(int db = -1, object asyncState = null)
{ {
...@@ -1025,30 +1025,30 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null) ...@@ -1025,30 +1025,30 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
/// <summary> /// <summary>
/// Obtain a configuration API for an individual server /// Obtain a configuration API for an individual server
/// </summary> /// </summary>
/// <param name="host">The host to get a server for.</param> /// <param name="host">The host to get a server for.</param>
/// <param name="port">The port for <paramref name="host"/> to get a server for.</param> /// <param name="port">The port for <paramref name="host"/> to get a server for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param> /// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param>
public IServer GetServer(string host, int port, object asyncState = null) => GetServer(Format.ParseEndPoint(host, port), asyncState); public IServer GetServer(string host, int port, object asyncState = null) => GetServer(Format.ParseEndPoint(host, port), asyncState);
/// <summary> /// <summary>
/// Obtain a configuration API for an individual server /// Obtain a configuration API for an individual server
/// </summary> /// </summary>
/// <param name="hostAndPort">The "host:port" string to get a server for.</param> /// <param name="hostAndPort">The "host:port" string to get a server for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param> /// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param>
public IServer GetServer(string hostAndPort, object asyncState = null) => GetServer(Format.TryParseEndPoint(hostAndPort), asyncState); public IServer GetServer(string hostAndPort, object asyncState = null) => GetServer(Format.TryParseEndPoint(hostAndPort), asyncState);
/// <summary> /// <summary>
/// Obtain a configuration API for an individual server /// Obtain a configuration API for an individual server
/// </summary> /// </summary>
/// <param name="host">The host to get a server for.</param> /// <param name="host">The host to get a server for.</param>
/// <param name="port">The port for <paramref name="host"/> to get a server for.</param> /// <param name="port">The port for <paramref name="host"/> to get a server for.</param>
public IServer GetServer(IPAddress host, int port) => GetServer(new IPEndPoint(host, port)); public IServer GetServer(IPAddress host, int port) => GetServer(new IPEndPoint(host, port));
/// <summary> /// <summary>
/// Obtain a configuration API for an individual server /// Obtain a configuration API for an individual server
/// </summary> /// </summary>
/// <param name="endpoint">The endpoint to get a server for.</param> /// <param name="endpoint">The endpoint to get a server for.</param>
/// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param> /// <param name="asyncState">The async state to pass into the resulting <see cref="RedisServer"/>.</param>
public IServer GetServer(EndPoint endpoint, object asyncState = null) public IServer GetServer(EndPoint endpoint, object asyncState = null)
{ {
...@@ -1064,7 +1064,7 @@ internal void Trace(string message, [CallerMemberName] string category = null) ...@@ -1064,7 +1064,7 @@ internal void Trace(string message, [CallerMemberName] string category = null)
{ {
OnTrace(message, category); OnTrace(message, category);
} }
[Conditional("VERBOSE")] [Conditional("VERBOSE")]
internal void Trace(bool condition, string message, [CallerMemberName] string category = null) internal void Trace(bool condition, string message, [CallerMemberName] string category = null)
{ {
...@@ -1079,7 +1079,7 @@ internal static void TraceWithoutContext(string message, [CallerMemberName] stri ...@@ -1079,7 +1079,7 @@ internal static void TraceWithoutContext(string message, [CallerMemberName] stri
{ {
OnTraceWithoutContext(message, category); OnTraceWithoutContext(message, category);
} }
[Conditional("VERBOSE")] [Conditional("VERBOSE")]
internal static void TraceWithoutContext(bool condition, string message, [CallerMemberName] string category = null) internal static void TraceWithoutContext(bool condition, string message, [CallerMemberName] string category = null)
{ {
...@@ -1091,7 +1091,7 @@ internal static void TraceWithoutContext(bool condition, string message, [Caller ...@@ -1091,7 +1091,7 @@ internal static void TraceWithoutContext(bool condition, string message, [Caller
/// <summary> /// <summary>
/// The number of operations that have been performed on all connections /// The number of operations that have been performed on all connections
/// </summary> /// </summary>
public long OperationCount public long OperationCount
{ {
get get
{ {
...@@ -1117,17 +1117,17 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau ...@@ -1117,17 +1117,17 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau
Trace("Configuration change detected; checking nodes", "Configuration"); Trace("Configuration change detected; checking nodes", "Configuration");
ReconfigureAsync(false, reconfigureAll, null, blame, cause, publishReconfigure, flags).ObserveErrors(); ReconfigureAsync(false, reconfigureAll, null, blame, cause, publishReconfigure, flags).ObserveErrors();
return true; return true;
} }
else else
{ {
Trace("Configuration change skipped; already in progress via " + activeCause, "Configuration"); Trace("Configuration change skipped; already in progress via " + activeCause, "Configuration");
return false; return false;
} }
} }
/// <summary> /// <summary>
/// Reconfigure the current connections based on the existing configuration /// Reconfigure the current connections based on the existing configuration
/// </summary> /// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public Task<bool> ConfigureAsync(TextWriter log = null) public Task<bool> ConfigureAsync(TextWriter log = null)
{ {
...@@ -1136,7 +1136,7 @@ public Task<bool> ConfigureAsync(TextWriter log = null) ...@@ -1136,7 +1136,7 @@ public Task<bool> ConfigureAsync(TextWriter log = null)
/// <summary> /// <summary>
/// Reconfigure the current connections based on the existing configuration /// Reconfigure the current connections based on the existing configuration
/// </summary> /// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public bool Configure(TextWriter log = null) public bool Configure(TextWriter log = null)
{ {
...@@ -1186,7 +1186,7 @@ public string GetStatus() ...@@ -1186,7 +1186,7 @@ public string GetStatus()
/// <summary> /// <summary>
/// Provides a text overview of the status of all connections /// Provides a text overview of the status of all connections
/// </summary> /// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param> /// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public void GetStatus(TextWriter log) public void GetStatus(TextWriter log)
{ {
...@@ -1202,7 +1202,7 @@ public void GetStatus(TextWriter log) ...@@ -1202,7 +1202,7 @@ public void GetStatus(TextWriter log)
LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago", LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo); Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
} }
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None) internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
...@@ -1526,7 +1526,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1526,7 +1526,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ } { }
} }
return true; return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Trace(ex.Message); Trace(ex.Message);
...@@ -1749,7 +1749,7 @@ internal ServerEndPoint SelectServer(int db, RedisCommand command, CommandFlags ...@@ -1749,7 +1749,7 @@ internal ServerEndPoint SelectServer(int db, RedisCommand command, CommandFlags
{ {
return serverSelectionStrategy.Select(db, command, key, flags); return serverSelectionStrategy.Select(db, command, key, flags);
} }
private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server) private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
{ {
message.SetSource(processor, resultBox); message.SetSource(processor, resultBox);
...@@ -1768,8 +1768,8 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce ...@@ -1768,8 +1768,8 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
switch (server.ServerType) switch (server.ServerType)
{ {
case ServerType.Cluster: case ServerType.Cluster:
case ServerType.Twemproxy: // strictly speaking twemproxy uses a different hashing algo, but the hash-tag behavior is case ServerType.Twemproxy: // strictly speaking twemproxy uses a different hashing algo, but the hash-tag behavior is
// the same, so this does a pretty good job of spotting illegal commands before sending them // the same, so this does a pretty good job of spotting illegal commands before sending them
if (message.GetHashSlot(ServerSelectionStrategy) == ServerSelectionStrategy.MultipleSlots) if (message.GetHashSlot(ServerSelectionStrategy) == ServerSelectionStrategy.MultipleSlots)
{ {
throw ExceptionFactory.MultiSlot(IncludeDetailInExceptions, message); throw ExceptionFactory.MultiSlot(IncludeDetailInExceptions, message);
...@@ -1787,17 +1787,17 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce ...@@ -1787,17 +1787,17 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
{ {
var profCtx = profiler?.GetContext(); var profCtx = profiler?.GetContext();
if (profCtx != null && profiledCommands.TryGetValue(profCtx, out ConcurrentProfileStorageCollection inFlightForCtx)) if (profCtx != null && profiledCommands.TryGetValue(profCtx, out ConcurrentProfileStorageCollection inFlightForCtx))
{ {
message.SetProfileStorage(ProfileStorage.NewWithContext(inFlightForCtx, server)); message.SetProfileStorage(ProfileStorage.NewWithContext(inFlightForCtx, server));
} }
if (message.Db >= 0) if (message.Db >= 0)
{ {
int availableDatabases = server.Databases; int availableDatabases = server.Databases;
if (availableDatabases > 0 && message.Db >= availableDatabases) if (availableDatabases > 0 && message.Db >= availableDatabases)
{ {
throw ExceptionFactory.DatabaseOutfRange(IncludeDetailInExceptions, message.Db, message, server); throw ExceptionFactory.DatabaseOutfRange(IncludeDetailInExceptions, message.Db, message, server);
} }
} }
Trace("Queueing on server: " + message); Trace("Queueing on server: " + message);
...@@ -1855,11 +1855,11 @@ public bool IsConnecting ...@@ -1855,11 +1855,11 @@ public bool IsConnecting
internal ConfigurationOptions RawConfig => configuration; internal ConfigurationOptions RawConfig => configuration;
internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy; internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy;
/// <summary> /// <summary>
/// Close all connections and release all resources associated with this object /// Close all connections and release all resources associated with this object
/// </summary> /// </summary>
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param> /// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public void Close(bool allowCommandsToComplete = true) public void Close(bool allowCommandsToComplete = true)
{ {
...@@ -1877,7 +1877,7 @@ public void Close(bool allowCommandsToComplete = true) ...@@ -1877,7 +1877,7 @@ public void Close(bool allowCommandsToComplete = true)
DisposeAndClearServers(); DisposeAndClearServers();
OnCloseReaderWriter(); OnCloseReaderWriter();
} }
partial void OnCloseReaderWriter(); partial void OnCloseReaderWriter();
private void DisposeAndClearServers() private void DisposeAndClearServers()
...@@ -1912,7 +1912,7 @@ private Task[] QuitAllServers() ...@@ -1912,7 +1912,7 @@ private Task[] QuitAllServers()
/// <summary> /// <summary>
/// Close all connections and release all resources associated with this object /// Close all connections and release all resources associated with this object
/// </summary> /// </summary>
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param> /// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public async Task CloseAsync(bool allowCommandsToComplete = true) public async Task CloseAsync(bool allowCommandsToComplete = true)
{ {
...@@ -1970,7 +1970,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un ...@@ -1970,7 +1970,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un
try try
{ {
throw unthrownException; throw unthrownException;
} }
catch (Exception ex) catch (Exception ex)
{ {
source.TrySetException(ex); source.TrySetException(ex);
...@@ -1978,7 +1978,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un ...@@ -1978,7 +1978,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un
GC.SuppressFinalize(source.Task); GC.SuppressFinalize(source.Task);
} }
} }
internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server) internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
...@@ -2027,7 +2027,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -2027,7 +2027,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
#endif #endif
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey); var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey);
data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) }; data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
void add(string lk, string sk, string v) void add(string lk, string sk, string v)
{ {
data.Add(Tuple.Create(lk, v)); data.Add(Tuple.Create(lk, v));
sb.Append(", ").Append(sk).Append(": ").Append(v); sb.Append(", ").Append(sk).Append(": ").Append(v);
...@@ -2063,9 +2063,9 @@ void add(string lk, string sk, string v) ...@@ -2063,9 +2063,9 @@ void add(string lk, string sk, string v)
data.Add(Tuple.Create("Busy-Workers", busyWorkerCount.ToString())); data.Add(Tuple.Create("Busy-Workers", busyWorkerCount.ToString()));
#endif #endif
#if FEATURE_PERFCOUNTER #if FEATURE_PERFCOUNTER
if (IncludePerformanceCountersInExceptions) if (IncludePerformanceCountersInExceptions)
{ {
add("Local-CPU", "Local-CPU", GetSystemCpuPercent()); add("Local-CPU", "Local-CPU", GetSystemCpuPercent());
} }
#endif #endif
sb.Append(" (Please take a look at this article for some common client-side issues that can cause timeouts: "); sb.Append(" (Please take a look at this article for some common client-side issues that can cause timeouts: ");
...@@ -2099,24 +2099,24 @@ void add(string lk, string sk, string v) ...@@ -2099,24 +2099,24 @@ void add(string lk, string sk, string v)
Trace(message + " received " + val); Trace(message + " received " + val);
return val; return val;
} }
} }
#if FEATURE_PERFCOUNTER #if FEATURE_PERFCOUNTER
internal static string GetThreadPoolAndCPUSummary(bool includePerformanceCounters) internal static string GetThreadPoolAndCPUSummary(bool includePerformanceCounters)
{ {
GetThreadPoolStats(out string iocp, out string worker); GetThreadPoolStats(out string iocp, out string worker);
var cpu = includePerformanceCounters ? GetSystemCpuPercent() : "n/a"; var cpu = includePerformanceCounters ? GetSystemCpuPercent() : "n/a";
return $"IOCP: {iocp}, WORKER: {worker}, Local-CPU: {cpu}"; return $"IOCP: {iocp}, WORKER: {worker}, Local-CPU: {cpu}";
} }
private static string GetSystemCpuPercent() private static string GetSystemCpuPercent()
{ {
return (PerfCounterHelper.TryGetSystemCPU(out float systemCPU)) return (PerfCounterHelper.TryGetSystemCPU(out float systemCPU))
? Math.Round(systemCPU, 2) + "%" ? Math.Round(systemCPU, 2) + "%"
: "unavailable"; : "unavailable";
} }
#endif #endif
#if FEATURE_THREADPOOL #if FEATURE_THREADPOOL
private static int GetThreadPoolStats(out string iocp, out string worker) private static int GetThreadPoolStats(out string iocp, out string worker)
{ {
ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxIoThreads); ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxIoThreads);
...@@ -2148,8 +2148,8 @@ private static int GetThreadPoolStats(out string iocp, out string worker) ...@@ -2148,8 +2148,8 @@ private static int GetThreadPoolStats(out string iocp, out string worker)
/// Limit at which to start recording unusual busy patterns (only one log will be retained at a time; /// Limit at which to start recording unusual busy patterns (only one log will be retained at a time;
/// set to a negative value to disable this feature) /// set to a negative value to disable this feature)
/// </summary> /// </summary>
public int StormLogThreshold { get; set; } = 15; public int StormLogThreshold { get; set; } = 15;
/// <summary> /// <summary>
/// Obtains the log of unusual busy patterns /// Obtains the log of unusual busy patterns
/// </summary> /// </summary>
...@@ -2166,12 +2166,12 @@ public void ResetStormLog() ...@@ -2166,12 +2166,12 @@ public void ResetStormLog()
Interlocked.Exchange(ref stormLogSnapshot, null); Interlocked.Exchange(ref stormLogSnapshot, null);
Interlocked.Exchange(ref haveStormLog, 0); Interlocked.Exchange(ref haveStormLog, 0);
} }
private long syncTimeouts, fireAndForgets; private long syncTimeouts, fireAndForgets;
/// <summary> /// <summary>
/// Request all compatible clients to reconfigure or reconnect /// Request all compatible clients to reconfigure or reconnect
/// </summary> /// </summary>
/// <param name="flags">The command flags to use.</param>2 /// <param name="flags">The command flags to use.</param>2
/// <returns>The number of instances known to have received the message (however, the actual number can be higher; returns -1 if the operation is pending)</returns> /// <returns>The number of instances known to have received the message (however, the actual number can be higher; returns -1 if the operation is pending)</returns>
public long PublishReconfigure(CommandFlags flags = CommandFlags.None) public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
...@@ -2187,17 +2187,17 @@ public long PublishReconfigure(CommandFlags flags = CommandFlags.None) ...@@ -2187,17 +2187,17 @@ public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
return PublishReconfigureImpl(flags); return PublishReconfigureImpl(flags);
} }
} }
private long PublishReconfigureImpl(CommandFlags flags) private long PublishReconfigureImpl(CommandFlags flags)
{ {
byte[] channel = ConfigurationChangedChannel; byte[] channel = ConfigurationChangedChannel;
if (channel == null) return 0; if (channel == null) return 0;
return GetSubscriber().Publish(channel, RedisLiterals.Wildcard, flags); return GetSubscriber().Publish(channel, RedisLiterals.Wildcard, flags);
} }
/// <summary> /// <summary>
/// Request all compatible clients to reconfigure or reconnect /// Request all compatible clients to reconfigure or reconnect
/// </summary> /// </summary>
/// <param name="flags">The command flags to use.</param> /// <param name="flags">The command flags to use.</param>
/// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns> /// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns>
public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None) public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
...@@ -2208,4 +2208,4 @@ public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None ...@@ -2208,4 +2208,4 @@ public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None
return GetSubscriber().PublishAsync(channel, RedisLiterals.Wildcard, flags); return GetSubscriber().PublishAsync(channel, RedisLiterals.Wildcard, flags);
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment