Commit dd3c07ce authored by Marc Gravell's avatar Marc Gravell

Make locking *mostly* work with twemproxy (and more generally: without...

Make locking *mostly* work with twemproxy (and more generally: without multi/exec if they aren't available)
parent ddffd780
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using NUnit.Framework; using NUnit.Framework;
using System.Collections.Generic;
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
{ {
...@@ -9,13 +10,14 @@ namespace StackExchange.Redis.Tests ...@@ -9,13 +10,14 @@ namespace StackExchange.Redis.Tests
public class Locking : TestBase public class Locking : TestBase
{ {
[Test] [Test]
public void AggressiveParallel() [TestCaseSource("TestModes")]
public void AggressiveParallel(TestMode testMode)
{ {
int count = 2; int count = 2;
int errorCount = 0; int errorCount = 0;
ManualResetEvent evt = new ManualResetEvent(false); ManualResetEvent evt = new ManualResetEvent(false);
using (var c1 = Create()) using (var c1 = Create(testMode))
using (var c2 = Create()) using (var c2 = Create(testMode))
{ {
WaitCallback cb = obj => WaitCallback cb = obj =>
{ {
...@@ -29,7 +31,7 @@ public void AggressiveParallel() ...@@ -29,7 +31,7 @@ public void AggressiveParallel()
conn.Ping(); conn.Ping();
if (Interlocked.Decrement(ref count) == 0) evt.Set(); if (Interlocked.Decrement(ref count) == 0) evt.Set();
}; };
const int db = 2; int db = testMode == TestMode.Twemproxy ? 0 : 2;
ThreadPool.QueueUserWorkItem(cb, c1.GetDatabase(db)); ThreadPool.QueueUserWorkItem(cb, c1.GetDatabase(db));
ThreadPool.QueueUserWorkItem(cb, c2.GetDatabase(db)); ThreadPool.QueueUserWorkItem(cb, c2.GetDatabase(db));
evt.WaitOne(8000); evt.WaitOne(8000);
...@@ -104,15 +106,42 @@ public void TestLockOpCountByVersion(ConnectionMultiplexer conn, int expected, b ...@@ -104,15 +106,42 @@ public void TestLockOpCountByVersion(ConnectionMultiplexer conn, int expected, b
// note we get a ping from GetCounters // note we get a ping from GetCounters
} }
private ConnectionMultiplexer Create(TestMode mode)
{
switch(mode)
{
case TestMode.MultiExec:
return Create();
case TestMode.NoMultiExec:
return Create(disabledCommands: new[] { "multi", "exec" });
case TestMode.Twemproxy:
return Create(proxy: Proxy.Twemproxy);
default:
throw new NotSupportedException(mode.ToString());
}
}
public enum TestMode
{
MultiExec,
NoMultiExec,
Twemproxy
}
public IEnumerable<TestMode> TestModes()
{
return (TestMode[])Enum.GetValues(typeof(TestMode));
}
[Test] [Test]
public void TakeLockAndExtend() [TestCaseSource("TestModes")]
public void TakeLockAndExtend(TestMode mode)
{ {
using (var conn = Create()) bool withTran = mode == TestMode.MultiExec;
using (var conn = Create(mode))
{ {
RedisValue right = Guid.NewGuid().ToString(), RedisValue right = Guid.NewGuid().ToString(),
wrong = Guid.NewGuid().ToString(); wrong = Guid.NewGuid().ToString();
const int DB = 7; int DB = mode == TestMode.Twemproxy ? 0 : 7;
RedisKey Key = "lock-key"; RedisKey Key = "lock-key";
var db = conn.GetDatabase(DB); var db = conn.GetDatabase(DB);
...@@ -123,9 +152,9 @@ public void TakeLockAndExtend() ...@@ -123,9 +152,9 @@ public void TakeLockAndExtend()
var t1 = db.LockTakeAsync(Key, right, TimeSpan.FromSeconds(20)); var t1 = db.LockTakeAsync(Key, right, TimeSpan.FromSeconds(20));
var t1b = db.LockTakeAsync(Key, wrong, TimeSpan.FromSeconds(10)); var t1b = db.LockTakeAsync(Key, wrong, TimeSpan.FromSeconds(10));
var t2 = db.LockQueryAsync(Key); var t2 = db.LockQueryAsync(Key);
var t3 = db.LockReleaseAsync(Key, wrong); var t3 = withTran ? db.LockReleaseAsync(Key, wrong) : null;
var t4 = db.LockQueryAsync(Key); var t4 = db.LockQueryAsync(Key);
var t5 = db.LockExtendAsync(Key, wrong, TimeSpan.FromSeconds(60)); var t5 = withTran ? db.LockExtendAsync(Key, wrong, TimeSpan.FromSeconds(60)) : null;
var t6 = db.LockQueryAsync(Key); var t6 = db.LockQueryAsync(Key);
var t7 = db.KeyTimeToLiveAsync(Key); var t7 = db.KeyTimeToLiveAsync(Key);
var t8 = db.LockExtendAsync(Key, right, TimeSpan.FromSeconds(60)); var t8 = db.LockExtendAsync(Key, right, TimeSpan.FromSeconds(60));
...@@ -142,9 +171,9 @@ public void TakeLockAndExtend() ...@@ -142,9 +171,9 @@ public void TakeLockAndExtend()
Assert.IsTrue(conn.Wait(t1), "1"); Assert.IsTrue(conn.Wait(t1), "1");
Assert.IsFalse(conn.Wait(t1b), "1b"); Assert.IsFalse(conn.Wait(t1b), "1b");
Assert.AreEqual(right, conn.Wait(t2), "2"); Assert.AreEqual(right, conn.Wait(t2), "2");
Assert.IsFalse(conn.Wait(t3), "3"); if(withTran) Assert.IsFalse(conn.Wait(t3), "3");
Assert.AreEqual(right, conn.Wait(t4), "4"); Assert.AreEqual(right, conn.Wait(t4), "4");
Assert.IsFalse(conn.Wait(t5), "5"); if (withTran) Assert.IsFalse(conn.Wait(t5), "5");
Assert.AreEqual(right, conn.Wait(t6), "6"); Assert.AreEqual(right, conn.Wait(t6), "6");
var ttl = conn.Wait(t7).Value.TotalSeconds; var ttl = conn.Wait(t7).Value.TotalSeconds;
Assert.IsTrue(ttl > 0 && ttl <= 20, "7"); Assert.IsTrue(ttl > 0 && ttl <= 20, "7");
...@@ -190,9 +219,10 @@ public void TakeLockAndExtend() ...@@ -190,9 +219,10 @@ public void TakeLockAndExtend()
[Test] [Test]
public void TestBasicLockNotTaken() [TestCaseSource("TestModes")]
public void TestBasicLockNotTaken(TestMode testMode)
{ {
using (var conn = Create()) using (var conn = Create(testMode))
{ {
int errorCount = 0; int errorCount = 0;
conn.ErrorMessage += delegate { Interlocked.Increment(ref errorCount); }; conn.ErrorMessage += delegate { Interlocked.Increment(ref errorCount); };
...@@ -219,9 +249,10 @@ public void TestBasicLockNotTaken() ...@@ -219,9 +249,10 @@ public void TestBasicLockNotTaken()
} }
[Test] [Test]
public void TestBasicLockTaken() [TestCaseSource("TestModes")]
public void TestBasicLockTaken(TestMode testMode)
{ {
using (var conn = Create()) using (var conn = Create(testMode))
{ {
var db = conn.GetDatabase(0); var db = conn.GetDatabase(0);
db.KeyDelete("lock-exists"); db.KeyDelete("lock-exists");
......
...@@ -30,6 +30,16 @@ public ITransaction CreateTransaction(object asyncState) ...@@ -30,6 +30,16 @@ public ITransaction CreateTransaction(object asyncState)
return new RedisTransaction(this, asyncState); return new RedisTransaction(this, asyncState);
} }
private ITransaction CreateTransactionIfAvailable(object asyncState)
{
var map = multiplexer.CommandMap;
if(!map.IsAvailable(RedisCommand.MULTI) || !map.IsAvailable(RedisCommand.EXEC))
{
return null;
}
return CreateTransaction(asyncState);
}
public RedisValue DebugObject(RedisKey key, CommandFlags flags = CommandFlags.None) public RedisValue DebugObject(RedisKey key, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(Db, flags, RedisCommand.DEBUG, RedisLiterals.OBJECT, key); var msg = Message.Create(Db, flags, RedisCommand.DEBUG, RedisLiterals.OBJECT, key);
...@@ -773,16 +783,24 @@ public Task ListTrimAsync(RedisKey key, long start, long stop, CommandFlags flag ...@@ -773,16 +783,24 @@ public Task ListTrimAsync(RedisKey key, long start, long stop, CommandFlags flag
public bool LockExtend(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None) public bool LockExtend(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None)
{ {
if (value.IsNull) throw new ArgumentNullException("value");
var tran = GetLockExtendTransaction(key, value, expiry); var tran = GetLockExtendTransaction(key, value, expiry);
return tran.Execute(flags); if(tran != null) return tran.Execute(flags);
// without transactions (twemproxy etc), we can't enforce the "value" part
return KeyExpire(key, expiry, flags);
} }
public Task<bool> LockExtendAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None) public Task<bool> LockExtendAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None)
{ {
if (value.IsNull) throw new ArgumentNullException("value");
var tran = GetLockExtendTransaction(key, value, expiry); var tran = GetLockExtendTransaction(key, value, expiry);
return tran.ExecuteAsync(flags); if(tran != null) return tran.ExecuteAsync(flags);
}
// without transactions (twemproxy etc), we can't enforce the "value" part
return KeyExpireAsync(key, expiry, flags);
}
public RedisValue LockQuery(RedisKey key, CommandFlags flags = CommandFlags.None) public RedisValue LockQuery(RedisKey key, CommandFlags flags = CommandFlags.None)
{ {
return StringGet(key, flags); return StringGet(key, flags);
...@@ -795,23 +813,33 @@ public Task<RedisValue> LockQueryAsync(RedisKey key, CommandFlags flags = Comman ...@@ -795,23 +813,33 @@ public Task<RedisValue> LockQueryAsync(RedisKey key, CommandFlags flags = Comman
public bool LockRelease(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) public bool LockRelease(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{ {
if (value.IsNull) throw new ArgumentNullException("value");
var tran = GetLockReleaseTransaction(key, value); var tran = GetLockReleaseTransaction(key, value);
return tran.Execute(flags); if(tran != null) return tran.Execute(flags);
// without transactions (twemproxy etc), we can't enforce the "value" part
return KeyDelete(key, flags);
} }
public Task<bool> LockReleaseAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) public Task<bool> LockReleaseAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{ {
if (value.IsNull) throw new ArgumentNullException("value");
var tran = GetLockReleaseTransaction(key, value); var tran = GetLockReleaseTransaction(key, value);
return tran.ExecuteAsync(flags); if(tran != null) return tran.ExecuteAsync(flags);
// without transactions (twemproxy etc), we can't enforce the "value" part
return KeyDeleteAsync(key, flags);
} }
public bool LockTake(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None) public bool LockTake(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None)
{ {
if (value.IsNull) throw new ArgumentNullException("value");
return StringSet(key, value, expiry, When.NotExists, flags); return StringSet(key, value, expiry, When.NotExists, flags);
} }
public Task<bool> LockTakeAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None) public Task<bool> LockTakeAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None)
{ {
if (value.IsNull) throw new ArgumentNullException("value");
return StringSetAsync(key, value, expiry, When.NotExists, flags); return StringSetAsync(key, value, expiry, When.NotExists, flags);
} }
...@@ -1615,17 +1643,23 @@ private Message GetHashSetMessage(RedisKey key, HashEntry[] hashFields, CommandF ...@@ -1615,17 +1643,23 @@ private Message GetHashSetMessage(RedisKey key, HashEntry[] hashFields, CommandF
ITransaction GetLockExtendTransaction(RedisKey key, RedisValue value, TimeSpan expiry) ITransaction GetLockExtendTransaction(RedisKey key, RedisValue value, TimeSpan expiry)
{ {
var tran = CreateTransaction(asyncState); var tran = CreateTransactionIfAvailable(asyncState);
tran.AddCondition(Condition.StringEqual(key, value)); if (tran != null)
tran.KeyExpireAsync(key, expiry, CommandFlags.FireAndForget); {
tran.AddCondition(Condition.StringEqual(key, value));
tran.KeyExpireAsync(key, expiry, CommandFlags.FireAndForget);
}
return tran; return tran;
} }
ITransaction GetLockReleaseTransaction(RedisKey key, RedisValue value) ITransaction GetLockReleaseTransaction(RedisKey key, RedisValue value)
{ {
var tran = CreateTransaction(asyncState); var tran = CreateTransactionIfAvailable(asyncState);
tran.AddCondition(Condition.StringEqual(key, value)); if (tran != null)
tran.KeyDeleteAsync(key, CommandFlags.FireAndForget); {
tran.AddCondition(Condition.StringEqual(key, value));
tran.KeyDeleteAsync(key, CommandFlags.FireAndForget);
}
return tran; return tran;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment