Commit e921f8d7 authored by Michael Colussi's avatar Michael Colussi

Adding support for most Sentinel methods

parent 7e079bf3
using System.Linq;
using System.Net;
using NUnit.Framework;
using System;
using System.Threading;
namespace StackExchange.Redis.Tests
{
[TestFixture, Ignore]
public class Sentinel
{
// TODO fill in these constants before running tests
private const string IP = "127.0.0.1";
private const int Port = 26379;
private const string ServiceName = "mymaster";
private static readonly ConnectionMultiplexer Conn = GetConn();
private static readonly IServer Server = Conn.GetServer(IP, Port);
public static ConnectionMultiplexer GetConn()
{
// create a connection
var options = new ConfigurationOptions()
{
CommandMap = CommandMap.Sentinel,
EndPoints = { { IP, Port } },
AllowAdmin = true,
TieBreaker = "",
ServiceName = ServiceName,
SyncTimeout = 5000
};
var connection = ConnectionMultiplexer.Connect(options, Console.Out);
Thread.Sleep(3000);
Assert.IsTrue(connection.IsConnected);
return connection;
}
[Test]
public void PingTest()
{
var test = Server.Ping();
Console.WriteLine("ping took {0} ms", test.TotalMilliseconds);
}
[Test]
public void SentinelGetMasterAddressByNameTest()
{
var endpoint = Server.SentinelGetMasterAddressByName(ServiceName);
Assert.IsNotNull(endpoint);
var ipEndPoint = endpoint as IPEndPoint;
Assert.IsNotNull(ipEndPoint);
Console.WriteLine("{0}:{1}", ipEndPoint.Address, ipEndPoint.Port);
}
[Test]
public void SentinelGetMasterAddressByNameNegativeTest()
{
var endpoint = Server.SentinelGetMasterAddressByName("FakeServiceName");
Assert.IsNull(endpoint);
}
[Test]
public void SentinelMasterTest()
{
var dict = Server.SentinelMaster(ServiceName).ToDictionary();
Assert.AreEqual(ServiceName, dict["name"]);
foreach (var kvp in dict)
{
Console.WriteLine("{0}:{1}", kvp.Key, kvp.Value);
}
}
[Test]
public void SentinelMastersTest()
{
var masterConfigs = Server.SentinelMasters();
Assert.IsTrue(masterConfigs.First().ToDictionary().ContainsKey("name"));
foreach (var config in masterConfigs)
{
foreach (var kvp in config)
{
Console.WriteLine("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Test]
public void SentinelSlavesTest()
{
var slaveConfigs = Server.SentinelSlaves(ServiceName);
if (slaveConfigs.Any())
{
Assert.IsTrue(slaveConfigs.First().ToDictionary().ContainsKey("name"));
}
foreach (var config in slaveConfigs)
{
foreach (var kvp in config) {
Console.WriteLine("{0}:{1}", kvp.Key, kvp.Value);
}
}
}
[Test, Ignore]
public void SentinelFailoverTest()
{
Server.SentinelFailover(ServiceName);
}
}
}
......@@ -94,6 +94,7 @@
<Compile Include="Scans.cs" />
<Compile Include="Scripting.cs" />
<Compile Include="Secure.cs" />
<Compile Include="Sentinel.cs" />
<Compile Include="Sets.cs" />
<Compile Include="SSDB.cs" />
<Compile Include="SSL.cs" />
......
......@@ -44,7 +44,11 @@ private static readonly CommandMap
"get", "set", "del", "incr", "incrby", "mget", "mset", "keys", "getset", "setnx",
"hget", "hset", "hdel", "hincrby", "hkeys", "hvals", "hmget", "hmset", "hlen",
"zscore", "zadd", "zrem", "zrange", "zrangebyscore", "zincrby", "zdecrby", "zcard",
"llen", "lpush", "rpush", "lpop", "rpop", "lrange", "lindex" }, true);
"llen", "lpush", "rpush", "lpop", "rpop", "lrange", "lindex"
}, true),
sentinel = Create(new HashSet<string> {
// see http://redis.io/topics/sentinel
"ping", "info", "sentinel", "subscribe", "psubscribe", "unsubscribe", "punsubscribe" }, true);
private readonly byte[][] map;
internal CommandMap(byte[][] map)
......@@ -68,6 +72,11 @@ internal CommandMap(byte[][] map)
/// <remarks>http://www.ideawu.com/ssdb/docs/redis-to-ssdb.html</remarks>
public static CommandMap SSDB { get { return ssdb; } }
/// <summary>
/// The commands available to <a href="Sentinel">http://redis.io/topics/sentinel</a>
/// </summary>
/// <remarks>http://redis.io/topics/sentinel</remarks>
public static CommandMap Sentinel { get { return sentinel; } }
/// <summary>
/// Create a new CommandMap, customizing some commands
......
......@@ -1129,7 +1129,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{
attemptsLeft--;
}
int standaloneCount = 0, clusterCount = 0;
int standaloneCount = 0, clusterCount = 0, sentinelCount = 0;
var endpoints = configuration.EndPoints;
LogLocked(log, "{0} unique nodes specified", endpoints.Count);
......@@ -1200,29 +1200,34 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
switch (server.ServerType)
// count the server types
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
standaloneCount++;
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
case ServerType.Sentinel:
sentinelCount++;
break;
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
clusterCount++;
if (server.IsSlave)
break;
}
// set the server UnselectableFlags and update masters list
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsSlave)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
}
else
{
masters.Add(server);
}
......@@ -1247,7 +1252,19 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (clusterCount == 0)
{
this.serverSelectionStrategy.ServerType = RawConfig.Proxy == Proxy.Twemproxy ? ServerType.Twemproxy : ServerType.Standalone;
// set the serverSelectionStrategy
if (RawConfig.Proxy == Proxy.Twemproxy)
{
this.serverSelectionStrategy.ServerType = ServerType.Twemproxy;
}
else if (standaloneCount == 0 && sentinelCount > 0)
{
this.serverSelectionStrategy.ServerType = ServerType.Sentinel;
}
else
{
this.serverSelectionStrategy.ServerType = ServerType.Standalone;
}
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters)
{
......@@ -1292,7 +1309,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
LogLocked(log, "");
LogLocked(log, stormLog);
}
healthy = standaloneCount != 0 || clusterCount != 0;
healthy = standaloneCount != 0 || clusterCount != 0 || sentinelCount != 0;
if (first && !healthy && attemptsLeft > 0)
{
LogLocked(log, "resetting failing connections to retry...");
......
......@@ -393,6 +393,101 @@ public partial interface IServer : IRedis
/// <returns>The server's current time.</returns>
/// <remarks>http://redis.io/commands/time</remarks>
Task<DateTime> TimeAsync(CommandFlags flags = CommandFlags.None);
#region Sentinel
/// <summary>
/// Returns the ip and port number of the master with that name.
/// If a failover is in progress or terminated successfully for this master it returns the address and port of the promoted slave.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>the master ip and port</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
EndPoint SentinelGetMasterAddressByName(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the ip and port number of the master with that name.
/// If a failover is in progress or terminated successfully for this master it returns the address and port of the promoted slave.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>the master ip and port</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Show the state and info of the specified master.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>the master state as KeyValuePairs</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Force a failover as if the master was not reachable, and without asking for agreement to other Sentinels
/// (however a new version of the configuration will be published so that the other Sentinels will update their configurations).
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>the master state as KeyValuePairs</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
Task<KeyValuePair<string, string>[]> SentinelMasterAsync(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Show a list of monitored masters and their state.
/// </summary>
/// <param name="flags"></param>
/// <returns>an array of master state KeyValuePair arrays</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
KeyValuePair<string, string>[][] SentinelMasters(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Show a list of monitored masters and their state.
/// </summary>
/// <param name="flags"></param>
/// <returns>an array of master state KeyValuePair arrays</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
Task<KeyValuePair<string, string>[][]> SentinelMastersAsync(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Show a list of slaves for this master, and their state.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>an array of slave state KeyValuePair arrays</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
KeyValuePair<string, string>[][] SentinelSlaves(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Show a list of slaves for this master, and their state.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>an array of slave state KeyValuePair arrays</returns>
/// <remarks>http://redis.io/topics/sentinel</remarks>
Task<KeyValuePair<string, string>[][]> SentinelSlavesAsync(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Force a failover as if the master was not reachable, and without asking for agreement to other Sentinels
/// (however a new version of the configuration will be published so that the other Sentinels will update their configurations).
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <remarks>http://redis.io/topics/sentinel</remarks>
void SentinelFailover(string serviceName, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Force a failover as if the master was not reachable, and without asking for agreement to other Sentinels
/// (however a new version of the configuration will be published so that the other Sentinels will update their configurations).
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <remarks>http://redis.io/topics/sentinel</remarks>
Task SentinelFailoverAsync(string serviceName, CommandFlags flags = CommandFlags.None);
#endregion
}
......
......@@ -505,6 +505,7 @@ internal static bool RequiresDatabase(RedisCommand command)
case RedisCommand.SYNC:
case RedisCommand.TIME:
case RedisCommand.UNSUBSCRIBE:
case RedisCommand.SENTINEL:
return false;
default:
return true;
......
......@@ -804,7 +804,7 @@ private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
if (!itemCount.TryGetInt64(out i64)) throw ExceptionFactory.ConnectionFailure(multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", bridge.ServerEndPoint);
int itemCountActual = checked((int)i64);
if (itemCountActual == 0) return RawResult.EmptyArray;
if (itemCountActual <= 0) return RawResult.EmptyArray;
var arr = new RawResult[itemCountActual];
for (int i = 0; i < itemCountActual; i++)
......
using System;
using System.Collections.Generic;
using System.Text;
namespace StackExchange.Redis
......@@ -231,6 +232,29 @@ internal RedisValue[] GetItemsAsValues()
}
}
// returns an array of RawResults
internal RawResult[] GetArrayOfRawResults()
{
if (arr == null)
{
return null;
}
else if (arr.Length == 0)
{
return new RawResult[0];
}
else
{
var rawResultArray = new RawResult[arr.Length];
for (int i = 0; i < arr.Length; i++)
{
var rawResult = (RawResult)arr.GetValue(i);
rawResultArray.SetValue(rawResult, i);
}
return rawResultArray;
}
}
internal string GetString()
{
if (arr == null) return null;
......
......@@ -122,6 +122,7 @@ enum RedisCommand
SDIFF,
SDIFFSTORE,
SELECT,
SENTINEL,
SET,
SETBIT,
SETEX,
......
......@@ -61,6 +61,19 @@ public static readonly RedisValue
// Sentinel Literals
MASTERS = "MASTERS",
MASTER = "MASTER",
SLAVES = "SLAVES",
GETMASTERADDRBYNAME = "GET-MASTER-ADDR-BY-NAME",
// RESET = "RESET",
FAILOVER = "FAILOVER",
// Sentinel Literals as of 2.8.4
MONITOR = "MONITOR",
REMOVE = "REMOVE",
// SET = "SET",
// DO NOT CHANGE CASE: these are configuration settings and MUST be as-is
databases = "databases",
no = "no",
......
......@@ -746,5 +746,69 @@ Message CreateMessage(long cursor, bool running)
}
}
}
#region Sentinel
public EndPoint SentinelGetMasterAddressByName(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.GETMASTERADDRBYNAME, (RedisValue)serviceName);
return ExecuteSync(msg, ResultProcessor.SentinelMasterEndpoint);
}
public Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.GETMASTERADDRBYNAME, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelMasterEndpoint);
}
public KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTER, (RedisValue)serviceName);
return ExecuteSync(msg, ResultProcessor.StringPairInterleaved);
}
public Task<KeyValuePair<string, string>[]> SentinelMasterAsync(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTER, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.StringPairInterleaved);
}
public void SentinelFailover(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.FAILOVER, (RedisValue)serviceName);
ExecuteSync(msg, ResultProcessor.DemandOK);
}
public Task SentinelFailoverAsync(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.FAILOVER, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.DemandOK);
}
public KeyValuePair<string, string>[][] SentinelMasters(CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTERS);
return ExecuteSync(msg, ResultProcessor.SentinelArrayOfArrays);
}
public Task<KeyValuePair<string, string>[][]> SentinelMastersAsync(CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTERS);
return ExecuteAsync(msg, ResultProcessor.SentinelArrayOfArrays);
}
public KeyValuePair<string, string>[][] SentinelSlaves(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SLAVES, (RedisValue)serviceName);
return ExecuteSync(msg, ResultProcessor.SentinelArrayOfArrays);
}
public Task<KeyValuePair<string, string>[][]> SentinelSlavesAsync(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SLAVES, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelArrayOfArrays);
}
#endregion
}
}
......@@ -77,6 +77,17 @@ public static readonly SortedSetEntryArrayProcessor
public static readonly ResultProcessor<string>
String = new StringProcessor(),
ClusterNodesRaw = new ClusterNodesRawProcessor();
#region Sentinel
public static readonly ResultProcessor<EndPoint>
SentinelMasterEndpoint = new SentinelGetMasterAddressByNameProcessor();
public static readonly ResultProcessor<KeyValuePair<string, string>[][]>
SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor();
#endregion
public static readonly ResultProcessor<KeyValuePair<string, string>[]>
StringPairInterleaved = new StringPairInterleavedProcessor();
public static readonly TimeSpanProcessor
......@@ -1153,6 +1164,68 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}
}
#region Sentinel
sealed class SentinelGetMasterAddressByNameProcessor : ResultProcessor<EndPoint>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItemsAsValues();
int port;
if (arr.Count() == 2 && int.TryParse(arr[1], out port))
{
SetResult(message, Format.ParseEndPoint(arr[0], port));
return true;
}
else if (arr.Count() == 0)
{
SetResult(message, null);
return true;
}
break;
}
return false;
}
}
sealed class SentinelArrayOfArraysProcessor : ResultProcessor<KeyValuePair<string, string>[][]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var innerProcessor = StringPairInterleaved as StringPairInterleavedProcessor;
if (innerProcessor == null)
{
return false;
}
switch (result.Type)
{
case ResultType.MultiBulk:
var arrayOfArrays = result.GetArrayOfRawResults();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Count()][];
for (int i = 0; i < arrayOfArrays.Count(); i++)
{
var rawInnerArray = arrayOfArrays[i];
KeyValuePair<string, string>[] kvpArray;
innerProcessor.TryParse(rawInnerArray, out kvpArray);
returnArray[i] = kvpArray;
}
SetResult(message, returnArray);
return true;
}
return false;
}
}
#endregion
}
internal abstract class ResultProcessor<T> : ResultProcessor
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment