Commit 879cef27 authored by Marc Gravell's avatar Marc Gravell

Merge branch 'AddGeoAdd' of https://github.com/wjdavis5/StackExchange.Redis into wjdavis5-AddGeoAdd

Apply feedback from https://github.com/StackExchange/StackExchange.Redis/pull/489#issuecomment-270615647 - note still untested; that's next

# Conflicts:
#	StackExchange.Redis.Tests/ConnectionFailedErrors.cs
#	StackExchange.Redis/StackExchange/Redis/IDatabase.cs
#	StackExchange.Redis_Net40/StackExchange.Redis_Net40.StrongName.csproj
#	StackExchange.Redis_Net40/StackExchange.Redis_Net40.csproj
#	StackExchange.Redis_Net45/StackExchange.Redis_Net45.StrongName.csproj
#	StackExchange.Redis_Net45/StackExchange.Redis_Net45.csproj
#	StackExchange.Redis_Net46/StackExchange.Redis_Net46.StrongName.csproj
#	StackExchange.Redis_Net46/StackExchange.Redis_Net46.csproj
parents bed3c2bc d0d108bf
......@@ -134,6 +134,7 @@ public void CheckFailureRecovered()
ClearAmbientFailures();
}
}
#endif
[Test]
public void TryGetAzureRoleInstanceIdNoThrow()
......
using System.Linq;
using System.Runtime.Remoting;
using NUnit.Framework;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class GeoTests : TestBase
{
[Test]
public void GeoAddEveryWay()
{
using (var conn = Create())
{
var db = conn.GetDatabase(3);
var added1 = db.GeoAdd("Sicily", 14.361389, 39.115556, "PalermoPlusOne");
var geo1 = new GeoEntry(13.361389, 38.115556, "Palermo");
var geo2 = new GeoEntry(15.087269, 37.502669, "Catania");
var added2 = db.GeoAdd("Sicily",new GeoEntry[] {geo1,geo2});
Assert.IsTrue(added1 & (added2==2));
}
}
[Test]
public void GetGeoDist()
{
using (var conn = Create())
{
var db = conn.GetDatabase(3);
var geo1 = new GeoEntry(13.361389, 38.115556, "Palermo");
var geo2 = new GeoEntry(15.087269, 37.502669, "Catania");
var added2 = db.GeoAdd("Sicily", new GeoEntry[] { geo1, geo2 });
var val = db.GeoDistance("Sicily", "Palermo", "Catania",GeoUnit.Meters);
Assert.Equals(166274.15156960039, (double) val);
}
}
[Test]
public void AddSetEveryWay()
{
using (var conn = Create())
{
var db = conn.GetDatabase(3);
RedisKey key = Me();
db.KeyDelete(key);
db.SetAdd(key, "a");
db.SetAdd(key, new RedisValue[] { "b" });
db.SetAdd(key, new RedisValue[] { "c", "d" });
db.SetAdd(key, new RedisValue[] { "e", "f", "g" });
db.SetAdd(key, new RedisValue[] { "h", "i", "j", "k" });
var vals = db.SetMembers(key);
string s = string.Join(",", vals.OrderByDescending(x => x));
Assert.AreEqual("k,j,i,h,g,f,e,d,c,b,a", s);
}
}
[Test]
public void AddSetEveryWayNumbers()
{
using (var conn = Create())
{
var db = conn.GetDatabase(3);
RedisKey key = Me();
db.KeyDelete(key);
db.SetAdd(key, "a");
db.SetAdd(key, new RedisValue[] { "1" });
db.SetAdd(key, new RedisValue[] { "11", "2" });
db.SetAdd(key, new RedisValue[] { "10", "3", "1.5" });
db.SetAdd(key, new RedisValue[] { "2.2", "-1", "s", "t" });
var vals = db.SetMembers(key);
string s = string.Join(",", vals.OrderByDescending(x => x));
Assert.AreEqual("t,s,a,11,10,3,2.2,2,1.5,1,-1", s);
}
}
}
}
......@@ -135,7 +135,7 @@ public void Teardown()
}
protected const int PrimaryPort = 6379, SlavePort = 6380, SecurePort = 6381;
protected const string PrimaryServer = "127.0.0.1", SecurePassword = "changeme", PrimaryPortString = "6379", SlavePortString = "6380", SecurePortString = "6381";
protected const string PrimaryServer = "192.168.10.120", SecurePassword = "changeme", PrimaryPortString = "6379", SlavePortString = "6380", SecurePortString = "6381";
internal static Task Swallow(Task task)
{
if (task != null) task.ContinueWith(swallowErrors, TaskContinuationOptions.OnlyOnFaulted);
......
......@@ -2,7 +2,7 @@
namespace StackExchange.Redis
{
/// <summary>
/// <summary>
/// Behaviour markers associated with a given command
/// </summary>
[Flags]
......
using System;
namespace StackExchange.Redis
{
/// <summary>
/// GeoRadius command options.
/// </summary>
[Flags]
public enum GeoRadiusOptions
{
/// <summary>
/// No Options
/// </summary>
None = 0,
/// <summary>
/// Redis will return the coordinates of any results.
/// </summary>
WithCoordinates = 1,
/// <summary>
/// Redis will return the distance from center for all results.
/// </summary>
WithDistance = 2,
/// <summary>
/// Redis will return the geo hash value as an integer. (This is the score in the sorted set)
/// </summary>
WithGeoHash = 4,
/// <summary>
/// Populates the commonly used values from the entry (the integer hash is not returned as it is not commonly useful)
/// </summary>
Default = WithCoordinates | GeoRadiusOptions.WithDistance
}
/// <summary>
/// The result of a GeoRadius command.
/// </summary>
public struct GeoRadiusResult
{
/// <summary>
/// The matched member.
/// </summary>
public RedisValue Member { get; }
/// <summary>
/// The distance of the matched member from the center of the geo radius command.
/// </summary>
public double? Distance { get; }
/// <summary>
/// The hash value of the matched member as an integer. (The key in the sorted set)
/// </summary>
/// <remarks>Note that this is not the same as the hash returned from GeoHash</remarks>
public long? Hash { get; }
/// <summary>
/// The coordinates of the matched member.
/// </summary>
public GeoPosition? Position { get; }
/// <summary>
/// Returns a new GeoRadiusResult
/// </summary>
internal GeoRadiusResult(RedisValue member, double? distance, long? hash, GeoPosition? position)
{
Member = member;
Distance = distance;
Hash = hash;
Position = position;
}
}
/// <summary>
/// Describes the longitude and latitude of a GeoEntry
/// </summary>
public struct GeoPosition : IEquatable<GeoPosition>
{
internal static string GetRedisUnit(GeoUnit unit)
{
switch (unit)
{
case GeoUnit.Meters: return "m";
case GeoUnit.Kilometers: return "km";
case GeoUnit.Miles: return "mi";
case GeoUnit.Feet: return "ft";
default:
throw new ArgumentOutOfRangeException(nameof(unit));
}
}
/// <summary>
/// The Latitude of the GeoPosition
/// </summary>
public double Latitude { get; }
/// <summary>
/// The Logitude of the GeoPosition
/// </summary>
public double Longitude { get; }
/// <summary>
/// Creates a new GeoPosition
/// </summary>
/// <param name="longitude"></param>
/// <param name="latitude"></param>
public GeoPosition(double longitude, double latitude)
{
Longitude = longitude;
Latitude = latitude;
}
/// <summary>
/// See Object.ToString()
/// </summary>
public override string ToString()
{
return string.Format("{0} {1}", Longitude, Latitude);
}
/// <summary>
/// See Object.GetHashCode()
/// Diagonals not an issue in the case of lat/long
/// </summary>
public override int GetHashCode()
{
// diagonals not an issue in the case of lat/long
return Longitude.GetHashCode() ^ Latitude.GetHashCode();
}
/// <summary>
/// Compares two values for equality
/// </summary>
public override bool Equals(object obj)
{
return obj is GeoPosition && Equals((GeoPosition)obj);
}
/// <summary>
/// Compares two values for equality
/// </summary>
public bool Equals(GeoPosition value)
{
return this == value;
}
/// <summary>
/// Compares two values for equality
/// </summary>
public static bool operator ==(GeoPosition x, GeoPosition y)
{
return x.Longitude == y.Longitude && x.Latitude == y.Latitude;
}
/// <summary>
/// Compares two values for non-equality
/// </summary>
public static bool operator !=(GeoPosition x, GeoPosition y)
{
return x.Longitude != y.Longitude || x.Latitude != y.Latitude;
}
}
/// <summary>
/// Describes a GeoEntry element with the corresponding value
/// GeoEntries are stored in redis as SortedSetEntries
/// </summary>
public struct GeoEntry : IEquatable<GeoEntry>
{
/// <summary>
/// The name of the geo entry
/// </summary>
public RedisValue Member { get; }
/// <summary>
/// Describes the longitude and latitude of a GeoEntry
/// </summary>
public GeoPosition Point { get; }
/// <summary>
/// Initializes a GeoEntry value
/// </summary>
public GeoEntry(double longitude, double latitude, RedisValue member)
{
Member = member;
Point = new GeoPosition(longitude, latitude);
}
/// <summary>
/// The longitude of the geo entry
/// </summary>
public double Longitude => Point.Longitude;
/// <summary>
/// The latitude of the geo entry
/// </summary>
public double Latitude => Point.Latitude;
/// <summary>
/// See Object.ToString()
/// </summary>
public override string ToString()
{
return $"({Longitude},{Latitude})={Member}";
}
/// <summary>
/// See Object.GetHashCode()
/// </summary>
public override int GetHashCode()
{
return Point.GetHashCode() ^ Member.GetHashCode();
}
/// <summary>
/// Compares two values for equality
/// </summary>
public override bool Equals(object obj)
{
return obj is GeoEntry && Equals((GeoEntry)obj);
}
/// <summary>
/// Compares two values for equality
/// </summary>
public bool Equals(GeoEntry value)
{
return this == value;
}
/// <summary>
/// Compares two values for equality
/// </summary>
public static bool operator ==(GeoEntry x, GeoEntry y)
{
return x.Point == y.Point && x.Member == y.Member;
}
/// <summary>
/// Compares two values for non-equality
/// </summary>
public static bool operator !=(GeoEntry x, GeoEntry y)
{
return x.Point != y.Point || x.Member != y.Member;
}
}
}
\ No newline at end of file
using System;
using System.ComponentModel;
namespace StackExchange.Redis
{
/// <summary>
/// Units associated with Geo Commands
/// </summary>
public enum GeoUnit
{
/// <summary>
/// Meters
/// </summary>
Meters,
/// <summary>
/// Kilometers
/// </summary>
Kilometers,
/// <summary>
/// Miles
/// </summary>
Miles,
/// <summary>
/// Feet
/// </summary>
Feet
}
}
\ No newline at end of file
......@@ -44,6 +44,86 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>http://redis.io/commands/debug-object</remarks>
RedisValue DebugObject(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Add the specified member to the set stored at key. Specified members that are already a member of this set are ignored. If key does not exist, a new set is created before adding the specified members.
/// </summary>
/// <returns>True if the specified member was not already present in the set, else False</returns>
/// <remarks>http://redis.io/commands/geoadd</remarks>
bool GeoAdd(RedisKey key, double longitude, double latitude, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Add the specified member to the set stored at key. Specified members that are already a member of this set are ignored. If key does not exist, a new set is created before adding the specified members.
/// </summary>
/// <returns>True if the specified member was not already present in the set, else False</returns>
/// <remarks>http://redis.io/commands/geoadd</remarks>
bool GeoAdd(RedisKey key, StackExchange.Redis.GeoEntry value, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored. If key does not exist, a new set is created before adding the specified members.
/// </summary>
/// <returns>the number of elements that were added to the set, not including all the elements already present into the set.</returns>
/// <remarks>http://redis.io/commands/geoadd</remarks>
long GeoAdd(RedisKey key, GeoEntry[] values, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Removes the specified member from the geo sorted set stored at key. Non existing members are ignored.
/// </summary>
/// <returns>True if the member existed in the sorted set and was removed; False otherwise.</returns>
/// <remarks>http://redis.io/commands/zrem</remarks>
bool GeoRemove(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the distance between two members in the geospatial index represented by the sorted set.
/// </summary>
/// <returns>The command returns the distance as a double (represented as a string) in the specified unit, or NULL if one or both the elements are missing.</returns>
/// <remarks>http://redis.io/commands/geodist</remarks>
double GeoDistance(RedisKey key, RedisValue member1, RedisValue member2, GeoUnit unit = GeoUnit.Meters, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return valid Geohash strings representing the position of one or more elements in a sorted set value representing a geospatial index (where elements were added using GEOADD).
/// </summary>
/// <returns>The command returns an array where each element is the Geohash corresponding to each member name passed as argument to the command.</returns>
/// <remarks>http://redis.io/commands/geohash</remarks>
string[] GeoHash(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return valid Geohash strings representing the position of one or more elements in a sorted set value representing a geospatial index (where elements were added using GEOADD).
/// </summary>
/// <returns>The command returns an array where each element is the Geohash corresponding to each member name passed as argument to the command.</returns>
/// <remarks>http://redis.io/commands/geohash</remarks>
string[] GeoHash(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the positions (longitude,latitude) of all the specified members of the geospatial index represented by the sorted set at key.
/// </summary>
/// <returns>The command returns an array where each element is a two elements array representing longitude and latitude (x,y) of each member name passed as argument to the command.Non existing elements are reported as NULL elements of the array.</returns>
/// <remarks>http://redis.io/commands/geopos</remarks>
GeoPosition?[] GeoPosition(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the positions (longitude,latitude) of all the specified members of the geospatial index represented by the sorted set at key.
/// </summary>
/// <returns>The command returns an array where each element is a two elements array representing longitude and latitude (x,y) of each member name passed as argument to the command.Non existing elements are reported as NULL elements of the array.</returns>
/// <remarks>http://redis.io/commands/geopos</remarks>
GeoPosition? GeoPosition(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the members of a sorted set populated with geospatial information using GEOADD, which are within the borders of the area specified with the center location and the maximum distance from the center (the radius).
/// </summary>
/// <returns>GeoRadiusResult[]</returns>
/// <remarks>http://redis.io/commands/georadius</remarks>
GeoRadiusResult[] GeoRadius(RedisKey key, RedisValue member, double radius, GeoUnit unit = GeoUnit.Meters, int count = -1, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the members of a sorted set populated with geospatial information using GEOADD, which are within the borders of the area specified with the center location and the maximum distance from the center (the radius).
/// </summary>
/// <returns>GeoRadiusResult[]</returns>
/// <remarks>http://redis.io/commands/georadius</remarks>
GeoRadiusResult[] GeoRadius(RedisKey key, double longitude, double latitude, double radius, GeoUnit unit = GeoUnit.Meters, int count = -1, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Decrements the number stored at field in the hash stored at key by decrement. If key does not exist, a new key holding a hash is created. If field does not exist or holds a string that cannot be interpreted as integer, the value is set to 0 before the operation is performed.
/// </summary>
......
......@@ -27,6 +27,59 @@ public RedisValue DebugObject(RedisKey key, CommandFlags flags = CommandFlags.No
return Inner.DebugObject(ToInner(key), flags);
}
public bool GeoAdd(RedisKey key, double longitude, double latitude, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoAdd(ToInner(key), longitude, latitude, member, flags);
}
public long GeoAdd(RedisKey key, GeoEntry[] geoEntries, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoAdd(ToInner(key), geoEntries, flags);
}
public bool GeoAdd(RedisKey key, GeoEntry geoEntry, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoAdd(key, geoEntry, flags);
}
public bool GeoRemove(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoRemove(key, member, flags);
}
public double GeoDistance(RedisKey key, RedisValue value0, RedisValue value1, GeoUnit geoUnit = GeoUnit.Meters,CommandFlags flags = CommandFlags.None)
{
return Inner.GeoDistance(ToInner(key), value0, value1, geoUnit, flags);
}
public string[] GeoHash(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoHash(key, members, flags);
}
public string[] GeoHash(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoHash(key, member, flags);
}
public GeoPosition?[] GeoPosition(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoPosition(key, members, flags);
}
public GeoPosition? GeoPosition(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoPosition(key, member, flags);
}
public GeoRadiusResult[] GeoRadius(RedisKey key, RedisValue member, double radius, GeoUnit unit = GeoUnit.Meters, int count = -1, Order? order = null,GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoRadius(key, member, radius, unit, count, order, options, flags);
}
public GeoRadiusResult[] GeoRadius(RedisKey key, double longitude, double latitude, double radius, GeoUnit unit = GeoUnit.Meters, int count = -1, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None)
{
return Inner.GeoRadius(key, longitude, latitude, radius, unit, count, order, options, flags);
}
public double HashDecrement(RedisKey key, RedisValue hashField, double value, CommandFlags flags = CommandFlags.None)
{
return Inner.HashDecrement(ToInner(key), hashField, value, flags);
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
#if FEATURE_SERIALIZATION
using System.Runtime.Serialization;
#endif
using System.Text;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
#if FEATURE_SERIALIZATION
using System.Runtime.Serialization;
#endif
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// Indicates that a command was illegal and was not sent to the server
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public sealed class RedisCommandException : Exception
{
#if FEATURE_SERIALIZATION
private RedisCommandException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
#endif
internal RedisCommandException(string message) : base(message) { }
internal RedisCommandException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// Indicates a connection fault when communicating with redis
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public sealed class RedisConnectionException : RedisException
{
#if FEATURE_SERIALIZATION
private RedisConnectionException(SerializationInfo info, StreamingContext ctx) : base(info, ctx)
{
FailureType = (ConnectionFailureType)info.GetInt32("failureType");
}
/// <summary>
/// Serialization implementation; not intended for general usage
/// </summary>
public override void GetObjectData(SerializationInfo info, StreamingContext context)
{
base.GetObjectData(info, context);
info.AddValue("failureType", (int)FailureType);
}
#endif
internal RedisConnectionException(ConnectionFailureType failureType, string message) : base(message)
{
FailureType = failureType;
}
internal RedisConnectionException(ConnectionFailureType failureType, string message, Exception innerException) : base(message, innerException)
{
FailureType = failureType;
}
/// <summary>
/// The type of connection failure
/// </summary>
public ConnectionFailureType FailureType { get; }
}
/// <summary>
/// Indicates an issue communicating with redis
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public class RedisException : Exception
{
/// <summary>
/// Deserialization constructor; not intended for general usage
/// </summary>
#if FEATURE_SERIALIZATION
protected RedisException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
#endif
internal RedisException(string message) : base(message) { }
internal RedisException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// Indicates an exception raised by a redis server
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public sealed class RedisServerException : RedisException
{
#if FEATURE_SERIALIZATION
private RedisServerException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
#endif
internal RedisServerException(string message) : base(message) { }
}
sealed class LoggingMessage : Message
{
public readonly TextWriter log;
private readonly Message tail;
public static Message Create(TextWriter log, Message tail)
{
return log == null ? tail : new LoggingMessage(log, tail);
}
private LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
{
this.log = log;
this.tail = tail;
FlagsRaw = tail.FlagsRaw;
}
public override string CommandAndKey => tail.CommandAndKey;
public override void AppendStormLog(StringBuilder sb)
{
tail.AppendStormLog(sb);
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return tail.GetHashSlot(serverSelectionStrategy);
}
internal override void WriteImpl(PhysicalConnection physical)
{
try
{
physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical.Bridge, tail.CommandAndKey);
}
catch { }
tail.WriteImpl(physical);
}
public TextWriter Log => log;
}
abstract class Message : ICompletable
{
public static readonly Message[] EmptyArray = new Message[0];
public readonly int Db;
internal const CommandFlags InternalCallFlag = (CommandFlags)128;
protected RedisCommand command;
private const CommandFlags AskingFlag = (CommandFlags)32,
ScriptUnavailableFlag = (CommandFlags)256;
const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave;
private const CommandFlags UserSelectableFlags
= CommandFlags.None | CommandFlags.DemandMaster | CommandFlags.DemandSlave
| CommandFlags.PreferMaster | CommandFlags.PreferSlave
| CommandFlags.HighPriority | CommandFlags.FireAndForget | CommandFlags.NoRedirect;
private CommandFlags flags;
internal CommandFlags FlagsRaw { get { return flags; } set { flags = value; } }
private ResultBox resultBox;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// Indicates that a command was illegal and was not sent to the server
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public sealed class RedisCommandException : Exception
{
#if FEATURE_SERIALIZATION
private RedisCommandException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
#endif
internal RedisCommandException(string message) : base(message) { }
internal RedisCommandException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// Indicates a connection fault when communicating with redis
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public sealed class RedisConnectionException : RedisException
{
#if FEATURE_SERIALIZATION
private RedisConnectionException(SerializationInfo info, StreamingContext ctx) : base(info, ctx)
{
FailureType = (ConnectionFailureType)info.GetInt32("failureType");
}
/// <summary>
/// Serialization implementation; not intended for general usage
/// </summary>
public override void GetObjectData(SerializationInfo info, StreamingContext context)
{
base.GetObjectData(info, context);
info.AddValue("failureType", (int)FailureType);
}
#endif
internal RedisConnectionException(ConnectionFailureType failureType, string message) : base(message)
{
FailureType = failureType;
}
internal RedisConnectionException(ConnectionFailureType failureType, string message, Exception innerException) : base(message, innerException)
{
FailureType = failureType;
}
/// <summary>
/// The type of connection failure
/// </summary>
public ConnectionFailureType FailureType { get; }
}
/// <summary>
/// Indicates an issue communicating with redis
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public class RedisException : Exception
{
/// <summary>
/// Deserialization constructor; not intended for general usage
/// </summary>
#if FEATURE_SERIALIZATION
protected RedisException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
#endif
internal RedisException(string message) : base(message) { }
internal RedisException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// Indicates an exception raised by a redis server
/// </summary>
#if FEATURE_SERIALIZATION
[Serializable]
#endif
public sealed class RedisServerException : RedisException
{
#if FEATURE_SERIALIZATION
private RedisServerException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
#endif
internal RedisServerException(string message) : base(message) { }
}
sealed class LoggingMessage : Message
{
public readonly TextWriter log;
private readonly Message tail;
public static Message Create(TextWriter log, Message tail)
{
return log == null ? tail : new LoggingMessage(log, tail);
}
private LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
{
this.log = log;
this.tail = tail;
FlagsRaw = tail.FlagsRaw;
}
public override string CommandAndKey => tail.CommandAndKey;
public override void AppendStormLog(StringBuilder sb)
{
tail.AppendStormLog(sb);
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return tail.GetHashSlot(serverSelectionStrategy);
}
internal override void WriteImpl(PhysicalConnection physical)
{
try
{
physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical.Bridge, tail.CommandAndKey);
}
catch { }
tail.WriteImpl(physical);
}
public TextWriter Log => log;
}
abstract class Message : ICompletable
{
public static readonly Message[] EmptyArray = new Message[0];
public readonly int Db;
internal const CommandFlags InternalCallFlag = (CommandFlags)128;
protected RedisCommand command;
private const CommandFlags AskingFlag = (CommandFlags)32,
ScriptUnavailableFlag = (CommandFlags)256;
const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave;
private const CommandFlags UserSelectableFlags
= CommandFlags.None | CommandFlags.DemandMaster | CommandFlags.DemandSlave
| CommandFlags.PreferMaster | CommandFlags.PreferSlave
| CommandFlags.HighPriority | CommandFlags.FireAndForget | CommandFlags.NoRedirect;
private CommandFlags flags;
internal CommandFlags FlagsRaw { get { return flags; } set { flags = value; } }
private ResultBox resultBox;
private ResultProcessor resultProcessor;
// All for profiling purposes
private ProfileStorage performance;
internal DateTime createdDateTime;
internal long createdTimestamp;
protected Message(int db, CommandFlags flags, RedisCommand command)
{
bool dbNeeded = RequiresDatabase(command);
if (db < 0)
{
if (dbNeeded)
{
throw ExceptionFactory.DatabaseRequired(false, command);
}
}
else
{
if (!dbNeeded)
{
throw ExceptionFactory.DatabaseNotRequired(false, command);
}
}
bool masterOnly = IsMasterOnly(command);
Db = db;
this.command = command;
internal long createdTimestamp;
protected Message(int db, CommandFlags flags, RedisCommand command)
{
bool dbNeeded = RequiresDatabase(command);
if (db < 0)
{
if (dbNeeded)
{
throw ExceptionFactory.DatabaseRequired(false, command);
}
}
else
{
if (!dbNeeded)
{
throw ExceptionFactory.DatabaseNotRequired(false, command);
}
}
bool masterOnly = IsMasterOnly(command);
Db = db;
this.command = command;
this.flags = flags & UserSelectableFlags;
if (masterOnly) SetMasterOnly();
......@@ -216,8 +216,8 @@ internal void SetProfileStorage(ProfileStorage storage)
{
performance = storage;
performance.SetMessage(this);
}
}
internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved)
{
if (performance == null) return;
......@@ -231,269 +231,292 @@ internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved)
createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
performance = ProfileStorage.NewAttachedToSameContext(oldPerformance, resendTo, isMoved);
performance.SetMessage(this);
}
public RedisCommand Command => command;
public virtual string CommandAndKey => Command.ToString();
public CommandFlags Flags => flags;
/// <summary>
/// Things with the potential to cause harm, or to reveal configuration information
/// </summary>
public bool IsAdmin
{
get
{
switch (Command)
{
case RedisCommand.BGREWRITEAOF:
case RedisCommand.BGSAVE:
case RedisCommand.CLIENT:
case RedisCommand.CLUSTER:
case RedisCommand.CONFIG:
case RedisCommand.DEBUG:
case RedisCommand.FLUSHALL:
case RedisCommand.FLUSHDB:
case RedisCommand.INFO:
case RedisCommand.KEYS:
case RedisCommand.MONITOR:
case RedisCommand.SAVE:
case RedisCommand.SHUTDOWN:
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SYNC:
return true;
default:
return false;
}
}
}
public bool IsAsking => (flags & AskingFlag) != 0;
internal bool IsScriptUnavailable => (flags & ScriptUnavailableFlag) != 0;
internal void SetScriptUnavailable()
{
flags |= ScriptUnavailableFlag;
}
public bool IsFireAndForget => (flags & CommandFlags.FireAndForget) != 0;
public bool IsHighPriority => (flags & CommandFlags.HighPriority) != 0;
public bool IsInternalCall => (flags & InternalCallFlag) != 0;
public ResultBox ResultBox => resultBox;
public static Message Create(int db, CommandFlags flags, RedisCommand command)
{
if (command == RedisCommand.SELECT)
return new SelectMessage(db, flags);
return new CommandMessage(db, flags, command);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key)
{
return new CommandKeyMessage(db, flags, command, key);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1)
{
return new CommandKeyKeyMessage(db, flags, command, key0, key1);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisValue value)
{
return new CommandKeyKeyValueMessage(db, flags, command, key0, key1, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisKey key2)
{
return new CommandKeyKeyKeyMessage(db, flags, command, key0, key1, key2);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value)
{
return new CommandValueMessage(db, flags, command, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value)
{
return new CommandKeyValueMessage(db, flags, command, key, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisChannel channel)
{
return new CommandChannelMessage(db, flags, command, channel);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisChannel channel, RedisValue value)
{
return new CommandChannelValueMessage(db, flags, command, channel, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisChannel channel)
{
return new CommandValueChannelMessage(db, flags, command, value, channel);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1)
{
return new CommandKeyValueValueMessage(db, flags, command, key, value0, value1);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2)
{
return new CommandKeyValueValueValueMessage(db, flags, command, key, value0, value1, value2);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3)
{
return new CommandKeyValueValueValueValueMessage(db, flags, command, key, value0, value1, value2, value3);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1)
{
return new CommandValueValueMessage(db, flags, command, value0, value1);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisKey key)
{
return new CommandValueKeyMessage(db, flags, command, value, key);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2)
{
return new CommandValueValueValueMessage(db, flags, command, value0, value1, value2);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3, RedisValue value4)
{
return new CommandValueValueValueValueValueMessage(db, flags, command, value0, value1, value2, value3, value4);
}
public static Message CreateInSlot(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values)
{
return new CommandSlotValuesMessage(db, slot, flags, command, values);
}
public static bool IsMasterOnly(RedisCommand command)
{
switch (command)
{
case RedisCommand.APPEND:
case RedisCommand.BITOP:
case RedisCommand.BLPOP:
case RedisCommand.BRPOP:
case RedisCommand.BRPOPLPUSH:
case RedisCommand.DECR:
case RedisCommand.DECRBY:
case RedisCommand.DEL:
case RedisCommand.EXPIRE:
case RedisCommand.EXPIREAT:
case RedisCommand.FLUSHALL:
case RedisCommand.FLUSHDB:
case RedisCommand.GETSET:
case RedisCommand.HDEL:
case RedisCommand.HINCRBY:
case RedisCommand.HINCRBYFLOAT:
case RedisCommand.HMSET:
case RedisCommand.HSET:
case RedisCommand.HSETNX:
case RedisCommand.INCR:
case RedisCommand.INCRBY:
case RedisCommand.INCRBYFLOAT:
case RedisCommand.LINSERT:
case RedisCommand.LPOP:
case RedisCommand.LPUSH:
case RedisCommand.LPUSHX:
case RedisCommand.LREM:
case RedisCommand.LSET:
case RedisCommand.LTRIM:
case RedisCommand.MIGRATE:
case RedisCommand.MOVE:
case RedisCommand.MSET:
case RedisCommand.MSETNX:
case RedisCommand.PERSIST:
case RedisCommand.PEXPIRE:
case RedisCommand.PEXPIREAT:
case RedisCommand.PFADD:
case RedisCommand.PFMERGE:
case RedisCommand.PSETEX:
case RedisCommand.RENAME:
case RedisCommand.RENAMENX:
case RedisCommand.RESTORE:
case RedisCommand.RPOP:
case RedisCommand.RPOPLPUSH:
case RedisCommand.RPUSH:
case RedisCommand.RPUSHX:
case RedisCommand.SADD:
case RedisCommand.SDIFFSTORE:
case RedisCommand.SET:
case RedisCommand.SETBIT:
case RedisCommand.SETEX:
case RedisCommand.SETNX:
case RedisCommand.SETRANGE:
case RedisCommand.SINTERSTORE:
case RedisCommand.SMOVE:
case RedisCommand.SPOP:
case RedisCommand.SREM:
case RedisCommand.SUNIONSTORE:
case RedisCommand.ZADD:
case RedisCommand.ZINTERSTORE:
case RedisCommand.ZINCRBY:
case RedisCommand.ZREM:
case RedisCommand.ZREMRANGEBYLEX:
case RedisCommand.ZREMRANGEBYRANK:
case RedisCommand.ZREMRANGEBYSCORE:
case RedisCommand.ZUNIONSTORE:
return true;
default:
return false;
}
}
public virtual void AppendStormLog(StringBuilder sb)
{
if (Db >= 0) sb.Append(Db).Append(':');
sb.Append(CommandAndKey);
}
public virtual int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { return ServerSelectionStrategy.NoSlot; }
public bool IsMasterOnly()
{
// note that the constructor runs the switch statement above, so
// this will alread be true for master-only commands, even if the
// user specified PreferMaster etc
return GetMasterSlaveFlags(flags) == CommandFlags.DemandMaster;
}
/// <summary>
/// This does a few important things:
/// 1: it suppresses error events for commands that the user isn't interested in
/// (i.e. "why does my standalone server keep saying ERR unknown command 'cluster' ?")
/// 2: it allows the initial PING and GET (during connect) to get queued rather
/// than be rejected as no-server-available (note that this doesn't apply to
/// handshake messages, as they bypass the queue completely)
/// 3: it disables non-pref logging, as it is usually server-targeted
/// </summary>
public void SetInternalCall()
{
flags |= InternalCallFlag;
}
public override string ToString()
{
return $"[{Db}]:{CommandAndKey} ({resultProcessor?.GetType().Name ?? "(n/a)"})";
}
public void SetResponseReceived()
{
performance?.SetResponseReceived();
}
}
public RedisCommand Command => command;
public virtual string CommandAndKey => Command.ToString();
public CommandFlags Flags => flags;
/// <summary>
/// Things with the potential to cause harm, or to reveal configuration information
/// </summary>
public bool IsAdmin
{
get
{
switch (Command)
{
case RedisCommand.BGREWRITEAOF:
case RedisCommand.BGSAVE:
case RedisCommand.CLIENT:
case RedisCommand.CLUSTER:
case RedisCommand.CONFIG:
case RedisCommand.DEBUG:
case RedisCommand.FLUSHALL:
case RedisCommand.FLUSHDB:
case RedisCommand.INFO:
case RedisCommand.KEYS:
case RedisCommand.MONITOR:
case RedisCommand.SAVE:
case RedisCommand.SHUTDOWN:
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SYNC:
return true;
default:
return false;
}
}
}
public bool IsAsking => (flags & AskingFlag) != 0;
internal bool IsScriptUnavailable => (flags & ScriptUnavailableFlag) != 0;
internal void SetScriptUnavailable()
{
flags |= ScriptUnavailableFlag;
}
public bool IsFireAndForget => (flags & CommandFlags.FireAndForget) != 0;
public bool IsHighPriority => (flags & CommandFlags.HighPriority) != 0;
public bool IsInternalCall => (flags & InternalCallFlag) != 0;
public ResultBox ResultBox => resultBox;
public static Message Create(int db, CommandFlags flags, RedisCommand command)
{
if (command == RedisCommand.SELECT)
return new SelectMessage(db, flags);
return new CommandMessage(db, flags, command);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key)
{
return new CommandKeyMessage(db, flags, command, key);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1)
{
return new CommandKeyKeyMessage(db, flags, command, key0, key1);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisValue value)
{
return new CommandKeyKeyValueMessage(db, flags, command, key0, key1, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisKey key2)
{
return new CommandKeyKeyKeyMessage(db, flags, command, key0, key1, key2);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value)
{
return new CommandValueMessage(db, flags, command, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value)
{
return new CommandKeyValueMessage(db, flags, command, key, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisChannel channel)
{
return new CommandChannelMessage(db, flags, command, channel);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisChannel channel, RedisValue value)
{
return new CommandChannelValueMessage(db, flags, command, channel, value);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisChannel channel)
{
return new CommandValueChannelMessage(db, flags, command, value, channel);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1)
{
return new CommandKeyValueValueMessage(db, flags, command, key, value0, value1);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2)
{
return new CommandKeyValueValueValueMessage(db, flags, command, key, value0, value1, value2);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, GeoEntry[] values)
{
if (values == null) throw new ArgumentNullException(nameof(values));
if (values.Length == 0)
{
throw new ArgumentOutOfRangeException(nameof(values));
}
if (values.Length == 1)
{
var value = values[0];
return Message.Create(db, flags, command, key, value.Longitude, value.Latitude, value.Member);
}
var arr = new RedisValue[3 * values.Length];
int index = 0;
foreach (var value in values)
{
arr[index++] = value.Longitude;
arr[index++] = value.Latitude;
arr[index++] = value.Member;
}
return new CommandKeyValuesMessage(db, flags, command, key, arr);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3)
{
return new CommandKeyValueValueValueValueMessage(db, flags, command, key, value0, value1, value2, value3);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1)
{
return new CommandValueValueMessage(db, flags, command, value0, value1);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisKey key)
{
return new CommandValueKeyMessage(db, flags, command, value, key);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2)
{
return new CommandValueValueValueMessage(db, flags, command, value0, value1, value2);
}
public static Message Create(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3, RedisValue value4)
{
return new CommandValueValueValueValueValueMessage(db, flags, command, value0, value1, value2, value3, value4);
}
public static Message CreateInSlot(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values)
{
return new CommandSlotValuesMessage(db, slot, flags, command, values);
}
public static bool IsMasterOnly(RedisCommand command)
{
switch (command)
{
case RedisCommand.APPEND:
case RedisCommand.BITOP:
case RedisCommand.BLPOP:
case RedisCommand.BRPOP:
case RedisCommand.BRPOPLPUSH:
case RedisCommand.DECR:
case RedisCommand.DECRBY:
case RedisCommand.DEL:
case RedisCommand.EXPIRE:
case RedisCommand.EXPIREAT:
case RedisCommand.FLUSHALL:
case RedisCommand.FLUSHDB:
case RedisCommand.GETSET:
case RedisCommand.HDEL:
case RedisCommand.HINCRBY:
case RedisCommand.HINCRBYFLOAT:
case RedisCommand.HMSET:
case RedisCommand.HSET:
case RedisCommand.HSETNX:
case RedisCommand.INCR:
case RedisCommand.INCRBY:
case RedisCommand.INCRBYFLOAT:
case RedisCommand.LINSERT:
case RedisCommand.LPOP:
case RedisCommand.LPUSH:
case RedisCommand.LPUSHX:
case RedisCommand.LREM:
case RedisCommand.LSET:
case RedisCommand.LTRIM:
case RedisCommand.MIGRATE:
case RedisCommand.MOVE:
case RedisCommand.MSET:
case RedisCommand.MSETNX:
case RedisCommand.PERSIST:
case RedisCommand.PEXPIRE:
case RedisCommand.PEXPIREAT:
case RedisCommand.PFADD:
case RedisCommand.PFMERGE:
case RedisCommand.PSETEX:
case RedisCommand.RENAME:
case RedisCommand.RENAMENX:
case RedisCommand.RESTORE:
case RedisCommand.RPOP:
case RedisCommand.RPOPLPUSH:
case RedisCommand.RPUSH:
case RedisCommand.RPUSHX:
case RedisCommand.SADD:
case RedisCommand.SDIFFSTORE:
case RedisCommand.SET:
case RedisCommand.SETBIT:
case RedisCommand.SETEX:
case RedisCommand.SETNX:
case RedisCommand.SETRANGE:
case RedisCommand.SINTERSTORE:
case RedisCommand.SMOVE:
case RedisCommand.SPOP:
case RedisCommand.SREM:
case RedisCommand.SUNIONSTORE:
case RedisCommand.ZADD:
case RedisCommand.ZINTERSTORE:
case RedisCommand.ZINCRBY:
case RedisCommand.ZREM:
case RedisCommand.ZREMRANGEBYLEX:
case RedisCommand.ZREMRANGEBYRANK:
case RedisCommand.ZREMRANGEBYSCORE:
case RedisCommand.ZUNIONSTORE:
return true;
default:
return false;
}
}
public virtual void AppendStormLog(StringBuilder sb)
{
if (Db >= 0) sb.Append(Db).Append(':');
sb.Append(CommandAndKey);
}
public virtual int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { return ServerSelectionStrategy.NoSlot; }
public bool IsMasterOnly()
{
// note that the constructor runs the switch statement above, so
// this will alread be true for master-only commands, even if the
// user specified PreferMaster etc
return GetMasterSlaveFlags(flags) == CommandFlags.DemandMaster;
}
/// <summary>
/// This does a few important things:
/// 1: it suppresses error events for commands that the user isn't interested in
/// (i.e. "why does my standalone server keep saying ERR unknown command 'cluster' ?")
/// 2: it allows the initial PING and GET (during connect) to get queued rather
/// than be rejected as no-server-available (note that this doesn't apply to
/// handshake messages, as they bypass the queue completely)
/// 3: it disables non-pref logging, as it is usually server-targeted
/// </summary>
public void SetInternalCall()
{
flags |= InternalCallFlag;
}
public override string ToString()
{
return $"[{Db}]:{CommandAndKey} ({resultProcessor?.GetType().Name ?? "(n/a)"})";
}
public void SetResponseReceived()
{
performance?.SetResponseReceived();
}
public bool TryComplete(bool isAsync)
{
//Ensure we can never call TryComplete on the same resultBox from two threads by grabbing it now
......@@ -507,719 +530,719 @@ public bool TryComplete(bool isAsync)
{
//put result box back if it was not already recycled
Interlocked.Exchange(ref resultBox, currBox);
}
performance?.SetCompleted();
return ret;
}
else
{
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
performance?.SetCompleted();
return true;
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisKey[] keys)
{
switch (keys.Length)
{
case 0: return new CommandKeyMessage(db, flags, command, key);
case 1: return new CommandKeyKeyMessage(db, flags, command, key, keys[0]);
case 2: return new CommandKeyKeyKeyMessage(db, flags, command, key, keys[0], keys[1]);
default: return new CommandKeyKeysMessage(db, flags, command, key, keys);
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, IList<RedisKey> keys)
{
switch (keys.Count)
{
case 0: return new CommandMessage(db, flags, command);
case 1: return new CommandKeyMessage(db, flags, command, keys[0]);
case 2: return new CommandKeyKeyMessage(db, flags, command, keys[0], keys[1]);
case 3: return new CommandKeyKeyKeyMessage(db, flags, command, keys[0], keys[1], keys[2]);
default: return new CommandKeysMessage(db, flags, command, (keys as RedisKey[]) ?? keys.ToArray());
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, IList<RedisValue> values)
{
switch (values.Count)
{
case 0: return new CommandMessage(db, flags, command);
case 1: return new CommandValueMessage(db, flags, command, values[0]);
case 2: return new CommandValueValueMessage(db, flags, command, values[0], values[1]);
case 3: return new CommandValueValueValueMessage(db, flags, command, values[0], values[1], values[2]);
// no 4; not worth adding
case 5: return new CommandValueValueValueValueValueMessage(db, flags, command, values[0], values[1], values[2], values[3], values[4]);
default: return new CommandValuesMessage(db, flags, command, (values as RedisValue[]) ?? values.ToArray());
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue[] values)
{
if (values == null) throw new ArgumentNullException(nameof(values));
switch (values.Length)
{
case 0: return new CommandKeyMessage(db, flags, command, key);
case 1: return new CommandKeyValueMessage(db, flags, command, key, values[0]);
case 2: return new CommandKeyValueValueMessage(db, flags, command, key, values[0], values[1]);
case 3: return new CommandKeyValueValueValueMessage(db, flags, command, key, values[0], values[1], values[2]);
case 4: return new CommandKeyValueValueValueValueMessage(db, flags, command, key, values[0], values[1], values[2], values[3]);
default: return new CommandKeyValuesMessage(db, flags, command, key, values);
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisValue[] values, RedisKey key1)
{
if (values == null) throw new ArgumentNullException(nameof(values));
return new CommandKeyValuesKeyMessage(db, flags, command, key0, values, key1);
}
internal static CommandFlags GetMasterSlaveFlags(CommandFlags flags)
{
// for the purposes of the switch, we only care about two bits
return flags & MaskMasterServerPreference;
}
internal static bool RequiresDatabase(RedisCommand command)
{
switch (command)
{
case RedisCommand.ASKING:
case RedisCommand.AUTH:
case RedisCommand.BGREWRITEAOF:
case RedisCommand.BGSAVE:
case RedisCommand.CLIENT:
case RedisCommand.CLUSTER:
case RedisCommand.CONFIG:
case RedisCommand.DISCARD:
case RedisCommand.ECHO:
case RedisCommand.FLUSHALL:
case RedisCommand.INFO:
case RedisCommand.LASTSAVE:
case RedisCommand.MONITOR:
case RedisCommand.MULTI:
case RedisCommand.PING:
case RedisCommand.PUBLISH:
case RedisCommand.PUBSUB:
case RedisCommand.PUNSUBSCRIBE:
case RedisCommand.PSUBSCRIBE:
case RedisCommand.QUIT:
case RedisCommand.READONLY:
case RedisCommand.READWRITE:
case RedisCommand.SAVE:
case RedisCommand.SCRIPT:
case RedisCommand.SHUTDOWN:
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SYNC:
case RedisCommand.TIME:
case RedisCommand.UNSUBSCRIBE:
case RedisCommand.SENTINEL:
return false;
default:
return true;
}
}
internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, CommandFlags masterSlave)
{
// take away the two flags we don't want, and add back the ones we care about
return (everything & ~(CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave))
| masterSlave;
}
internal void Cancel()
{
resultProcessor?.SetException(this, new TaskCanceledException());
}
// true if ready to be completed (i.e. false if re-issued to another server)
internal bool ComputeResult(PhysicalConnection connection, RawResult result)
{
return resultProcessor == null || resultProcessor.SetResult(connection, this, result);
}
internal void Fail(ConnectionFailureType failure, Exception innerException)
{
PhysicalConnection.IdentifyFailureType(innerException, ref failure);
resultProcessor?.ConnectionFail(this, failure, innerException);
}
internal void SetEnqueued()
{
performance?.SetEnqueued();
}
internal void SetRequestSent()
{
performance?.SetRequestSent();
}
internal void SetAsking(bool value)
{
if (value) flags |= AskingFlag; // the bits giveth
else flags &= ~AskingFlag; // and the bits taketh away
}
internal void SetNoRedirect()
{
flags |= CommandFlags.NoRedirect;
}
internal void SetPreferMaster()
{
flags = (flags & ~MaskMasterServerPreference) | CommandFlags.PreferMaster;
}
internal void SetPreferSlave()
{
flags = (flags & ~MaskMasterServerPreference) | CommandFlags.PreferSlave;
}
internal void SetSource(ResultProcessor resultProcessor, ResultBox resultBox)
{ // note order here reversed to prevent overload resolution errors
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
internal void SetSource<T>(ResultBox<T> resultBox, ResultProcessor<T> resultProcessor)
{
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
internal abstract void WriteImpl(PhysicalConnection physical);
internal void WriteTo(PhysicalConnection physical)
{
try
{
WriteImpl(physical);
}
catch (RedisCommandException)
{ // these have specific meaning; don't wrap
throw;
}
catch (Exception ex)
{
physical?.OnInternalError(ex);
Fail(ConnectionFailureType.InternalFailure, ex);
}
}
internal abstract class CommandChannelBase : Message
{
protected readonly RedisChannel Channel;
public CommandChannelBase(int db, CommandFlags flags, RedisCommand command, RedisChannel channel) : base(db, flags, command)
{
channel.AssertNotNull();
Channel = channel;
}
public override string CommandAndKey => Command + " " + Channel;
}
internal abstract class CommandKeyBase : Message
{
protected readonly RedisKey Key;
public CommandKeyBase(int db, CommandFlags flags, RedisCommand command, RedisKey key) : base(db, flags, command)
{
key.AssertNotNull();
Key = key;
}
public override string CommandAndKey => Command + " " + (string)Key;
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return serverSelectionStrategy.HashSlot(Key);
}
}
sealed class CommandChannelMessage : CommandChannelBase
{
public CommandChannelMessage(int db, CommandFlags flags, RedisCommand command, RedisChannel channel) : base(db, flags, command, channel)
{ }
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Channel);
}
}
sealed class CommandChannelValueMessage : CommandChannelBase
{
private readonly RedisValue value;
public CommandChannelValueMessage(int db, CommandFlags flags, RedisCommand command, RedisChannel channel, RedisValue value) : base(db, flags, command, channel)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Channel);
physical.Write(value);
}
}
sealed class CommandKeyKeyKeyMessage : CommandKeyBase
{
private readonly RedisKey key1, key2;
public CommandKeyKeyKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisKey key2) : base(db, flags, command, key0)
{
key1.AssertNotNull();
key2.AssertNotNull();
this.key1 = key1;
this.key2 = key2;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = serverSelectionStrategy.HashSlot(Key);
slot = serverSelectionStrategy.CombineSlot(slot, key1);
slot = serverSelectionStrategy.CombineSlot(slot, key2);
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
physical.Write(key1);
physical.Write(key2);
}
}
class CommandKeyKeyMessage : CommandKeyBase
{
protected readonly RedisKey key1;
public CommandKeyKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1) : base(db, flags, command, key0)
{
key1.AssertNotNull();
this.key1 = key1;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = serverSelectionStrategy.HashSlot(Key);
slot = serverSelectionStrategy.CombineSlot(slot, key1);
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Key);
physical.Write(key1);
}
}
sealed class CommandKeyKeysMessage : CommandKeyBase
{
private readonly RedisKey[] keys;
public CommandKeyKeysMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisKey[] keys) : base(db, flags, command, key)
{
for (int i = 0; i < keys.Length; i++)
{
keys[i].AssertNotNull();
}
this.keys = keys;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = serverSelectionStrategy.HashSlot(Key);
for (int i = 0; i < keys.Length; i++)
{
slot = serverSelectionStrategy.CombineSlot(slot, keys[i]);
}
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, keys.Length + 1);
physical.Write(Key);
for (int i = 0; i < keys.Length; i++)
{
physical.Write(keys[i]);
}
}
}
sealed class CommandKeyKeyValueMessage : CommandKeyKeyMessage
{
private readonly RedisValue value;
public CommandKeyKeyValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisValue value) : base(db, flags, command, key0, key1)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
physical.Write(key1);
physical.Write(value);
}
}
sealed class CommandKeyMessage : CommandKeyBase
{
public CommandKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key) : base(db, flags, command, key)
{ }
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Key);
}
}
sealed class CommandValuesMessage : Message
{
private readonly RedisValue[] values;
public CommandValuesMessage(int db, CommandFlags flags, RedisCommand command, RedisValue[] values) : base(db, flags, command)
{
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, values.Length);
for (int i = 0; i < values.Length; i++)
{
physical.Write(values[i]);
}
}
}
sealed class CommandKeysMessage : Message
{
private readonly RedisKey[] keys;
public CommandKeysMessage(int db, CommandFlags flags, RedisCommand command, RedisKey[] keys) : base(db, flags, command)
{
for (int i = 0; i < keys.Length; i++)
{
keys[i].AssertNotNull();
}
this.keys = keys;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
int slot = ServerSelectionStrategy.NoSlot;
for(int i = 0; i < keys.Length; i++)
{
slot = serverSelectionStrategy.CombineSlot(slot, keys[i]);
}
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, keys.Length);
for (int i = 0; i < keys.Length; i++)
{
physical.Write(keys[i]);
}
}
}
sealed class CommandKeyValueMessage : CommandKeyBase
{
private readonly RedisValue value;
public CommandKeyValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value) : base(db, flags, command, key)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Key);
physical.Write(value);
}
}
sealed class CommandKeyValuesKeyMessage : CommandKeyBase
{
private readonly RedisKey key1;
private readonly RedisValue[] values;
public CommandKeyValuesKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisValue[] values, RedisKey key1) : base(db, flags, command, key0)
{
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
key1.AssertNotNull();
this.key1 = key1;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = base.GetHashSlot(serverSelectionStrategy);
return serverSelectionStrategy.CombineSlot(slot, key1);
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, values.Length + 2);
physical.Write(Key);
for (int i = 0; i < values.Length; i++) physical.Write(values[i]);
physical.Write(key1);
}
}
sealed class CommandKeyValuesMessage : CommandKeyBase
{
private readonly RedisValue[] values;
public CommandKeyValuesMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue[] values) : base(db, flags, command, key)
{
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, values.Length + 1);
physical.Write(Key);
for (int i = 0; i < values.Length; i++) physical.Write(values[i]);
}
}
sealed class CommandKeyValueValueMessage : CommandKeyBase
{
private readonly RedisValue value0, value1;
public CommandKeyValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1) : base(db, flags, command, key)
{
value0.AssertNotNull();
value1.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
physical.Write(value0);
physical.Write(value1);
}
}
sealed class CommandKeyValueValueValueMessage : CommandKeyBase
{
private readonly RedisValue value0, value1, value2;
public CommandKeyValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2) : base(db, flags, command, key)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 4);
physical.Write(Key);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
}
}
sealed class CommandKeyValueValueValueValueMessage : CommandKeyBase
{
private readonly RedisValue value0, value1, value2, value3;
public CommandKeyValueValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3) : base(db, flags, command, key)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
value3.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
this.value3 = value3;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 5);
physical.Write(Key);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
physical.Write(value3);
}
}
sealed class CommandMessage : Message
{
public CommandMessage(int db, CommandFlags flags, RedisCommand command) : base(db, flags, command) { }
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 0);
}
}
private class CommandSlotValuesMessage : Message
{
private readonly int slot;
private readonly RedisValue[] values;
public CommandSlotValuesMessage(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values)
: base(db, flags, command)
{
this.slot = slot;
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, values.Length);
for (int i = 0; i < values.Length; i++)
{
physical.Write(values[i]);
}
}
}
sealed class CommandValueChannelMessage : CommandChannelBase
{
private readonly RedisValue value;
public CommandValueChannelMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisChannel channel) : base(db, flags, command, channel)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value);
physical.Write(Channel);
}
}
sealed class CommandValueKeyMessage : CommandKeyBase
{
private readonly RedisValue value;
public CommandValueKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisKey key) : base(db, flags, command, key)
{
value.AssertNotNull();
this.value = value;
}
public override void AppendStormLog(StringBuilder sb)
{
base.AppendStormLog(sb);
sb.Append(" (").Append((string)value).Append(')');
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value);
physical.Write(Key);
}
}
sealed class CommandValueMessage : Message
{
private readonly RedisValue value;
public CommandValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value) : base(db, flags, command)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(value);
}
}
sealed class CommandValueValueMessage : Message
{
private readonly RedisValue value0, value1;
public CommandValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1) : base(db, flags, command)
{
value0.AssertNotNull();
value1.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value0);
physical.Write(value1);
}
}
sealed class CommandValueValueValueMessage : Message
{
private readonly RedisValue value0, value1, value2;
public CommandValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2) : base(db, flags, command)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
}
}
sealed class CommandValueValueValueValueValueMessage : Message
{
private readonly RedisValue value0, value1, value2, value3, value4;
public CommandValueValueValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3, RedisValue value4) : base(db, flags, command)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
value3.AssertNotNull();
value4.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
this.value3 = value3;
this.value4 = value4;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 5);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
physical.Write(value3);
physical.Write(value4);
}
}
sealed class SelectMessage : Message
{
public SelectMessage(int db, CommandFlags flags) : base(db, flags, RedisCommand.SELECT)
{
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Db);
}
}
}
}
}
performance?.SetCompleted();
return ret;
}
else
{
ConnectionMultiplexer.TraceWithoutContext("No result-box to complete for " + Command, "Message");
performance?.SetCompleted();
return true;
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisKey[] keys)
{
switch (keys.Length)
{
case 0: return new CommandKeyMessage(db, flags, command, key);
case 1: return new CommandKeyKeyMessage(db, flags, command, key, keys[0]);
case 2: return new CommandKeyKeyKeyMessage(db, flags, command, key, keys[0], keys[1]);
default: return new CommandKeyKeysMessage(db, flags, command, key, keys);
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, IList<RedisKey> keys)
{
switch (keys.Count)
{
case 0: return new CommandMessage(db, flags, command);
case 1: return new CommandKeyMessage(db, flags, command, keys[0]);
case 2: return new CommandKeyKeyMessage(db, flags, command, keys[0], keys[1]);
case 3: return new CommandKeyKeyKeyMessage(db, flags, command, keys[0], keys[1], keys[2]);
default: return new CommandKeysMessage(db, flags, command, (keys as RedisKey[]) ?? keys.ToArray());
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, IList<RedisValue> values)
{
switch (values.Count)
{
case 0: return new CommandMessage(db, flags, command);
case 1: return new CommandValueMessage(db, flags, command, values[0]);
case 2: return new CommandValueValueMessage(db, flags, command, values[0], values[1]);
case 3: return new CommandValueValueValueMessage(db, flags, command, values[0], values[1], values[2]);
// no 4; not worth adding
case 5: return new CommandValueValueValueValueValueMessage(db, flags, command, values[0], values[1], values[2], values[3], values[4]);
default: return new CommandValuesMessage(db, flags, command, (values as RedisValue[]) ?? values.ToArray());
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue[] values)
{
if (values == null) throw new ArgumentNullException(nameof(values));
switch (values.Length)
{
case 0: return new CommandKeyMessage(db, flags, command, key);
case 1: return new CommandKeyValueMessage(db, flags, command, key, values[0]);
case 2: return new CommandKeyValueValueMessage(db, flags, command, key, values[0], values[1]);
case 3: return new CommandKeyValueValueValueMessage(db, flags, command, key, values[0], values[1], values[2]);
case 4: return new CommandKeyValueValueValueValueMessage(db, flags, command, key, values[0], values[1], values[2], values[3]);
default: return new CommandKeyValuesMessage(db, flags, command, key, values);
}
}
internal static Message Create(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisValue[] values, RedisKey key1)
{
if (values == null) throw new ArgumentNullException(nameof(values));
return new CommandKeyValuesKeyMessage(db, flags, command, key0, values, key1);
}
internal static CommandFlags GetMasterSlaveFlags(CommandFlags flags)
{
// for the purposes of the switch, we only care about two bits
return flags & MaskMasterServerPreference;
}
internal static bool RequiresDatabase(RedisCommand command)
{
switch (command)
{
case RedisCommand.ASKING:
case RedisCommand.AUTH:
case RedisCommand.BGREWRITEAOF:
case RedisCommand.BGSAVE:
case RedisCommand.CLIENT:
case RedisCommand.CLUSTER:
case RedisCommand.CONFIG:
case RedisCommand.DISCARD:
case RedisCommand.ECHO:
case RedisCommand.FLUSHALL:
case RedisCommand.INFO:
case RedisCommand.LASTSAVE:
case RedisCommand.MONITOR:
case RedisCommand.MULTI:
case RedisCommand.PING:
case RedisCommand.PUBLISH:
case RedisCommand.PUBSUB:
case RedisCommand.PUNSUBSCRIBE:
case RedisCommand.PSUBSCRIBE:
case RedisCommand.QUIT:
case RedisCommand.READONLY:
case RedisCommand.READWRITE:
case RedisCommand.SAVE:
case RedisCommand.SCRIPT:
case RedisCommand.SHUTDOWN:
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SYNC:
case RedisCommand.TIME:
case RedisCommand.UNSUBSCRIBE:
case RedisCommand.SENTINEL:
return false;
default:
return true;
}
}
internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, CommandFlags masterSlave)
{
// take away the two flags we don't want, and add back the ones we care about
return (everything & ~(CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave))
| masterSlave;
}
internal void Cancel()
{
resultProcessor?.SetException(this, new TaskCanceledException());
}
// true if ready to be completed (i.e. false if re-issued to another server)
internal bool ComputeResult(PhysicalConnection connection, RawResult result)
{
return resultProcessor == null || resultProcessor.SetResult(connection, this, result);
}
internal void Fail(ConnectionFailureType failure, Exception innerException)
{
PhysicalConnection.IdentifyFailureType(innerException, ref failure);
resultProcessor?.ConnectionFail(this, failure, innerException);
}
internal void SetEnqueued()
{
performance?.SetEnqueued();
}
internal void SetRequestSent()
{
performance?.SetRequestSent();
}
internal void SetAsking(bool value)
{
if (value) flags |= AskingFlag; // the bits giveth
else flags &= ~AskingFlag; // and the bits taketh away
}
internal void SetNoRedirect()
{
flags |= CommandFlags.NoRedirect;
}
internal void SetPreferMaster()
{
flags = (flags & ~MaskMasterServerPreference) | CommandFlags.PreferMaster;
}
internal void SetPreferSlave()
{
flags = (flags & ~MaskMasterServerPreference) | CommandFlags.PreferSlave;
}
internal void SetSource(ResultProcessor resultProcessor, ResultBox resultBox)
{ // note order here reversed to prevent overload resolution errors
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
internal void SetSource<T>(ResultBox<T> resultBox, ResultProcessor<T> resultProcessor)
{
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
internal abstract void WriteImpl(PhysicalConnection physical);
internal void WriteTo(PhysicalConnection physical)
{
try
{
WriteImpl(physical);
}
catch (RedisCommandException)
{ // these have specific meaning; don't wrap
throw;
}
catch (Exception ex)
{
physical?.OnInternalError(ex);
Fail(ConnectionFailureType.InternalFailure, ex);
}
}
internal abstract class CommandChannelBase : Message
{
protected readonly RedisChannel Channel;
public CommandChannelBase(int db, CommandFlags flags, RedisCommand command, RedisChannel channel) : base(db, flags, command)
{
channel.AssertNotNull();
Channel = channel;
}
public override string CommandAndKey => Command + " " + Channel;
}
internal abstract class CommandKeyBase : Message
{
protected readonly RedisKey Key;
public CommandKeyBase(int db, CommandFlags flags, RedisCommand command, RedisKey key) : base(db, flags, command)
{
key.AssertNotNull();
Key = key;
}
public override string CommandAndKey => Command + " " + (string)Key;
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return serverSelectionStrategy.HashSlot(Key);
}
}
sealed class CommandChannelMessage : CommandChannelBase
{
public CommandChannelMessage(int db, CommandFlags flags, RedisCommand command, RedisChannel channel) : base(db, flags, command, channel)
{ }
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Channel);
}
}
sealed class CommandChannelValueMessage : CommandChannelBase
{
private readonly RedisValue value;
public CommandChannelValueMessage(int db, CommandFlags flags, RedisCommand command, RedisChannel channel, RedisValue value) : base(db, flags, command, channel)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Channel);
physical.Write(value);
}
}
sealed class CommandKeyKeyKeyMessage : CommandKeyBase
{
private readonly RedisKey key1, key2;
public CommandKeyKeyKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisKey key2) : base(db, flags, command, key0)
{
key1.AssertNotNull();
key2.AssertNotNull();
this.key1 = key1;
this.key2 = key2;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = serverSelectionStrategy.HashSlot(Key);
slot = serverSelectionStrategy.CombineSlot(slot, key1);
slot = serverSelectionStrategy.CombineSlot(slot, key2);
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
physical.Write(key1);
physical.Write(key2);
}
}
class CommandKeyKeyMessage : CommandKeyBase
{
protected readonly RedisKey key1;
public CommandKeyKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1) : base(db, flags, command, key0)
{
key1.AssertNotNull();
this.key1 = key1;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = serverSelectionStrategy.HashSlot(Key);
slot = serverSelectionStrategy.CombineSlot(slot, key1);
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Key);
physical.Write(key1);
}
}
sealed class CommandKeyKeysMessage : CommandKeyBase
{
private readonly RedisKey[] keys;
public CommandKeyKeysMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisKey[] keys) : base(db, flags, command, key)
{
for (int i = 0; i < keys.Length; i++)
{
keys[i].AssertNotNull();
}
this.keys = keys;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = serverSelectionStrategy.HashSlot(Key);
for (int i = 0; i < keys.Length; i++)
{
slot = serverSelectionStrategy.CombineSlot(slot, keys[i]);
}
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, keys.Length + 1);
physical.Write(Key);
for (int i = 0; i < keys.Length; i++)
{
physical.Write(keys[i]);
}
}
}
sealed class CommandKeyKeyValueMessage : CommandKeyKeyMessage
{
private readonly RedisValue value;
public CommandKeyKeyValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisKey key1, RedisValue value) : base(db, flags, command, key0, key1)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
physical.Write(key1);
physical.Write(value);
}
}
sealed class CommandKeyMessage : CommandKeyBase
{
public CommandKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key) : base(db, flags, command, key)
{ }
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Key);
}
}
sealed class CommandValuesMessage : Message
{
private readonly RedisValue[] values;
public CommandValuesMessage(int db, CommandFlags flags, RedisCommand command, RedisValue[] values) : base(db, flags, command)
{
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, values.Length);
for (int i = 0; i < values.Length; i++)
{
physical.Write(values[i]);
}
}
}
sealed class CommandKeysMessage : Message
{
private readonly RedisKey[] keys;
public CommandKeysMessage(int db, CommandFlags flags, RedisCommand command, RedisKey[] keys) : base(db, flags, command)
{
for (int i = 0; i < keys.Length; i++)
{
keys[i].AssertNotNull();
}
this.keys = keys;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
int slot = ServerSelectionStrategy.NoSlot;
for (int i = 0; i < keys.Length; i++)
{
slot = serverSelectionStrategy.CombineSlot(slot, keys[i]);
}
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, keys.Length);
for (int i = 0; i < keys.Length; i++)
{
physical.Write(keys[i]);
}
}
}
sealed class CommandKeyValueMessage : CommandKeyBase
{
private readonly RedisValue value;
public CommandKeyValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value) : base(db, flags, command, key)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Key);
physical.Write(value);
}
}
sealed class CommandKeyValuesKeyMessage : CommandKeyBase
{
private readonly RedisKey key1;
private readonly RedisValue[] values;
public CommandKeyValuesKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key0, RedisValue[] values, RedisKey key1) : base(db, flags, command, key0)
{
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
key1.AssertNotNull();
this.key1 = key1;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
var slot = base.GetHashSlot(serverSelectionStrategy);
return serverSelectionStrategy.CombineSlot(slot, key1);
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, values.Length + 2);
physical.Write(Key);
for (int i = 0; i < values.Length; i++) physical.Write(values[i]);
physical.Write(key1);
}
}
sealed class CommandKeyValuesMessage : CommandKeyBase
{
private readonly RedisValue[] values;
public CommandKeyValuesMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue[] values) : base(db, flags, command, key)
{
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, values.Length + 1);
physical.Write(Key);
for (int i = 0; i < values.Length; i++) physical.Write(values[i]);
}
}
sealed class CommandKeyValueValueMessage : CommandKeyBase
{
private readonly RedisValue value0, value1;
public CommandKeyValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1) : base(db, flags, command, key)
{
value0.AssertNotNull();
value1.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
physical.Write(value0);
physical.Write(value1);
}
}
sealed class CommandKeyValueValueValueMessage : CommandKeyBase
{
private readonly RedisValue value0, value1, value2;
public CommandKeyValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2) : base(db, flags, command, key)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 4);
physical.Write(Key);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
}
}
sealed class CommandKeyValueValueValueValueMessage : CommandKeyBase
{
private readonly RedisValue value0, value1, value2, value3;
public CommandKeyValueValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3) : base(db, flags, command, key)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
value3.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
this.value3 = value3;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 5);
physical.Write(Key);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
physical.Write(value3);
}
}
sealed class CommandMessage : Message
{
public CommandMessage(int db, CommandFlags flags, RedisCommand command) : base(db, flags, command) { }
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 0);
}
}
private class CommandSlotValuesMessage : Message
{
private readonly int slot;
private readonly RedisValue[] values;
public CommandSlotValuesMessage(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values)
: base(db, flags, command)
{
this.slot = slot;
for (int i = 0; i < values.Length; i++)
{
values[i].AssertNotNull();
}
this.values = values;
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, values.Length);
for (int i = 0; i < values.Length; i++)
{
physical.Write(values[i]);
}
}
}
sealed class CommandValueChannelMessage : CommandChannelBase
{
private readonly RedisValue value;
public CommandValueChannelMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisChannel channel) : base(db, flags, command, channel)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value);
physical.Write(Channel);
}
}
sealed class CommandValueKeyMessage : CommandKeyBase
{
private readonly RedisValue value;
public CommandValueKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value, RedisKey key) : base(db, flags, command, key)
{
value.AssertNotNull();
this.value = value;
}
public override void AppendStormLog(StringBuilder sb)
{
base.AppendStormLog(sb);
sb.Append(" (").Append((string)value).Append(')');
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value);
physical.Write(Key);
}
}
sealed class CommandValueMessage : Message
{
private readonly RedisValue value;
public CommandValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value) : base(db, flags, command)
{
value.AssertNotNull();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(value);
}
}
sealed class CommandValueValueMessage : Message
{
private readonly RedisValue value0, value1;
public CommandValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1) : base(db, flags, command)
{
value0.AssertNotNull();
value1.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value0);
physical.Write(value1);
}
}
sealed class CommandValueValueValueMessage : Message
{
private readonly RedisValue value0, value1, value2;
public CommandValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2) : base(db, flags, command)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
}
}
sealed class CommandValueValueValueValueValueMessage : Message
{
private readonly RedisValue value0, value1, value2, value3, value4;
public CommandValueValueValueValueValueMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value0, RedisValue value1, RedisValue value2, RedisValue value3, RedisValue value4) : base(db, flags, command)
{
value0.AssertNotNull();
value1.AssertNotNull();
value2.AssertNotNull();
value3.AssertNotNull();
value4.AssertNotNull();
this.value0 = value0;
this.value1 = value1;
this.value2 = value2;
this.value3 = value3;
this.value4 = value4;
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 5);
physical.Write(value0);
physical.Write(value1);
physical.Write(value2);
physical.Write(value3);
physical.Write(value4);
}
}
sealed class SelectMessage : Message
{
public SelectMessage(int db, CommandFlags flags) : base(db, flags, RedisCommand.SELECT)
{
}
internal override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Db);
}
}
}
}
......@@ -230,6 +230,64 @@ internal RedisValue[] GetItemsAsValues()
return arr;
}
}
static readonly string[] NilStrings = new string[0];
internal string[] GetItemsAsStrings()
{
RawResult[] items = GetItems();
if (items == null)
{
return null;
}
else if (items.Length == 0)
{
return NilStrings;
}
else
{
var arr = new string[items.Length];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = (string)(items[i].AsRedisValue());
}
return arr;
}
}
internal GeoPosition?[] GetItemsAsGeoPositionArray()
{
RawResult[] items = GetItems();
if (items == null)
{
return null;
}
else if (items.Length == 0)
{
return new GeoPosition?[0];
}
else
{
var arr = new GeoPosition?[items.Length];
for (int i = 0; i < arr.Length; i++)
{
RawResult[] item = items[i].GetArrayOfRawResults();
if (item == null)
{
arr[i] = null;
}
else
{
arr[i] = new GeoPosition((double)item[0].AsRedisValue(), (double)item[1].AsRedisValue());
}
}
return arr;
}
}
internal RawResult[] GetItemsAsRawResults()
{
return GetItems();
}
// returns an array of RawResults
internal RawResult[] GetArrayOfRawResults()
......
......@@ -38,6 +38,13 @@ enum RedisCommand
FLUSHALL,
FLUSHDB,
GEOADD,
GEODIST,
GEOHASH,
GEOPOS,
GEORADIUS,
GEORADIUSBYMEMBER,
GET,
GETBIT,
GETRANGE,
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
......@@ -45,6 +46,102 @@ public RedisValue DebugObject(RedisKey key, CommandFlags flags = CommandFlags.No
return ExecuteSync(msg, ResultProcessor.RedisValue);
}
public bool GeoAdd(RedisKey key, double longitude, double latitude, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return GeoAdd(key, new GeoEntry(longitude, latitude, member), flags);
}
public bool GeoAdd(RedisKey key, GeoEntry value, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.GEOADD, key, value.Longitude, value.Latitude, value.Member);
return ExecuteSync(msg, ResultProcessor.Boolean);
}
public long GeoAdd(RedisKey key, GeoEntry[] values, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.GEOADD, key, values);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public bool GeoRemove(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return SortedSetRemove(key, member, flags);
}
public double GeoDistance(RedisKey key, RedisValue value0, RedisValue value1, GeoUnit unit = GeoUnit.Meters, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.GEODIST, key, value0, value1, StackExchange.Redis.GeoPosition.GetRedisUnit(unit));
return (double)ExecuteSync(msg, ResultProcessor.RedisValue);
}
public string[] GeoHash(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None)
{
if (members == null) throw new ArgumentNullException(nameof(members));
var redisValues = new RedisValue[members.Length];
for (var i = 0; i < members.Length; i++) redisValues[i] = members[i];
var msg = Message.Create(Database, flags, RedisCommand.GEOHASH, key, redisValues);
return ExecuteSync(msg, ResultProcessor.StringArray);
}
public string[] GeoHash(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return GeoHash(key, new[] { member }, flags);
}
public GeoPosition?[] GeoPosition(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None)
{
if (members == null) throw new ArgumentNullException(nameof(members));
var redisValues = new RedisValue[members.Length];
for (var i = 0; i < members.Length; i++) redisValues[i] = members[i];
var msg = Message.Create(Database, flags, RedisCommand.GEOPOS, key, redisValues);
return ExecuteSync(msg, ResultProcessor.RedisGeoPosition);
}
public GeoPosition? GeoPosition(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
{
return GeoPosition(key, new[] { member }, flags)[0];
}
private Message GetGeoRadiusMessage(RedisKey key, RedisValue? member, double longitude, double latitude, double radius, GeoUnit unit, int count, Order? order, GeoRadiusOptions options, CommandFlags flags)
{
var redisValues = new List<RedisValue>();
RedisCommand command;
if (member == null)
{
redisValues.Add(longitude);
redisValues.Add(latitude);
command = RedisCommand.GEORADIUS;
}
else
{
redisValues.Add(member.Value);
command = RedisCommand.GEORADIUSBYMEMBER;
}
redisValues.Add(radius);
redisValues.Add(StackExchange.Redis.GeoPosition.GetRedisUnit(unit));
if ((options & GeoRadiusOptions.WithCoordinates) != 0) redisValues.Add("WITHCOORD");
if ((options & GeoRadiusOptions.WithDistance) != 0) redisValues.Add("WITHDIST");
if ((options & GeoRadiusOptions.WithGeoHash) != 0) redisValues.Add("WITHHASH");
if (count > 0) redisValues.Add(count);
if (order != null)
{
switch (order.Value)
{
case Order.Ascending: redisValues.Add("ASC"); break;
case Order.Descending: redisValues.Add("DESC"); break;
default: throw new ArgumentOutOfRangeException(nameof(order));
}
}
return Message.Create(Database, flags, command, key, redisValues.ToArray());
}
public GeoRadiusResult[] GeoRadius(RedisKey key, RedisValue member, double radius, GeoUnit unit, int count, Order? order, GeoRadiusOptions options, CommandFlags flags)
{
return ExecuteSync(GetGeoRadiusMessage(key, member, double.NaN, double.NaN, radius, unit, count, order, options, flags), ResultProcessor.GeoRadiusArray(options));
}
public GeoRadiusResult[] GeoRadius(RedisKey key, double longitude, double latitude, double radius, GeoUnit unit, int count, Order? order, GeoRadiusOptions options, CommandFlags flags)
{
return ExecuteSync(GetGeoRadiusMessage(key, null, longitude, latitude, radius, unit, count, order, options, flags), ResultProcessor.GeoRadiusArray(options));
}
public Task<RedisValue> DebugObjectAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.DEBUG, RedisLiterals.OBJECT, key);
......@@ -310,7 +407,7 @@ public long HyperLogLogLength(RedisKey[] keys, CommandFlags flags = CommandFlags
var features = GetFeatures(Database, keys[0], flags, out server);
// technically a write / master-only command until 2.8.18
if (server != null && !features.HyperLogLogCountSlaveSafe) cmd.SetMasterOnly();
}
}
return ExecuteSync(cmd, ResultProcessor.Int64, server);
}
......@@ -926,7 +1023,6 @@ public Task<RedisResult> ScriptEvaluateAsync(LoadedLuaScript script, object para
{
return script.EvaluateAsync(this, parameters, null, flags);
}
public bool SetAdd(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.SADD, key, value);
......@@ -1685,8 +1781,9 @@ private Message GetHashSetMessage(RedisKey key, HashEntry[] hashFields, CommandF
switch (hashFields.Length)
{
case 0: return null;
case 1: return Message.Create(Database, flags, RedisCommand.HMSET, key,
hashFields[0].name, hashFields[0].value);
case 1:
return Message.Create(Database, flags, RedisCommand.HMSET, key,
hashFields[0].name, hashFields[0].value);
case 2:
return Message.Create(Database, flags, RedisCommand.HMSET, key,
hashFields[0].name, hashFields[0].value,
......@@ -2471,4 +2568,4 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}
}
}
}
......@@ -65,7 +65,14 @@ abstract class ResultProcessor
RedisValue = new RedisValueProcessor();
public static readonly ResultProcessor<RedisValue[]>
RedisValueArray = new RedisValueArrayProcessor();
RedisValueArray = new RedisValueArrayProcessor();
public static readonly ResultProcessor<string[]>
StringArray = new StringArrayProcessor();
public static readonly ResultProcessor<GeoPosition?[]>
RedisGeoPosition = new RedisValueGeoPositionProcessor();
public static readonly ResultProcessor<TimeSpan>
ResponseTimer = new TimingProcessor();
......@@ -75,6 +82,8 @@ abstract class ResultProcessor
public static readonly SortedSetEntryArrayProcessor
SortedSetWithScores = new SortedSetEntryArrayProcessor();
public static ResultProcessor<GeoRadiusResult[]> GeoRadiusArray(GeoRadiusOptions options) => GeoRadiusResultArrayProcessor.Get(options);
public static readonly ResultProcessor<string>
String = new StringProcessor(),
ClusterNodesRaw = new ClusterNodesRawProcessor();
......@@ -85,10 +94,10 @@ public static readonly SortedSetEntryArrayProcessor
SentinelMasterEndpoint = new SentinelGetMasterAddressByNameProcessor();
public static readonly ResultProcessor<KeyValuePair<string, string>[][]>
SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor();
SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor();
#endregion
public static readonly ResultProcessor<KeyValuePair<string, string>[]>
StringPairInterleaved = new StringPairInterleavedProcessor();
public static readonly TimeSpanProcessor
......@@ -96,8 +105,8 @@ public static readonly TimeSpanProcessor
TimeSpanFromSeconds = new TimeSpanProcessor(false);
public static readonly HashEntryArrayProcessor
HashEntryArray = new HashEntryArrayProcessor();
static readonly byte[] MOVED = Encoding.UTF8.GetBytes("MOVED "), ASK = Encoding.UTF8.GetBytes("ASK ");
static readonly byte[] MOVED = Encoding.UTF8.GetBytes("MOVED "), ASK = Encoding.UTF8.GetBytes("ASK ");
public void ConnectionFail(Message message, ConnectionFailureType fail, Exception innerException)
{
PhysicalConnection.IdentifyFailureType(innerException, ref fail);
......@@ -153,8 +162,8 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
if (Format.TryParseInt32(parts[1], out hashSlot) &&
(endpoint = Format.TryParseEndPoint(parts[2])) != null)
{
// no point sending back to same server, and no point sending to a dead server
// no point sending back to same server, and no point sending to a dead server
if (!Equals(server.EndPoint, endpoint))
{
if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
......@@ -176,8 +185,8 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
if (string.IsNullOrWhiteSpace(err))
{
err = result.GetString();
}
}
if (log)
{
bridge.Multiplexer.OnErrorMessage(server.EndPoint, err);
......@@ -204,7 +213,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
private void UnexpectedResponse(Message message, RawResult result)
{
ConnectionMultiplexer.TraceWithoutContext("From " + GetType().Name, "Unexpected Response");
ConnectionFail(message, ConnectionFailureType.ProtocolFailure, "Unexpected response to " + (message?.Command.ToString() ?? "n/a") +": " + result.ToString());
ConnectionFail(message, ConnectionFailureType.ProtocolFailure, "Unexpected response to " + (message?.Command.ToString() ?? "n/a") + ": " + result.ToString());
}
public sealed class TimeSpanProcessor : ResultProcessor<TimeSpan?>
......@@ -314,7 +323,7 @@ public sealed class TrackSubscriptionsProcessor : ResultProcessor<bool>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if(result.Type == ResultType.MultiBulk)
if (result.Type == ResultType.MultiBulk)
{
var items = result.GetItems();
long count;
......@@ -503,7 +512,7 @@ sealed class AutoConfigureProcessor : ResultProcessor<bool>
static readonly byte[] READONLY = Encoding.UTF8.GetBytes("READONLY ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
if(result.IsError && result.AssertStarts(READONLY))
if (result.IsError && result.AssertStarts(READONLY))
{
var server = connection.Bridge.ServerEndPoint;
server.Multiplexer.Trace("Auto-configured role: slave");
......@@ -584,7 +593,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
break;
}
}
else if((val = Extract(line, "run_id:")) != null)
else if ((val = Extract(line, "run_id:")) != null)
{
server.RunId = val;
}
......@@ -927,11 +936,11 @@ class PubSubNumSubProcessor : Int64Processor
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if(result.Type == ResultType.MultiBulk)
if (result.Type == ResultType.MultiBulk)
{
var arr = result.GetItems();
long val;
if(arr != null && arr.Length == 2 && arr[1].TryGetInt64(out val))
if (arr != null && arr.Length == 2 && arr[1].TryGetInt64(out val))
{
SetResult(message, val);
return true;
......@@ -950,7 +959,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
if(result.IsNull)
if (result.IsNull)
{
SetResult(message, null);
return true;
......@@ -975,7 +984,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
if(result.IsNull)
if (result.IsNull)
{
SetResult(message, null);
return true;
......@@ -1090,163 +1099,277 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
return false;
}
}
sealed class RedisValueProcessor : ResultProcessor<RedisValue>
}
sealed class StringArrayProcessor : ResultProcessor<string[]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch(result.Type)
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.AsRedisValue());
return true;
}
return false;
}
}
private class ScriptResultProcessor : ResultProcessor<RedisResult>
{
static readonly byte[] NOSCRIPT = Encoding.UTF8.GetBytes("NOSCRIPT ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type == ResultType.Error && result.AssertStarts(NOSCRIPT))
{ // scripts are not flushed individually, so assume the entire script cache is toast ("SCRIPT FLUSH")
connection.Bridge.ServerEndPoint.FlushScriptCache();
message.SetScriptUnavailable();
}
// and apply usual processing for the rest
return base.SetResult(connection, message, result);
}
case ResultType.MultiBulk:
var arr = result.GetItemsAsStrings();
// note that top-level error messages still get handled by SetResult, but nested errors
// (is that a thing?) will be wrapped in the RedisResult
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var value = Redis.RedisResult.TryCreate(connection, result);
if (value != null)
{
SetResult(message, value);
return true;
SetResult(message, arr);
return true;
}
return false;
}
}
sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase<KeyValuePair<string, string>>
{
protected override KeyValuePair<string, string> Parse(RawResult first, RawResult second)
{
return new KeyValuePair<string, string>(first.GetString(), second.GetString());
}
}
sealed class StringProcessor : ResultProcessor<string>
}
sealed class RedisValueGeoPositionProcessor : ResultProcessor<GeoPosition?[]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.GetString());
case ResultType.MultiBulk:
var arr = result.GetItemsAsGeoPositionArray();
SetResult(message, arr);
return true;
}
return false;
}
}
private class TracerProcessor : ResultProcessor<bool>
{
static readonly byte[]
authRequired = Encoding.UTF8.GetBytes("NOAUTH Authentication required."),
authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
private readonly bool establishConnection;
public TracerProcessor(bool establishConnection)
{
this.establishConnection = establishConnection;
}
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
if (result.IsError)
{
if (result.IsEqual(authFail) || result.IsEqual(authRequired))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, new Exception(result.ToString() + " Verify if the Redis password provided is correct."));
}
else if (result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
}
}
return final;
sealed class GeoRadiusResultArrayProcessor : ResultProcessor<GeoRadiusResult[]>
{
private static readonly GeoRadiusResultArrayProcessor[] instances;
private readonly GeoRadiusOptions options;
static GeoRadiusResultArrayProcessor()
{
instances = new GeoRadiusResultArrayProcessor[8];
for (int i = 0; i < 8; i++) instances[i] = new GeoRadiusResultArrayProcessor((GeoRadiusOptions)i);
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
bool happy;
switch(message.Command)
{
case RedisCommand.ECHO:
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.Multiplexer.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.IsEqual(RedisLiterals.BytesPONG);
break;
case RedisCommand.TIME:
happy = result.Type == ResultType.MultiBulk && result.GetItems().Length == 2;
break;
case RedisCommand.EXISTS:
happy = result.Type == ResultType.Integer;
break;
default:
happy = true;
break;
}
if(happy)
{
if(establishConnection) connection.Bridge.OnFullyEstablished(connection);
SetResult(message, happy);
return true;
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
return false;
}
public static GeoRadiusResultArrayProcessor Get(GeoRadiusOptions options)
{
int i = (int)options;
if (i < 0 || i >= instances.Length) throw new ArgumentOutOfRangeException(nameof(options));
return instances[i];
}
private GeoRadiusResultArrayProcessor(GeoRadiusOptions options)
{
this.options = options;
}
}
#region Sentinel
sealed class SentinelGetMasterAddressByNameProcessor : ResultProcessor<EndPoint>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItemsAsValues();
var arr = result.GetItemsAsRawResults();
int port;
if (arr.Length == 2 && int.TryParse(arr[1], out port))
{
SetResult(message, Format.ParseEndPoint(arr[0], port));
return true;
}
else if (arr.Length == 0)
{
SetResult(message, null);
return true;
GeoRadiusResult[] typed;
if (arr == null)
{
typed = null;
}
else
{
var options = this.options;
typed = new GeoRadiusResult[arr.Length];
for (int i = 0; i < arr.Length; i++)
{
typed[i] = Parse(options, arr[i]);
}
}
SetResult(message, typed);
return true;
}
return false;
}
private static GeoRadiusResult Parse(GeoRadiusOptions options, RawResult item)
{
if (options == GeoRadiusOptions.None)
{
// Without any WITH option specified, the command just returns a linear array like ["New York","Milan","Paris"].
return new GeoRadiusResult(item.AsRedisValue(), null, null, null);
}
// If WITHCOORD, WITHDIST or WITHHASH options are specified, the command returns an array of arrays, where each sub-array represents a single item.
var arr = item.GetArrayOfRawResults();
int index = 0;
// the first item in the sub-array is always the name of the returned item.
var member = arr[index++].AsRedisValue();
/* The other information is returned in the following order as successive elements of the sub-array.
The distance from the center as a floating point number, in the same unit specified in the radius.
The geohash integer.
The coordinates as a two items x,y array (longitude,latitude).
*/
double? distance = null;
GeoPosition? position = null;
long? hash = null;
if ((options & GeoRadiusOptions.WithDistance) != 0) { distance = (double?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithGeoHash) != 0) { hash = (long?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithCoordinates) != 0)
{
var coords = arr[index++].GetArrayOfRawResults();
double longitude = (double)coords[0].AsRedisValue(), latitude = (double)coords[1].AsRedisValue();
position = new GeoPosition(longitude, latitude);
}
return new GeoRadiusResult(member, distance, hash, position);
}
}
sealed class RedisValueProcessor : ResultProcessor<RedisValue>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.AsRedisValue());
return true;
}
return false;
}
}
private class ScriptResultProcessor : ResultProcessor<RedisResult>
{
static readonly byte[] NOSCRIPT = Encoding.UTF8.GetBytes("NOSCRIPT ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type == ResultType.Error && result.AssertStarts(NOSCRIPT))
{ // scripts are not flushed individually, so assume the entire script cache is toast ("SCRIPT FLUSH")
connection.Bridge.ServerEndPoint.FlushScriptCache();
message.SetScriptUnavailable();
}
// and apply usual processing for the rest
return base.SetResult(connection, message, result);
}
// note that top-level error messages still get handled by SetResult, but nested errors
// (is that a thing?) will be wrapped in the RedisResult
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var value = Redis.RedisResult.TryCreate(connection, result);
if (value != null)
{
SetResult(message, value);
return true;
}
return false;
}
}
sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase<KeyValuePair<string, string>>
{
protected override KeyValuePair<string, string> Parse(RawResult first, RawResult second)
{
return new KeyValuePair<string, string>(first.GetString(), second.GetString());
}
}
sealed class StringProcessor : ResultProcessor<string>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.GetString());
return true;
}
return false;
}
}
private class TracerProcessor : ResultProcessor<bool>
{
static readonly byte[]
authRequired = Encoding.UTF8.GetBytes("NOAUTH Authentication required."),
authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
private readonly bool establishConnection;
public TracerProcessor(bool establishConnection)
{
this.establishConnection = establishConnection;
}
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
if (result.IsError)
{
if (result.IsEqual(authFail) || result.IsEqual(authRequired))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, new Exception(result.ToString() + " Verify if the Redis password provided is correct."));
}
else if (result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
}
}
return final;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
bool happy;
switch (message.Command)
{
case RedisCommand.ECHO:
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.Multiplexer.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.IsEqual(RedisLiterals.BytesPONG);
break;
case RedisCommand.TIME:
happy = result.Type == ResultType.MultiBulk && result.GetItems().Length == 2;
break;
case RedisCommand.EXISTS:
happy = result.Type == ResultType.Integer;
break;
default:
happy = true;
break;
}
if (happy)
{
if (establishConnection) connection.Bridge.OnFullyEstablished(connection);
SetResult(message, happy);
return true;
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
return false;
}
}
}
#region Sentinel
sealed class SentinelGetMasterAddressByNameProcessor : ResultProcessor<EndPoint>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItemsAsValues();
int port;
if (arr.Length == 2 && int.TryParse(arr[1], out port))
{
SetResult(message, Format.ParseEndPoint(arr[0], port));
return true;
}
else if (arr.Length == 0)
{
SetResult(message, null);
return true;
}
break;
case ResultType.SimpleString:
......@@ -1254,55 +1377,55 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if (result.IsNull)
return true;
break;
}
return false;
}
}
sealed class SentinelArrayOfArraysProcessor : ResultProcessor<KeyValuePair<string, string>[][]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var innerProcessor = StringPairInterleaved as StringPairInterleavedProcessor;
if (innerProcessor == null)
{
return false;
}
switch (result.Type)
{
case ResultType.MultiBulk:
var arrayOfArrays = result.GetArrayOfRawResults();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Length][];
for (int i = 0; i < arrayOfArrays.Length; i++)
{
var rawInnerArray = arrayOfArrays[i];
KeyValuePair<string, string>[] kvpArray;
innerProcessor.TryParse(rawInnerArray, out kvpArray);
returnArray[i] = kvpArray;
}
SetResult(message, returnArray);
return true;
}
return false;
}
}
}
return false;
}
}
sealed class SentinelArrayOfArraysProcessor : ResultProcessor<KeyValuePair<string, string>[][]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var innerProcessor = StringPairInterleaved as StringPairInterleavedProcessor;
if (innerProcessor == null)
{
return false;
}
switch (result.Type)
{
case ResultType.MultiBulk:
var arrayOfArrays = result.GetArrayOfRawResults();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Length][];
for (int i = 0; i < arrayOfArrays.Length; i++)
{
var rawInnerArray = arrayOfArrays[i];
KeyValuePair<string, string>[] kvpArray;
innerProcessor.TryParse(rawInnerArray, out kvpArray);
returnArray[i] = kvpArray;
}
SetResult(message, returnArray);
return true;
}
return false;
}
}
#endregion
}
internal abstract class ResultProcessor<T> : ResultProcessor
{
protected void SetResult(Message message, T value)
{
if (message == null) return;
}
internal abstract class ResultProcessor<T> : ResultProcessor
{
protected void SetResult(Message message, T value)
{
if (message == null) return;
var box = message.ResultBox as ResultBox<T>;
message.SetResponseReceived();
box?.SetResult(value);
}
message.SetResponseReceived();
box?.SetResult(value);
}
}
}
......@@ -4,6 +4,10 @@
namespace StackExchange.Redis
{
/// <summary>
/// Describes a sorted-set element with the corresponding value
/// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment