Commit 460bf46a authored by Marc Gravell's avatar Marc Gravell

MIGRATE command

parent bc31336f
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace StackExchange.Redis.Tests
{
public class Migrate : TestBase
{
public void Basic()
{
var fromConfig = new ConfigurationOptions { EndPoints = { { PrimaryServer, SecurePort } }, Password = SecurePassword };
var toConfig = new ConfigurationOptions { EndPoints = { { PrimaryServer, PrimaryPort } } };
using (var from = ConnectionMultiplexer.Connect(fromConfig))
using (var to = ConnectionMultiplexer.Connect(toConfig))
{
RedisKey key = Me();
var fromDb = from.GetDatabase();
var toDb = to.GetDatabase();
fromDb.KeyDelete(key);
toDb.KeyDelete(key);
fromDb.StringSet(key, "foo");
var dest = to.GetEndPoints(true).Single();
fromDb.KeyMigrate(key, dest);
Assert.IsFalse(fromDb.KeyExists(key));
Assert.IsTrue(toDb.KeyExists(key));
string s = toDb.StringGet(key);
Assert.AreEqual("foo", s);
}
}
}
}
......@@ -80,6 +80,7 @@
<Compile Include="Lex.cs" />
<Compile Include="Lists.cs" />
<Compile Include="Locking.cs" />
<Compile Include="Migrate.cs" />
<Compile Include="MultiAdd.cs" />
<Compile Include="MultiMaster.cs" />
<Compile Include="Naming.cs" />
......
......@@ -66,6 +66,7 @@
<Compile Include="StackExchange\Redis\ExtensionMethods.cs" />
<Compile Include="StackExchange\Redis\HashEntry.cs" />
<Compile Include="StackExchange\Redis\InternalErrorEventArgs.cs" />
<Compile Include="StackExchange\Redis\MigrateOptions.cs" />
<Compile Include="StackExchange\Redis\RedisChannel.cs" />
<Compile Include="StackExchange\Redis\Bitwise.cs" />
<Compile Include="StackExchange\Redis\ClientFlags.cs" />
......
......@@ -23,6 +23,14 @@ public interface IDatabase : IRedis, IDatabaseAsync
[IgnoreNamePrefix]
IBatch CreateBatch(object asyncState = null);
/// <summary>
/// Atomically transfer a key from a source Redis instance to a destination Redis instance. On success the key is deleted from the original instance by default, and is guaranteed to exist in the target instance.
/// </summary>
/// <remarks>http://redis.io/commands/MIGRATE</remarks>
void KeyMigrate(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Allows creation of a group of operations that will be sent to the server as a single unit,
/// and processed on the server as a single unit.
......
......@@ -222,6 +222,13 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>http://redis.io/commands/persist</remarks>
Task<bool> KeyExpireAsync(RedisKey key, DateTime? expiry, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Atomically transfer a key from a source Redis instance to a destination Redis instance. On success the key is deleted from the original instance by default, and is guaranteed to exist in the target instance.
/// </summary>
/// <remarks>http://redis.io/commands/MIGRATE</remarks>
Task KeyMigrateAsync(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Move key from the currently selected database (see SELECT) to the specified destination database. When key already exists in the destination database, or it does not exist in the source database, it does nothing. It is possible to use MOVE as a locking primitive because of this.
/// </summary>
......
using System;
namespace StackExchange.Redis
{
/// <summary>
/// Additional options for the MIGRATE command
/// </summary>
[Flags]
public enum MigrateOptions
{
/// <summary>
/// No options specified
/// </summary>
None = 0,
/// <summary>
/// Do not remove the key from the local instance.
/// </summary>
Copy = 1,
/// <summary>
/// Replace existing key on the remote instance.
/// </summary>
Replace = 2
}
}
......@@ -408,6 +408,56 @@ public Task<bool> KeyExpireAsync(RedisKey key, DateTime? expiry, CommandFlags fl
return ExecuteAsync(msg, ResultProcessor.Boolean, server: server);
}
public void KeyMigrate(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None)
{
if (timeoutMilliseconds <= 0) timeoutMilliseconds = multiplexer.TimeoutMilliseconds;
var msg = new KeyMigrateCommandMessage(Db, key, toServer, toDatabase, timeoutMilliseconds, migrateOptions, flags);
ExecuteSync(msg, ResultProcessor.DemandOK);
}
public Task KeyMigrateAsync(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None)
{
if (timeoutMilliseconds <= 0) timeoutMilliseconds = multiplexer.TimeoutMilliseconds;
var msg = new KeyMigrateCommandMessage(Db, key, toServer, toDatabase, timeoutMilliseconds, migrateOptions, flags);
return ExecuteAsync(msg, ResultProcessor.DemandOK);
}
sealed class KeyMigrateCommandMessage : Message.CommandKeyBase // MIGRATE is atypical
{
private MigrateOptions migrateOptions;
private int timeoutMilliseconds;
private int toDatabase;
RedisValue toHost, toPort;
public KeyMigrateCommandMessage(int db, RedisKey key, EndPoint toServer, int toDatabase, int timeoutMilliseconds, MigrateOptions migrateOptions, CommandFlags flags)
: base(db, flags, RedisCommand.MIGRATE, key)
{
if (toServer == null) throw new ArgumentNullException("server");
string toHost;
int toPort;
if (!Format.TryGetHostPort(toServer, out toHost, out toPort)) throw new ArgumentException("toServer");
this.toHost = toHost;
this.toPort = toPort;
if (toDatabase < 0) throw new ArgumentOutOfRangeException("toDatabase");
this.toDatabase = toDatabase;
this.timeoutMilliseconds = timeoutMilliseconds;
this.migrateOptions = migrateOptions;
}
internal override void WriteImpl(PhysicalConnection physical)
{
bool isCopy = (migrateOptions & MigrateOptions.Copy) != 0;
bool isReplace = (migrateOptions & MigrateOptions.Replace) != 0;
physical.WriteHeader(Command, 5 + (isCopy ? 1 : 0) + (isReplace ? 1 : 0));
physical.Write(toHost);
physical.Write(toPort);
physical.Write(Key);
physical.Write(toDatabase);
physical.Write(timeoutMilliseconds);
if (isCopy) physical.Write(RedisLiterals.COPY);
if (isReplace) physical.Write(RedisLiterals.REPLACE);
}
}
public bool KeyMove(RedisKey key, int database, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Db, flags, RedisCommand.MOVE, key, database);
......
......@@ -52,6 +52,8 @@ public static readonly RedisValue
EXISTS = "EXISTS",
FLUSH = "FLUSH",
PING = "PING",
COPY = "COPY",
REPLACE = "REPLACE",
// DO NOT CHANGE CASE: these are configuration settings and MUST be as-is
databases = "databases",
......
......@@ -103,6 +103,7 @@
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\Message.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageCompletable.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MessageQueue.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\MigrateOptions.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\Order.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\PhysicalBridge.cs" />
<Compile Include="..\StackExchange.Redis\StackExchange\Redis\PhysicalConnection.cs" />
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment