Commit 87d4dc84 authored by hrishi18pathak's avatar hrishi18pathak

make auto discoverability of nodes synchronous, actually fix issue 300

parent 6b317395
...@@ -1203,114 +1203,153 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1203,114 +1203,153 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
} }
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority; const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority;
var available = new Task<bool>[endpoints.Count]; List<ServerEndPoint> masters = new List<ServerEndPoint>(endpoints.Count);
var servers = new ServerEndPoint[available.Length];
bool useTieBreakers = !string.IsNullOrWhiteSpace(configuration.TieBreaker); bool useTieBreakers = !string.IsNullOrWhiteSpace(configuration.TieBreaker);
var tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey); ServerEndPoint[] servers = null;
for (int i = 0; i < available.Length; i++) Task<string>[] tieBreakers = null;
bool encounteredConnectedServer = false;
Stopwatch watch = null;
int iterCount = first ? 2 : 1;
// this is fix for https://github.com/StackExchange/StackExchange.Redis/issues/300
// auto discoverability of cluster nodes is made synchronous.
// we try to connect to endpoints specified inside the user provided configuration
// and when we encounter one such endpoint to which we are able to successfully connect,
// we get the list of cluster nodes from this endpoint and try to proactively connect
// to these nodes instead of relying on auto configure
for (int iter = 0; iter < iterCount; ++iter)
{ {
Trace("Testing: " + Format.ToString(endpoints[i])); if (endpoints == null) break;
var server = GetServerEndPoint(endpoints[i]);
//server.ReportNextFailure(); var available = new Task<bool>[endpoints.Count];
servers[i] = server; tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
if (reconfigureAll && server.IsConnected) servers = new ServerEndPoint[available.Length];
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey);
for (int i = 0; i < available.Length; i++)
{ {
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint)); Trace("Testing: " + Format.ToString(endpoints[i]));
// note that these will be processed synchronously *BEFORE* the tracer is processed, var server = GetServerEndPoint(endpoints[i]);
// so we know that the configuration will be up to date if we see the tracer //server.ReportNextFailure();
server.AutoConfigure(null); servers[i] = server;
} if (reconfigureAll && server.IsConnected)
available[i] = server.SendTracer(log); {
if (useTieBreakers) LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
{ // note that these will be processed synchronously *BEFORE* the tracer is processed,
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker); // so we know that the configuration will be up to date if we see the tracer
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey); server.AutoConfigure(null);
msg.SetInternalCall(); }
msg = LoggingMessage.Create(log, msg); available[i] = server.SendTracer(log);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String); if (useTieBreakers)
{
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
}
} }
}
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(configuration.ConnectTimeout)); watch = watch ?? Stopwatch.StartNew();
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(configuration.ConnectTimeout) + " to respond..."); var remaining = configuration.ConnectTimeout - checked((int)watch.ElapsedMilliseconds);
await WaitAllIgnoreErrorsAsync(available, configuration.ConnectTimeout, log).ForAwait(); LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(remaining));
List<ServerEndPoint> masters = new List<ServerEndPoint>(available.Length); Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(remaining) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, remaining, log).ForAwait();
for (int i = 0; i < available.Length; i++) EndPointCollection updatedEndpointCollection = null;
{ for (int i = 0; i < available.Length; i++)
var task = available[i];
Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
{ {
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); var task = available[i];
var aex = task.Exception; Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
foreach (var ex in aex.InnerExceptions) if (task.IsFaulted)
{ {
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message); servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
failureMessage = ex.Message; var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message);
failureMessage = ex.Message;
}
} }
} else if (task.IsCanceled)
else if (task.IsCanceled) {
{ servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i]));
LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i])); }
} else if (task.IsCompleted)
else if (task.IsCompleted)
{
var server = servers[i];
if (task.Result)
{ {
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond); var server = servers[i];
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i])); if (task.Result)
UpdateClusterConfigIfNeeded(server, log);
// count the server types
switch (server.ServerType)
{ {
case ServerType.Twemproxy: servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
case ServerType.Standalone: LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
standaloneCount++; if (!encounteredConnectedServer)
break; {
case ServerType.Sentinel: // we have encountered a connected server for the first time.
sentinelCount++; // so we will get list of other nodes from this server using "CLUSTER NODES" command
break; // and try to connect to these other nodes in the next iteration
case ServerType.Cluster: encounteredConnectedServer = true;
clusterCount++; updatedEndpointCollection = GetEndpointsFromClusterNodes(server, log);
break; }
} // count the server types
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
standaloneCount++;
break;
case ServerType.Sentinel:
sentinelCount++;
break;
case ServerType.Cluster:
clusterCount++;
break;
}
// set the server UnselectableFlags and update masters list // set the server UnselectableFlags and update masters list
switch (server.ServerType) switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
}
}
else
{ {
case ServerType.Twemproxy: servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
case ServerType.Sentinel: LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i]));
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
} }
} }
else else
{ {
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i])); LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
} }
} }
if (encounteredConnectedServer)
{
endpoints = updatedEndpointCollection;
}
else else
{ {
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); break; // will be retried by the outer do while loop
LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
} }
} }
...@@ -1419,33 +1458,23 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1419,33 +1458,23 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
} }
} }
private void UpdateClusterConfigIfNeeded(ServerEndPoint server, TextWriter log) private EndPointCollection GetEndpointsFromClusterNodes(ServerEndPoint server, TextWriter log)
{ {
var message = Message.Create(-1, CommandFlags.None, RedisCommand.CLUSTER, RedisLiterals.NODES); var message = Message.Create(-1, CommandFlags.None, RedisCommand.CLUSTER, RedisLiterals.NODES);
ClusterConfiguration clusterConfig = null; ClusterConfiguration clusterConfig = null;
try try
{ {
clusterConfig = this.ExecuteSyncImpl(message, ResultProcessor.ClusterNodes, server); clusterConfig = this.ExecuteSyncImpl(message, ResultProcessor.ClusterNodes, server);
} return new EndPointCollection(clusterConfig.Nodes.Select(node => node.EndPoint).ToList());
catch (Exception ex)
{
if (ex.Message.Contains("ERR This instance has cluster support disabled"))
{
LogLocked(log, "Cluster support disabled. Continuing without updating cluster config...");
return;
}
LogLocked(log, "Encountered error while updating cluster config: " + ex.Message);
} }
catch (Exception ex)
if (clusterConfig != null)
{ {
this.UpdateClusterRange(clusterConfig); LogLocked(log, "Encountered error while updating cluster config: " + ex.Message);
LogLocked(log, "Updated cluster config"); return null;
} }
} }
private void ResetAllNonConnected() private void ResetAllNonConnected()
{ {
var snapshot = serverSnapshot; var snapshot = serverSnapshot;
......
using System; using System;
using System.Collections.Generic;
using System.Collections.ObjectModel; using System.Collections.ObjectModel;
using System.Net; using System.Net;
...@@ -9,6 +10,14 @@ namespace StackExchange.Redis ...@@ -9,6 +10,14 @@ namespace StackExchange.Redis
/// </summary> /// </summary>
public sealed class EndPointCollection : Collection<EndPoint> public sealed class EndPointCollection : Collection<EndPoint>
{ {
public EndPointCollection() : base()
{
}
public EndPointCollection(IList<EndPoint> endpoints) : base(endpoints)
{
}
/// <summary> /// <summary>
/// Format an endpoint /// Format an endpoint
/// </summary> /// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment