Commit ed264de5 authored by Marc Gravell's avatar Marc Gravell

rework everything to use CommandBytes rather than having tons of byte[] all...

rework everything to use CommandBytes rather than having tons of byte[] all over the place; move Execute command checking earlier, adding ArgCount concept to allow full check
parent b15c4d04
......@@ -54,8 +54,6 @@ public BufferReader(ReadOnlySequence<byte> buffer)
FetchNextSegment();
}
private static readonly byte[] CRLF = { (byte)'\r', (byte)'\n' };
/// <summary>
/// Note that in results other than success, no guarantees are made about final state; if you care: snapshot
/// </summary>
......@@ -170,6 +168,7 @@ public void Consume(int count)
int totalSkipped = 0;
bool haveTrailingCR = false;
ReadOnlySpan<byte> CRLF = stackalloc byte[2] { (byte)'\r', (byte)'\n' };
do
{
if (reader.RemainingThisSpan == 0) continue;
......
......@@ -4,10 +4,27 @@
namespace StackExchange.Redis
{
public readonly struct CommandBytes : IEquatable<CommandBytes>
internal readonly struct CommandBytes : IEquatable<CommandBytes>
{
private static Encoding Encoding => Encoding.UTF8;
internal unsafe static CommandBytes TrimToFit(string value)
{
if (string.IsNullOrWhiteSpace(value)) return default;
value = value.Trim();
var len = Encoding.GetByteCount(value);
if (len <= MaxLength) return new CommandBytes(value); // all fits
fixed (char* c = value)
{
byte* b = stackalloc byte[ChunkLength * sizeof(ulong)];
var encoder = PhysicalConnection.GetPerThreadEncoder();
encoder.Convert(c, value.Length, b, MaxLength, true, out var maxLen, out _, out var isComplete);
if (!isComplete) maxLen--;
return new CommandBytes(value.Substring(0, maxLen));
}
}
// Uses [n=4] x UInt64 values to store a command payload,
// allowing allocation free storage and efficient
// equality tests. If you're glancing at this and thinking
......@@ -32,8 +49,9 @@ public override int GetHashCode()
public bool Equals(CommandBytes value) => _0 == value._0 && _1 == value._1 && _2 == value._2;
public static bool operator == (CommandBytes x, CommandBytes y) => x.Equals(y);
public static bool operator !=(CommandBytes x, CommandBytes y) => !x.Equals(y);
// note: don't add == operators; with the implicit op above, that invalidates "==null" compiler checks (which should report a failure!)
public static implicit operator CommandBytes(string value) => new CommandBytes(value);
public override unsafe string ToString()
{
......@@ -54,6 +72,17 @@ public unsafe int Length
}
}
}
public bool IsEmpty => _0 == 0L; // cheap way of checking zero length
public unsafe void CopyTo(Span<byte> target)
{
fixed (ulong* uPtr = &_0)
{
byte* bPtr = (byte*)uPtr;
new Span<byte>(bPtr + 1, *bPtr).CopyTo(target);
}
}
public unsafe byte this[int index]
{
get
......@@ -70,9 +99,11 @@ public unsafe int Length
public unsafe CommandBytes(string value)
{
var len = Encoding.GetByteCount(value);
if (len > MaxLength) throw new ArgumentOutOfRangeException("Maximum command length exceeed");
_0 = _1 = _2 = 0L;
if (string.IsNullOrEmpty(value)) return;
var len = Encoding.GetByteCount(value);
if (len > MaxLength) throw new ArgumentOutOfRangeException($"Command '{value}' exceeds library limit of {MaxLength} bytes");
fixed (ulong* uPtr = &_0)
{
byte* bPtr = (byte*)uPtr;
......@@ -86,7 +117,7 @@ public unsafe CommandBytes(string value)
public unsafe CommandBytes(ReadOnlySpan<byte> value)
{
if (value.Length > MaxLength) throw new ArgumentOutOfRangeException("Maximum command length exceeed");
if (value.Length > MaxLength) throw new ArgumentOutOfRangeException("Maximum command length exceeed: " + value.Length + " bytes");
_0 = _1 = _2 = 0L;
fixed (ulong* uPtr = &_0)
{
......
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
......@@ -10,9 +9,9 @@ namespace StackExchange.Redis
/// </summary>
public sealed class CommandMap
{
private readonly byte[][] map;
private readonly CommandBytes[] map;
internal CommandMap(byte[][] map)
internal CommandMap(CommandBytes[] map)
{
this.map = map;
}
......@@ -154,55 +153,41 @@ internal void AppendDeltas(StringBuilder sb)
{
for (int i = 0; i < map.Length; i++)
{
var key = ((RedisCommand)i).ToString();
var value = map[i] == null ? "" : Encoding.UTF8.GetString(map[i]);
if (key != value)
var keyString = ((RedisCommand)i).ToString();
var keyBytes = new CommandBytes(keyString);
var value = map[i];
if (!keyBytes.Equals(value))
{
if (sb.Length != 0) sb.Append(',');
sb.Append('$').Append(key).Append('=').Append(value);
sb.Append('$').Append(keyString).Append('=').Append(value);
}
}
}
internal void AssertAvailable(RedisCommand command)
{
if (map[(int)command] == null) throw ExceptionFactory.CommandDisabled(false, command, null, null);
if (map[(int)command].IsEmpty) throw ExceptionFactory.CommandDisabled(command);
}
internal byte[] GetBytes(RedisCommand command) => map[(int)command];
internal CommandBytes GetBytes(RedisCommand command) => map[(int)command];
internal byte[] GetBytes(string command)
internal CommandBytes GetBytes(string command)
{
if (command == null) return null;
if (command == null) return default;
if(Enum.TryParse(command, true, out RedisCommand cmd))
{ // we know that one!
return map[(int)cmd];
}
var bytes = (byte[])_unknownCommands[command];
if(bytes == null)
{
lock(_unknownCommands)
{ // double-checked
bytes = (byte[])_unknownCommands[command];
if(bytes == null)
{
bytes = Encoding.ASCII.GetBytes(command);
_unknownCommands[command] = bytes;
}
return new CommandBytes(command);
}
}
return bytes;
}
private static readonly Hashtable _unknownCommands = new Hashtable();
internal bool IsAvailable(RedisCommand command) => map[(int)command] != null;
internal bool IsAvailable(RedisCommand command) => !map[(int)command].IsEmpty;
private static CommandMap CreateImpl(Dictionary<string, string> caseInsensitiveOverrides, HashSet<RedisCommand> exclusions)
{
var commands = (RedisCommand[])Enum.GetValues(typeof(RedisCommand));
var map = new byte[commands.Length][];
var map = new CommandBytes[commands.Length];
bool haveDelta = false;
for (int i = 0; i < commands.Length; i++)
{
......@@ -211,7 +196,7 @@ private static CommandMap CreateImpl(Dictionary<string, string> caseInsensitiveO
if (exclusions?.Contains(commands[i]) == true)
{
map[idx] = null;
map[idx] = default;
}
else
{
......@@ -222,8 +207,7 @@ private static CommandMap CreateImpl(Dictionary<string, string> caseInsensitiveO
if (value != name) haveDelta = true;
// TODO: bug?
haveDelta = true;
byte[] val = string.IsNullOrWhiteSpace(value) ? null : Encoding.UTF8.GetBytes(value);
map[idx] = val;
map[idx] = new CommandBytes(value);
}
}
if (!haveDelta && Default != null) return Default;
......
......@@ -311,6 +311,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value);
}
}
public override int ArgCount => value.IsNull ? 1 : 2;
}
}
......
......@@ -463,7 +463,10 @@ internal void CheckMessage(Message message)
{
if (!configuration.AllowAdmin && message.IsAdmin)
throw ExceptionFactory.AdminModeNotEnabled(IncludeDetailInExceptions, message.Command, message, null);
CommandMap.AssertAvailable(message.Command);
if (message.Command != RedisCommand.UNKNOWN) CommandMap.AssertAvailable(message.Command);
// using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
if (message.ArgCount >= PhysicalConnection.REDIS_MAX_ARGS) throw ExceptionFactory.TooManyArgs(message.CommandAndKey, message.ArgCount);
}
private const string NoContent = "(no content)";
private static void WriteNormalizingLineEndings(string source, StreamWriter writer)
......
......@@ -19,29 +19,13 @@ internal static Exception AdminModeNotEnabled(bool includeDetail, RedisCommand c
return ex;
}
internal static Exception CommandDisabled(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisCommandException("This operation has been disabled in the command-map and cannot be used: " + s);
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
internal static Exception CommandDisabled(RedisCommand command) => CommandDisabled(command.ToString());
internal static Exception TooManyArgs(bool includeDetail, string command, Message message, ServerEndPoint server, int required)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisCommandException($"This operation would involve too many arguments ({required} vs the redis limit of {PhysicalConnection.REDIS_MAX_ARGS}): {s}");
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
internal static Exception CommandDisabled(string command)
=> new RedisCommandException("This operation has been disabled in the command-map and cannot be used: " + command);
internal static Exception CommandDisabled(bool includeDetail, string command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisCommandException("This operation has been disabled in the command-map and cannot be used: " + s);
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
internal static Exception TooManyArgs(string command, int argCount)
=> new RedisCommandException($"This operation would involve too many arguments ({(argCount + 1)} vs the redis limit of {PhysicalConnection.REDIS_MAX_ARGS}): {command}");
internal static Exception ConnectionFailure(bool includeDetail, ConnectionFailureType failureType, string message, ServerEndPoint server)
{
......
......@@ -42,6 +42,7 @@ protected override void WriteImpl(PhysicalConnection physical)
catch { }
tail.WriteTo(physical);
}
public override int ArgCount => tail.ArgCount;
public TextWriter Log => log;
}
......@@ -212,6 +213,9 @@ internal void SetScriptUnavailable()
public bool IsInternalCall => (flags & InternalCallFlag) != 0;
public ResultBox ResultBox => resultBox;
public abstract int ArgCount { get; } // note: over-estimate if necessary
public static Message Create(int db, CommandFlags flags, RedisCommand command)
{
if (command == RedisCommand.SELECT)
......@@ -739,6 +743,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteHeader(Command, 1);
physical.Write(Channel);
}
public override int ArgCount => 1;
}
private sealed class CommandChannelValueMessage : CommandChannelBase
......@@ -756,6 +761,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(Channel);
physical.WriteBulkString(value);
}
public override int ArgCount => 2;
}
private sealed class CommandKeyKeyKeyMessage : CommandKeyBase
......@@ -783,6 +789,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(key1);
physical.Write(key2);
}
public override int ArgCount => 3;
}
private class CommandKeyKeyMessage : CommandKeyBase
......@@ -806,6 +813,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(Key);
physical.Write(key1);
}
public override int ArgCount => 2;
}
private sealed class CommandKeyKeysMessage : CommandKeyBase
......@@ -839,6 +847,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(keys[i]);
}
}
public override int ArgCount => keys.Length + 1;
}
private sealed class CommandKeyKeyValueMessage : CommandKeyKeyMessage
......@@ -857,6 +866,8 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(key1);
physical.WriteBulkString(value);
}
public override int ArgCount => 3;
}
private sealed class CommandKeyMessage : CommandKeyBase
......@@ -868,6 +879,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteHeader(Command, 1);
physical.Write(Key);
}
public override int ArgCount => 1;
}
private sealed class CommandValuesMessage : Message
......@@ -890,6 +902,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(values[i]);
}
}
public override int ArgCount => values.Length;
}
private sealed class CommandKeysMessage : Message
......@@ -922,6 +935,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(keys[i]);
}
}
public override int ArgCount => keys.Length;
}
private sealed class CommandKeyValueMessage : CommandKeyBase
......@@ -939,6 +953,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(Key);
physical.WriteBulkString(value);
}
public override int ArgCount => 2;
}
private sealed class CommandKeyValuesKeyMessage : CommandKeyBase
......@@ -969,6 +984,7 @@ protected override void WriteImpl(PhysicalConnection physical)
for (int i = 0; i < values.Length; i++) physical.WriteBulkString(values[i]);
physical.Write(key1);
}
public override int ArgCount => values.Length + 2;
}
private sealed class CommandKeyValuesMessage : CommandKeyBase
......@@ -989,6 +1005,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.Write(Key);
for (int i = 0; i < values.Length; i++) physical.WriteBulkString(values[i]);
}
public override int ArgCount => values.Length + 1;
}
private sealed class CommandKeyValueValueMessage : CommandKeyBase
......@@ -1009,6 +1026,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value0);
physical.WriteBulkString(value1);
}
public override int ArgCount => 3;
}
private sealed class CommandKeyValueValueValueMessage : CommandKeyBase
......@@ -1032,6 +1050,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value1);
physical.WriteBulkString(value2);
}
public override int ArgCount => 4;
}
private sealed class CommandKeyValueValueValueValueMessage : CommandKeyBase
......@@ -1058,6 +1077,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value2);
physical.WriteBulkString(value3);
}
public override int ArgCount => 5;
}
private sealed class CommandMessage : Message
......@@ -1067,6 +1087,7 @@ protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 0);
}
public override int ArgCount => 0;
}
private class CommandSlotValuesMessage : Message
......@@ -1098,6 +1119,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(values[i]);
}
}
public override int ArgCount => values.Length;
}
private sealed class CommandValueChannelMessage : CommandChannelBase
......@@ -1115,6 +1137,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value);
physical.Write(Channel);
}
public override int ArgCount => 2;
}
private sealed class CommandValueKeyMessage : CommandKeyBase
......@@ -1139,6 +1162,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value);
physical.Write(Key);
}
public override int ArgCount => 2;
}
private sealed class CommandValueMessage : Message
......@@ -1155,6 +1179,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteHeader(Command, 1);
physical.WriteBulkString(value);
}
public override int ArgCount => 1;
}
private sealed class CommandValueValueMessage : Message
......@@ -1174,6 +1199,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value0);
physical.WriteBulkString(value1);
}
public override int ArgCount => 2;
}
private sealed class CommandValueValueValueMessage : Message
......@@ -1196,6 +1222,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value1);
physical.WriteBulkString(value2);
}
public override int ArgCount => 3;
}
private sealed class CommandValueValueValueValueValueMessage : Message
......@@ -1224,6 +1251,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(value3);
physical.WriteBulkString(value4);
}
public override int ArgCount => 5;
}
private sealed class SelectMessage : Message
......@@ -1237,6 +1265,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteHeader(Command, 1);
physical.WriteBulkString(Db);
}
public override int ArgCount => 1;
}
}
}
......@@ -26,7 +26,6 @@ internal sealed partial class PhysicalConnection : IDisposable
private const int DefaultRedisDatabaseCount = 16;
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
//private static readonly AsyncCallback endRead = result =>
//{
......@@ -43,7 +42,7 @@ internal sealed partial class PhysicalConnection : IDisposable
// }
//};
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
private static readonly CommandBytes message = "message", pmessage = "pmessage";
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
......@@ -607,35 +606,32 @@ internal static void WriteBulkString(RedisValue value, PipeWriter output)
}
}
internal void WriteHeader(RedisCommand command, int arguments)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(physicalName);
var commandBytes = bridge.Multiplexer.CommandMap.GetBytes(command);
if (commandBytes == null)
{
throw ExceptionFactory.CommandDisabled(IncludeDetailInExceptions, command, null, bridge.ServerEndPoint);
}
WriteHeader(commandBytes, arguments);
}
internal const int REDIS_MAX_ARGS = 1024 * 1024; // there is a <= 1024*1024 max constraint inside redis itself: https://github.com/antirez/redis/blob/6c60526db91e23fb2d666fc52facc9a11780a2a3/src/networking.c#L1024
internal void WriteHeader(string command, int arguments)
internal void WriteHeader(RedisCommand command, int arguments, CommandBytes commandBytes = default)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(physicalName);
if (arguments >= REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
if (command == RedisCommand.UNKNOWN)
{
throw ExceptionFactory.TooManyArgs(bridge.Multiplexer.IncludeDetailInExceptions, command, null, bridge.ServerEndPoint, arguments + 1);
// using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
if (arguments >= REDIS_MAX_ARGS) throw ExceptionFactory.TooManyArgs(commandBytes.ToString(), arguments);
}
var commandBytes = bridge.Multiplexer.CommandMap.GetBytes(command);
WriteHeader(commandBytes, arguments);
else
{
// using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
if (arguments >= REDIS_MAX_ARGS) throw ExceptionFactory.TooManyArgs(command.ToString(), arguments);
// for everything that isn't custom commands: ask the muxer for the actual bytes
commandBytes = bridge.Multiplexer.CommandMap.GetBytes(command);
}
private void WriteHeader(byte[] commandBytes, int arguments)
{
// in theory we should never see this; CheckMessage dealt with "regular" messages, and
// ExecuteMessage should have dealt with everything else
if (commandBytes.IsEmpty) throw ExceptionFactory.CommandDisabled(command);
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
......@@ -647,7 +643,7 @@ private void WriteHeader(byte[] commandBytes, int arguments)
int offset = WriteRaw(span, arguments + 1, offset: 1);
offset = AppendToSpanBlob(span, commandBytes, offset: offset);
offset = AppendToSpanCommand(span, commandBytes, offset: offset);
_ioPipe.Output.Advance(offset);
}
......@@ -789,14 +785,14 @@ internal WriteResult WakeWriterAndCheckForThrottle()
}
}
private static readonly byte[] NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static readonly ReadOnlyMemory<byte> NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static void WriteUnifiedBlob(PipeWriter writer, byte[] value)
{
if (value == null)
{
// special case:
writer.Write(NullBulkString);
writer.Write(NullBulkString.Span);
}
else
{
......@@ -813,7 +809,7 @@ private static void WriteUnifiedSpan(PipeWriter writer, ReadOnlySpan<byte> value
if (value.Length == 0)
{
// special case:
writer.Write(EmptyBulkString);
writer.Write(EmptyBulkString.Span);
}
else if (value.Length <= MaxQuickSpanSize)
{
......@@ -836,17 +832,14 @@ private static void WriteUnifiedSpan(PipeWriter writer, ReadOnlySpan<byte> value
}
}
private static int AppendToSpanBlob(Span<byte> span, byte[] value, int offset = 0)
private static int AppendToSpanCommand(Span<byte> span, CommandBytes value, int offset = 0)
{
span[offset++] = (byte)'$';
if (value == null)
{
offset = WriteRaw(span, -1, offset: offset); // note that not many things like this...
}
else
{
offset = AppendToSpanSpan(span, new ReadOnlySpan<byte>(value), offset);
}
int len = value.Length;
offset = WriteRaw(span, len, offset: offset);
value.CopyTo(span.Slice(offset, len));
offset += value.Length;
offset = WriteCrlf(span, offset);
return offset;
}
......@@ -864,7 +857,7 @@ internal void WriteSha1AsHex(byte[] value)
var writer = _ioPipe.Output;
if (value == null)
{
writer.Write(NullBulkString);
writer.Write(NullBulkString.Span);
}
else if (value.Length == ResultProcessor.ScriptLoadProcessor.Sha1HashLength)
{
......@@ -906,7 +899,7 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix
if (value == null)
{
// special case
writer.Write(NullBulkString);
writer.Write(NullBulkString.Span);
}
else
{
......@@ -919,7 +912,7 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix
if (totalLength == 0)
{
// special-case
writer.Write(EmptyBulkString);
writer.Write(EmptyBulkString.Span);
}
else
{
......@@ -937,7 +930,7 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix
[ThreadStatic]
static Encoder s_PerThreadEncoder;
static Encoder GetPerThreadEncoder()
internal static Encoder GetPerThreadEncoder()
{
var encoder = s_PerThreadEncoder;
if(encoder == null)
......@@ -1189,7 +1182,7 @@ private void MatchResult(RawResult result)
EndPoint blame = null;
try
{
if (!items[2].IsEqual(RedisLiterals.ByteWildcard))
if (!items[2].IsEqual(CommonReplies.wildcard))
{
blame = Format.TryParseEndPoint(items[2].GetString());
}
......
......@@ -13,7 +13,7 @@ internal sealed class ProfiledCommand : IProfiledCommand
public int Db => Message.Db;
public string Command => Message is RedisDatabase.ExecuteMessage em ? em.Command : Message.Command.ToString();
public string Command => Message is RedisDatabase.ExecuteMessage em ? em.Command.ToString() : Message.Command.ToString();
public CommandFlags Flags => Message.Flags;
......
......@@ -142,7 +142,7 @@ internal RedisChannel AsRedisChannel(byte[] channelPrefix, RedisChannel.PatternM
{
return new RedisChannel(GetBlob(), mode);
}
if (AssertStarts(channelPrefix))
if (StartsWith(channelPrefix))
{
byte[] copy = _payload.Slice(channelPrefix.Length).ToArray();
return new RedisChannel(copy, mode);
......@@ -195,6 +195,12 @@ internal void Recycle(int limit = -1)
}
}
internal bool IsEqual(CommandBytes expected)
{
if (expected.Length != _payload.Length) return false;
return new CommandBytes(_payload).Equals(expected);
}
internal unsafe bool IsEqual(byte[] expected)
{
if (expected == null) throw new ArgumentNullException(nameof(expected));
......@@ -216,7 +222,15 @@ internal unsafe bool IsEqual(byte[] expected)
return true;
}
internal bool AssertStarts(byte[] expected)
internal bool StartsWith(CommandBytes expected)
{
var len = expected.Length;
if (len > _payload.Length) return false;
var rangeToCheck = _payload.Slice(0, len);
return rangeToCheck.Equals(expected);
}
internal bool StartsWith(byte[] expected)
{
if (expected == null) throw new ArgumentNullException(nameof(expected));
if (expected.Length > _payload.Length) return false;
......
......@@ -706,6 +706,16 @@ protected override void WriteImpl(PhysicalConnection physical)
if (isCopy) physical.WriteBulkString(RedisLiterals.COPY);
if (isReplace) physical.WriteBulkString(RedisLiterals.REPLACE);
}
public override int ArgCount
{
get
{
bool isCopy = (migrateOptions & MigrateOptions.Copy) != 0;
bool isReplace = (migrateOptions & MigrateOptions.Replace) != 0;
return 5 + (isCopy ? 1 : 0) + (isReplace ? 1 : 0);
}
}
}
public bool KeyMove(RedisKey key, int database, CommandFlags flags = CommandFlags.None)
......@@ -1092,7 +1102,7 @@ public RedisResult Execute(string command, params object[] args)
=> Execute(command, args, CommandFlags.None);
public RedisResult Execute(string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
{
var msg = new ExecuteMessage(Database, flags, command, args);
var msg = new ExecuteMessage(multiplexer?.CommandMap, Database, flags, command, args);
return ExecuteSync(msg, ResultProcessor.ScriptResult);
}
......@@ -1100,7 +1110,7 @@ public Task<RedisResult> ExecuteAsync(string command, params object[] args)
=> ExecuteAsync(command, args, CommandFlags.None);
public Task<RedisResult> ExecuteAsync(string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
{
var msg = new ExecuteMessage(Database, flags, command, args);
var msg = new ExecuteMessage(multiplexer?.CommandMap, Database, flags, command, args);
return ExecuteAsync(msg, ResultProcessor.ScriptResult);
}
......@@ -3172,6 +3182,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteBulkString(RedisLiterals.LOAD);
physical.WriteBulkString((RedisValue)Script);
}
public override int ArgCount => 2;
}
private sealed class HashScanResultProcessor : ScanResultProcessor<HashEntry>
......@@ -3210,20 +3221,25 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
internal sealed class ExecuteMessage : Message
{
private readonly string _command;
private readonly ICollection<object> args;
private readonly CommandBytes _command;
private readonly ICollection<object> _args;
public new string Command => _command;
public ExecuteMessage(int db, CommandFlags flags, string command, ICollection<object> args) : base(db, flags, RedisCommand.UNKNOWN)
public new CommandBytes Command => _command;
public ExecuteMessage(CommandMap map, int db, CommandFlags flags, string command, ICollection<object> args) : base(db, flags, RedisCommand.UNKNOWN)
{
_command = command;
this.args = args ?? Array.Empty<object>();
_args = args ?? Array.Empty<object>();
if (args.Count >= PhysicalConnection.REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
{
throw ExceptionFactory.TooManyArgs(command, args.Count);
}
_command = map?.GetBytes(command) ?? default;
if (_command.IsEmpty) throw ExceptionFactory.CommandDisabled(command);
}
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(_command, args.Count);
foreach (object arg in args)
physical.WriteHeader(RedisCommand.UNKNOWN, _args.Count, _command);
foreach (object arg in _args)
{
if (arg is RedisKey key)
{
......@@ -3242,12 +3258,12 @@ protected override void WriteImpl(PhysicalConnection physical)
}
}
public override string CommandAndKey => _command;
public override string CommandAndKey => _command.ToString();
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
int slot = ServerSelectionStrategy.NoSlot;
foreach (object arg in args)
foreach (object arg in _args)
{
if (arg is RedisKey key)
{
......@@ -3256,6 +3272,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
}
return slot;
}
public override int ArgCount => _args.Count;
}
private sealed class ScriptEvalMessage : Message, IMultiMessage
......@@ -3347,6 +3364,7 @@ protected override void WriteImpl(PhysicalConnection physical)
for (int i = 0; i < values.Length; i++)
physical.WriteBulkString(values[i]);
}
public override int ArgCount => 2 + keys.Length + values.Length;
}
private sealed class SetScanResultProcessor : ScanResultProcessor<RedisValue>
......@@ -3392,6 +3410,7 @@ protected override void WriteImpl(PhysicalConnection physical)
for (int i = 0; i < values.Length; i++)
physical.WriteBulkString(values[i]);
}
public override int ArgCount => 2 + keys.Length + values.Length;
}
private sealed class SortedSetScanResultProcessor : ScanResultProcessor<SortedSetEntry>
......@@ -3446,6 +3465,7 @@ protected override void WriteImpl(PhysicalConnection physical)
physical.WriteHeader(command, 1);
physical.Write(Key);
}
public override int ArgCount => 1;
}
private class StringGetWithExpiryProcessor : ResultProcessor<RedisValueWithExpiry>
......
......@@ -3,6 +3,29 @@
namespace StackExchange.Redis
{
internal static class CommonReplies
{
public static readonly CommandBytes
ASK = "ASK ",
authFail_trimmed = CommandBytes.TrimToFit("ERR operation not permitted"),
backgroundSavingStarted_trimmed = CommandBytes.TrimToFit("Background saving started"),
databases = "databases",
loading = "LOADING ",
MOVED = "MOVED ",
NOAUTH = "NOAUTH ",
NOSCRIPT = "NOSCRIPT ",
no = "no",
OK = "OK",
one = "1",
PONG = "PONG",
QUEUED = "QUEUED",
READONLY = "READONLY ",
slave_read_only = "slave-read-only",
timeout = "timeout",
wildcard = "*",
yes = "yes",
zero = "0";
}
internal static class RedisLiterals
{
// unlike primary commands, these do not get altered by the command-map; we may as
......@@ -65,15 +88,20 @@ public static readonly RedisValue
MASTER = "MASTER",
SLAVES = "SLAVES",
GETMASTERADDRBYNAME = "GET-MASTER-ADDR-BY-NAME",
// RESET = "RESET",
// RESET = "RESET",
FAILOVER = "FAILOVER",
// Sentinel Literals as of 2.8.4
MONITOR = "MONITOR",
REMOVE = "REMOVE",
// SET = "SET",
// SET = "SET",
// DO NOT CHANGE CASE: these are configuration settings and MUST be as-is
MinusSymbol = "-",
PlusSumbol = "+",
Wildcard = "*",
// misc (config, etc)
databases = "databases",
no = "no",
normal = "normal",
......@@ -83,19 +111,11 @@ public static readonly RedisValue
slave = "slave",
slave_read_only = "slave-read-only",
timeout = "timeout",
yes = "yes",
MinusSymbol = "-",
PlusSumbol = "+",
Wildcard = "*";
yes = "yes";
public static readonly byte[] BytesOK = Encoding.UTF8.GetBytes("OK");
public static readonly byte[] BytesPONG = Encoding.UTF8.GetBytes("PONG");
public static readonly byte[] BytesBackgroundSavingStarted = Encoding.UTF8.GetBytes("Background saving started");
public static readonly byte[] ByteWildcard = { (byte)'*' };
internal static RedisValue Get(Bitwise operation)
{
switch(operation)
switch (operation)
{
case Bitwise.And: return AND;
case Bitwise.Or: return OR;
......
......@@ -70,7 +70,7 @@ private Message GetClientKillMessage(EndPoint endpoint, long? id, ClientType? cl
{
RedisLiterals.KILL
};
if(id != null)
if (id != null)
{
parts.Add(RedisLiterals.ID);
parts.Add(id.Value);
......@@ -78,7 +78,7 @@ private Message GetClientKillMessage(EndPoint endpoint, long? id, ClientType? cl
if (clientType != null)
{
parts.Add(RedisLiterals.TYPE);
switch(clientType.Value)
switch (clientType.Value)
{
case ClientType.Normal:
parts.Add(RedisLiterals.normal);
......@@ -99,7 +99,7 @@ private Message GetClientKillMessage(EndPoint endpoint, long? id, ClientType? cl
parts.Add(RedisLiterals.ADDR);
parts.Add((RedisValue)Format.ToString(endpoint));
}
if(!skipMe)
if (!skipMe)
{
parts.Add(RedisLiterals.SKIPME);
parts.Add(RedisLiterals.no);
......@@ -649,7 +649,7 @@ private void FixFlags(Message message, ServerEndPoint server)
private Message GetSaveMessage(SaveType type, CommandFlags flags = CommandFlags.None)
{
switch(type)
switch (type)
{
case SaveType.BackgroundRewriteAppendOnlyFile: return Message.Create(-1, flags, RedisCommand.BGREWRITEAOF);
case SaveType.BackgroundSave: return Message.Create(-1, flags, RedisCommand.BGSAVE);
......@@ -675,19 +675,17 @@ private ResultProcessor<bool> GetSaveResultProcessor(SaveType type)
private static class ScriptHash
{
private static readonly byte[] hex = {
(byte)'0', (byte)'1', (byte)'2', (byte)'3', (byte)'4', (byte)'5', (byte)'6', (byte)'7',
(byte)'8', (byte)'9', (byte)'a', (byte)'b', (byte)'c', (byte)'d', (byte)'e', (byte)'f' };
public static RedisValue Encode(byte[] value)
{
const string hex = "0123456789abcdef";
if (value == null) return default(RedisValue);
var result = new byte[value.Length * 2];
int offset = 0;
for (int i = 0; i < value.Length; i++)
{
int val = value[i];
result[offset++] = hex[val / 16];
result[offset++] = hex[val % 16];
result[offset++] = (byte)hex[val >> 4];
result[offset++] = (byte)hex[val & 15];
}
return result;
}
......@@ -830,7 +828,7 @@ public Task SentinelFailoverAsync(string serviceName, CommandFlags flags = Comma
public RedisResult Execute(string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
{
var msg = new RedisDatabase.ExecuteMessage(-1, flags, command, args);
var msg = new RedisDatabase.ExecuteMessage(multiplexer?.CommandMap, -1, flags, command, args);
return ExecuteSync(msg, ResultProcessor.ScriptResult);
}
......@@ -838,7 +836,7 @@ public RedisResult Execute(string command, ICollection<object> args, CommandFlag
public Task<RedisResult> ExecuteAsync(string command, ICollection<object> args, CommandFlags flags = CommandFlags.None)
{
var msg = new RedisDatabase.ExecuteMessage(-1, flags, command, args);
var msg = new RedisDatabase.ExecuteMessage(multiplexer?.CommandMap, -1, flags, command, args);
return ExecuteAsync(msg, ResultProcessor.ScriptResult);
}
......
......@@ -151,15 +151,16 @@ protected override void WriteImpl(PhysicalConnection physical)
Wrapped.WriteTo(physical);
Wrapped.SetRequestSent();
}
public override int ArgCount => Wrapped.ArgCount;
}
private class QueuedProcessor : ResultProcessor<bool>
{
public static readonly ResultProcessor<bool> Default = new QueuedProcessor();
private static readonly byte[] QUEUED = Encoding.UTF8.GetBytes("QUEUED");
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type == ResultType.SimpleString && result.IsEqual(QUEUED))
if (result.Type == ResultType.SimpleString && result.IsEqual(CommonReplies.QUEUED))
{
if (message is QueuedMessage q)
{
......@@ -351,6 +352,7 @@ protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 0);
}
public override int ArgCount => 0;
private bool AreAllConditionsSatisfied(ConnectionMultiplexer multiplexer)
{
......@@ -400,7 +402,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Type)
{
case ResultType.SimpleString:
if (tran.IsAborted && result.IsEqual(RedisLiterals.BytesOK))
if (tran.IsAborted && result.IsEqual(CommonReplies.OK))
{
connection.Trace("Acknowledging UNWATCH (aborted electively)");
SetResult(message, false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment