Commit 98dbfd4b authored by Marc Gravell's avatar Marc Gravell

Merge pull request #311 from antoinecellerier/cluster-fixes

Add resilience to non ideal cluster topology in CLUSTER NODES response parsing
parents e5f6773f 069d5429
...@@ -172,6 +172,7 @@ public sealed class ClusterConfiguration ...@@ -172,6 +172,7 @@ public sealed class ClusterConfiguration
private readonly ServerSelectionStrategy serverSelectionStrategy; private readonly ServerSelectionStrategy serverSelectionStrategy;
internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, string nodes, EndPoint origin) internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, string nodes, EndPoint origin)
{ {
// Beware: Any exception thrown here will wreak silent havoc like inability to connect to cluster nodes or non returning calls
this.serverSelectionStrategy = serverSelectionStrategy; this.serverSelectionStrategy = serverSelectionStrategy;
this.origin = origin; this.origin = origin;
using (var reader = new StringReader(nodes)) using (var reader = new StringReader(nodes))
...@@ -180,12 +181,39 @@ internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, s ...@@ -180,12 +181,39 @@ internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, s
while ((line = reader.ReadLine()) != null) while ((line = reader.ReadLine()) != null)
{ {
if (string.IsNullOrWhiteSpace(line)) continue; if (string.IsNullOrWhiteSpace(line)) continue;
var node = new ClusterNode(this, line, origin); var node = new ClusterNode(this, line, origin);
// Be resilient to ":0 {master,slave},fail,noaddr" nodes
if (node.IsNoAddr)
continue;
if (nodeLookup.ContainsKey(node.EndPoint))
{
// Deal with conflicting node entries for the same endpoint
// This can happen in dynamic environments when a node goes down and a new one is created
// to replace it.
if (!node.IsConnected)
{
// The node we're trying to add is probably about to become stale. Ignore it.
continue;
}
else if (!nodeLookup[node.EndPoint].IsConnected)
{
// The node we registered previously is probably stale. Replace it with a known good node.
nodeLookup[node.EndPoint] = node;
}
else
{
// We have conflicting connected nodes. There's nothing much we can do other than
// wait for the cluster state to converge and refresh on the next pass.
// The same is true if we have multiple disconnected nodes.
}
}
else
{
nodeLookup.Add(node.EndPoint, node); nodeLookup.Add(node.EndPoint, node);
} }
} }
} }
}
/// <summary> /// <summary>
/// Gets all nodes contained in the configuration /// Gets all nodes contained in the configuration
...@@ -262,6 +290,10 @@ public sealed class ClusterNode : IEquatable<ClusterNode>, IComparable<ClusterN ...@@ -262,6 +290,10 @@ public sealed class ClusterNode : IEquatable<ClusterNode>, IComparable<ClusterN
private readonly bool isSlave; private readonly bool isSlave;
private readonly bool isNoAddr;
private readonly bool isConnected;
private readonly string nodeId, parentNodeId, raw; private readonly string nodeId, parentNodeId, raw;
private readonly IList<SlotRange> slots; private readonly IList<SlotRange> slots;
...@@ -275,6 +307,7 @@ public sealed class ClusterNode : IEquatable<ClusterNode>, IComparable<ClusterN ...@@ -275,6 +307,7 @@ public sealed class ClusterNode : IEquatable<ClusterNode>, IComparable<ClusterN
internal ClusterNode() { } internal ClusterNode() { }
internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint origin) internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint origin)
{ {
// http://redis.io/commands/cluster-nodes
this.configuration = configuration; this.configuration = configuration;
this.raw = raw; this.raw = raw;
var parts = raw.Split(StringSplits.Space); var parts = raw.Split(StringSplits.Space);
...@@ -285,6 +318,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or ...@@ -285,6 +318,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or
nodeId = parts[0]; nodeId = parts[0];
isSlave = flags.Contains("slave"); isSlave = flags.Contains("slave");
isNoAddr = flags.Contains("noaddr");
parentNodeId = string.IsNullOrWhiteSpace(parts[3]) ? null : parts[3]; parentNodeId = string.IsNullOrWhiteSpace(parts[3]) ? null : parts[3];
List<SlotRange> slots = null; List<SlotRange> slots = null;
...@@ -299,6 +333,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or ...@@ -299,6 +333,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or
} }
} }
this.slots = slots == null ? NoSlots : slots.AsReadOnly(); this.slots = slots == null ? NoSlots : slots.AsReadOnly();
this.isConnected = parts[7] == "connected"; // Can be "connected" or "disconnected"
} }
/// <summary> /// <summary>
/// Gets all child nodes of the current node /// Gets all child nodes of the current node
...@@ -333,6 +368,16 @@ public IList<ClusterNode> Children ...@@ -333,6 +368,16 @@ public IList<ClusterNode> Children
/// </summary> /// </summary>
public bool IsSlave { get { return isSlave; } } public bool IsSlave { get { return isSlave; } }
/// <summary>
/// Gets whether this node is flagged as noaddr
/// </summary>
public bool IsNoAddr { get { return isNoAddr; } }
/// <summary>
/// Gets the node's connection status
/// </summary>
public bool IsConnected { get { return isConnected; } }
/// <summary> /// <summary>
/// Gets the unique node-id of the current node /// Gets the unique node-id of the current node
/// </summary> /// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment