Commit 8e6c8c0d authored by Marc Gravell's avatar Marc Gravell

PFCOUNT can be master-only or master/slave

parent c94917a1
...@@ -171,30 +171,33 @@ protected Message(int db, CommandFlags flags, RedisCommand command) ...@@ -171,30 +171,33 @@ protected Message(int db, CommandFlags flags, RedisCommand command)
} }
} }
if (IsMasterOnly(command)) bool masterOnly = IsMasterOnly(command);
{
switch (GetMasterSlaveFlags(flags))
{
case CommandFlags.DemandSlave:
throw ExceptionFactory.MasterOnly(false, command, null, null);
case CommandFlags.DemandMaster:
// already fine as-is
break;
case CommandFlags.PreferMaster:
case CommandFlags.PreferSlave:
default: // we will run this on the master, then
flags = SetMasterSlaveFlags(flags, CommandFlags.DemandMaster);
break;
}
}
this.Db = db; this.Db = db;
this.command = command; this.command = command;
this.flags = flags & UserSelectableFlags; this.flags = flags & UserSelectableFlags;
if (masterOnly) SetMasterOnly();
createdDateTime = DateTime.UtcNow; createdDateTime = DateTime.UtcNow;
createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp(); createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
} }
internal void SetMasterOnly()
{
switch (GetMasterSlaveFlags(this.flags))
{
case CommandFlags.DemandSlave:
throw ExceptionFactory.MasterOnly(false, this.command, null, null);
case CommandFlags.DemandMaster:
// already fine as-is
break;
case CommandFlags.PreferMaster:
case CommandFlags.PreferSlave:
default: // we will run this on the master, then
this.flags = SetMasterSlaveFlags(this.flags, CommandFlags.DemandMaster);
break;
}
}
internal void SetProfileStorage(ProfileStorage storage) internal void SetProfileStorage(ProfileStorage storage)
{ {
performance = storage; performance = storage;
...@@ -418,7 +421,6 @@ public static bool IsMasterOnly(RedisCommand command) ...@@ -418,7 +421,6 @@ public static bool IsMasterOnly(RedisCommand command)
case RedisCommand.PEXPIRE: case RedisCommand.PEXPIRE:
case RedisCommand.PEXPIREAT: case RedisCommand.PEXPIREAT:
case RedisCommand.PFADD: case RedisCommand.PFADD:
case RedisCommand.PFCOUNT: // technically a write command
case RedisCommand.PFMERGE: case RedisCommand.PFMERGE:
case RedisCommand.PSETEX: case RedisCommand.PSETEX:
case RedisCommand.RENAME: case RedisCommand.RENAME:
......
...@@ -294,30 +294,50 @@ public Task<bool> HyperLogLogAddAsync(RedisKey key, RedisValue[] values, Command ...@@ -294,30 +294,50 @@ public Task<bool> HyperLogLogAddAsync(RedisKey key, RedisValue[] values, Command
public long HyperLogLogLength(RedisKey key, CommandFlags flags = CommandFlags.None) public long HyperLogLogLength(RedisKey key, CommandFlags flags = CommandFlags.None)
{ {
ServerEndPoint server;
var features = GetFeatures(Db, key, flags, out server);
var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, key); var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, key);
return ExecuteSync(cmd, ResultProcessor.Int64); // technically a write / master-only command until 2.8.18
if (server != null && !features.HyperLogLogCountSlaveSafe) cmd.SetMasterOnly();
return ExecuteSync(cmd, ResultProcessor.Int64, server);
} }
public long HyperLogLogLength(RedisKey[] keys, CommandFlags flags = CommandFlags.None) public long HyperLogLogLength(RedisKey[] keys, CommandFlags flags = CommandFlags.None)
{ {
if (keys == null) throw new ArgumentNullException("keys"); if (keys == null) throw new ArgumentNullException("keys");
ServerEndPoint server = null;
var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, keys); var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, keys);
return ExecuteSync(cmd, ResultProcessor.Int64); if (keys.Length != 0)
{
var features = GetFeatures(Db, keys[0], flags, out server);
// technically a write / master-only command until 2.8.18
if (server != null && !features.HyperLogLogCountSlaveSafe) cmd.SetMasterOnly();
}
return ExecuteSync(cmd, ResultProcessor.Int64, server);
} }
public Task<long> HyperLogLogLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None) public Task<long> HyperLogLogLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{ {
ServerEndPoint server;
var features = GetFeatures(Db, key, flags, out server);
var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, key); var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, key);
return ExecuteAsync(cmd, ResultProcessor.Int64); // technically a write / master-only command until 2.8.18
if (server != null && !features.HyperLogLogCountSlaveSafe) cmd.SetMasterOnly();
return ExecuteAsync(cmd, ResultProcessor.Int64, server);
} }
public Task<long> HyperLogLogLengthAsync(RedisKey[] keys, CommandFlags flags = CommandFlags.None) public Task<long> HyperLogLogLengthAsync(RedisKey[] keys, CommandFlags flags = CommandFlags.None)
{ {
if (keys == null) throw new ArgumentNullException("keys"); if (keys == null) throw new ArgumentNullException("keys");
ServerEndPoint server = null;
var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, keys); var cmd = Message.Create(Db, flags, RedisCommand.PFCOUNT, keys);
return ExecuteAsync(cmd, ResultProcessor.Int64); if (keys.Length != 0)
{
var features = GetFeatures(Db, keys[0], flags, out server);
// technically a write / master-only command until 2.8.18
if (server != null && !features.HyperLogLogCountSlaveSafe) cmd.SetMasterOnly();
}
return ExecuteAsync(cmd, ResultProcessor.Int64, server);
} }
public void HyperLogLogMerge(RedisKey destination, RedisKey first, RedisKey second, CommandFlags flags = CommandFlags.None) public void HyperLogLogMerge(RedisKey destination, RedisKey first, RedisKey second, CommandFlags flags = CommandFlags.None)
...@@ -561,7 +581,7 @@ public Task KeyRestoreAsync(RedisKey key, byte[] value, TimeSpan? expiry = null, ...@@ -561,7 +581,7 @@ public Task KeyRestoreAsync(RedisKey key, byte[] value, TimeSpan? expiry = null,
if (server != null && features.MillisecondExpiry && multiplexer.CommandMap.IsAvailable(RedisCommand.PTTL)) if (server != null && features.MillisecondExpiry && multiplexer.CommandMap.IsAvailable(RedisCommand.PTTL))
{ {
msg = Message.Create(Db, flags, RedisCommand.PTTL, key); msg = Message.Create(Db, flags, RedisCommand.PTTL, key);
return ExecuteSync(msg, ResultProcessor.TimeSpanFromMilliseconds); return ExecuteSync(msg, ResultProcessor.TimeSpanFromMilliseconds, server);
} }
msg = Message.Create(Db, flags, RedisCommand.TTL, key); msg = Message.Create(Db, flags, RedisCommand.TTL, key);
return ExecuteSync(msg, ResultProcessor.TimeSpanFromSeconds); return ExecuteSync(msg, ResultProcessor.TimeSpanFromSeconds);
...@@ -575,7 +595,7 @@ public Task KeyRestoreAsync(RedisKey key, byte[] value, TimeSpan? expiry = null, ...@@ -575,7 +595,7 @@ public Task KeyRestoreAsync(RedisKey key, byte[] value, TimeSpan? expiry = null,
if (server != null && features.MillisecondExpiry && multiplexer.CommandMap.IsAvailable(RedisCommand.PTTL)) if (server != null && features.MillisecondExpiry && multiplexer.CommandMap.IsAvailable(RedisCommand.PTTL))
{ {
msg = Message.Create(Db, flags, RedisCommand.PTTL, key); msg = Message.Create(Db, flags, RedisCommand.PTTL, key);
return ExecuteAsync(msg, ResultProcessor.TimeSpanFromMilliseconds); return ExecuteAsync(msg, ResultProcessor.TimeSpanFromMilliseconds, server);
} }
msg = Message.Create(Db, flags, RedisCommand.TTL, key); msg = Message.Create(Db, flags, RedisCommand.TTL, key);
return ExecuteAsync(msg, ResultProcessor.TimeSpanFromSeconds); return ExecuteAsync(msg, ResultProcessor.TimeSpanFromSeconds);
......
...@@ -25,6 +25,7 @@ public struct RedisFeatures ...@@ -25,6 +25,7 @@ public struct RedisFeatures
v2_6_12 = new Version(2, 6, 12), v2_6_12 = new Version(2, 6, 12),
v2_8_0 = new Version(2, 8, 0), v2_8_0 = new Version(2, 8, 0),
v2_8_12 = new Version(2, 8, 12), v2_8_12 = new Version(2, 8, 12),
v2_8_18 = new Version(2, 8, 18),
v2_9_5 = new Version(2, 9, 5); v2_9_5 = new Version(2, 9, 5);
private readonly Version version; private readonly Version version;
...@@ -137,6 +138,11 @@ public RedisFeatures(Version version) ...@@ -137,6 +138,11 @@ public RedisFeatures(Version version)
/// </summary> /// </summary>
public bool ScriptingDatabaseSafe { get { return Version >= v2_8_12; } } public bool ScriptingDatabaseSafe { get { return Version >= v2_8_12; } }
/// <summary>
/// Is PFCOUNT supported on slaves?
/// </summary>
public bool HyperLogLogCountSlaveSafe { get { return Version >= v2_8_18; } }
/// <summary> /// <summary>
/// The Redis version of the server /// The Redis version of the server
/// </summary> /// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment