Unverified Commit b2db13fc authored by Shadi Massalha's avatar Shadi Massalha Committed by GitHub

Sentinel Support Derived from pr-692 (#1067)

This PR is derived from PR-692 and have been merged with the latest master commit.
Things that have been done:
1. review code for PR-692
2. fixed potential infinite loop in the code
3. Adapt code to success build with the latest master commit
4. Manual testing on 3 Sentinel nodes and 3 Redis nodes (connection and failover)

Usage:
```C#
                ConfigurationOptions sentinelConfig = new ConfigurationOptions();
                sentinelConfig.ServiceName = "mymaster";
                sentinelConfig.EndPoints.Add("192.168.99.102", 26379);
                sentinelConfig.EndPoints.Add("192.168.99.102", 26380);
                sentinelConfig.EndPoints.Add("192.168.99.102", 26381);
                sentinelConfig.TieBreaker = "";
                sentinelConfig.DefaultVersion = new Version(4, 0, 11);                 
                // its important to set the Sentinel commands supported
                sentinelConfig.CommandMap = CommandMap.Sentinel;

                // Get sentinel connection
                ConnectionMultiplexer sentinelConnection = ConnectionMultiplexer.Connect(sentinelConfig, Console.Out);
                // Create master service configuration
                ConfigurationOptions masterConfig = new ConfigurationOptions { ServiceName = "mymaster" };
                // Get master Redis connection
                var redisMasterConnection = sentinelConnection.GetSentinelMasterConnection(masterConfig);

                ...
               IDatabase db = redisMasterConnection.GetDatabase();                
               db.StringSet(key, value);
               ...
               string value1 = db.StringGet(key);

```
parent 748f1fa7
......@@ -39,6 +39,8 @@ install:
redis-server.exe --service-install --service-name "redis-26380" "..\Sentinel\sentinel-26380.conf" --sentinel
redis-server.exe --service-install --service-name "redis-26381" "..\Sentinel\sentinel-26381.conf" --sentinel
cd ..\..\..
- sh: >-
cd tests/RedisConfigs
......
......@@ -1007,8 +1007,15 @@ private static ConnectionMultiplexer ConnectImpl(object configuration, TextWrite
muxer.LastException = ExceptionFactory.UnableToConnect(muxer, "ConnectTimeout");
}
}
if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer, muxer.failureMessage);
killMe = null;
if (muxer.ServerSelectionStrategy.ServerType == ServerType.Sentinel)
{
// Initialize the Sentinel handlers
muxer.InitializeSentinel(logProxy);
}
return muxer;
}
finally
......@@ -2093,6 +2100,334 @@ public bool IsConnecting
internal ServerSelectionStrategy ServerSelectionStrategy { get; }
internal Timer sentinelMasterReconnectTimer;
internal Dictionary<string, ConnectionMultiplexer> sentinelConnectionChildren;
/// <summary>
/// Initializes the connection as a Sentinel connection and adds
/// the necessary event handlers to track changes to the managed
/// masters.
/// </summary>
/// <param name="logProxy"></param>
internal void InitializeSentinel(LogProxy logProxy)
{
if (ServerSelectionStrategy.ServerType != ServerType.Sentinel)
{
return;
}
sentinelConnectionChildren = new Dictionary<string, ConnectionMultiplexer>();
// Subscribe to sentinel change events
ISubscriber sub = GetSubscriber();
if (sub.SubscribedEndpoint("+switch-master") == null)
{
sub.Subscribe("+switch-master", (channel, message) =>
{
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
EndPoint switchBlame = Format.TryParseEndPoint(string.Format("{0}:{1}", messageParts[1], messageParts[2]));
lock (sentinelConnectionChildren)
{
// Switch the master if we have connections for that service
if (sentinelConnectionChildren.ContainsKey(messageParts[0]))
{
ConnectionMultiplexer child = sentinelConnectionChildren[messageParts[0]];
// Is the connection still valid?
if (child.IsDisposed)
{
child.ConnectionFailed -= OnManagedConnectionFailed;
child.ConnectionRestored -= OnManagedConnectionRestored;
sentinelConnectionChildren.Remove(messageParts[0]);
}
else
{
SwitchMaster(switchBlame, sentinelConnectionChildren[messageParts[0]]);
}
}
}
});
}
// If we lose connection to a sentinel server,
// We need to reconfigure to make sure we still have
// a subscription to the +switch-master channel.
ConnectionFailed += (sender, e) =>
{
// Reconfigure to get subscriptions back online
ReconfigureAsync(false, true, logProxy, e.EndPoint, "Lost sentinel connection", false).Wait();
};
// Subscribe to new sentinels being added
if (sub.SubscribedEndpoint("+sentinel") == null)
{
sub.Subscribe("+sentinel", (channel, message) =>
{
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
UpdateSentinelAddressList(messageParts[0]);
});
}
}
private const string FAILED_CONFIGURE_MASTER_FOR_SERVICE_MSG =
"Failed connecting to configured master for service: {0}, after {1} retries within interval {2}";
/// <summary>
/// Returns a managed connection to the master server indicated by
/// the ServiceName in the config.
/// </summary>
/// <param name="config">the configuration to be used when connecting to the master</param>
/// <param name="log"></param>
public ConnectionMultiplexer GetSentinelMasterConnection(ConfigurationOptions config, TextWriter log = null)
{
if (ServerSelectionStrategy.ServerType != ServerType.Sentinel)
throw new NotImplementedException("The ConnectionMultiplexer is not a Sentinel connection.");
if (string.IsNullOrEmpty(config.ServiceName))
throw new ArgumentException("A ServiceName must be specified.");
lock (sentinelConnectionChildren)
{
if (sentinelConnectionChildren.ContainsKey(config.ServiceName) && !sentinelConnectionChildren[config.ServiceName].IsDisposed)
return sentinelConnectionChildren[config.ServiceName];
}
// Clear out the endpoints
config.EndPoints.Clear();
// Get an initial endpoint
var msg = string.Format(FAILED_CONFIGURE_MASTER_FOR_SERVICE_MSG, config.ServiceName, 5, 10);
EndPoint initialMasterEndPoint = Retry<EndPoint>(5, 10, () => GetConfiguredMasterForService(config.ServiceName), msg);
//SHADI: do
//{
// initialMasterEndPoint = GetConfiguredMasterForService(config.ServiceName);
//} while(initialMasterEndPoint == null);
config.EndPoints.Add(initialMasterEndPoint);
ConnectionMultiplexer connection = Connect(config, log);
// Attach to reconnect event to ensure proper connection to the new master
connection.ConnectionRestored += OnManagedConnectionRestored;
// If we lost the connection, run a switch to a least try and get updated info about the master
connection.ConnectionFailed += OnManagedConnectionFailed;
lock (sentinelConnectionChildren)
{
sentinelConnectionChildren[connection.RawConfig.ServiceName] = connection;
}
// Perform the initial switchover
SwitchMaster(RawConfig.EndPoints[0], connection, log);
return connection;
}
internal void OnManagedConnectionRestored(object sender, ConnectionFailedEventArgs e)
{
ConnectionMultiplexer connection = (ConnectionMultiplexer)sender;
if(connection.sentinelMasterReconnectTimer != null)
{
connection.sentinelMasterReconnectTimer.Dispose();
connection.sentinelMasterReconnectTimer = null;
}
// Run a switch to make sure we have update-to-date
// information about which master we should connect to
SwitchMaster(e.EndPoint, connection);
try
{
// Verify that the reconnected endpoint is a master,
// and the correct one otherwise we should reconnect
if(connection.GetServer(e.EndPoint).IsSlave || e.EndPoint != connection.currentSentinelMasterEndPoint)
{
// Wait for things to smooth out
Thread.Sleep(200);
// This isn't a master, so try connecting again
SwitchMaster(e.EndPoint, connection);
}
}
catch(Exception)
{
// If we get here it means that we tried to reconnect to a server that is no longer
// considered a master by Sentinel and was removed from the list of endpoints.
// Wait for things to smooth out
Thread.Sleep(200);
// If we caught an exception, we may have gotten a stale endpoint
// we are not aware of, so retry
SwitchMaster(e.EndPoint, connection);
}
}
internal void OnManagedConnectionFailed(object sender, ConnectionFailedEventArgs e)
{
ConnectionMultiplexer connection = (ConnectionMultiplexer)sender;
// Periodically check to see if we can reconnect to the proper master.
// This is here in case we lost our subscription to a good sentinel instance
// or if we miss the published master change
if (connection.sentinelMasterReconnectTimer == null)
{
connection.sentinelMasterReconnectTimer = new Timer((_) =>
{
SwitchMaster(e.EndPoint, connection);
}, null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1));
//connection.sentinelMasterReconnectTimer.AutoReset = true;
//connection.sentinelMasterReconnectTimer.Start();
}
}
internal EndPoint GetConfiguredMasterForService(string serviceName, int timeoutmillis = -1)
{
Task<EndPoint>[] sentinelMasters = GetServerSnapshot().ToArray()
.Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => GetServer(s.EndPoint).SentinelGetMasterAddressByNameAsync(serviceName))
.ToArray();
Task<Task<EndPoint>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinelMasters);
if (!firstCompleteRequest.Wait(timeoutmillis))
throw new TimeoutException("Timeout resolving master for service");
if (firstCompleteRequest.Result.Result == null)
throw new Exception("Unable to determine master");
return firstCompleteRequest.Result.Result;
}
private static async Task<Task<T>> WaitFirstNonNullIgnoreErrorsAsync<T>(Task<T>[] tasks)
{
if (tasks == null) throw new ArgumentNullException("tasks");
if (tasks.Length == 0) return null;
var typeNullable = (Nullable.GetUnderlyingType(typeof(T)) != null);
var taskList = tasks.Cast<Task>().ToList();
try
{
while (taskList.Count > 0)
{
var allTasksAwaitingAny = Task.WhenAny(taskList).ObserveErrors();
var result = await allTasksAwaitingAny.ForAwait();
taskList.Remove((Task<T>)result);
if (((Task<T>)result).IsFaulted) continue;
if ((!typeNullable) || ((Task<T>)result).Result != null)
return (Task<T>)result;
}
}
catch
{ }
return null;
}
internal EndPoint currentSentinelMasterEndPoint;
/// <summary>
/// Switches the SentinelMasterConnection over to a new master.
/// </summary>
/// <param name="switchBlame">the endpoing responsible for the switch</param>
/// <param name="connection">the connection that should be switched over to a new master endpoint</param>
/// <param name="log">log output</param>
internal void SwitchMaster(EndPoint switchBlame, ConnectionMultiplexer connection, TextWriter log = null)
{
if (log == null) log = TextWriter.Null;
using (var logProxy = LogProxy.TryCreate(log))
{
string serviceName = connection.RawConfig.ServiceName;
// Get new master
var msg = string.Format(FAILED_CONFIGURE_MASTER_FOR_SERVICE_MSG, serviceName, 5, 10);
EndPoint masterEndPoint = Retry<EndPoint>(5, 10, () => GetConfiguredMasterForService(serviceName), msg);
//Shadi: do
//{
// masterEndPoint = GetConfiguredMasterForService(serviceName);
//} while(masterEndPoint == null);
connection.currentSentinelMasterEndPoint = masterEndPoint;
if (!connection.servers.Contains(masterEndPoint))
{
connection.RawConfig.EndPoints.Clear();
connection.servers.Clear();
//connection._serverSnapshot = new ServerEndPoint[0];
connection.RawConfig.EndPoints.Add(masterEndPoint);
Trace(string.Format("Switching master to {0}", masterEndPoint));
// Trigger a reconfigure
connection.ReconfigureAsync(false, false, logProxy, switchBlame, string.Format("master switch {0}", serviceName), false, CommandFlags.PreferMaster).Wait();
}
UpdateSentinelAddressList(serviceName);
}
}
/// <summary>
/// retry mechanism that executing func t times to get a non-null result within a constant interval between retries.
/// if t exceeds the times parameter without success, it will throw an exception with a descriptive message
/// </summary>
/// <typeparam name="T">A generic result type expected to return</typeparam>
/// <param name="times">retries times trying to get a result before throw exception</param>
/// <param name="interval">interval between retries in millisconds</param>
/// <param name="func">delegate repeatedly runs until getting the desired result</param>
/// <param name="message">message to be used within NullReferenceException that should be thrown in case
/// there is no result after n retries</param>
/// <returns>object of type T</returns>
private T Retry<T>(int times, int interval, Func<T> func, string message)
{
for (var t = 0; t < times; t++)
{
var result = func();
if (result != null)
{
return result;
}
Thread.Sleep(interval);
}
throw new NullReferenceException(message);
}
internal void UpdateSentinelAddressList(string serviceName, int timeoutmillis = 500)
{
Task<EndPoint[]>[] sentinels = GetServerSnapshot().ToArray()
.Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => GetServer(s.EndPoint).SentinelGetSentinelAddresses(serviceName))
.ToArray();
Task<Task<EndPoint[]>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinels);
// Ignore errors, as having an updated sentinel list is
// not essential
if (firstCompleteRequest.Result?.Result == null)
return;
if (!firstCompleteRequest.Wait(timeoutmillis))
return;
if (firstCompleteRequest.Result.Result == null)
return;
bool hasNew = false;
foreach (EndPoint newSentinel in firstCompleteRequest.Result.Result.Where(x => !RawConfig.EndPoints.Contains(x)))
{
hasNew = true;
RawConfig.EndPoints.Add(newSentinel);
}
if (hasNew)
{
// Reconfigure the sentinel multiplexer if we added new endpoints
ReconfigureAsync(false, true, null, RawConfig.EndPoints[0], "Updating Sentinel List", false).Wait();
}
}
/// <summary>
/// Close all connections and release all resources associated with this object
/// </summary>
......
......@@ -752,6 +752,15 @@ public partial interface IServer : IRedis
/// <remarks>https://redis.io/topics/sentinel</remarks>
Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the ip and port numbers of all known Sentinels
/// for the given service name.
/// </summary>
/// <param name="serviveName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>a list of the sentinel ips and ports</returns>
Task<EndPoint[]> SentinelGetSentinelAddresses(string serviveName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Show the state and info of the specified master.
/// </summary>
......
......@@ -807,6 +807,12 @@ public Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, Co
return ExecuteAsync(msg, ResultProcessor.SentinelMasterEndpoint);
}
public Task<EndPoint[]> SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints);
}
public KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTER, (RedisValue)serviceName);
......
......@@ -125,6 +125,9 @@ public static readonly StreamPendingMessagesProcessor
public static readonly ResultProcessor<EndPoint>
SentinelMasterEndpoint = new SentinelGetMasterAddressByNameProcessor();
public static readonly ResultProcessor<EndPoint[]>
SentinelAddressesEndPoints = new SentinelGetSentinelAddresses();
public static readonly ResultProcessor<KeyValuePair<string, string>[][]>
SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor();
......@@ -714,6 +717,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}
}
else if (message?.Command == RedisCommand.SENTINEL)
{
server.ServerType = ServerType.Sentinel;
server.Multiplexer.Trace("Auto-configured server-type: sentinel");
}
SetResult(message, true);
return true;
case ResultType.MultiBulk:
......@@ -765,6 +773,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}
}
else if (message?.Command == RedisCommand.SENTINEL)
{
server.ServerType = ServerType.Sentinel;
server.Multiplexer.Trace("Auto-configured server-type: sentinel");
}
SetResult(message, true);
return true;
}
......@@ -1970,12 +1983,62 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return true;
}
break;
}
return false;
}
}
private sealed class SentinelGetSentinelAddresses : ResultProcessor<EndPoint[]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
{
List<EndPoint> endPoints = new List<EndPoint>();
switch (result.Type)
{
case ResultType.MultiBulk:
foreach (RawResult item in result.GetItems())
{
var arr = item.GetItemsAsValues();
string ip = null;
string portStr = null;
for (int i = 0; i < arr.Length && (ip == null || portStr == null); i += 2)
{
string name = arr[i];
string value = arr[i + 1];
switch (name)
{
case "ip":
ip = value;
break;
case "port":
portStr = value;
break;
}
}
if (ip != null && portStr != null && int.TryParse(portStr, out int port))
{
endPoints.Add(Format.ParseEndPoint(ip, port));
}
}
break;
case ResultType.SimpleString:
//We don't want to blow up if the master is not found
if (result.IsNull)
return true;
break;
}
if (endPoints.Count > 0)
{
SetResult(message, endPoints.ToArray());
return true;
}
return false;
}
}
......
......@@ -284,6 +284,12 @@ internal void AutoConfigure(PhysicalConnection connection)
msg.SetInternalCall();
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
if (commandMap.IsAvailable(RedisCommand.SENTINEL))
{
msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTERS);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
if (commandMap.IsAvailable(RedisCommand.INFO))
{
lastInfoReplicationCheckTicks = Environment.TickCount;
......
port 26381
sentinel monitor mymaster 127.0.0.1 7011 2
sentinel monitor mymaster 127.0.0.1 7010 1
sentinel down-after-milliseconds mymaster 1000
sentinel failover-timeout mymaster 1000
sentinel config-epoch mymaster 0
......
......@@ -33,7 +33,7 @@ redis-server cluster-7005.conf &>/dev/null &
popd > /dev/null
#Sentinel Servers
echo Starting Sentinel: 7010-7011,26379-26380
echo Starting Sentinel: 7010-7011,26379-26381
pushd Sentinel > /dev/null
echo "${INDENT}Targets: 7010-7011"
redis-server redis-7010.conf &>/dev/null &
......
......@@ -79,7 +79,9 @@ public class Config
public string RemoteServerAndPort => RemoteServer + ":" + RemotePort.ToString();
public string SentinelServer { get; set; } = "127.0.0.1";
public int SentinelPort { get; set; } = 26379;
public int SentinelPortA { get; set; } = 26379;
public int SentinelPortB { get; set; } = 26380;
public int SentinelPortC { get; set; } = 26381;
public string SentinelSeviceName { get; set; } = "mymaster";
public string ClusterServer { get; set; } = "127.0.0.1";
......
using System.IO;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
......@@ -12,7 +14,10 @@ public class Sentinel : TestBase
private string ServiceName => TestConfig.Current.SentinelSeviceName;
private ConnectionMultiplexer Conn { get; }
private IServer Server { get; }
private IServer SentinelServerA { get; }
private IServer SentinelServerB { get; }
private IServer SentinelServerC { get; }
public IServer[] SentinelsServers { get; }
protected StringWriter ConnectionLog { get; }
public Sentinel(ITestOutputHelper output) : base(output)
......@@ -25,7 +30,11 @@ public Sentinel(ITestOutputHelper output) : base(output)
var options = new ConfigurationOptions()
{
CommandMap = CommandMap.Sentinel,
EndPoints = { { TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPort } },
EndPoints = {
{ TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA },
{ TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortB },
{ TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortC }
},
AllowAdmin = true,
TieBreaker = "",
ServiceName = TestConfig.Current.SentinelSeviceName,
......@@ -34,70 +43,220 @@ public Sentinel(ITestOutputHelper output) : base(output)
Conn = ConnectionMultiplexer.Connect(options, ConnectionLog);
Thread.Sleep(3000);
Assert.True(Conn.IsConnected);
Server = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPort);
SentinelServerA = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA);
SentinelServerB = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortB);
SentinelServerC = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortC);
SentinelsServers = new IServer[] { SentinelServerA, SentinelServerB, SentinelServerC };
}
[Fact]
public void PingTest()
{
var test = Server.Ping();
Log("ping took {0} ms", test.TotalMilliseconds);
var test = SentinelServerA.Ping();
Log("ping to sentinel {0}:{1} took {2} ms", TestConfig.Current.SentinelServer,
TestConfig.Current.SentinelPortA, test.TotalMilliseconds);
test = SentinelServerB.Ping();
Log("ping to sentinel {0}:{1} took {1} ms", TestConfig.Current.SentinelServer,
TestConfig.Current.SentinelPortB, test.TotalMilliseconds);
test = SentinelServerC.Ping();
Log("ping to sentinel {0}:{1} took {1} ms", TestConfig.Current.SentinelServer,
TestConfig.Current.SentinelPortC, test.TotalMilliseconds);
}
[Fact]
public void SentinelGetMasterAddressByNameTest()
{
var endpoint = Server.SentinelGetMasterAddressByName(ServiceName);
foreach (var server in SentinelsServers)
{
var master = server.SentinelMaster(ServiceName);
var endpoint = server.SentinelGetMasterAddressByName(ServiceName);
Assert.NotNull(endpoint);
var ipEndPoint = endpoint as IPEndPoint;
Assert.NotNull(ipEndPoint);
Assert.Equal(master.ToDictionary()["ip"], ipEndPoint.Address.ToString());
Assert.Equal(master.ToDictionary()["port"], ipEndPoint.Port.ToString());
Log("{0}:{1}", ipEndPoint.Address, ipEndPoint.Port);
}
}
[Fact]
public async Task SentinelGetMasterAddressByNameAsyncTest()
{
foreach (var server in SentinelsServers)
{
var master = server.SentinelMaster(ServiceName);
var endpoint = await server.SentinelGetMasterAddressByNameAsync(ServiceName).ForAwait();
Assert.NotNull(endpoint);
var ipEndPoint = endpoint as IPEndPoint;
Assert.NotNull(ipEndPoint);
Assert.Equal(master.ToDictionary()["ip"], ipEndPoint.Address.ToString());
Assert.Equal(master.ToDictionary()["port"], ipEndPoint.Port.ToString());
Log("{0}:{1}", ipEndPoint.Address, ipEndPoint.Port);
}
}
[Fact]
public void SentinelGetMasterAddressByNameNegativeTest()
{
var endpoint = Server.SentinelGetMasterAddressByName("FakeServiceName");
foreach (var server in SentinelsServers)
{
var endpoint = server.SentinelGetMasterAddressByName("FakeServiceName");
Assert.Null(endpoint);
}
}
[Fact]
public async Task SentinelGetMasterAddressByNameAsyncNegativeTest()
{
var endpoint = await Server.SentinelGetMasterAddressByNameAsync("FakeServiceName").ForAwait();
foreach (var server in SentinelsServers)
{
var endpoint = await server.SentinelGetMasterAddressByNameAsync("FakeServiceName").ForAwait();
Assert.Null(endpoint);
}
}
[Fact]
public void SentinelMasterTest()
{
var dict = Server.SentinelMaster(ServiceName).ToDictionary();
foreach (var server in SentinelsServers)
{
var dict = server.SentinelMaster(ServiceName).ToDictionary();
Assert.Equal(ServiceName, dict["name"]);
Assert.Equal("master", dict["flags"]);
foreach (var kvp in dict)
{
Log("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Fact]
public async Task SentinelMasterAsyncTest()
{
foreach (var server in SentinelsServers)
{
var results = await server.SentinelMasterAsync(ServiceName).ForAwait();
Assert.Equal(ServiceName, results.ToDictionary()["name"]);
Assert.Equal("master", results.ToDictionary()["flags"]);
foreach (var kvp in results)
{
Log("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Fact]
public void SentinelSentinelsTest()
{
var sentinels = Server.SentinelSentinels("mymaster");
Assert.True(sentinels[0].ToDictionary().ContainsKey("name"));
foreach (var config in sentinels)
var sentinels = SentinelServerA.SentinelSentinels(ServiceName);
var Server26380Info = SentinelServerB.Info();
var expected = new List<string> {
SentinelServerB.EndPoint.ToString(),
SentinelServerC.EndPoint.ToString()
};
var actual = new List<string>();
foreach (var kv in sentinels)
{
foreach (var kvp in config)
var data = kv.ToDictionary();
actual.Add(data["ip"] + ":" + data["port"]);
}
Assert.All(expected, ep => Assert.NotEqual(ep, SentinelServerA.EndPoint.ToString()));
Assert.True(sentinels.Length == 2);
Assert.All(expected, ep => Assert.Contains(ep, actual));
sentinels = SentinelServerB.SentinelSentinels(ServiceName);
foreach (var kv in sentinels)
{
Writer.WriteLine("{0}:{1}", kvp.Key, kvp.Value);
var data = kv.ToDictionary();
actual.Add(data["ip"] + ":" + data["port"]);
}
expected = new List<string> {
SentinelServerA.EndPoint.ToString(),
SentinelServerC.EndPoint.ToString()
};
Assert.All(expected, ep => Assert.NotEqual(ep, SentinelServerB.EndPoint.ToString()));
Assert.True(sentinels.Length == 2);
Assert.All(expected, ep => Assert.Contains(ep, actual));
sentinels = SentinelServerC.SentinelSentinels(ServiceName);
foreach (var kv in sentinels)
{
var data = kv.ToDictionary();
actual.Add(data["ip"] + ":" + data["port"]);
}
expected = new List<string> {
SentinelServerA.EndPoint.ToString(),
SentinelServerB.EndPoint.ToString()
};
Assert.All(expected, ep => Assert.NotEqual(ep, SentinelServerC.EndPoint.ToString()));
Assert.True(sentinels.Length == 2);
Assert.All(expected, ep => Assert.Contains(ep, actual));
}
[Fact]
public async Task SentinelSentinelsAsyncTest()
{
var sentinels = await SentinelServerA.SentinelSentinelsAsync(ServiceName).ForAwait();
var expected = new List<string> {
SentinelServerB.EndPoint.ToString(),
SentinelServerC.EndPoint.ToString()
};
var actual = new List<string>();
foreach (var kv in sentinels)
{
var data = kv.ToDictionary();
actual.Add(data["ip"] + ":" + data["port"]);
}
Assert.All(expected, ep => Assert.NotEqual(ep, SentinelServerA.EndPoint.ToString()));
Assert.True(sentinels.Length == 2);
Assert.All(expected, ep => Assert.Contains(ep, actual));
sentinels = await SentinelServerB.SentinelSentinelsAsync(ServiceName).ForAwait();
expected = new List<string> {
SentinelServerA.EndPoint.ToString(),
SentinelServerC.EndPoint.ToString()
};
actual = new List<string>();
foreach (var kv in sentinels)
{
var data = kv.ToDictionary();
actual.Add(data["ip"] + ":" + data["port"]);
}
Assert.All(expected, ep => Assert.NotEqual(ep, SentinelServerB.EndPoint.ToString()));
Assert.True(sentinels.Length == 2);
Assert.All(expected, ep => Assert.Contains(ep, actual));
sentinels = await SentinelServerC.SentinelSentinelsAsync(ServiceName).ForAwait();
expected = new List<string> {
SentinelServerA.EndPoint.ToString(),
SentinelServerB.EndPoint.ToString()
};
actual = new List<string>();
foreach (var kv in sentinels)
{
var data = kv.ToDictionary();
actual.Add(data["ip"] + ":" + data["port"]);
}
Assert.All(expected, ep => Assert.NotEqual(ep, SentinelServerC.EndPoint.ToString()));
Assert.True(sentinels.Length == 2);
Assert.All(expected, ep => Assert.Contains(ep, actual));
}
[Fact]
public void SentinelMastersTest()
{
var masterConfigs = Server.SentinelMasters();
var masterConfigs = SentinelServerA.SentinelMasters();
Assert.Single(masterConfigs);
Assert.True(masterConfigs[0].ToDictionary().ContainsKey("name"));
Assert.Equal(ServiceName, masterConfigs[0].ToDictionary()["name"]);
Assert.Equal("master", masterConfigs[0].ToDictionary()["flags"]);
foreach (var config in masterConfigs)
{
foreach (var kvp in config)
......@@ -108,25 +267,189 @@ public void SentinelMastersTest()
}
[Fact]
public void SentinelSlavesTest()
public async Task SentinelMastersAsyncTest()
{
var masterConfigs = await SentinelServerA.SentinelMastersAsync().ForAwait();
Assert.Single(masterConfigs);
Assert.True(masterConfigs[0].ToDictionary().ContainsKey("name"));
Assert.Equal(ServiceName, masterConfigs[0].ToDictionary()["name"]);
Assert.Equal("master", masterConfigs[0].ToDictionary()["flags"]);
foreach (var config in masterConfigs)
{
var slaveConfigs = Server.SentinelSlaves(ServiceName);
if (slaveConfigs.Length > 0)
foreach (var kvp in config)
{
Log("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Fact]
public void SentinelSlavesTest()
{
var slaveConfigs = SentinelServerA.SentinelSlaves(ServiceName);
Assert.True(slaveConfigs.Length > 0);
Assert.True(slaveConfigs[0].ToDictionary().ContainsKey("name"));
Assert.Equal("slave", slaveConfigs[0].ToDictionary()["flags"]);
foreach (var config in slaveConfigs)
{
foreach (var kvp in config)
{
Log("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Fact]
public async Task SentinelSlavesAsyncTest()
{
var slaveConfigs = await SentinelServerA.SentinelSlavesAsync(ServiceName).ForAwait();
Assert.True(slaveConfigs.Length > 0);
Assert.True(slaveConfigs[0].ToDictionary().ContainsKey("name"));
Assert.Equal("slave", slaveConfigs[0].ToDictionary()["flags"]);
foreach (var config in slaveConfigs)
{
foreach (var kvp in config) {
foreach (var kvp in config)
{
Log("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Fact]
public void SentinelFailoverTest()
public async Task SentinelFailoverTest()
{
foreach (var server in SentinelsServers)
{
Server.SentinelFailover(ServiceName);
var master = server.SentinelGetMasterAddressByName(ServiceName);
var slaves = server.SentinelSlaves(ServiceName);
server.SentinelFailover(ServiceName);
await Task.Delay(2000).ForAwait();
var newMaster = server.SentinelGetMasterAddressByName(ServiceName);
var newSlave = server.SentinelSlaves(ServiceName);
Assert.Equal(slaves[0].ToDictionary()["name"], newMaster.ToString());
Assert.Equal(master.ToString(), newSlave[0].ToDictionary()["name"]);
}
}
[Fact]
public async Task SentinelFailoverAsyncTest()
{
foreach (var server in SentinelsServers)
{
var master = server.SentinelGetMasterAddressByName(ServiceName);
var slaves = server.SentinelSlaves(ServiceName);
await server.SentinelFailoverAsync(ServiceName).ForAwait();
await Task.Delay(2000).ForAwait();
var newMaster = server.SentinelGetMasterAddressByName(ServiceName);
var newSlave = server.SentinelSlaves(ServiceName);
Assert.Equal(slaves[0].ToDictionary()["name"], newMaster.ToString());
Assert.Equal(master.ToString(), newSlave[0].ToDictionary()["name"]);
}
}
[Fact]
public async Task GetSentinelMasterConnectionFailoverTest()
{
var conn = Conn.GetSentinelMasterConnection(new ConfigurationOptions { ServiceName = ServiceName });
var endpoint = conn.currentSentinelMasterEndPoint.ToString();
SentinelServerA.SentinelFailover(ServiceName);
await Task.Delay(2000).ForAwait();
var conn1 = Conn.GetSentinelMasterConnection(new ConfigurationOptions { ServiceName = ServiceName });
var endpoint1 = conn1.currentSentinelMasterEndPoint.ToString();
Assert.NotEqual(endpoint, endpoint1);
}
[Fact]
public async Task GetSentinelMasterConnectionFailoverAsyncTest()
{
var conn = Conn.GetSentinelMasterConnection(new ConfigurationOptions { ServiceName = ServiceName });
var endpoint = conn.currentSentinelMasterEndPoint.ToString();
await SentinelServerA.SentinelFailoverAsync(ServiceName).ForAwait();
await Task.Delay(2000).ForAwait();
var conn1 = Conn.GetSentinelMasterConnection(new ConfigurationOptions { ServiceName = ServiceName });
var endpoint1 = conn1.currentSentinelMasterEndPoint.ToString();
Assert.NotEqual(endpoint, endpoint1);
}
[Fact]
public async Task GetSentinelMasterConnectionWriteReadFailover()
{
var conn = Conn.GetSentinelMasterConnection(new ConfigurationOptions { ServiceName = ServiceName });
var s = conn.currentSentinelMasterEndPoint.ToString();
IDatabase db = conn.GetDatabase();
var expected = DateTime.Now.Ticks.ToString();
db.StringSet("beforeFailOverValue", expected);
SentinelServerA.SentinelFailover(ServiceName);
await Task.Delay(2000).ForAwait();
var conn1 = Conn.GetSentinelMasterConnection(new ConfigurationOptions { ServiceName = ServiceName });
var s1 = conn1.currentSentinelMasterEndPoint.ToString();
var db1 = conn1.GetDatabase();
var actual = db1.StringGet("beforeFailOverValue");
Assert.NotNull(s);
Assert.NotNull(s1);
Assert.NotEmpty(s);
Assert.NotEmpty(s1);
Assert.NotEqual(s, s1);
Assert.Equal(expected, actual);
}
[Fact]
public async Task SentinelGetSentinelAddressesTest()
{
var addresses = await SentinelServerA.SentinelGetSentinelAddresses(ServiceName).ForAwait();
Assert.Contains(SentinelServerB.EndPoint, addresses);
Assert.Contains(SentinelServerC.EndPoint, addresses);
addresses = await SentinelServerB.SentinelGetSentinelAddresses(ServiceName).ForAwait();
Assert.Contains(SentinelServerA.EndPoint, addresses);
Assert.Contains(SentinelServerC.EndPoint, addresses);
addresses = await SentinelServerC.SentinelGetSentinelAddresses(ServiceName).ForAwait();
Assert.Contains(SentinelServerA.EndPoint, addresses);
Assert.Contains(SentinelServerB.EndPoint, addresses);
}
[Fact]
public async Task ReadOnlyConnectionSlavesTest()
{
var slaves = SentinelServerA.SentinelSlaves(ServiceName);
var config = new ConfigurationOptions
{
TieBreaker = "",
ServiceName = TestConfig.Current.SentinelSeviceName,
};
foreach (var kv in slaves)
{
Assert.Equal("slave", kv.ToDictionary()["flags"]);
config.EndPoints.Add(kv.ToDictionary()["name"]);
}
var readonlyConn = ConnectionMultiplexer.Connect(config);
await Task.Delay(2000).ForAwait();
Assert.True(readonlyConn.IsConnected);
var db = readonlyConn.GetDatabase();
var s = db.StringGet("test");
Assert.True(s.IsNullOrEmpty);
//var ex = Assert.Throws<RedisConnectionException>(() => db.StringSet("test", "try write to read only instance"));
//Assert.StartsWith("No connection is available to service this operation", ex.Message);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment