Unverified Commit 49f24a2c authored by Nick Craver's avatar Nick Craver Committed by GitHub

Sentinel: Several break fixes (#1402)

* fix naming of SentinelGetSentinelAddressesAsync

* tyop

* Sentinel: don't use async over sync with .Result
Co-authored-by: 's avatarmgravell <marc.gravell@gmail.com>
parent 46be338b
...@@ -2303,53 +2303,16 @@ internal void OnManagedConnectionFailed(object sender, ConnectionFailedEventArgs ...@@ -2303,53 +2303,16 @@ internal void OnManagedConnectionFailed(object sender, ConnectionFailedEventArgs
{ {
SwitchMaster(e.EndPoint, connection); SwitchMaster(e.EndPoint, connection);
}, null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1)); }, null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1));
//connection.sentinelMasterReconnectTimer.AutoReset = true;
//connection.sentinelMasterReconnectTimer.Start();
} }
} }
internal EndPoint GetConfiguredMasterForService(string serviceName, int timeoutmillis = -1) internal EndPoint GetConfiguredMasterForService(string serviceName) =>
{ GetServerSnapshot()
Task<EndPoint>[] sentinelMasters = GetServerSnapshot().ToArray() .ToArray()
.Where(s => s.ServerType == ServerType.Sentinel) .Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => GetServer(s.EndPoint).SentinelGetMasterAddressByNameAsync(serviceName)) .AsParallel()
.ToArray(); .Select(s => GetServer(s.EndPoint).SentinelGetMasterAddressByName(serviceName))
.First(r => r != null);
Task<Task<EndPoint>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinelMasters);
if (!firstCompleteRequest.Wait(timeoutmillis))
throw new TimeoutException("Timeout resolving master for service");
if (firstCompleteRequest.Result?.Result == null)
throw new Exception("Unable to determine master");
return firstCompleteRequest.Result.Result;
}
private static async Task<Task<T>> WaitFirstNonNullIgnoreErrorsAsync<T>(Task<T>[] tasks)
{
if (tasks == null) throw new ArgumentNullException("tasks");
if (tasks.Length == 0) return null;
var typeNullable = (Nullable.GetUnderlyingType(typeof(T)) != null);
var taskList = tasks.Cast<Task>().ToList();
try
{
while (taskList.Count > 0)
{
var allTasksAwaitingAny = Task.WhenAny(taskList).ObserveErrors();
var result = await allTasksAwaitingAny.ForAwait();
taskList.Remove((Task<T>)result);
if (((Task<T>)result).IsFaulted) continue;
if ((!typeNullable) || ((Task<T>)result).Result != null)
return (Task<T>)result;
}
}
catch
{ }
return null;
}
internal EndPoint currentSentinelMasterEndPoint; internal EndPoint currentSentinelMasterEndPoint;
...@@ -2419,26 +2382,22 @@ private T Retry<T>(int times, int interval, Func<T> func, string message) ...@@ -2419,26 +2382,22 @@ private T Retry<T>(int times, int interval, Func<T> func, string message)
throw new NullReferenceException(message); throw new NullReferenceException(message);
} }
internal void UpdateSentinelAddressList(string serviceName, int timeoutmillis = 500) internal void UpdateSentinelAddressList(string serviceName)
{ {
Task<EndPoint[]>[] sentinels = GetServerSnapshot().ToArray() var firstCompleteRequest = GetServerSnapshot()
.Where(s => s.ServerType == ServerType.Sentinel) .ToArray()
.Select(s => GetServer(s.EndPoint).SentinelGetSentinelAddresses(serviceName)) .Where(s => s.ServerType == ServerType.Sentinel)
.ToArray(); .AsParallel()
.Select(s => GetServer(s.EndPoint).SentinelGetSentinelAddresses(serviceName))
Task<Task<EndPoint[]>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinels); .First(r => r != null);
// Ignore errors, as having an updated sentinel list is // Ignore errors, as having an updated sentinel list is
// not essential // not essential
if (firstCompleteRequest.Result?.Result == null) if (firstCompleteRequest == null)
return;
if (!firstCompleteRequest.Wait(timeoutmillis))
return;
if (firstCompleteRequest.Result.Result == null)
return; return;
bool hasNew = false; bool hasNew = false;
foreach (EndPoint newSentinel in firstCompleteRequest.Result.Result.Where(x => !RawConfig.EndPoints.Contains(x))) foreach (EndPoint newSentinel in firstCompleteRequest.Where(x => !RawConfig.EndPoints.Contains(x)))
{ {
hasNew = true; hasNew = true;
RawConfig.EndPoints.Add(newSentinel); RawConfig.EndPoints.Add(newSentinel);
......
...@@ -756,10 +756,19 @@ public partial interface IServer : IRedis ...@@ -756,10 +756,19 @@ public partial interface IServer : IRedis
/// Returns the ip and port numbers of all known Sentinels /// Returns the ip and port numbers of all known Sentinels
/// for the given service name. /// for the given service name.
/// </summary> /// </summary>
/// <param name="serviveName">the sentinel service name</param> /// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param> /// <param name="flags"></param>
/// <returns>a list of the sentinel ips and ports</returns> /// <returns>a list of the sentinel ips and ports</returns>
Task<EndPoint[]> SentinelGetSentinelAddresses(string serviveName, CommandFlags flags = CommandFlags.None); EndPoint[] SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the ip and port numbers of all known Sentinels
/// for the given service name.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>a list of the sentinel ips and ports</returns>
Task<EndPoint[]> SentinelGetSentinelAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Show the state and info of the specified master. /// Show the state and info of the specified master.
......
...@@ -807,10 +807,16 @@ public Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, Co ...@@ -807,10 +807,16 @@ public Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, Co
return ExecuteAsync(msg, ResultProcessor.SentinelMasterEndpoint); return ExecuteAsync(msg, ResultProcessor.SentinelMasterEndpoint);
} }
public Task<EndPoint[]> SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None) public EndPoint[] SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName); var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints); return ExecuteSync(msg, ResultProcessor.SentinelAddressesEndPoints);
}
public Task<EndPoint[]> SentinelGetSentinelAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints);
} }
public KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None) public KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None)
......
...@@ -562,15 +562,15 @@ public async Task GetSentinelMasterConnectionWriteReadFailover() ...@@ -562,15 +562,15 @@ public async Task GetSentinelMasterConnectionWriteReadFailover()
[Fact] [Fact]
public async Task SentinelGetSentinelAddressesTest() public async Task SentinelGetSentinelAddressesTest()
{ {
var addresses = await SentinelServerA.SentinelGetSentinelAddresses(ServiceName).ForAwait(); var addresses = await SentinelServerA.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait();
Assert.Contains(SentinelServerB.EndPoint, addresses); Assert.Contains(SentinelServerB.EndPoint, addresses);
Assert.Contains(SentinelServerC.EndPoint, addresses); Assert.Contains(SentinelServerC.EndPoint, addresses);
addresses = await SentinelServerB.SentinelGetSentinelAddresses(ServiceName).ForAwait(); addresses = await SentinelServerB.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait();
Assert.Contains(SentinelServerA.EndPoint, addresses); Assert.Contains(SentinelServerA.EndPoint, addresses);
Assert.Contains(SentinelServerC.EndPoint, addresses); Assert.Contains(SentinelServerC.EndPoint, addresses);
addresses = await SentinelServerC.SentinelGetSentinelAddresses(ServiceName).ForAwait(); addresses = await SentinelServerC.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait();
Assert.Contains(SentinelServerA.EndPoint, addresses); Assert.Contains(SentinelServerA.EndPoint, addresses);
Assert.Contains(SentinelServerB.EndPoint, addresses); Assert.Contains(SentinelServerB.EndPoint, addresses);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment