Commit cf91c206 authored by Jon Cole's avatar Jon Cole
parents 0cd9148c 2bc2fb8f
......@@ -243,7 +243,7 @@ private static RedisConnection GetOldStyleConnection(string host, int port, bool
protected static TimeSpan RunConcurrent(Action work, int threads, int timeout = 10000, [CallerMemberName] string caller = null)
{
if (work == null) throw new ArgumentNullException("work");
if (threads < 1) throw new ArgumentOutOfRangeException("theads");
if (threads < 1) throw new ArgumentOutOfRangeException("threads");
if(string.IsNullOrWhiteSpace(caller)) caller = Me();
Stopwatch watch = null;
ManualResetEvent allDone = new ManualResetEvent(false);
......
......@@ -172,6 +172,7 @@ public sealed class ClusterConfiguration
private readonly ServerSelectionStrategy serverSelectionStrategy;
internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, string nodes, EndPoint origin)
{
// Beware: Any exception thrown here will wreak silent havoc like inability to connect to cluster nodes or non returning calls
this.serverSelectionStrategy = serverSelectionStrategy;
this.origin = origin;
using (var reader = new StringReader(nodes))
......@@ -180,9 +181,36 @@ internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, s
while ((line = reader.ReadLine()) != null)
{
if (string.IsNullOrWhiteSpace(line)) continue;
var node = new ClusterNode(this, line, origin);
nodeLookup.Add(node.EndPoint, node);
// Be resilient to ":0 {master,slave},fail,noaddr" nodes
if (node.IsNoAddr)
continue;
if (nodeLookup.ContainsKey(node.EndPoint))
{
// Deal with conflicting node entries for the same endpoint
// This can happen in dynamic environments when a node goes down and a new one is created
// to replace it.
if (!node.IsConnected)
{
// The node we're trying to add is probably about to become stale. Ignore it.
continue;
}
else if (!nodeLookup[node.EndPoint].IsConnected)
{
// The node we registered previously is probably stale. Replace it with a known good node.
nodeLookup[node.EndPoint] = node;
}
else
{
// We have conflicting connected nodes. There's nothing much we can do other than
// wait for the cluster state to converge and refresh on the next pass.
// The same is true if we have multiple disconnected nodes.
}
}
else
{
nodeLookup.Add(node.EndPoint, node);
}
}
}
}
......@@ -262,6 +290,10 @@ public sealed class ClusterNode : IEquatable<ClusterNode>, IComparable<ClusterN
private readonly bool isSlave;
private readonly bool isNoAddr;
private readonly bool isConnected;
private readonly string nodeId, parentNodeId, raw;
private readonly IList<SlotRange> slots;
......@@ -275,22 +307,18 @@ public sealed class ClusterNode : IEquatable<ClusterNode>, IComparable<ClusterN
internal ClusterNode() { }
internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint origin)
{
// http://redis.io/commands/cluster-nodes
this.configuration = configuration;
this.raw = raw;
var parts = raw.Split(StringSplits.Space);
var flags = parts[2].Split(StringSplits.Comma);
if (flags.Contains("myself"))
{
endpoint = origin;
}
else
{
endpoint = Format.TryParseEndPoint(parts[1]);
}
endpoint = Format.TryParseEndPoint(parts[1]);
nodeId = parts[0];
isSlave = flags.Contains("slave");
isNoAddr = flags.Contains("noaddr");
parentNodeId = string.IsNullOrWhiteSpace(parts[3]) ? null : parts[3];
List<SlotRange> slots = null;
......@@ -305,6 +333,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or
}
}
this.slots = slots == null ? NoSlots : slots.AsReadOnly();
this.isConnected = parts[7] == "connected"; // Can be "connected" or "disconnected"
}
/// <summary>
/// Gets all child nodes of the current node
......@@ -339,6 +368,16 @@ public IList<ClusterNode> Children
/// </summary>
public bool IsSlave { get { return isSlave; } }
/// <summary>
/// Gets whether this node is flagged as noaddr
/// </summary>
public bool IsNoAddr { get { return isNoAddr; } }
/// <summary>
/// Gets the node's connection status
/// </summary>
public bool IsConnected { get { return isConnected; } }
/// <summary>
/// Gets the unique node-id of the current node
/// </summary>
......
......@@ -33,7 +33,7 @@ private static readonly CommandMap
RedisCommand.SCRIPT,
RedisCommand.AUTH, RedisCommand.ECHO, RedisCommand.PING, RedisCommand.QUIT, RedisCommand.SELECT,
RedisCommand.ECHO, RedisCommand.PING, RedisCommand.QUIT, RedisCommand.SELECT,
RedisCommand.BGREWRITEAOF, RedisCommand.BGSAVE, RedisCommand.CLIENT, RedisCommand.CLUSTER, RedisCommand.CONFIG, RedisCommand.DBSIZE,
RedisCommand.DEBUG, RedisCommand.FLUSHALL, RedisCommand.FLUSHDB, RedisCommand.INFO, RedisCommand.LASTSAVE, RedisCommand.MONITOR, RedisCommand.SAVE,
......
......@@ -1203,114 +1203,155 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority;
var available = new Task<bool>[endpoints.Count];
var servers = new ServerEndPoint[available.Length];
List<ServerEndPoint> masters = new List<ServerEndPoint>(endpoints.Count);
bool useTieBreakers = !string.IsNullOrWhiteSpace(configuration.TieBreaker);
var tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey);
for (int i = 0; i < available.Length; i++)
ServerEndPoint[] servers = null;
Task<string>[] tieBreakers = null;
bool encounteredConnectedClusterServer = false;
Stopwatch watch = null;
int iterCount = first ? 2 : 1;
// this is fix for https://github.com/StackExchange/StackExchange.Redis/issues/300
// auto discoverability of cluster nodes is made synchronous.
// we try to connect to endpoints specified inside the user provided configuration
// and when we encounter one such endpoint to which we are able to successfully connect,
// we get the list of cluster nodes from this endpoint and try to proactively connect
// to these nodes instead of relying on auto configure
for (int iter = 0; iter < iterCount; ++iter)
{
Trace("Testing: " + Format.ToString(endpoints[i]));
var server = GetServerEndPoint(endpoints[i]);
//server.ReportNextFailure();
servers[i] = server;
if (reconfigureAll && server.IsConnected)
if (endpoints == null) break;
var available = new Task<bool>[endpoints.Count];
tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
servers = new ServerEndPoint[available.Length];
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey);
for (int i = 0; i < available.Length; i++)
{
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null);
}
available[i] = server.SendTracer(log);
if (useTieBreakers)
{
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
Trace("Testing: " + Format.ToString(endpoints[i]));
var server = GetServerEndPoint(endpoints[i]);
//server.ReportNextFailure();
servers[i] = server;
if (reconfigureAll && server.IsConnected)
{
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null);
}
available[i] = server.SendTracer(log);
if (useTieBreakers)
{
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
}
}
}
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(configuration.ConnectTimeout));
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(configuration.ConnectTimeout) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, configuration.ConnectTimeout, log).ForAwait();
List<ServerEndPoint> masters = new List<ServerEndPoint>(available.Length);
watch = watch ?? Stopwatch.StartNew();
var remaining = configuration.ConnectTimeout - checked((int)watch.ElapsedMilliseconds);
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(remaining));
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(remaining) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, remaining, log).ForAwait();
for (int i = 0; i < available.Length; i++)
{
var task = available[i];
Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
EndPointCollection updatedClusterEndpointCollection = null;
for (int i = 0; i < available.Length; i++)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
var task = available[i];
Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message);
failureMessage = ex.Message;
}
}
else if (task.IsCanceled)
{
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message);
failureMessage = ex.Message;
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i]));
}
}
else if (task.IsCanceled)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i]));
}
else if (task.IsCompleted)
{
var server = servers[i];
if (task.Result)
else if (task.IsCompleted)
{
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
// count the server types
switch (server.ServerType)
var server = servers[i];
if (task.Result)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
standaloneCount++;
break;
case ServerType.Sentinel:
sentinelCount++;
break;
case ServerType.Cluster:
clusterCount++;
break;
}
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
// count the server types
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
standaloneCount++;
break;
case ServerType.Sentinel:
sentinelCount++;
break;
case ServerType.Cluster:
clusterCount++;
break;
}
// set the server UnselectableFlags and update masters list
switch (server.ServerType)
if (clusterCount > 0 && !encounteredConnectedClusterServer)
{
// we have encountered a connected server with clustertype for the first time.
// so we will get list of other nodes from this server using "CLUSTER NODES" command
// and try to connect to these other nodes in the next iteration
encounteredConnectedClusterServer = true;
updatedClusterEndpointCollection = GetEndpointsFromClusterNodes(server, log);
}
// set the server UnselectableFlags and update masters list
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
}
}
else
{
case ServerType.Twemproxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i]));
}
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i]));
LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
}
}
if (encounteredConnectedClusterServer)
{
endpoints = updatedClusterEndpointCollection;
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
break; // we do not want to repeat the second iteration
}
}
......@@ -1419,6 +1460,23 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
}
private EndPointCollection GetEndpointsFromClusterNodes(ServerEndPoint server, TextWriter log)
{
var message = Message.Create(-1, CommandFlags.None, RedisCommand.CLUSTER, RedisLiterals.NODES);
ClusterConfiguration clusterConfig = null;
try
{
clusterConfig = this.ExecuteSyncImpl(message, ResultProcessor.ClusterNodes, server);
return new EndPointCollection(clusterConfig.Nodes.Select(node => node.EndPoint).ToList());
}
catch (Exception ex)
{
LogLocked(log, "Encountered error while updating cluster config: " + ex.Message);
return null;
}
}
private void ResetAllNonConnected()
{
var snapshot = serverSnapshot;
......
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Net;
......@@ -9,6 +10,14 @@ namespace StackExchange.Redis
/// </summary>
public sealed class EndPointCollection : Collection<EndPoint>
{
public EndPointCollection() : base()
{
}
public EndPointCollection(IList<EndPoint> endpoints) : base(endpoints)
{
}
/// <summary>
/// Format an endpoint
/// </summary>
......
......@@ -133,7 +133,7 @@ internal static bool TryGetHostPort(EndPoint endpoint, out string host, out int
internal static bool TryParseDouble(string s, out double value)
{
if(s == null || s.Length == 0)
if(string.IsNullOrEmpty(s))
{
value = 0;
return false;
......
......@@ -484,7 +484,7 @@ sealed class KeyMigrateCommandMessage : Message.CommandKeyBase // MIGRATE is aty
public KeyMigrateCommandMessage(int db, RedisKey key, EndPoint toServer, int toDatabase, int timeoutMilliseconds, MigrateOptions migrateOptions, CommandFlags flags)
: base(db, flags, RedisCommand.MIGRATE, key)
{
if (toServer == null) throw new ArgumentNullException("server");
if (toServer == null) throw new ArgumentNullException("toServer");
string toHost;
int toPort;
if (!Format.TryGetHostPort(toServer, out toHost, out toPort)) throw new ArgumentException("toServer");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment