Commit d80ca807 authored by Marc Gravell's avatar Marc Gravell

Twemproxy support

parent 87cb5f21
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
namespace StackExchange.Redis.Tests.Issues
{
[TestFixture]
public class Issue6 : TestBase
{
[Test]
public void ShouldWorkWithoutEchoOrPing()
{
using(var conn = Create(proxy: Proxy.Twemproxy))
{
Console.WriteLine("config: " + conn.Configuration);
var db = conn.GetDatabase();
var time = db.Ping();
Console.WriteLine("ping time: " + time);
}
}
}
}
...@@ -70,6 +70,7 @@ ...@@ -70,6 +70,7 @@
<Compile Include="Expiry.cs" /> <Compile Include="Expiry.cs" />
<Compile Include="Config.cs" /> <Compile Include="Config.cs" />
<Compile Include="FloatingPoint.cs" /> <Compile Include="FloatingPoint.cs" />
<Compile Include="Issues\Issue6.cs" />
<Compile Include="Keys.cs" /> <Compile Include="Keys.cs" />
<Compile Include="KeysAndValues.cs" /> <Compile Include="KeysAndValues.cs" />
<Compile Include="Lists.cs" /> <Compile Include="Lists.cs" />
......
...@@ -144,7 +144,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer) ...@@ -144,7 +144,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer)
string clientName = null, int? syncTimeout = null, bool? allowAdmin = null, int? keepAlive = null, string clientName = null, int? syncTimeout = null, bool? allowAdmin = null, int? keepAlive = null,
int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null, int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null,
bool fail = true, string[] disabledCommands = null, bool checkConnect = true, bool pause = true, string failMessage = null, bool fail = true, string[] disabledCommands = null, bool checkConnect = true, bool pause = true, string failMessage = null,
string channelPrefix = null, bool useSharedSocketManager = true) string channelPrefix = null, bool useSharedSocketManager = true, Proxy? proxy = null)
{ {
if(pause) Thread.Sleep(500); // get a lot of glitches when hammering new socket creations etc; pace it out a bit if(pause) Thread.Sleep(500); // get a lot of glitches when hammering new socket creations etc; pace it out a bit
string configuration = GetConfiguration(); string configuration = GetConfiguration();
...@@ -171,6 +171,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer) ...@@ -171,6 +171,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer)
if (allowAdmin != null) config.AllowAdmin = allowAdmin.Value; if (allowAdmin != null) config.AllowAdmin = allowAdmin.Value;
if (keepAlive != null) config.KeepAlive = keepAlive.Value; if (keepAlive != null) config.KeepAlive = keepAlive.Value;
if (connectTimeout != null) config.ConnectTimeout = connectTimeout.Value; if (connectTimeout != null) config.ConnectTimeout = connectTimeout.Value;
if (proxy != null) config.Proxy = proxy.Value;
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
var task = ConnectionMultiplexer.ConnectAsync(config, log ?? Console.Out); var task = ConnectionMultiplexer.ConnectAsync(config, log ?? Console.Out);
if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2)) if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2))
......
...@@ -9,7 +9,36 @@ namespace StackExchange.Redis ...@@ -9,7 +9,36 @@ namespace StackExchange.Redis
/// </summary> /// </summary>
public sealed class CommandMap public sealed class CommandMap
{ {
private static readonly CommandMap @default = CreateImpl(null); private static readonly CommandMap
@default = CreateImpl(null, null),
twemproxy = CreateImpl(null, exclusions: new HashSet<RedisCommand>
{
// see https://github.com/twitter/twemproxy/blob/master/notes/redis.md
RedisCommand.KEYS, RedisCommand.MIGRATE, RedisCommand.MOVE, RedisCommand.OBJECT, RedisCommand.RANDOMKEY,
RedisCommand.RENAME, RedisCommand.RENAMENX, RedisCommand.SORT, RedisCommand.SCAN,
RedisCommand.BITOP, RedisCommand.MSET, RedisCommand.MSETNX,
RedisCommand.HSCAN,
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
RedisCommand.SSCAN,
RedisCommand.ZSCAN,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
RedisCommand.SCRIPT,
RedisCommand.AUTH, RedisCommand.ECHO, RedisCommand.PING, RedisCommand.QUIT, RedisCommand.SELECT,
RedisCommand.BGREWRITEAOF, RedisCommand.BGSAVE, RedisCommand.CLIENT, RedisCommand.CLUSTER, RedisCommand.CONFIG, RedisCommand.DBSIZE,
RedisCommand.DEBUG, RedisCommand.FLUSHALL, RedisCommand.FLUSHDB, RedisCommand.INFO, RedisCommand.LASTSAVE, RedisCommand.MONITOR, RedisCommand.SAVE,
RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME
});
private readonly byte[][] map; private readonly byte[][] map;
internal CommandMap(byte[][] map) internal CommandMap(byte[][] map)
...@@ -21,6 +50,12 @@ internal CommandMap(byte[][] map) ...@@ -21,6 +50,12 @@ internal CommandMap(byte[][] map)
/// </summary> /// </summary>
public static CommandMap Default { get { return @default; } } public static CommandMap Default { get { return @default; } }
/// <summary>
/// The commands available to <a href="twemproxy">https://github.com/twitter/twemproxy</a>
/// </summary>
/// <remarks>https://github.com/twitter/twemproxy/blob/master/notes/redis.md</remarks>
public static CommandMap Twemproxy { get { return twemproxy; } }
/// <summary> /// <summary>
/// Create a new CommandMap, customizing some commands /// Create a new CommandMap, customizing some commands
/// </summary> /// </summary>
...@@ -28,8 +63,65 @@ public static CommandMap Create(Dictionary<string, string> overrides) ...@@ -28,8 +63,65 @@ public static CommandMap Create(Dictionary<string, string> overrides)
{ {
if (overrides == null || overrides.Count == 0) return Default; if (overrides == null || overrides.Count == 0) return Default;
return CreateImpl(overrides); if (ReferenceEquals(overrides.Comparer, StringComparer.OrdinalIgnoreCase) ||
ReferenceEquals(overrides.Comparer, StringComparer.InvariantCultureIgnoreCase))
{
// that's ok; we're happy with ordinal/invariant case-insensitive
// (but not culture-specific insensitive; completely untested)
}
else
{
// need case insensitive
overrides = new Dictionary<string, string>(overrides, StringComparer.OrdinalIgnoreCase);
}
return CreateImpl(overrides, null);
}
/// <summary>
/// Creates a CommandMap by specifying which commands are available or unavailable
/// </summary>
public static CommandMap Create(HashSet<string> commands, bool available = true)
{
if (available)
{
var dictionary = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
// nix everything
foreach (RedisCommand command in Enum.GetValues(typeof(RedisCommand)))
{
dictionary[command.ToString()] = null;
}
if (commands != null)
{
// then include (by removal) the things that are available
foreach (string command in commands)
{
dictionary.Remove(command);
}
}
return CreateImpl(dictionary, null);
}
else
{
HashSet<RedisCommand> exclusions = null;
if (commands != null)
{
// nix the things that are specified
foreach (var command in commands)
{
RedisCommand parsed;
if (Enum.TryParse(command, true, out parsed))
{
(exclusions ?? (exclusions = new HashSet<RedisCommand>())).Add(parsed);
}
}
}
if (exclusions == null || exclusions.Count == 0) return Default;
return CreateImpl(null, exclusions);
}
} }
/// <summary> /// <summary>
/// See Object.ToString() /// See Object.ToString()
/// </summary> /// </summary>
...@@ -69,33 +161,37 @@ internal bool IsAvailable(RedisCommand command) ...@@ -69,33 +161,37 @@ internal bool IsAvailable(RedisCommand command)
return map[(int)command] != null; return map[(int)command] != null;
} }
private static CommandMap CreateImpl(Dictionary<string, string> overrides) private static CommandMap CreateImpl(Dictionary<string, string> caseInsensitiveOverrides, HashSet<RedisCommand> exclusions)
{ {
RedisCommand[] values = (RedisCommand[])Enum.GetValues(typeof(RedisCommand)); var commands = (RedisCommand[])Enum.GetValues(typeof(RedisCommand));
byte[][] map = new byte[values.Length][]; byte[][] map = new byte[commands.Length][];
bool haveDelta = false; bool haveDelta = false;
for (int i = 0; i < values.Length; i++) for (int i = 0; i < commands.Length; i++)
{ {
int idx = (int)values[i]; int idx = (int)commands[i];
string name = values[i].ToString(), value = name; string name = commands[i].ToString(), value = name;
if (overrides != null) if (exclusions != null && exclusions.Contains(commands[i]))
{
map[idx] = null;
}
else
{ {
foreach (var pair in overrides) if (caseInsensitiveOverrides != null)
{ {
if (string.Equals(name, pair.Key, StringComparison.OrdinalIgnoreCase)) string tmp;
if (caseInsensitiveOverrides.TryGetValue(name, out tmp))
{ {
value = pair.Value; value = tmp;
break;
} }
} }
} if (value != name) haveDelta = true;
if (value != name) haveDelta = true;
haveDelta = true; haveDelta = true;
byte[] val = string.IsNullOrWhiteSpace(value) ? null : Encoding.UTF8.GetBytes(value); byte[] val = string.IsNullOrWhiteSpace(value) ? null : Encoding.UTF8.GetBytes(value);
map[idx] = val; map[idx] = val;
}
} }
if (!haveDelta && @default != null) return @default; if (!haveDelta && @default != null) return @default;
......
...@@ -76,7 +76,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -76,7 +76,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch(result.Type) switch(result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var parts = result.GetItems(); var parts = result.GetItems();
CommandTrace[] arr = new CommandTrace[parts.Length]; CommandTrace[] arr = new CommandTrace[parts.Length];
for (int i = 0; i < parts.Length; i++) for (int i = 0; i < parts.Length; i++)
......
...@@ -8,6 +8,22 @@ ...@@ -8,6 +8,22 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
/// <summary>
/// Specifies the proxy that is being used to communicate to redis
/// </summary>
public enum Proxy
{
/// <summary>
/// Direct communication to the redis server(s)
/// </summary>
None,
/// <summary>
/// Communication via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
}
/// <summary> /// <summary>
/// The options relevant to a set of redis connections /// The options relevant to a set of redis connections
/// </summary> /// </summary>
...@@ -20,7 +36,7 @@ public sealed class ConfigurationOptions : ICloneable ...@@ -20,7 +36,7 @@ public sealed class ConfigurationOptions : ICloneable
VersionPrefix = "version=", ConnectTimeoutPrefix = "connectTimeout=", PasswordPrefix = "password=", VersionPrefix = "version=", ConnectTimeoutPrefix = "connectTimeout=", PasswordPrefix = "password=",
TieBreakerPrefix = "tiebreaker=", WriteBufferPrefix = "writeBuffer=", SslHostPrefix = "sslHost=", TieBreakerPrefix = "tiebreaker=", WriteBufferPrefix = "writeBuffer=", SslHostPrefix = "sslHost=",
ConfigChannelPrefix = "configChannel=", AbortOnConnectFailPrefix = "abortConnect=", ResolveDnsPrefix = "resolveDns=", ConfigChannelPrefix = "configChannel=", AbortOnConnectFailPrefix = "abortConnect=", ResolveDnsPrefix = "resolveDns=",
ChannelPrefixPrefix = "channelPrefix="; ChannelPrefixPrefix = "channelPrefix=", ProxyPrefix = "proxy=";
private readonly EndPointCollection endpoints = new EndPointCollection(); private readonly EndPointCollection endpoints = new EndPointCollection();
...@@ -31,20 +47,13 @@ public sealed class ConfigurationOptions : ICloneable ...@@ -31,20 +47,13 @@ public sealed class ConfigurationOptions : ICloneable
private bool? allowAdmin, abortOnConnectFail, resolveDns; private bool? allowAdmin, abortOnConnectFail, resolveDns;
private Proxy? proxy;
private CommandMap commandMap;
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel; private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
private Version defaultVersion; private Version defaultVersion;
private int? keepAlive, syncTimeout, connectTimeout, writeBuffer; private int? keepAlive, syncTimeout, connectTimeout, writeBuffer;
/// <summary>
/// Create a new ConfigurationOptions instance
/// </summary>
public ConfigurationOptions()
{
CommandMap = CommandMap.Default;
}
/// <summary> /// <summary>
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note /// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
/// that this cannot be specified in the configuration-string. /// that this cannot be specified in the configuration-string.
...@@ -63,7 +72,12 @@ public ConfigurationOptions() ...@@ -63,7 +72,12 @@ public ConfigurationOptions()
/// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer /// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer
/// SocketManager is created automatically. /// SocketManager is created automatically.
/// </summary> /// </summary>
public SocketManager SocketManager { get;set; } public SocketManager SocketManager { get;set; }
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } }
/// <summary> /// <summary>
/// Indicates whether admin operations should be allowed /// Indicates whether admin operations should be allowed
...@@ -83,7 +97,24 @@ public ConfigurationOptions() ...@@ -83,7 +97,24 @@ public ConfigurationOptions()
/// <summary> /// <summary>
/// The command-map associated with this configuration /// The command-map associated with this configuration
/// </summary> /// </summary>
public CommandMap CommandMap { get; set; } public CommandMap CommandMap
{
get
{
if (commandMap != null) return commandMap;
switch(Proxy)
{
case Redis.Proxy.Twemproxy:
return CommandMap.Twemproxy;
default:
return CommandMap.Default;
}
}
set {
if (value == null) throw new ArgumentNullException("value");
commandMap = value;
}
}
/// <summary> /// <summary>
/// Channel to use for broadcasting and listening for configuration change notification /// Channel to use for broadcasting and listening for configuration change notification
...@@ -180,7 +211,8 @@ public ConfigurationOptions Clone() ...@@ -180,7 +211,8 @@ public ConfigurationOptions Clone()
configChannel = configChannel, configChannel = configChannel,
abortOnConnectFail = abortOnConnectFail, abortOnConnectFail = abortOnConnectFail,
resolveDns = resolveDns, resolveDns = resolveDns,
CommandMap = CommandMap, proxy = proxy,
commandMap = commandMap,
CertificateValidationCallback = CertificateValidationCallback, CertificateValidationCallback = CertificateValidationCallback,
CertificateSelectionCallback = CertificateSelectionCallback, CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(), ChannelPrefix = ChannelPrefix.Clone(),
...@@ -217,7 +249,8 @@ public override string ToString() ...@@ -217,7 +249,8 @@ public override string ToString()
Append(sb, AbortOnConnectFailPrefix, abortOnConnectFail); Append(sb, AbortOnConnectFailPrefix, abortOnConnectFail);
Append(sb, ResolveDnsPrefix, resolveDns); Append(sb, ResolveDnsPrefix, resolveDns);
Append(sb, ChannelPrefixPrefix, (string)ChannelPrefix); Append(sb, ChannelPrefixPrefix, (string)ChannelPrefix);
CommandMap.AppendDeltas(sb); Append(sb, ProxyPrefix, proxy);
if(commandMap != null) commandMap.AppendDeltas(sb);
return sb.ToString(); return sb.ToString();
} }
...@@ -301,9 +334,10 @@ void Clear() ...@@ -301,9 +334,10 @@ void Clear()
allowAdmin = abortOnConnectFail = resolveDns = null; allowAdmin = abortOnConnectFail = resolveDns = null;
defaultVersion = null; defaultVersion = null;
endpoints.Clear(); endpoints.Clear();
commandMap = null;
CertificateSelection = null; CertificateSelection = null;
CertificateValidation = null; CertificateValidation = null;
CommandMap = CommandMap.Default;
ChannelPrefix = default(RedisChannel); ChannelPrefix = default(RedisChannel);
SocketManager = null; SocketManager = null;
} }
...@@ -396,12 +430,16 @@ private void DoParse(string configuration) ...@@ -396,12 +430,16 @@ private void DoParse(string configuration)
{ {
int tmp; int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) WriteBuffer = tmp; if (Format.TryParseInt32(value.Trim(), out tmp)) WriteBuffer = tmp;
} else if(IsOption(option, ProxyPrefix))
{
Proxy tmp;
if (Enum.TryParse(option, true, out tmp)) Proxy = tmp;
} }
else if(option[0]=='$') else if(option[0]=='$')
{ {
RedisCommand cmd; RedisCommand cmd;
option = option.Substring(1, idx-1); option = option.Substring(1, idx-1);
if (Enum.TryParse<RedisCommand>(option, true, out cmd)) if (Enum.TryParse(option, true, out cmd))
{ {
if (map == null) map = new Dictionary<string, string>(StringComparer.InvariantCultureIgnoreCase); if (map == null) map = new Dictionary<string, string>(StringComparer.InvariantCultureIgnoreCase);
map[option] = value; map[option] = value;
...@@ -418,7 +456,10 @@ private void DoParse(string configuration) ...@@ -418,7 +456,10 @@ private void DoParse(string configuration)
if (ep != null && !endpoints.Contains(ep)) endpoints.Add(ep); if (ep != null && !endpoints.Contains(ep)) endpoints.Add(ep);
} }
} }
this.CommandMap = CommandMap.Create(map); if (map != null && map.Count != 0)
{
this.CommandMap = CommandMap.Create(map);
}
} }
} }
} }
......
...@@ -793,10 +793,15 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -793,10 +793,15 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
IncludeDetailInExceptions = true; IncludeDetailInExceptions = true;
this.configuration = configuration; this.configuration = configuration;
this.CommandMap = configuration.CommandMap;
this.CommandMap.AssertAvailable(RedisCommand.PING); var map = this.CommandMap = configuration.CommandMap;
this.CommandMap.AssertAvailable(RedisCommand.ECHO); if (!string.IsNullOrWhiteSpace(configuration.Password)) map.AssertAvailable(RedisCommand.AUTH);
if (!string.IsNullOrWhiteSpace(configuration.Password)) this.CommandMap.AssertAvailable(RedisCommand.AUTH);
if(!map.IsAvailable(RedisCommand.ECHO) && !map.IsAvailable(RedisCommand.PING) && !map.IsAvailable(RedisCommand.TIME))
{ // I mean really, give me a CHANCE! I need *something* to check the server is available to me...
// see also: SendTracer (matching logic)
map.AssertAvailable(RedisCommand.EXISTS);
}
PreserveAsyncOrder = true; // safest default PreserveAsyncOrder = true; // safest default
this.timeoutMilliseconds = configuration.SyncTimeout; this.timeoutMilliseconds = configuration.SyncTimeout;
...@@ -857,6 +862,7 @@ internal static long LastGlobalHeartbeatSecondsAgo ...@@ -857,6 +862,7 @@ internal static long LastGlobalHeartbeatSecondsAgo
/// </summary> /// </summary>
public ISubscriber GetSubscriber(object asyncState = null) public ISubscriber GetSubscriber(object asyncState = null)
{ {
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
return new RedisSubscriber(this, asyncState); return new RedisSubscriber(this, asyncState);
} }
/// <summary> /// <summary>
...@@ -865,6 +871,7 @@ public ISubscriber GetSubscriber(object asyncState = null) ...@@ -865,6 +871,7 @@ public ISubscriber GetSubscriber(object asyncState = null)
public IDatabase GetDatabase(int db = 0, object asyncState = null) public IDatabase GetDatabase(int db = 0, object asyncState = null)
{ {
if (db < 0) throw new ArgumentOutOfRangeException("db"); if (db < 0) throw new ArgumentOutOfRangeException("db");
if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("Twemproxy only supports database 0");
return new RedisDatabase(this, db, asyncState); return new RedisDatabase(this, db, asyncState);
} }
...@@ -897,7 +904,7 @@ public IServer GetServer(IPAddress host, int port) ...@@ -897,7 +904,7 @@ public IServer GetServer(IPAddress host, int port)
public IServer GetServer(EndPoint endpoint, object asyncState = null) public IServer GetServer(EndPoint endpoint, object asyncState = null)
{ {
if (endpoint == null) throw new ArgumentNullException("endpoint"); if (endpoint == null) throw new ArgumentNullException("endpoint");
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy");
var server = (ServerEndPoint)servers[endpoint]; var server = (ServerEndPoint)servers[endpoint];
if (server == null) throw new ArgumentException("The specified endpoint is not defined", "endpoint"); if (server == null) throw new ArgumentException("The specified endpoint is not defined", "endpoint");
return new RedisServer(this, server, asyncState); return new RedisServer(this, server, asyncState);
...@@ -1079,10 +1086,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1079,10 +1086,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
} }
foreach (var server in serverSnapshot) foreach (var server in serverSnapshot)
{ {
server.Activate(RedisCommand.ECHO); server.Activate(ConnectionType.Interactive);
if (this.CommandMap.IsAvailable(RedisCommand.SUBSCRIBE)) if (this.CommandMap.IsAvailable(RedisCommand.SUBSCRIBE))
{ {
server.Activate(RedisCommand.SUBSCRIBE); server.Activate(ConnectionType.Subscription);
} }
} }
} }
...@@ -1110,8 +1117,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1110,8 +1117,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (reconfigureAll && server.IsConnected) if (reconfigureAll && server.IsConnected)
{ {
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint)); LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
// note that these will be processed synchronously *BEFORE* the PONG is processed, // note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the PONG // so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null); server.AutoConfigure(null);
} }
available[i] = server.SendTracer(); available[i] = server.SendTracer();
...@@ -1159,6 +1166,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1159,6 +1166,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
switch (server.ServerType) switch (server.ServerType)
{ {
case ServerType.Twemproxy:
case ServerType.Standalone: case ServerType.Standalone:
servers[i].ClearUnselectable(UnselectableFlags.ServerType); servers[i].ClearUnselectable(UnselectableFlags.ServerType);
standaloneCount++; standaloneCount++;
...@@ -1202,7 +1210,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1202,7 +1210,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (clusterCount == 0) if (clusterCount == 0)
{ {
this.serverSelectionStrategy.ServerType = ServerType.Standalone; this.serverSelectionStrategy.ServerType = RawConfig.Proxy == Proxy.Twemproxy ? ServerType.Twemproxy : ServerType.Standalone;
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait(); var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters) foreach (var master in masters)
{ {
...@@ -1437,12 +1445,16 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce ...@@ -1437,12 +1445,16 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
throw ExceptionFactory.MasterOnly(IncludeDetailInExceptions, message.Command, message, server); throw ExceptionFactory.MasterOnly(IncludeDetailInExceptions, message.Command, message, server);
} }
if (server.ServerType == ServerType.Cluster) switch(server.ServerType)
{ {
if (message.GetHashSlot(ServerSelectionStrategy) == ServerSelectionStrategy.MultipleSlots) case ServerType.Cluster:
{ case ServerType.Twemproxy: // strictly speaking twemproxy uses a different hashing algo, but the hash-tag behavior is
throw ExceptionFactory.MultiSlot(IncludeDetailInExceptions, message); // the same, so this does a pretty good job of spotting illegal commands before sending them
} if (message.GetHashSlot(ServerSelectionStrategy) == ServerSelectionStrategy.MultipleSlots)
{
throw ExceptionFactory.MultiSlot(IncludeDetailInExceptions, message);
}
break;
} }
if (!server.IsConnected) if (!server.IsConnected)
{ {
......
...@@ -278,6 +278,9 @@ public static bool IsMasterOnly(RedisCommand command) ...@@ -278,6 +278,9 @@ public static bool IsMasterOnly(RedisCommand command)
{ {
case RedisCommand.APPEND: case RedisCommand.APPEND:
case RedisCommand.BITOP: case RedisCommand.BITOP:
case RedisCommand.BLPOP:
case RedisCommand.BRPOP:
case RedisCommand.BRPOPLPUSH:
case RedisCommand.DECR: case RedisCommand.DECR:
case RedisCommand.DECRBY: case RedisCommand.DECRBY:
case RedisCommand.DEL: case RedisCommand.DEL:
...@@ -305,6 +308,7 @@ public static bool IsMasterOnly(RedisCommand command) ...@@ -305,6 +308,7 @@ public static bool IsMasterOnly(RedisCommand command)
case RedisCommand.MIGRATE: case RedisCommand.MIGRATE:
case RedisCommand.MOVE: case RedisCommand.MOVE:
case RedisCommand.MSET: case RedisCommand.MSET:
case RedisCommand.MSETNX:
case RedisCommand.PERSIST: case RedisCommand.PERSIST:
case RedisCommand.PEXPIRE: case RedisCommand.PEXPIRE:
case RedisCommand.PEXPIREAT: case RedisCommand.PEXPIREAT:
......
...@@ -236,11 +236,8 @@ internal void KeepAlive() ...@@ -236,11 +236,8 @@ internal void KeepAlive()
switch (connectionType) switch (connectionType)
{ {
case ConnectionType.Interactive: case ConnectionType.Interactive:
if (commandMap.IsAvailable(RedisCommand.PING)) msg = serverEndPoint.GetTracerMessage(false);
{ msg.SetSource(ResultProcessor.Tracer, null);
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING);
msg.SetSource(ResultProcessor.DemandPONG, null);
}
break; break;
case ConnectionType.Subscription: case ConnectionType.Subscription:
if (commandMap.IsAvailable(RedisCommand.UNSUBSCRIBE)) if (commandMap.IsAvailable(RedisCommand.UNSUBSCRIBE))
......
...@@ -294,7 +294,7 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message) ...@@ -294,7 +294,7 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
var serverEndpoint = bridge.ServerEndPoint; var serverEndpoint = bridge.ServerEndPoint;
int available = serverEndpoint.Databases; int available = serverEndpoint.Databases;
if (!serverEndpoint.HasDatabases) // only db0 is available on cluster if (!serverEndpoint.HasDatabases) // only db0 is available on cluster/twemproxy
{ {
if (targetDatabase != 0) if (targetDatabase != 0)
{ // should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory { // should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory
...@@ -538,7 +538,7 @@ int EnsureSpaceAndComputeBytesToRead() ...@@ -538,7 +538,7 @@ int EnsureSpaceAndComputeBytesToRead()
void MatchResult(RawResult result) void MatchResult(RawResult result)
{ {
// check to see if it could be an out-of-band pubsub message // check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.Array) if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk)
{ // out of band message does not match to a queued message { // out of band message does not match to a queued message
var items = result.GetItems(); var items = result.GetItems();
if (items.Length >= 3 && items[0].Assert(message)) if (items.Length >= 3 && items[0].Assert(message))
......
...@@ -34,7 +34,7 @@ public RawResult(ResultType resultType, byte[] buffer, int offset, int count) ...@@ -34,7 +34,7 @@ public RawResult(ResultType resultType, byte[] buffer, int offset, int count)
public RawResult(RawResult[] arr) public RawResult(RawResult[] arr)
{ {
if (arr == null) throw new ArgumentNullException("arr"); if (arr == null) throw new ArgumentNullException("arr");
this.resultType = ResultType.Array; this.resultType = ResultType.MultiBulk;
this.offset = 0; this.offset = 0;
this.count = arr.Length; this.count = arr.Length;
this.arr = arr; this.arr = arr;
...@@ -59,7 +59,7 @@ public override string ToString() ...@@ -59,7 +59,7 @@ public override string ToString()
return string.Format("{0}: {1}", resultType, GetString()); return string.Format("{0}: {1}", resultType, GetString());
case ResultType.BulkString: case ResultType.BulkString:
return string.Format("{0}: {1} bytes", resultType, count); return string.Format("{0}: {1} bytes", resultType, count);
case ResultType.Array: case ResultType.MultiBulk:
return string.Format("{0}: {1} items", resultType, count); return string.Format("{0}: {1} items", resultType, count);
default: default:
return "(unknown)"; return "(unknown)";
......
...@@ -16,16 +16,29 @@ internal RedisBase(ConnectionMultiplexer multiplexer, object asyncState) ...@@ -16,16 +16,29 @@ internal RedisBase(ConnectionMultiplexer multiplexer, object asyncState)
this.asyncState = asyncState; this.asyncState = asyncState;
} }
private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlags flags)
{
// do the best we can with available commands
var map = multiplexer.CommandMap;
if(map.IsAvailable(RedisCommand.PING))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.PING);
if(map.IsAvailable(RedisCommand.TIME))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.TIME);
if (map.IsAvailable(RedisCommand.ECHO))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.ECHO, RedisLiterals.PING);
// as our fallback, we'll do something odd... we'll treat a key like a value, out of sheer desperation
// note: this usually means: twemproxy - in which case we're fine anyway, since the proxy does the routing
return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
}
public virtual TimeSpan Ping(CommandFlags flags = CommandFlags.None) public virtual TimeSpan Ping(CommandFlags flags = CommandFlags.None)
{ {
var msg = ResultProcessor.TimingProcessor.CreateMessage(flags, RedisCommand.PING); var msg = GetTimerMessage(flags);
return ExecuteSync(msg, ResultProcessor.ResponseTimer); return ExecuteSync(msg, ResultProcessor.ResponseTimer);
} }
public virtual Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None) public virtual Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
{ {
var msg = ResultProcessor.TimingProcessor.CreateMessage(flags, RedisCommand.PING); var msg = GetTimerMessage(flags);
return ExecuteAsync(msg, ResultProcessor.ResponseTimer); return ExecuteAsync(msg, ResultProcessor.ResponseTimer);
} }
......
...@@ -11,6 +11,9 @@ enum RedisCommand ...@@ -11,6 +11,9 @@ enum RedisCommand
BITCOUNT, BITCOUNT,
BITOP, BITOP,
BITPOS, BITPOS,
BLPOP,
BRPOP,
BRPOPLPUSH,
CLIENT, CLIENT,
CLUSTER, CLUSTER,
...@@ -50,6 +53,7 @@ enum RedisCommand ...@@ -50,6 +53,7 @@ enum RedisCommand
HLEN, HLEN,
HMGET, HMGET,
HMSET, HMSET,
HSCAN,
HSET, HSET,
HSETNX, HSETNX,
HVALS, HVALS,
...@@ -78,8 +82,11 @@ enum RedisCommand ...@@ -78,8 +82,11 @@ enum RedisCommand
MONITOR, MONITOR,
MOVE, MOVE,
MSET, MSET,
MSETNX,
MULTI, MULTI,
OBJECT,
PERSIST, PERSIST,
PEXPIRE, PEXPIRE,
PEXPIREAT, PEXPIREAT,
......
...@@ -1688,6 +1688,7 @@ private Message GetStringSetMessage(KeyValuePair<RedisKey, RedisValue>[] values, ...@@ -1688,6 +1688,7 @@ private Message GetStringSetMessage(KeyValuePair<RedisKey, RedisValue>[] values,
case 0: return null; case 0: return null;
case 1: return GetStringSetMessage(values[0].Key, values[0].Value, null, when, flags); case 1: return GetStringSetMessage(values[0].Key, values[0].Value, null, when, flags);
default: default:
WhenAlwaysOrNotExists(when);
int slot = ServerSelectionStrategy.NoSlot, offset = 0; int slot = ServerSelectionStrategy.NoSlot, offset = 0;
var args = new RedisValue[values.Length * 2]; var args = new RedisValue[values.Length * 2];
var serverSelectionStrategy = multiplexer.ServerSelectionStrategy; var serverSelectionStrategy = multiplexer.ServerSelectionStrategy;
...@@ -1697,7 +1698,7 @@ private Message GetStringSetMessage(KeyValuePair<RedisKey, RedisValue>[] values, ...@@ -1697,7 +1698,7 @@ private Message GetStringSetMessage(KeyValuePair<RedisKey, RedisValue>[] values,
args[offset++] = values[i].Value; args[offset++] = values[i].Value;
slot = serverSelectionStrategy.CombineSlot(slot, values[i].Key); slot = serverSelectionStrategy.CombineSlot(slot, values[i].Key);
} }
return Message.CreateInSlot(Db, slot, flags, RedisCommand.MSET, args); return Message.CreateInSlot(Db, slot, flags, when == When.NotExists ? RedisCommand.MSETNX : RedisCommand.MSET, args);
} }
} }
Message GetStringSetMessage(RedisKey key, RedisValue value, TimeSpan? expiry = null, When when = When.Always, CommandFlags flags = CommandFlags.None) Message GetStringSetMessage(RedisKey key, RedisValue value, TimeSpan? expiry = null, When when = When.Always, CommandFlags flags = CommandFlags.None)
...@@ -1780,10 +1781,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1780,10 +1781,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch (result.Type) switch (result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItems(); var arr = result.GetItems();
long i64; long i64;
if (arr.Length == 2 && arr[1].Type == ResultType.Array && arr[0].TryGetInt64(out i64)) if (arr.Length == 2 && arr[1].Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
{ {
var sscanResult = new SetScanResult(i64, arr[1].GetItemsAsValues()); var sscanResult = new SetScanResult(i64, arr[1].GetItemsAsValues());
SetResult(message, sscanResult); SetResult(message, sscanResult);
......
...@@ -51,6 +51,7 @@ public static readonly RedisValue ...@@ -51,6 +51,7 @@ public static readonly RedisValue
LOAD = "LOAD", LOAD = "LOAD",
EXISTS = "EXISTS", EXISTS = "EXISTS",
FLUSH = "FLUSH", FLUSH = "FLUSH",
PING = "PING",
// DO NOT CHANGE CASE: these are configuration settings and MUST be as-is // DO NOT CHANGE CASE: these are configuration settings and MUST be as-is
databases = "databases", databases = "databases",
...@@ -62,8 +63,9 @@ public static readonly RedisValue ...@@ -62,8 +63,9 @@ public static readonly RedisValue
server = "server", server = "server",
Wildcard = "*"; Wildcard = "*";
public static readonly byte[] OK = Encoding.UTF8.GetBytes("OK"); public static readonly byte[] BytesOK = Encoding.UTF8.GetBytes("OK");
public static readonly byte[] ByteWildcard = { (byte)'*' }; public static readonly byte[] ByteWildcard = { (byte)'*' };
public static readonly byte[] BytesPONG = Encoding.UTF8.GetBytes("PONG");
internal static RedisValue Get(Bitwise operation) internal static RedisValue Get(Bitwise operation)
......
...@@ -20,7 +20,7 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r ...@@ -20,7 +20,7 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r
case ResultType.SimpleString: case ResultType.SimpleString:
case ResultType.BulkString: case ResultType.BulkString:
return new SingleRedisResult(result.AsRedisValue()); return new SingleRedisResult(result.AsRedisValue());
case ResultType.Array: case ResultType.MultiBulk:
var items = result.GetItems(); var items = result.GetItems();
var arr = new RedisResult[items.Length]; var arr = new RedisResult[items.Length];
for (int i = 0; i < arr.Length; i++) for (int i = 0; i < arr.Length; i++)
......
...@@ -547,10 +547,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -547,10 +547,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch (result.Type) switch (result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItems(); var arr = result.GetItems();
long i64; long i64;
if (arr.Length == 2 && arr[1].Type == ResultType.Array && arr[0].TryGetInt64(out i64)) if (arr.Length == 2 && arr[1].Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
{ {
var keysResult = new KeysScanResult(i64, arr[1].GetItemsAsKeys()); var keysResult = new KeysScanResult(i64, arr[1].GetItemsAsKeys());
SetResult(message, keysResult); SetResult(message, keysResult);
......
...@@ -17,7 +17,7 @@ public override TimeSpan Ping(CommandFlags flags = CommandFlags.None) ...@@ -17,7 +17,7 @@ public override TimeSpan Ping(CommandFlags flags = CommandFlags.None)
{ {
// can't use regular PING, but we can unsubscribe from something random that we weren't even subscribed to... // can't use regular PING, but we can unsubscribe from something random that we weren't even subscribed to...
RedisValue channel = Guid.NewGuid().ToByteArray(); RedisValue channel = Guid.NewGuid().ToByteArray();
var msg = ResultProcessor.TimingProcessor.CreateMessage(flags, RedisCommand.UNSUBSCRIBE, channel); var msg = ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.UNSUBSCRIBE, channel);
return ExecuteSync(msg, ResultProcessor.ResponseTimer); return ExecuteSync(msg, ResultProcessor.ResponseTimer);
} }
...@@ -25,7 +25,7 @@ public override Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None) ...@@ -25,7 +25,7 @@ public override Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
{ {
// can't use regular PING, but we can unsubscribe from something random that we weren't even subscribed to... // can't use regular PING, but we can unsubscribe from something random that we weren't even subscribed to...
RedisValue channel = Guid.NewGuid().ToByteArray(); RedisValue channel = Guid.NewGuid().ToByteArray();
var msg = ResultProcessor.TimingProcessor.CreateMessage(flags, RedisCommand.UNSUBSCRIBE, channel); var msg = ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.UNSUBSCRIBE, channel);
return ExecuteAsync(msg, ResultProcessor.ResponseTimer); return ExecuteAsync(msg, ResultProcessor.ResponseTimer);
} }
......
...@@ -395,14 +395,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -395,14 +395,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Type) switch (result.Type)
{ {
case ResultType.SimpleString: case ResultType.SimpleString:
if (tran.IsAborted && result.Assert(RedisLiterals.OK)) if (tran.IsAborted && result.Assert(RedisLiterals.BytesOK))
{ {
connection.Multiplexer.Trace("Acknowledging UNWATCH (aborted electively)"); connection.Multiplexer.Trace("Acknowledging UNWATCH (aborted electively)");
SetResult(message, false); SetResult(message, false);
return true; return true;
} }
break; break;
case ResultType.Array: case ResultType.MultiBulk:
if (!tran.IsAborted) if (!tran.IsAborted)
{ {
var arr = result.GetItems(); var arr = result.GetItems();
......
...@@ -12,13 +12,13 @@ abstract class ResultProcessor ...@@ -12,13 +12,13 @@ abstract class ResultProcessor
{ {
public static readonly ResultProcessor<bool> public static readonly ResultProcessor<bool>
Boolean = new BooleanProcessor(), Boolean = new BooleanProcessor(),
DemandOK = new ExpectBasicStringProcessor(RedisLiterals.OK), DemandOK = new ExpectBasicStringProcessor(RedisLiterals.BytesOK),
DemandPONG = new ExpectBasicStringProcessor("PONG"), DemandPONG = new ExpectBasicStringProcessor("PONG"),
DemandZeroOrOne = new DemandZeroOrOneProcessor(), DemandZeroOrOne = new DemandZeroOrOneProcessor(),
AutoConfigure = new AutoConfigureProcessor(), AutoConfigure = new AutoConfigureProcessor(),
EstablishConnection = new EstablishConnectionProcessor(),
TrackSubscriptions = new TrackSubscriptionsProcessor(), TrackSubscriptions = new TrackSubscriptionsProcessor(),
Tracer = new TracerProcessor(); Tracer = new TracerProcessor(false),
EstablishConnection = new TracerProcessor(true);
public static readonly ResultProcessor<double> public static readonly ResultProcessor<double>
Double = new DoubleProcessor(); Double = new DoubleProcessor();
...@@ -176,7 +176,7 @@ public sealed class TrackSubscriptionsProcessor : ResultProcessor<bool> ...@@ -176,7 +176,7 @@ public sealed class TrackSubscriptionsProcessor : ResultProcessor<bool>
{ {
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
if(result.Type == ResultType.Array) if(result.Type == ResultType.MultiBulk)
{ {
var items = result.GetItems(); var items = result.GetItems();
long count; long count;
...@@ -192,9 +192,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -192,9 +192,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
public sealed class TimingProcessor : ResultProcessor<TimeSpan> public sealed class TimingProcessor : ResultProcessor<TimeSpan>
{ {
public static Message CreateMessage(CommandFlags flags, RedisCommand command, RedisValue value = default(RedisValue)) public static TimerMessage CreateMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value = default(RedisValue))
{ {
return new TimerMessage(flags, command, value); return new TimerMessage(db, flags, command, value);
} }
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
...@@ -203,7 +203,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -203,7 +203,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false; return false;
} }
else else
{ { // don't check the actual reply; there are multiple ways of constructing
// a timing message, and we don't actually care about what approach was used
var timingMessage = message as TimerMessage; var timingMessage = message as TimerMessage;
TimeSpan duration; TimeSpan duration;
if (timingMessage != null) if (timingMessage != null)
...@@ -221,12 +222,12 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -221,12 +222,12 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
} }
sealed class TimerMessage : Message internal sealed class TimerMessage : Message
{ {
public readonly Stopwatch Watch; public readonly Stopwatch Watch;
private readonly RedisValue value; private readonly RedisValue value;
public TimerMessage(CommandFlags flags, RedisCommand command, RedisValue value) public TimerMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value)
: base(-1, flags, command) : base(db, flags, command)
{ {
this.Watch = Stopwatch.StartNew(); this.Watch = Stopwatch.StartNew();
this.value = value; this.value = value;
...@@ -356,7 +357,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -356,7 +357,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
SetResult(message, true); SetResult(message, true);
return true; return true;
case ResultType.Array: case ResultType.MultiBulk:
if (message != null && message.Command == RedisCommand.CONFIG) if (message != null && message.Command == RedisCommand.CONFIG)
{ {
var arr = result.GetItems(); var arr = result.GetItems();
...@@ -489,7 +490,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -489,7 +490,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Type) switch (result.Type)
{ {
case ResultType.SimpleString: case ResultType.SimpleString:
if(result.Assert(RedisLiterals.OK)) if(result.Assert(RedisLiterals.BytesOK))
{ {
SetResult(message, true); SetResult(message, true);
} else } else
...@@ -501,7 +502,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -501,7 +502,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.BulkString: case ResultType.BulkString:
SetResult(message, result.GetBoolean()); SetResult(message, result.GetBoolean());
return true; return true;
case ResultType.Array: case ResultType.MultiBulk:
var items = result.GetItems(); var items = result.GetItems();
if(items.Length == 1) if(items.Length == 1)
{ // treat an array of 1 like a single reply (for example, SCRIPT EXISTS) { // treat an array of 1 like a single reply (for example, SCRIPT EXISTS)
...@@ -619,7 +620,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -619,7 +620,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return true; return true;
} }
break; break;
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItems(); var arr = result.GetItems();
switch(arr.Length) switch(arr.Length)
{ {
...@@ -646,45 +647,6 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -646,45 +647,6 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
} }
sealed class EstablishConnectionProcessor : ResultProcessor<bool>
{
static readonly byte[] expected = Encoding.UTF8.GetBytes("PONG"), authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
if (result.IsError)
{
if (result.Assert(authFail))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure);
} else if(result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
}
}
return final;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if(result.Assert(expected))
{
connection.Bridge.OnFullyEstablished(connection);
SetResult(message, true);
return true;
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
return false;
}
}
}
sealed class ExpectBasicStringProcessor : ResultProcessor<bool> sealed class ExpectBasicStringProcessor : ResultProcessor<bool>
{ {
private readonly byte[] expected; private readonly byte[] expected;
...@@ -797,7 +759,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -797,7 +759,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch (result.Type) switch (result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItemsAsKeys(); var arr = result.GetItemsAsKeys();
SetResult(message, arr); SetResult(message, arr);
return true; return true;
...@@ -846,7 +808,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -846,7 +808,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch (result.Type) switch (result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItemsAsValues(); var arr = result.GetItemsAsValues();
SetResult(message, arr); SetResult(message, arr);
...@@ -861,7 +823,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -861,7 +823,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch (result.Type) switch (result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItems(); var arr = result.GetItems();
RedisChannel[] final; RedisChannel[] final;
if (arr.Length == 0) if (arr.Length == 0)
...@@ -982,7 +944,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -982,7 +944,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
switch (result.Type) switch (result.Type)
{ {
case ResultType.Array: case ResultType.MultiBulk:
var arr = result.GetItems(); var arr = result.GetItems();
int count = arr.Length / 2; int count = arr.Length / 2;
KeyValuePair<TKey, TValue>[] pairs; KeyValuePair<TKey, TValue>[] pairs;
...@@ -1013,7 +975,7 @@ private class SortedSetWithScoresProcessor : ResultProcessor<KeyValuePair<RedisV ...@@ -1013,7 +975,7 @@ private class SortedSetWithScoresProcessor : ResultProcessor<KeyValuePair<RedisV
{ {
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
if(result.Type == ResultType.Array) if(result.Type == ResultType.MultiBulk)
{ {
var items = result.GetItems(); var items = result.GetItems();
var arr = new KeyValuePair<RedisValue, double>[items.Length / 2]; var arr = new KeyValuePair<RedisValue, double>[items.Length / 2];
...@@ -1034,11 +996,67 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1034,11 +996,67 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
private class TracerProcessor : ResultProcessor<bool> private class TracerProcessor : ResultProcessor<bool>
{ {
private readonly bool establishConnection;
public TracerProcessor(bool establishConnection)
{
this.establishConnection = establishConnection;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
bool happy = result.Type == ResultType.BulkString && result.Assert(connection.Multiplexer.UniqueId); bool happy;
SetResult(message, happy); switch(message.Command)
return true; // we'll always acknowledge that we saw a non-error response {
case RedisCommand.ECHO:
happy = result.Type == ResultType.BulkString && (!establishConnection || result.Assert(connection.Multiplexer.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.Assert(RedisLiterals.BytesPONG);
break;
case RedisCommand.TIME:
happy = result.Type == ResultType.MultiBulk && result.GetItems().Length == 2;
break;
case RedisCommand.EXISTS:
happy = result.Type == ResultType.Integer;
break;
default:
happy = true;
break;
}
if(happy)
{
if(establishConnection) connection.Bridge.OnFullyEstablished(connection);
SetResult(message, happy);
return true;
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
return false;
}
}
static readonly byte[] expected = RedisLiterals.BytesPONG, authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
if (result.IsError)
{
if (result.Assert(authFail))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure);
}
else if (result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
}
}
return final;
} }
} }
......
...@@ -7,6 +7,6 @@ internal enum ResultType : byte ...@@ -7,6 +7,6 @@ internal enum ResultType : byte
Error = 2, Error = 2,
Integer = 3, Integer = 3,
BulkString = 4, BulkString = 4,
Array = 5 MultiBulk = 5
} }
} }
...@@ -58,6 +58,13 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) ...@@ -58,6 +58,13 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
writeEverySeconds = config.KeepAlive; writeEverySeconds = config.KeepAlive;
interactive = CreateBridge(ConnectionType.Interactive); interactive = CreateBridge(ConnectionType.Interactive);
serverType = ServerType.Standalone; serverType = ServerType.Standalone;
// overrides for twemproxy
if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy)
{
databases = 1;
serverType = ServerType.Twemproxy;
}
} }
public ClusterConfiguration ClusterConfiguration { get; private set; } public ClusterConfiguration ClusterConfiguration { get; private set; }
...@@ -126,6 +133,18 @@ public void Dispose() ...@@ -126,6 +133,18 @@ public void Dispose()
if (tmp != null) tmp.Dispose(); if (tmp != null) tmp.Dispose();
} }
public PhysicalBridge GetBridge(ConnectionType type, bool create = true)
{
if (isDisposed) return null;
switch (type)
{
case ConnectionType.Interactive:
return interactive ?? (create ? interactive = CreateBridge(ConnectionType.Interactive) : null);
case ConnectionType.Subscription:
return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription) : null);
}
return null;
}
public PhysicalBridge GetBridge(RedisCommand command, bool create = true) public PhysicalBridge GetBridge(RedisCommand command, bool create = true)
{ {
if (isDisposed) return null; if (isDisposed) return null;
...@@ -195,18 +214,26 @@ public bool TryEnqueue(Message message) ...@@ -195,18 +214,26 @@ public bool TryEnqueue(Message message)
return bridge != null && bridge.TryEnqueue(message, isSlave); return bridge != null && bridge.TryEnqueue(message, isSlave);
} }
internal void Activate(RedisCommand command) internal void Activate(ConnectionType type)
{ {
GetBridge(command, true); GetBridge(type, true);
} }
internal void AutoConfigure(PhysicalConnection connection) internal void AutoConfigure(PhysicalConnection connection)
{ {
if (serverType == ServerType.Twemproxy)
{
// don't try to detect configuration; all the config commands are disabled, and
// the fallback master/slave detection won't help
return;
}
var commandMap = multiplexer.CommandMap; var commandMap = multiplexer.CommandMap;
const CommandFlags flags = CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect; const CommandFlags flags = CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect;
var features = GetFeatures(); var features = GetFeatures();
Message msg; Message msg;
if (commandMap.IsAvailable(RedisCommand.CONFIG)) if (commandMap.IsAvailable(RedisCommand.CONFIG))
{ {
if (multiplexer.RawConfig.KeepAlive <= 0) if (multiplexer.RawConfig.KeepAlive <= 0)
...@@ -261,7 +288,7 @@ internal Task Close() ...@@ -261,7 +288,7 @@ internal Task Close()
{ {
var tmp = interactive; var tmp = interactive;
Task result; Task result;
if (tmp == null || !tmp.IsConnected) if (tmp == null || !tmp.IsConnected || !multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT))
{ {
result = CompletedTask<bool>.Default(null); result = CompletedTask<bool>.Default(null);
} }
...@@ -466,10 +493,8 @@ void Handshake(PhysicalConnection connection) ...@@ -466,10 +493,8 @@ void Handshake(PhysicalConnection connection)
multiplexer.Trace("Auto-configure..."); multiplexer.Trace("Auto-configure...");
AutoConfigure(connection); AutoConfigure(connection);
} }
multiplexer.Trace("Sending critical ping"); multiplexer.Trace("Sending critical tracer");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING); WriteDirectOrQueueFireAndForget(connection, GetTracerMessage(true), ResultProcessor.EstablishConnection);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.EstablishConnection);
// note: this **must** be the last thing on the subscription handshake, because after this // note: this **must** be the last thing on the subscription handshake, because after this
...@@ -499,9 +524,42 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller ...@@ -499,9 +524,42 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller
internal Task<bool> SendTracer() internal Task<bool> SendTracer()
{ {
var msg = Message.Create(-1, CommandFlags.NoRedirect | CommandFlags.HighPriority, RedisCommand.ECHO,(RedisValue) multiplexer.UniqueId); return QueueDirectAsync(GetTracerMessage(false), ResultProcessor.Tracer);
}
internal Message GetTracerMessage(bool assertIdentity)
{
// different configurations block certain commands, as can ad-hoc local configurations, so
// we'll do the best with what we have available.
// note that the muxer-ctor asserts that one of ECHO, PING, TIME of GET is available
// see also: TracerProcessor
var map = multiplexer.CommandMap;
Message msg;
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.FireAndForget;
if (assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId);
}
else if (map.IsAvailable(RedisCommand.PING))
{
msg = Message.Create(-1, flags, RedisCommand.PING);
}
else if (map.IsAvailable(RedisCommand.TIME))
{
msg = Message.Create(-1, flags, RedisCommand.TIME);
}
else if(!assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{
// we'll use echo as a PING substitute if it is all we have (in preference to EXISTS)
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId);
}
else
{
map.AssertAvailable(RedisCommand.EXISTS);
msg = Message.Create(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
}
msg.SetInternalCall(); msg.SetInternalCall();
return QueueDirectAsync(msg, ResultProcessor.Tracer); return msg;
} }
internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq) internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq)
......
...@@ -108,10 +108,16 @@ public ServerEndPoint Select(Message message) ...@@ -108,10 +108,16 @@ public ServerEndPoint Select(Message message)
{ {
if (message == null) throw new ArgumentNullException("message"); if (message == null) throw new ArgumentNullException("message");
int slot = NoSlot; int slot = NoSlot;
if (serverType == ServerType.Cluster) switch (serverType)
{ {
slot = message.GetHashSlot(this); case ServerType.Cluster:
if (slot == MultipleSlots) throw ExceptionFactory.MultiSlot(multiplexer.IncludeDetailInExceptions, message); case ServerType.Twemproxy: // strictly speaking twemproxy uses a different hashing algo, but the hash-tag behavior is
// the same, so this does a pretty good job of spotting illegal commands before sending them
slot = message.GetHashSlot(this);
if (slot == MultipleSlots) throw ExceptionFactory.MultiSlot(multiplexer.IncludeDetailInExceptions, message);
break;
} }
return Select(slot, message.Command, message.Flags); return Select(slot, message.Command, message.Flags);
} }
......
...@@ -16,6 +16,10 @@ public enum ServerType ...@@ -16,6 +16,10 @@ public enum ServerType
/// <summary> /// <summary>
/// Distributed redis-cluster server /// Distributed redis-cluster server
/// </summary> /// </summary>
Cluster Cluster,
/// <summary>
/// Distributed redis installation via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment