Commit 8c62c8f7 authored by mgravell's avatar mgravell

add a new cluster kind for redis-cluster-proxy; in most ways it is similar to twemproxy

parent f798868d
......@@ -21,7 +21,7 @@ internal CommandMap(CommandBytes[] map)
public static CommandMap Default { get; } = CreateImpl(null, null);
/// <summary>
/// The commands available to <a href="twemproxy">https://github.com/twitter/twemproxy</a>
/// The commands available to <a href="https://github.com/twitter/twemproxy/">twemproxy</a>
/// </summary>
/// <remarks>https://github.com/twitter/twemproxy/blob/master/notes/redis.md</remarks>
public static CommandMap Twemproxy { get; } = CreateImpl(null, exclusions: new HashSet<RedisCommand>
......@@ -53,6 +53,49 @@ internal CommandMap(CommandBytes[] map)
RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME
});
/// <summary>
/// The commands available to <a href="https://github.com/RedisLabs/redis-cluster-proxy/">redis-cluster-proxy</a>
/// </summary>
/// <remarks>https://github.com/RedisLabs/redis-cluster-proxy/</remarks>
public static CommandMap RedisClusterProxy { get; } = CreateImpl(null, exclusions: new HashSet<RedisCommand>
{
// via "proxy command unsupported"
// RedisCommand.ACL,
RedisCommand.ASKING,
RedisCommand.CLIENT,
RedisCommand.CLUSTER,
RedisCommand.CONFIG,
RedisCommand.DEBUG,
// RedisCommand.HELLO,
RedisCommand.INFO,
RedisCommand.LATENCY,
RedisCommand.MEMORY,
RedisCommand.MIGRATE,
// RedisCommand.MODULE,
RedisCommand.MONITOR,
// RedisCommand.PFDEBUG,
// RedisCommand.PFSELFTEST,
RedisCommand.PSUBSCRIBE,
// RedisCommand.PSYNC,
RedisCommand.PUBLISH,
RedisCommand.PUBSUB,
RedisCommand.PUNSUBSCRIBE,
RedisCommand.READONLY,
RedisCommand.READWRITE,
// RedisCommand.REPLCONF,
// RedisCommand.REPLICAOF,
// RedisCommand.ROLE,
RedisCommand.SCRIPT,
RedisCommand.SHUTDOWN,
RedisCommand.SLAVEOF,
RedisCommand.SLOWLOG,
RedisCommand.SUBSCRIBE,
RedisCommand.SYNC,
RedisCommand.TIME,
RedisCommand.UNSUBSCRIBE,
// RedisCommand.WAIT,
});
/// <summary>
/// The commands available to <a href="ssdb">http://www.ideawu.com/ssdb/</a>
/// </summary>
......
......@@ -251,6 +251,8 @@ public CommandMap CommandMap
{
case Proxy.Twemproxy:
return CommandMap.Twemproxy;
case Proxy.RedisClusterProxy:
return CommandMap.RedisClusterProxy;
default:
return CommandMap.Default;
}
......@@ -481,9 +483,16 @@ public ConfigurationOptions Clone()
/// Resolve the default port for any endpoints that did not have a port explicitly specified
/// </summary>
public void SetDefaultPorts()
{
if (Proxy == Proxy.RedisClusterProxy && !Ssl)
{
EndPoints.SetDefaultPorts(7777); // default port for redis-cluster-proxy
}
else
{
EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379);
}
}
/// <summary>
/// Returns the effective configuration string for this configuration, including Redis credentials.
......
......@@ -1248,7 +1248,12 @@ internal long LastHeartbeatSecondsAgo
/// <param name="asyncState">The async state object to pass to the created <see cref="RedisSubscriber"/>.</param>
public ISubscriber GetSubscriber(object asyncState = null)
{
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
switch (RawConfig.Proxy)
{
case Proxy.Twemproxy:
case Proxy.RedisClusterProxy:
throw new NotSupportedException($"The pub/sub API is not available via {RawConfig.Proxy}");
}
return new RedisSubscriber(this, asyncState);
}
......@@ -1263,7 +1268,15 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
db = RawConfig.DefaultDatabase ?? 0;
if (db < 0) throw new ArgumentOutOfRangeException(nameof(db));
if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("Twemproxy only supports database 0");
if (db != 0)
{
switch (RawConfig.Proxy)
{
case Proxy.Twemproxy:
case Proxy.RedisClusterProxy:
throw new NotSupportedException($"{RawConfig.Proxy} only supports database 0");
}
}
// if there's no async-state, and the DB is suitable, we can hand out a re-used instance
return (asyncState == null && db <= MaxCachedDatabaseInstance)
......@@ -1318,7 +1331,12 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
if (endpoint == null) throw new ArgumentNullException(nameof(endpoint));
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy");
switch (RawConfig.Proxy)
{
case Proxy.RedisClusterProxy:
case Proxy.Twemproxy:
throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}");
}
var server = (ServerEndPoint)servers[endpoint];
if (server == null) throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint));
return new RedisServer(this, server, asyncState);
......@@ -1634,6 +1652,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
{
case ServerType.Twemproxy:
case ServerType.Standalone:
case ServerType.RedisClusterProxy:
standaloneCount++;
break;
case ServerType.Sentinel:
......@@ -1657,6 +1676,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.RedisClusterProxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
......@@ -1701,11 +1721,16 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
if (clusterCount == 0)
{
// set the serverSelectionStrategy
if (RawConfig.Proxy == Proxy.Twemproxy)
switch (RawConfig.Proxy)
{
case Proxy.Twemproxy:
ServerSelectionStrategy.ServerType = ServerType.Twemproxy;
}
else if (standaloneCount == 0 && sentinelCount > 0)
break;
case Proxy.RedisClusterProxy:
ServerSelectionStrategy.ServerType = ServerType.RedisClusterProxy;
break;
default:
if (standaloneCount == 0 && sentinelCount > 0)
{
ServerSelectionStrategy.ServerType = ServerType.Sentinel;
}
......@@ -1713,6 +1738,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
{
ServerSelectionStrategy.ServerType = ServerType.Standalone;
}
break;
}
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters)
{
......
......@@ -12,6 +12,10 @@ public enum Proxy
/// <summary>
/// Communication via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
Twemproxy,
/// <summary>
/// Communication via <a href="https://github.com/RedisLabs/redis-cluster-proxy">redis-cluster-proxy</a>
/// </summary>
RedisClusterProxy,
}
}
......@@ -20,6 +20,10 @@ public enum ServerType
/// <summary>
/// Distributed redis installation via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
Twemproxy,
/// <summary>
/// Distributed redis installation via <a href="https://github.com/RedisLabs/redis-cluster-proxy">redis-cluster-proxy</a>
/// </summary>
RedisClusterProxy,
}
}
......@@ -57,11 +57,19 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
serverType = ServerType.Standalone;
// overrides for twemproxy
if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy)
// overrides for twemproxy etc
switch (multiplexer.RawConfig.Proxy)
{
case Proxy.Twemproxy:
databases = 1;
serverType = ServerType.Twemproxy;
break;
case Proxy.RedisClusterProxy:
databases = 1;
serverType = ServerType.RedisClusterProxy;
if (version < RedisFeatures.v3_0_0) // cluster is at least 3.0
version = RedisFeatures.v3_0_0;
break;
}
}
......@@ -254,8 +262,10 @@ internal void AddScript(string script, byte[] hash)
internal void AutoConfigure(PhysicalConnection connection)
{
if (serverType == ServerType.Twemproxy)
switch (serverType)
{
case ServerType.Twemproxy:
case ServerType.RedisClusterProxy:
// don't try to detect configuration; all the config commands are disabled, and
// the fallback master/slave detection won't help
return;
......
using System;
using Xunit;
using Xunit.Abstractions;
namespace StackExchange.Redis.Tests
{
public class RedisClusterProxyTests : TestBase
{
public RedisClusterProxyTests(ITestOutputHelper output) : base(output) { }
protected override string GetConfiguration() => "127.0.0.1,proxy=RedisClusterProxy,version=5.0";
[Fact(Skip = "No CI build for this yet")]
public void CanConnectToClusterProxy()
{
using (var conn = Create())
{
var expected = Guid.NewGuid().ToString();
var db = conn.GetDatabase();
RedisKey key = Me();
db.StringSet(key, expected);
var actual = (string)db.StringGet(key);
Assert.Equal(expected, actual);
// check it knows that we're dealing with a cluster
var ex = Assert.Throws<NotSupportedException>(() => conn.GetServer("abc"));
Assert.Equal("The server API is not available via RedisClusterProxy", ex.Message);
ex = Assert.Throws<NotSupportedException>(() => conn.GetSubscriber("abc"));
Assert.Equal("The pub/sub API is not available via RedisClusterProxy", ex.Message);
// test a script
const string LUA_SCRIPT = "return redis.call('info')";
var name = (string)db.ScriptEvaluate(LUA_SCRIPT);
Log($"client: {name}");
// run it twice to check we didn't rely on script hashing (SCRIPT is disabled)
_ = db.ScriptEvaluate(LUA_SCRIPT);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment