Commit e11171dc authored by Nick Craver's avatar Nick Craver

Auto-props cleanup

parent c76b249d
...@@ -45,7 +45,7 @@ public ServerCounters GetCounters() ...@@ -45,7 +45,7 @@ public ServerCounters GetCounters()
{ {
counters.Add(snapshot[i].GetCounters()); counters.Add(snapshot[i].GetCounters());
} }
unprocessableCompletionManager.GetCounters(counters.Other); UnprocessableCompletionManager.GetCounters(counters.Other);
return counters; return counters;
} }
...@@ -123,7 +123,7 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp ...@@ -123,7 +123,7 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp
var handler = ConnectionFailed; var handler = ConnectionFailed;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception) new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception)
); );
} }
...@@ -142,7 +142,7 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con ...@@ -142,7 +142,7 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con
var handler = InternalError; var handler = InternalError;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin) new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin)
); );
} }
...@@ -158,7 +158,7 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT ...@@ -158,7 +158,7 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT
var handler = ConnectionRestored; var handler = ConnectionRestored;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null) new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null)
); );
} }
...@@ -170,7 +170,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs ...@@ -170,7 +170,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs
if (isDisposed) return; if (isDisposed) return;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
new EndPointEventArgs(handler, this, endpoint) new EndPointEventArgs(handler, this, endpoint)
); );
} }
...@@ -189,7 +189,7 @@ internal void OnErrorMessage(EndPoint endpoint, string message) ...@@ -189,7 +189,7 @@ internal void OnErrorMessage(EndPoint endpoint, string message)
var handler = ErrorMessage; var handler = ErrorMessage;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
new RedisErrorEventArgs(handler, this, endpoint, message) new RedisErrorEventArgs(handler, this, endpoint, message)
); );
} }
...@@ -481,7 +481,7 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ ...@@ -481,7 +481,7 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ
/// <summary> /// <summary>
/// Gets the timeout associated with the connections /// Gets the timeout associated with the connections
/// </summary> /// </summary>
public int TimeoutMilliseconds => timeoutMilliseconds; public int TimeoutMilliseconds { get; }
/// <summary> /// <summary>
/// Gets all endpoints defined on the server /// Gets all endpoints defined on the server
...@@ -494,13 +494,11 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false) ...@@ -494,13 +494,11 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false)
return ConvertHelper.ConvertAll(serverSnapshot, x => x.EndPoint); return ConvertHelper.ConvertAll(serverSnapshot, x => x.EndPoint);
} }
private readonly int timeoutMilliseconds;
private readonly ConfigurationOptions configuration; private readonly ConfigurationOptions configuration;
internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved) internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
{ {
return serverSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved); return ServerSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved);
} }
/// <summary> /// <summary>
...@@ -510,7 +508,7 @@ internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool i ...@@ -510,7 +508,7 @@ internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool i
public void Wait(Task task) public void Wait(Task task)
{ {
if (task == null) throw new ArgumentNullException(nameof(task)); if (task == null) throw new ArgumentNullException(nameof(task));
if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException(); if (!task.Wait(TimeoutMilliseconds)) throw new TimeoutException();
} }
/// <summary> /// <summary>
...@@ -521,7 +519,7 @@ public void Wait(Task task) ...@@ -521,7 +519,7 @@ public void Wait(Task task)
public T Wait<T>(Task<T> task) public T Wait<T>(Task<T> task)
{ {
if (task == null) throw new ArgumentNullException(nameof(task)); if (task == null) throw new ArgumentNullException(nameof(task));
if (!task.Wait(timeoutMilliseconds)) throw new TimeoutException(); if (!task.Wait(TimeoutMilliseconds)) throw new TimeoutException();
return task.Result; return task.Result;
} }
...@@ -533,10 +531,10 @@ public void WaitAll(params Task[] tasks) ...@@ -533,10 +531,10 @@ public void WaitAll(params Task[] tasks)
{ {
if (tasks == null) throw new ArgumentNullException(nameof(tasks)); if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Length == 0) return; if (tasks.Length == 0) return;
if (!Task.WaitAll(tasks, timeoutMilliseconds)) throw new TimeoutException(); if (!Task.WaitAll(tasks, TimeoutMilliseconds)) throw new TimeoutException();
} }
private bool WaitAllIgnoreErrors(Task[] tasks) => WaitAllIgnoreErrors(tasks, timeoutMilliseconds); private bool WaitAllIgnoreErrors(Task[] tasks) => WaitAllIgnoreErrors(tasks, TimeoutMilliseconds);
private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
{ {
...@@ -677,7 +675,7 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new) ...@@ -677,7 +675,7 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
var handler = HashSlotMoved; var handler = HashSlotMoved;
if (handler != null) if (handler != null)
{ {
unprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new) new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new)
); );
} }
...@@ -687,7 +685,7 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new) ...@@ -687,7 +685,7 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
/// Compute the hash-slot of a specified key /// Compute the hash-slot of a specified key
/// </summary> /// </summary>
/// <param name="key">The key to get a hash slot ID for.</param> /// <param name="key">The key to get a hash slot ID for.</param>
public int HashSlot(RedisKey key) => serverSelectionStrategy.HashSlot(key); public int HashSlot(RedisKey key) => ServerSelectionStrategy.HashSlot(key);
internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags) internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags)
{ {
...@@ -910,11 +908,11 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -910,11 +908,11 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
} }
PreserveAsyncOrder = configuration.PreserveAsyncOrder; PreserveAsyncOrder = configuration.PreserveAsyncOrder;
timeoutMilliseconds = configuration.SyncTimeout; TimeoutMilliseconds = configuration.SyncTimeout;
OnCreateReaderWriter(configuration); OnCreateReaderWriter(configuration);
unprocessableCompletionManager = new CompletionManager(this, "multiplexer"); UnprocessableCompletionManager = new CompletionManager(this, "multiplexer");
serverSelectionStrategy = new ServerSelectionStrategy(this); ServerSelectionStrategy = new ServerSelectionStrategy(this);
var configChannel = configuration.ConfigurationChannel; var configChannel = configuration.ConfigurationChannel;
if (!string.IsNullOrWhiteSpace(configChannel)) if (!string.IsNullOrWhiteSpace(configChannel))
...@@ -975,7 +973,7 @@ internal long LastHeartbeatSecondsAgo ...@@ -975,7 +973,7 @@ internal long LastHeartbeatSecondsAgo
internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastGlobalHeartbeatTicks)) / 1000; internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastGlobalHeartbeatTicks)) / 1000;
internal CompletionManager UnprocessableCompletionManager => unprocessableCompletionManager; internal CompletionManager UnprocessableCompletionManager { get; }
/// <summary> /// <summary>
/// Obtain a pub/sub subscriber connection to the specified server /// Obtain a pub/sub subscriber connection to the specified server
...@@ -1086,8 +1084,6 @@ internal static void TraceWithoutContext(bool condition, string message, [Caller ...@@ -1086,8 +1084,6 @@ internal static void TraceWithoutContext(bool condition, string message, [Caller
if (condition) OnTraceWithoutContext(message, category); if (condition) OnTraceWithoutContext(message, category);
} }
private readonly CompletionManager unprocessableCompletionManager;
/// <summary> /// <summary>
/// The number of operations that have been performed on all connections /// The number of operations that have been performed on all connections
/// </summary> /// </summary>
...@@ -1234,7 +1230,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1234,7 +1230,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (configuration.ResolveDns && configuration.HasDnsEndPoints()) if (configuration.ResolveDns && configuration.HasDnsEndPoints())
{ {
var dns = configuration.ResolveEndPointsAsync(this, log).ObserveErrors(); var dns = configuration.ResolveEndPointsAsync(this, log).ObserveErrors();
if ((await Task.WhenAny(dns, Task.Delay(timeoutMilliseconds)).ForAwait()) != dns) if ((await Task.WhenAny(dns, Task.Delay(TimeoutMilliseconds)).ForAwait()) != dns)
{ {
throw new TimeoutException("Timeout resolving endpoints"); throw new TimeoutException("Timeout resolving endpoints");
} }
...@@ -1443,15 +1439,15 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1443,15 +1439,15 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
// set the serverSelectionStrategy // set the serverSelectionStrategy
if (RawConfig.Proxy == Proxy.Twemproxy) if (RawConfig.Proxy == Proxy.Twemproxy)
{ {
serverSelectionStrategy.ServerType = ServerType.Twemproxy; ServerSelectionStrategy.ServerType = ServerType.Twemproxy;
} }
else if (standaloneCount == 0 && sentinelCount > 0) else if (standaloneCount == 0 && sentinelCount > 0)
{ {
serverSelectionStrategy.ServerType = ServerType.Sentinel; ServerSelectionStrategy.ServerType = ServerType.Sentinel;
} }
else else
{ {
serverSelectionStrategy.ServerType = ServerType.Standalone; ServerSelectionStrategy.ServerType = ServerType.Standalone;
} }
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait(); var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters) foreach (var master in masters)
...@@ -1468,10 +1464,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1468,10 +1464,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
} }
else else
{ {
serverSelectionStrategy.ServerType = ServerType.Cluster; ServerSelectionStrategy.ServerType = ServerType.Cluster;
long coveredSlots = serverSelectionStrategy.CountCoveredSlots(); long coveredSlots = ServerSelectionStrategy.CountCoveredSlots();
LogLocked(log, "Cluster: {0} of {1} slots covered", LogLocked(log, "Cluster: {0} of {1} slots covered",
coveredSlots, serverSelectionStrategy.TotalSlots); coveredSlots, ServerSelectionStrategy.TotalSlots);
} }
if (!first) if (!first)
{ {
...@@ -1724,15 +1720,13 @@ internal void UpdateClusterRange(ClusterConfiguration configuration) ...@@ -1724,15 +1720,13 @@ internal void UpdateClusterRange(ClusterConfiguration configuration)
foreach (var slot in node.Slots) foreach (var slot in node.Slots)
{ {
var server = GetServerEndPoint(node.EndPoint); var server = GetServerEndPoint(node.EndPoint);
if (server != null) serverSelectionStrategy.UpdateClusterRange(slot.From, slot.To, server); if (server != null) ServerSelectionStrategy.UpdateClusterRange(slot.From, slot.To, server);
} }
} }
} }
private Timer pulse; private Timer pulse;
private readonly ServerSelectionStrategy serverSelectionStrategy;
internal ServerEndPoint[] GetServerSnapshot() internal ServerEndPoint[] GetServerSnapshot()
{ {
var tmp = serverSnapshot; var tmp = serverSnapshot;
...@@ -1742,12 +1736,12 @@ internal ServerEndPoint[] GetServerSnapshot() ...@@ -1742,12 +1736,12 @@ internal ServerEndPoint[] GetServerSnapshot()
internal ServerEndPoint SelectServer(Message message) internal ServerEndPoint SelectServer(Message message)
{ {
if (message == null) return null; if (message == null) return null;
return serverSelectionStrategy.Select(message); return ServerSelectionStrategy.Select(message);
} }
internal ServerEndPoint SelectServer(int db, RedisCommand command, CommandFlags flags, RedisKey key) internal ServerEndPoint SelectServer(int db, RedisCommand command, CommandFlags flags, RedisKey key)
{ {
return serverSelectionStrategy.Select(db, command, key, flags); return ServerSelectionStrategy.Select(db, command, key, flags);
} }
private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server) private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
...@@ -1855,7 +1849,7 @@ public bool IsConnecting ...@@ -1855,7 +1849,7 @@ public bool IsConnecting
internal ConfigurationOptions RawConfig => configuration; internal ConfigurationOptions RawConfig => configuration;
internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy; internal ServerSelectionStrategy ServerSelectionStrategy { get; }
/// <summary> /// <summary>
/// Close all connections and release all resources associated with this object /// Close all connections and release all resources associated with this object
...@@ -2005,7 +1999,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -2005,7 +1999,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot()); throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
} }
if (Monitor.Wait(source, timeoutMilliseconds)) if (Monitor.Wait(source, TimeoutMilliseconds))
{ {
Trace("Timeley response to " + message); Trace("Timeley response to " + message);
} }
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Net; using System.Net;
using System.Threading; using System.Threading;
...@@ -74,7 +74,7 @@ internal void OnMessage(RedisChannel subscription, RedisChannel channel, RedisVa ...@@ -74,7 +74,7 @@ internal void OnMessage(RedisChannel subscription, RedisChannel channel, RedisVa
completable = sub.ForInvoke(channel, payload); completable = sub.ForInvoke(channel, payload);
} }
} }
if (completable != null) unprocessableCompletionManager.CompleteSyncOrAsync(completable); if (completable != null) UnprocessableCompletionManager.CompleteSyncOrAsync(completable);
} }
internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
...@@ -322,4 +322,4 @@ public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisVal ...@@ -322,4 +322,4 @@ public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisVal
return multiplexer.RemoveSubscription(channel, handler, flags, asyncState); return multiplexer.RemoveSubscription(channel, handler, flags, asyncState);
} }
} }
} }
\ No newline at end of file
...@@ -27,10 +27,8 @@ internal sealed partial class ServerEndPoint : IDisposable ...@@ -27,10 +27,8 @@ internal sealed partial class ServerEndPoint : IDisposable
internal volatile ServerEndPoint[] Slaves = NoSlaves; internal volatile ServerEndPoint[] Slaves = NoSlaves;
private static readonly Regex nameSanitizer = new Regex("[^!-~]", RegexOptions.Compiled); private static readonly Regex nameSanitizer = new Regex("[^!-~]", RegexOptions.Compiled);
private static readonly ServerEndPoint[] NoSlaves = new ServerEndPoint[0]; private static readonly ServerEndPoint[] NoSlaves = new ServerEndPoint[0];
private readonly EndPoint endpoint;
private readonly Hashtable knownScripts = new Hashtable(StringComparer.Ordinal); private readonly Hashtable knownScripts = new Hashtable(StringComparer.Ordinal);
private readonly ConnectionMultiplexer multiplexer;
private int databases, writeEverySeconds; private int databases, writeEverySeconds;
private PhysicalBridge interactive, subscription; private PhysicalBridge interactive, subscription;
...@@ -48,8 +46,8 @@ internal void ResetNonConnected() ...@@ -48,8 +46,8 @@ internal void ResetNonConnected()
public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, TextWriter log) public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, TextWriter log)
{ {
this.multiplexer = multiplexer; Multiplexer = multiplexer;
this.endpoint = endpoint; EndPoint = endpoint;
var config = multiplexer.RawConfig; var config = multiplexer.RawConfig;
version = config.DefaultVersion; version = config.DefaultVersion;
slaveReadOnly = true; slaveReadOnly = true;
...@@ -71,7 +69,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, Text ...@@ -71,7 +69,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, Text
public int Databases { get { return databases; } set { SetConfig(ref databases, value); } } public int Databases { get { return databases; } set { SetConfig(ref databases, value); } }
public EndPoint EndPoint => endpoint; public EndPoint EndPoint { get; }
public bool HasDatabases => serverType == ServerType.Standalone; public bool HasDatabases => serverType == ServerType.Standalone;
...@@ -130,7 +128,8 @@ public long OperationCount ...@@ -130,7 +128,8 @@ public long OperationCount
public Version Version { get { return version; } set { SetConfig(ref version, value); } } public Version Version { get { return version; } set { SetConfig(ref version, value); } }
public int WriteEverySeconds { get { return writeEverySeconds; } set { SetConfig(ref writeEverySeconds, value); } } public int WriteEverySeconds { get { return writeEverySeconds; } set { SetConfig(ref writeEverySeconds, value); } }
internal ConnectionMultiplexer Multiplexer => multiplexer;
internal ConnectionMultiplexer Multiplexer { get; }
public void ClearUnselectable(UnselectableFlags flags) public void ClearUnselectable(UnselectableFlags flags)
{ {
...@@ -140,7 +139,7 @@ public void ClearUnselectable(UnselectableFlags flags) ...@@ -140,7 +139,7 @@ public void ClearUnselectable(UnselectableFlags flags)
unselectableReasons &= ~flags; unselectableReasons &= ~flags;
if (unselectableReasons != oldFlags) if (unselectableReasons != oldFlags)
{ {
multiplexer.Trace(unselectableReasons == 0 ? "Now usable" : ("Now unusable: " + flags), ToString()); Multiplexer.Trace(unselectableReasons == 0 ? "Now usable" : ("Now unusable: " + flags), ToString());
} }
} }
} }
...@@ -196,9 +195,9 @@ public void SetClusterConfiguration(ClusterConfiguration configuration) ...@@ -196,9 +195,9 @@ public void SetClusterConfiguration(ClusterConfiguration configuration)
if (configuration != null) if (configuration != null)
{ {
multiplexer.Trace("Updating cluster ranges..."); Multiplexer.Trace("Updating cluster ranges...");
multiplexer.UpdateClusterRange(configuration); Multiplexer.UpdateClusterRange(configuration);
multiplexer.Trace("Resolving genealogy..."); Multiplexer.Trace("Resolving genealogy...");
var thisNode = configuration.Nodes.FirstOrDefault(x => x.EndPoint.Equals(EndPoint)); var thisNode = configuration.Nodes.FirstOrDefault(x => x.EndPoint.Equals(EndPoint));
if (thisNode != null) if (thisNode != null)
{ {
...@@ -208,17 +207,17 @@ public void SetClusterConfiguration(ClusterConfiguration configuration) ...@@ -208,17 +207,17 @@ public void SetClusterConfiguration(ClusterConfiguration configuration)
{ {
if (node.NodeId == thisNode.ParentNodeId) if (node.NodeId == thisNode.ParentNodeId)
{ {
master = multiplexer.GetServerEndPoint(node.EndPoint); master = Multiplexer.GetServerEndPoint(node.EndPoint);
} }
else if (node.ParentNodeId == thisNode.NodeId) else if (node.ParentNodeId == thisNode.NodeId)
{ {
(slaves ?? (slaves = new List<ServerEndPoint>())).Add(multiplexer.GetServerEndPoint(node.EndPoint)); (slaves ?? (slaves = new List<ServerEndPoint>())).Add(Multiplexer.GetServerEndPoint(node.EndPoint));
} }
} }
Master = master; Master = master;
Slaves = slaves?.ToArray() ?? NoSlaves; Slaves = slaves?.ToArray() ?? NoSlaves;
} }
multiplexer.Trace("Cluster configured"); Multiplexer.Trace("Cluster configured");
} }
} }
...@@ -230,7 +229,7 @@ public void SetUnselectable(UnselectableFlags flags) ...@@ -230,7 +229,7 @@ public void SetUnselectable(UnselectableFlags flags)
unselectableReasons |= flags; unselectableReasons |= flags;
if (unselectableReasons != oldFlags) if (unselectableReasons != oldFlags)
{ {
multiplexer.Trace(unselectableReasons == 0 ? "Now usable" : ("Now unusable: " + flags), ToString()); Multiplexer.Trace(unselectableReasons == 0 ? "Now usable" : ("Now unusable: " + flags), ToString());
} }
} }
} }
...@@ -261,7 +260,7 @@ internal void AutoConfigure(PhysicalConnection connection) ...@@ -261,7 +260,7 @@ internal void AutoConfigure(PhysicalConnection connection)
return; return;
} }
var commandMap = multiplexer.CommandMap; var commandMap = Multiplexer.CommandMap;
const CommandFlags flags = CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect; const CommandFlags flags = CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect;
var features = GetFeatures(); var features = GetFeatures();
...@@ -269,7 +268,7 @@ internal void AutoConfigure(PhysicalConnection connection) ...@@ -269,7 +268,7 @@ internal void AutoConfigure(PhysicalConnection connection)
if (commandMap.IsAvailable(RedisCommand.CONFIG)) if (commandMap.IsAvailable(RedisCommand.CONFIG))
{ {
if (multiplexer.RawConfig.KeepAlive <= 0) if (Multiplexer.RawConfig.KeepAlive <= 0)
{ {
msg = Message.Create(-1, flags, RedisCommand.CONFIG, RedisLiterals.GET, RedisLiterals.timeout); msg = Message.Create(-1, flags, RedisCommand.CONFIG, RedisLiterals.GET, RedisLiterals.timeout);
msg.SetInternalCall(); msg.SetInternalCall();
...@@ -325,7 +324,7 @@ internal void AutoConfigure(PhysicalConnection connection) ...@@ -325,7 +324,7 @@ internal void AutoConfigure(PhysicalConnection connection)
internal Task Close() internal Task Close()
{ {
var tmp = interactive; var tmp = interactive;
if (tmp == null || !tmp.IsConnected || !multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT)) if (tmp == null || !tmp.IsConnected || !Multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT))
{ {
return CompletedTask<bool>.Default(null); return CompletedTask<bool>.Default(null);
} }
...@@ -362,7 +361,7 @@ internal string RunId ...@@ -362,7 +361,7 @@ internal string RunId
internal ServerCounters GetCounters() internal ServerCounters GetCounters()
{ {
var counters = new ServerCounters(endpoint); var counters = new ServerCounters(EndPoint);
interactive?.GetCounters(counters.Interactive); interactive?.GetCounters(counters.Interactive);
subscription?.GetCounters(counters.Subscription); subscription?.GetCounters(counters.Subscription);
return counters; return counters;
...@@ -416,12 +415,12 @@ internal Message GetTracerMessage(bool assertIdentity) ...@@ -416,12 +415,12 @@ internal Message GetTracerMessage(bool assertIdentity)
// we'll do the best with what we have available. // we'll do the best with what we have available.
// note that the muxer-ctor asserts that one of ECHO, PING, TIME of GET is available // note that the muxer-ctor asserts that one of ECHO, PING, TIME of GET is available
// see also: TracerProcessor // see also: TracerProcessor
var map = multiplexer.CommandMap; var map = Multiplexer.CommandMap;
Message msg; Message msg;
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.FireAndForget; const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.FireAndForget;
if (assertIdentity && map.IsAvailable(RedisCommand.ECHO)) if (assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{ {
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId); msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)Multiplexer.UniqueId);
} }
else if (map.IsAvailable(RedisCommand.PING)) else if (map.IsAvailable(RedisCommand.PING))
{ {
...@@ -434,12 +433,12 @@ internal Message GetTracerMessage(bool assertIdentity) ...@@ -434,12 +433,12 @@ internal Message GetTracerMessage(bool assertIdentity)
else if (!assertIdentity && map.IsAvailable(RedisCommand.ECHO)) else if (!assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{ {
// we'll use echo as a PING substitute if it is all we have (in preference to EXISTS) // we'll use echo as a PING substitute if it is all we have (in preference to EXISTS)
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId); msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)Multiplexer.UniqueId);
} }
else else
{ {
map.AssertAvailable(RedisCommand.EXISTS); map.AssertAvailable(RedisCommand.EXISTS);
msg = Message.Create(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId); msg = Message.Create(0, flags, RedisCommand.EXISTS, (RedisValue)Multiplexer.UniqueId);
} }
msg.SetInternalCall(); msg.SetInternalCall();
return msg; return msg;
...@@ -472,9 +471,9 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -472,9 +471,9 @@ internal void OnFullyEstablished(PhysicalConnection connection)
var bridge = connection.Bridge; var bridge = connection.Bridge;
if (bridge == subscription) if (bridge == subscription)
{ {
multiplexer.ResendSubscriptions(this); Multiplexer.ResendSubscriptions(this);
} }
multiplexer.OnConnectionRestored(endpoint, bridge.ConnectionType); Multiplexer.OnConnectionRestored(EndPoint, bridge.ConnectionType);
} }
catch (Exception ex) catch (Exception ex)
{ {
...@@ -498,7 +497,7 @@ internal bool CheckInfoReplication() ...@@ -498,7 +497,7 @@ internal bool CheckInfoReplication()
{ {
lastInfoReplicationCheckTicks = Environment.TickCount; lastInfoReplicationCheckTicks = Environment.TickCount;
PhysicalBridge bridge; PhysicalBridge bridge;
if (version >= RedisFeatures.v2_8_0 && multiplexer.CommandMap.IsAvailable(RedisCommand.INFO) if (version >= RedisFeatures.v2_8_0 && Multiplexer.CommandMap.IsAvailable(RedisCommand.INFO)
&& (bridge = GetBridge(ConnectionType.Interactive, false)) != null) && (bridge = GetBridge(ConnectionType.Interactive, false)) != null)
{ {
var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication); var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication);
...@@ -524,7 +523,7 @@ internal void OnHeartbeat() ...@@ -524,7 +523,7 @@ internal void OnHeartbeat()
} }
catch (Exception ex) catch (Exception ex)
{ {
multiplexer.OnInternalError(ex, EndPoint); Multiplexer.OnInternalError(ex, EndPoint);
} }
finally finally
{ {
...@@ -541,7 +540,7 @@ internal Task<T> QueueDirectAsync<T>(Message message, ResultProcessor<T> process ...@@ -541,7 +540,7 @@ internal Task<T> QueueDirectAsync<T>(Message message, ResultProcessor<T> process
if (bridge == null) bridge = GetBridge(message.Command); if (bridge == null) bridge = GetBridge(message.Command);
if (!bridge.TryEnqueue(message, isSlave)) if (!bridge.TryEnqueue(message, isSlave))
{ {
ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(multiplexer.IncludeDetailInExceptions, multiplexer.IncludePerformanceCountersInExceptions, message.Command, message, this, multiplexer.GetServerSnapshot())); ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(Multiplexer.IncludeDetailInExceptions, Multiplexer.IncludePerformanceCountersInExceptions, message.Command, message, this, Multiplexer.GetServerSnapshot()));
} }
return tcs.Task; return tcs.Task;
} }
...@@ -551,7 +550,7 @@ internal void QueueDirectFireAndForget<T>(Message message, ResultProcessor<T> pr ...@@ -551,7 +550,7 @@ internal void QueueDirectFireAndForget<T>(Message message, ResultProcessor<T> pr
if (message != null) if (message != null)
{ {
message.SetSource(processor, null); message.SetSource(processor, null);
multiplexer.Trace("Enqueue: " + message); Multiplexer.Trace("Enqueue: " + message);
(bridge ?? GetBridge(message.Command)).TryEnqueue(message, isSlave); (bridge ?? GetBridge(message.Command)).TryEnqueue(message, isSlave);
} }
} }
...@@ -571,7 +570,7 @@ internal Task<bool> SendTracer(TextWriter log = null) ...@@ -571,7 +570,7 @@ internal Task<bool> SendTracer(TextWriter log = null)
internal string Summary() internal string Summary()
{ {
var sb = new StringBuilder(Format.ToString(endpoint)) var sb = new StringBuilder(Format.ToString(EndPoint))
.Append(": ").Append(serverType).Append(" v").Append(version).Append(", ").Append(isSlave ? "slave" : "master"); .Append(": ").Append(serverType).Append(" v").Append(version).Append(", ").Append(isSlave ? "slave" : "master");
if (databases > 0) sb.Append("; ").Append(databases).Append(" databases"); if (databases > 0) sb.Append("; ").Append(databases).Append(" databases");
...@@ -609,12 +608,12 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, ...@@ -609,12 +608,12 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
message.SetSource(processor, null); message.SetSource(processor, null);
if (connection == null) if (connection == null)
{ {
multiplexer.Trace("Enqueue: " + message); Multiplexer.Trace("Enqueue: " + message);
GetBridge(message.Command).TryEnqueue(message, isSlave); GetBridge(message.Command).TryEnqueue(message, isSlave);
} }
else else
{ {
multiplexer.Trace("Writing direct: " + message); Multiplexer.Trace("Writing direct: " + message);
connection.Bridge.WriteMessageDirect(connection, message); connection.Bridge.WriteMessageDirect(connection, message);
} }
} }
...@@ -622,7 +621,7 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, ...@@ -622,7 +621,7 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log) private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
{ {
multiplexer.Trace(type.ToString()); Multiplexer.Trace(type.ToString());
var bridge = new PhysicalBridge(this, type); var bridge = new PhysicalBridge(this, type);
bridge.TryConnect(log); bridge.TryConnect(log);
return bridge; return bridge;
...@@ -630,30 +629,30 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log) ...@@ -630,30 +629,30 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
private void Handshake(PhysicalConnection connection, TextWriter log) private void Handshake(PhysicalConnection connection, TextWriter log)
{ {
multiplexer.LogLocked(log, "Server handshake"); Multiplexer.LogLocked(log, "Server handshake");
if (connection == null) if (connection == null)
{ {
multiplexer.Trace("No connection!?"); Multiplexer.Trace("No connection!?");
return; return;
} }
Message msg; Message msg;
string password = multiplexer.RawConfig.Password; string password = Multiplexer.RawConfig.Password;
if (!string.IsNullOrWhiteSpace(password)) if (!string.IsNullOrWhiteSpace(password))
{ {
multiplexer.LogLocked(log, "Authenticating (password)"); Multiplexer.LogLocked(log, "Authenticating (password)");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password);
msg.SetInternalCall(); msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK);
} }
if (multiplexer.CommandMap.IsAvailable(RedisCommand.CLIENT)) if (Multiplexer.CommandMap.IsAvailable(RedisCommand.CLIENT))
{ {
string name = multiplexer.ClientName; string name = Multiplexer.ClientName;
if (!string.IsNullOrWhiteSpace(name)) if (!string.IsNullOrWhiteSpace(name))
{ {
name = nameSanitizer.Replace(name, ""); name = nameSanitizer.Replace(name, "");
if (!string.IsNullOrWhiteSpace(name)) if (!string.IsNullOrWhiteSpace(name))
{ {
multiplexer.LogLocked(log, "Setting client name: {0}", name); Multiplexer.LogLocked(log, "Setting client name: {0}", name);
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name);
msg.SetInternalCall(); msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK);
...@@ -665,10 +664,10 @@ private void Handshake(PhysicalConnection connection, TextWriter log) ...@@ -665,10 +664,10 @@ private void Handshake(PhysicalConnection connection, TextWriter log)
if (connType == ConnectionType.Interactive) if (connType == ConnectionType.Interactive)
{ {
multiplexer.LogLocked(log, "Auto-configure..."); Multiplexer.LogLocked(log, "Auto-configure...");
AutoConfigure(connection); AutoConfigure(connection);
} }
multiplexer.LogLocked(log, "Sending critical tracer: {0}", connection.Bridge); Multiplexer.LogLocked(log, "Sending critical tracer: {0}", connection.Bridge);
var tracer = GetTracerMessage(true); var tracer = GetTracerMessage(true);
tracer = LoggingMessage.Create(log, tracer); tracer = LoggingMessage.Create(log, tracer);
WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection); WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection);
...@@ -677,14 +676,14 @@ private void Handshake(PhysicalConnection connection, TextWriter log) ...@@ -677,14 +676,14 @@ private void Handshake(PhysicalConnection connection, TextWriter log)
// we will be in subscriber mode: regular commands cannot be sent // we will be in subscriber mode: regular commands cannot be sent
if (connType == ConnectionType.Subscription) if (connType == ConnectionType.Subscription)
{ {
var configChannel = multiplexer.ConfigurationChangedChannel; var configChannel = Multiplexer.ConfigurationChangedChannel;
if (configChannel != null) if (configChannel != null)
{ {
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.SUBSCRIBE, (RedisChannel)configChannel); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.SUBSCRIBE, (RedisChannel)configChannel);
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions);
} }
} }
multiplexer.LogLocked(log, "Flushing outbound buffer"); Multiplexer.LogLocked(log, "Flushing outbound buffer");
connection.Flush(); connection.Flush();
} }
...@@ -692,9 +691,9 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller ...@@ -692,9 +691,9 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller
{ {
if (!EqualityComparer<T>.Default.Equals(field, value)) if (!EqualityComparer<T>.Default.Equals(field, value))
{ {
multiplexer.Trace(caller + " changed from " + field + " to " + value, "Configuration"); Multiplexer.Trace(caller + " changed from " + field + " to " + value, "Configuration");
field = value; field = value;
multiplexer.ReconfigureIfNeeded(endpoint, false, caller); Multiplexer.ReconfigureIfNeeded(EndPoint, false, caller);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment