Commit 63c80596 authored by Marc Gravell's avatar Marc Gravell

test invalidating transactions *right before* we issue the exec

parent f2e00539
...@@ -9,7 +9,7 @@ namespace StackExchange.Redis.Tests ...@@ -9,7 +9,7 @@ namespace StackExchange.Redis.Tests
{ {
public class Transactions : TestBase public class Transactions : TestBase
{ {
public Transactions(ITestOutputHelper output) : base (output) { } public Transactions(ITestOutputHelper output) : base(output) { }
[Fact] [Fact]
public void BasicEmptyTran() public void BasicEmptyTran()
...@@ -924,7 +924,7 @@ public async Task ParallelTransactionsWithConditions() ...@@ -924,7 +924,7 @@ public async Task ParallelTransactionsWithConditions()
tran.AddCondition(Condition.StringEqual(trigger, oldVal)); tran.AddCondition(Condition.StringEqual(trigger, oldVal));
var x = tran.StringIncrementAsync(trigger); var x = tran.StringIncrementAsync(trigger);
var y = tran.StringIncrementAsync(hits); var y = tran.StringIncrementAsync(hits);
if(await tran.ExecuteAsync()) if (await tran.ExecuteAsync())
{ {
Interlocked.Increment(ref expectedSuccess); Interlocked.Increment(ref expectedSuccess);
await x; await x;
...@@ -942,7 +942,7 @@ public async Task ParallelTransactionsWithConditions() ...@@ -942,7 +942,7 @@ public async Task ParallelTransactionsWithConditions()
{ {
await tasks[i]; await tasks[i];
} }
var actual = (int) await muxers[0].GetDatabase().StringGetAsync(hits); var actual = (int)await muxers[0].GetDatabase().StringGetAsync(hits);
Assert.Equal(expectedSuccess, actual); Assert.Equal(expectedSuccess, actual);
Writer.WriteLine($"success: {actual} out of {Workers * PerThread} attempts"); Writer.WriteLine($"success: {actual} out of {Workers * PerThread} attempts");
} }
...@@ -954,5 +954,56 @@ public async Task ParallelTransactionsWithConditions() ...@@ -954,5 +954,56 @@ public async Task ParallelTransactionsWithConditions()
} }
} }
} }
[Fact]
public async Task WatchAbort_StringEqual()
{
using (var vic = Create(log: TextWriter.Null))
using (var perp = Create(log: TextWriter.Null))
{
var key = Me();
var db = vic.GetDatabase();
// expect foo, change to bar at the last minute
vic.PreTransactionExec += cmd =>
{
Writer.WriteLine($"'{cmd}' detected; changing it...");
perp.GetDatabase().StringSet(key, "bar");
};
db.KeyDelete(key);
db.StringSet(key, "foo");
var tran = db.CreateTransaction();
tran.AddCondition(Condition.StringEqual(key, "foo"));
var pong = tran.PingAsync();
Assert.False(await tran.ExecuteAsync());
await Assert.ThrowsAsync<TaskCanceledException>(() => pong);
}
}
[Fact]
public async Task WatchAbort_HashLengthEqual()
{
using (var vic = Create(log: TextWriter.Null))
using (var perp = Create(log: TextWriter.Null))
{
var key = Me();
var db = vic.GetDatabase();
// expect foo, change to bar at the last minute
vic.PreTransactionExec += cmd =>
{
Writer.WriteLine($"'{cmd}' detected; changing it...");
perp.GetDatabase().HashSet(key, "bar", "def");
};
db.KeyDelete(key);
db.HashSet(key, "foo", "abc");
var tran = db.CreateTransaction();
tran.AddCondition(Condition.HashLengthEqual(key, 1));
var pong = tran.PingAsync();
Assert.False(await tran.ExecuteAsync());
await Assert.ThrowsAsync<TaskCanceledException>(() => pong);
}
}
} }
} }
...@@ -704,7 +704,16 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne ...@@ -704,7 +704,16 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{ {
throw ExceptionFactory.MasterOnly(Multiplexer.IncludeDetailInExceptions, message.Command, message, ServerEndPoint); throw ExceptionFactory.MasterOnly(Multiplexer.IncludeDetailInExceptions, message.Command, message, ServerEndPoint);
} }
if (cmd == RedisCommand.QUIT) connection.RecordQuit(); switch(cmd)
{
case RedisCommand.QUIT:
connection.RecordQuit();
break;
case RedisCommand.EXEC:
Multiplexer.OnPreTransactionExec(message); // testing purposes, to force certain errors
break;
}
SelectDatabaseInsideWriteLock(connection, message); SelectDatabaseInsideWriteLock(connection, message);
if (!connection.TransactionActive) if (!connection.TransactionActive)
......
...@@ -254,6 +254,7 @@ internal string GetConnectionName(EndPoint endPoint, ConnectionType connectionTy ...@@ -254,6 +254,7 @@ internal string GetConnectionName(EndPoint endPoint, ConnectionType connectionTy
internal event Action<string, Exception, string> MessageFaulted; internal event Action<string, Exception, string> MessageFaulted;
internal event Action<bool> Closing; internal event Action<bool> Closing;
internal event Action<string> PreTransactionExec;
internal event Action<EndPoint, ConnectionType> Connecting; internal event Action<EndPoint, ConnectionType> Connecting;
internal event Action<EndPoint, ConnectionType> Resurrecting; internal event Action<EndPoint, ConnectionType> Resurrecting;
...@@ -276,6 +277,11 @@ internal void OnResurrecting(EndPoint endpoint, ConnectionType connectionType) ...@@ -276,6 +277,11 @@ internal void OnResurrecting(EndPoint endpoint, ConnectionType connectionType)
{ {
Resurrecting.Invoke(endpoint, connectionType); Resurrecting.Invoke(endpoint, connectionType);
} }
internal void OnPreTransactionExec(Message message)
{
PreTransactionExec?.Invoke(message.CommandAndKey);
}
} }
internal sealed class RedisSubscriber : RedisBase, ISubscriber internal sealed class RedisSubscriber : RedisBase, ISubscriber
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment