Commit 0e428a7c authored by Marc Gravell's avatar Marc Gravell Committed by Nick Craver

make sure there is only **one** path that can cause endpoints to be added, and...

make sure there is only **one** path that can cause endpoints to be added, and ensure that the activation happens after the constructor - removes race condition that can lead to abandoned endpoints; note as part of this, switch the server-snapshot to span-based
parent 97d9e375
......@@ -38,7 +38,7 @@ public static TaskFactory Factory
/// </summary>
public ServerCounters GetCounters()
{
var snapshot = serverSnapshot;
var snapshot = GetServerSnapshot();
var counters = new ServerCounters(null);
for (int i = 0; i < snapshot.Length; i++)
......@@ -233,7 +233,7 @@ public void ExportConfiguration(Stream destination, ExportOptions options = Expo
using (var zip = new ZipArchive(destination, ZipArchiveMode.Create, true))
{
var arr = serverSnapshot;
var arr = GetServerSnapshot();
foreach (var server in arr)
{
const CommandFlags flags = CommandFlags.None;
......@@ -328,7 +328,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
throw;
}
var nodes = serverSnapshot;
var nodes = GetServerSnapshot();
RedisValue newMaster = Format.ToString(server.EndPoint);
RedisKey tieBreakerKey = default(RedisKey);
......@@ -509,7 +509,7 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false)
{
if (configuredOnly) return configuration.EndPoints.ToArray();
return Array.ConvertAll(serverSnapshot, x => x.EndPoint);
return _serverSnapshot.GetEndPoints();
}
private readonly ConfigurationOptions configuration;
......@@ -698,7 +698,7 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags)
{
var tmp = serverSnapshot;
var tmp = GetServerSnapshot();
int len = tmp.Length;
ServerEndPoint fallback = null;
for (int i = 0; i < len; i++)
......@@ -865,13 +865,60 @@ private static ConnectionMultiplexer ConnectImpl(Func<ConnectionMultiplexer> mul
private string failureMessage;
private readonly Hashtable servers = new Hashtable();
private volatile ServerEndPoint[] serverSnapshot = Array.Empty<ServerEndPoint>();
internal ServerEndPoint GetServerEndPoint(EndPoint endpoint)
private volatile ServerSnapshot _serverSnapshot = ServerSnapshot.Empty;
internal ReadOnlySpan<ServerEndPoint> GetServerSnapshot() => _serverSnapshot.Span;
sealed class ServerSnapshot
{
public static ServerSnapshot Empty { get; } = new ServerSnapshot(Array.Empty<ServerEndPoint>(), 0);
private ServerSnapshot(ServerEndPoint[] arr, int count)
{
_arr = arr;
_count = count;
}
private ServerEndPoint[] _arr;
private int _count;
public ReadOnlySpan<ServerEndPoint> Span => new ReadOnlySpan<ServerEndPoint>(_arr, 0, _count);
internal ServerSnapshot Add(ServerEndPoint value)
{
if (value == null) return this;
ServerEndPoint[] arr;
if (_arr.Length > _count)
{
arr = _arr;
}
else
{
// no more room; need a new array
int newLen = _arr.Length << 1;
if (newLen == 0) newLen = 4;
arr = new ServerEndPoint[newLen];
_arr.CopyTo(arr, 0);
}
arr[_count] = value;
return new ServerSnapshot(arr, _count + 1);
}
internal EndPoint[] GetEndPoints()
{
if (_count == 0) return Array.Empty<EndPoint>();
var arr = new EndPoint[_count];
for(int i = 0; i < _count; i++)
{
arr[i] = _arr[i].EndPoint;
}
return arr;
}
}
internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, TextWriter log = null, bool activate = true)
{
if (endpoint == null) return null;
var server = (ServerEndPoint)servers[endpoint];
if (server == null)
{
bool isNew = false;
lock (servers)
{
server = (ServerEndPoint)servers[endpoint];
......@@ -879,19 +926,14 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint)
{
if (isDisposed) throw new ObjectDisposedException(ToString());
server = new ServerEndPoint(this, endpoint, null);
// ^^ this causes ReconfigureAsync() which calls GetServerEndpoint() which can modify servers, so double check!
if (!servers.ContainsKey(endpoint))
{
servers.Add(endpoint, server);
}
var newSnapshot = serverSnapshot;
Array.Resize(ref newSnapshot, newSnapshot.Length + 1);
newSnapshot[newSnapshot.Length - 1] = server;
serverSnapshot = newSnapshot;
server = new ServerEndPoint(this, endpoint, log);
servers.Add(endpoint, server);
isNew = true;
_serverSnapshot = _serverSnapshot.Add(server);
}
}
// spin up the connection if this is new
if (isNew && activate) server.Activate(ConnectionType.Interactive, log);
}
return server;
}
......@@ -943,7 +985,7 @@ private void OnHeartbeat()
Interlocked.Exchange(ref lastGlobalHeartbeatTicks, now);
Trace("heartbeat");
var tmp = serverSnapshot;
var tmp = GetServerSnapshot();
for (int i = 0; i < tmp.Length; i++)
tmp[i].OnHeartbeat();
}
......@@ -1097,7 +1139,7 @@ public long OperationCount
get
{
long total = 0;
var snapshot = serverSnapshot;
var snapshot = GetServerSnapshot();
for (int i = 0; i < snapshot.Length; i++) total += snapshot[i].OperationCount;
return total;
}
......@@ -1193,7 +1235,7 @@ public void GetStatus(TextWriter log)
{
if (log == null) return;
var tmp = serverSnapshot;
var tmp = GetServerSnapshot();
foreach (var server in tmp)
{
LogLocked(log, server.Summary());
......@@ -1204,6 +1246,17 @@ public void GetStatus(TextWriter log)
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
}
private void ActivateAllServers(TextWriter log)
{
foreach (var server in GetServerSnapshot())
{
server.Activate(ConnectionType.Interactive, log);
if (CommandMap.IsAvailable(RedisCommand.SUBSCRIBE))
{
server.Activate(ConnectionType.Subscription, null); // no need to log the SUB stuff
}
}
}
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{
if (isDisposed) throw new ObjectDisposedException(ToString());
......@@ -1240,34 +1293,11 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
throw new TimeoutException("Timeout resolving endpoints");
}
}
int index = 0;
lock (servers)
foreach (var endpoint in configuration.EndPoints)
{
var newSnapshot = new ServerEndPoint[configuration.EndPoints.Count];
foreach (var endpoint in configuration.EndPoints)
{
var server = (ServerEndPoint)servers[endpoint];
if (server == null)
{
server = new ServerEndPoint(this, endpoint, log);
// ^^ this causes ReconfigureAsync() which calls GetServerEndpoint() which can modify servers, so double check!
if (!servers.ContainsKey(endpoint))
{
servers.Add(endpoint, server);
}
}
newSnapshot[index++] = server;
}
serverSnapshot = newSnapshot;
}
foreach (var server in serverSnapshot)
{
server.Activate(ConnectionType.Interactive, log);
if (CommandMap.IsAvailable(RedisCommand.SUBSCRIBE))
{
server.Activate(ConnectionType.Subscription, null); // no need to log the SUB stuff
}
GetServerEndPoint(endpoint, log, false);
}
ActivateAllServers(log);
}
int attemptsLeft = first ? configuration.ConnectRetry : 1;
......@@ -1562,7 +1592,7 @@ private async Task<EndPointCollection> GetEndpointsFromClusterNodes(ServerEndPoi
private void ResetAllNonConnected()
{
var snapshot = serverSnapshot;
var snapshot = GetServerSnapshot();
foreach (var server in snapshot)
{
server.ResetNonConnected();
......@@ -1735,12 +1765,6 @@ internal void UpdateClusterRange(ClusterConfiguration configuration)
private Timer pulse;
internal ServerEndPoint[] GetServerSnapshot()
{
var tmp = serverSnapshot;
return tmp;
}
internal ServerEndPoint SelectServer(Message message)
{
if (message == null) return null;
......@@ -1837,7 +1861,7 @@ public bool IsConnected
{
get
{
var tmp = serverSnapshot;
var tmp = GetServerSnapshot();
for (int i = 0; i < tmp.Length; i++)
if (tmp[i].IsConnected) return true;
return false;
......@@ -1851,7 +1875,7 @@ public bool IsConnecting
{
get
{
var tmp = serverSnapshot;
var tmp = GetServerSnapshot();
for (int i = 0; i < tmp.Length; i++)
if (tmp[i].IsConnecting) return true;
return false;
......
......@@ -104,7 +104,7 @@ internal static string GetInnerMostExceptionMessage(Exception e)
}
}
internal static Exception NoConnectionAvailable(bool includeDetail, bool includePerformanceCounters, RedisCommand command, Message message, ServerEndPoint server, ServerEndPoint[] serverSnapshot)
internal static Exception NoConnectionAvailable(bool includeDetail, bool includePerformanceCounters, RedisCommand command, Message message, ServerEndPoint server, ReadOnlySpan<ServerEndPoint> serverSnapshot)
{
string commandLabel = GetLabel(includeDetail, command, message);
......@@ -138,7 +138,7 @@ internal static Exception NoConnectionAvailable(bool includeDetail, bool include
return ex;
}
internal static Exception PopulateInnerExceptions(ServerEndPoint[] serverSnapshot)
internal static Exception PopulateInnerExceptions(ReadOnlySpan<ServerEndPoint> serverSnapshot)
{
var innerExceptions = new List<Exception>();
if (serverSnapshot != null)
......
......@@ -53,7 +53,6 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, Text
isSlave = false;
databases = 0;
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
interactive = CreateBridge(ConnectionType.Interactive, log);
serverType = ServerType.Standalone;
// overrides for twemproxy
......@@ -635,7 +634,6 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
}
}
}
private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
{
Multiplexer.Trace(type.ToString());
......
......@@ -163,11 +163,11 @@ internal async void BeginConnectAsync(EndPoint endpoint, Socket socket, Physical
try
{
if (socket.ConnectAsync(args))
{
{ // asynchronous operation is pending
ConfigureTimeout(args, multiplexer.RawConfig.ConnectTimeout);
}
else
{
{ // completed synchronously
SocketAwaitable.OnCompleted(args);
}
......@@ -177,7 +177,8 @@ internal async void BeginConnectAsync(EndPoint endpoint, Socket socket, Physical
bool ignoreConnect = false;
ShouldIgnoreConnect(physicalConnection, ref ignoreConnect);
if (ignoreConnect) return;
await awaitable;
await awaitable; // wait for the connect to complete or fail (will throw)
switch (physicalConnection == null ? SocketMode.Abort : await physicalConnection.ConnectedAsync(socket, log, this).ForAwait())
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment