Commit 85165681 authored by Marc Gravell's avatar Marc Gravell

experimental: transaction logging to find this cursed race condition

parent be7d81c4
...@@ -261,6 +261,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -261,6 +261,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
muxer.ConnectionFailed += OnConnectionFailed; muxer.ConnectionFailed += OnConnectionFailed;
muxer.MessageFaulted += (msg, ex, origin) => Writer?.WriteLine($"Faulted from '{origin}': '{msg}' - '{(ex == null ? "(null)" : ex.Message)}'"); muxer.MessageFaulted += (msg, ex, origin) => Writer?.WriteLine($"Faulted from '{origin}': '{msg}' - '{(ex == null ? "(null)" : ex.Message)}'");
muxer.Connecting += (e, t) => Writer.WriteLine($"Connecting to {Format.ToString(e)} as {t}"); muxer.Connecting += (e, t) => Writer.WriteLine($"Connecting to {Format.ToString(e)} as {t}");
muxer.TransactionLog += msg => Writer.WriteLine("tran: " + msg);
muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}"); muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}");
muxer.Closing += complete => Writer.WriteLine(complete ? "Closed" : "Closing..."); muxer.Closing += complete => Writer.WriteLine(complete ? "Closed" : "Closing...");
return muxer; return muxer;
......
...@@ -764,7 +764,7 @@ public async Task BasicTranWithSortedSetContainsCondition(bool demandKeyExists, ...@@ -764,7 +764,7 @@ public async Task BasicTranWithSortedSetContainsCondition(bool demandKeyExists,
[InlineData("", ComparisonType.GreaterThan, 0L, false)] [InlineData("", ComparisonType.GreaterThan, 0L, false)]
public async Task BasicTranWithListLengthCondition(string value, ComparisonType type, long length, bool expectTranResult) public async Task BasicTranWithListLengthCondition(string value, ComparisonType type, long length, bool expectTranResult)
{ {
using (var muxer = Create()) using (var muxer = Create(log: TextWriter.Null))
{ {
RedisKey key = Me(), key2 = Me() + "2"; RedisKey key = Me(), key2 = Me() + "2";
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
......
...@@ -254,7 +254,7 @@ internal string GetConnectionName(EndPoint endPoint, ConnectionType connectionTy ...@@ -254,7 +254,7 @@ internal string GetConnectionName(EndPoint endPoint, ConnectionType connectionTy
internal event Action<string, Exception, string> MessageFaulted; internal event Action<string, Exception, string> MessageFaulted;
internal event Action<bool> Closing; internal event Action<bool> Closing;
internal event Action<string> PreTransactionExec; internal event Action<string> PreTransactionExec, TransactionLog;
internal event Action<EndPoint, ConnectionType> Connecting; internal event Action<EndPoint, ConnectionType> Connecting;
internal event Action<EndPoint, ConnectionType> Resurrecting; internal event Action<EndPoint, ConnectionType> Resurrecting;
...@@ -282,6 +282,11 @@ internal void OnPreTransactionExec(Message message) ...@@ -282,6 +282,11 @@ internal void OnPreTransactionExec(Message message)
{ {
PreTransactionExec?.Invoke(message.CommandAndKey); PreTransactionExec?.Invoke(message.CommandAndKey);
} }
internal void OnTransactionLog(string message)
{
TransactionLog?.Invoke(message);
}
} }
internal sealed class RedisSubscriber : RedisBase, ISubscriber internal sealed class RedisSubscriber : RedisBase, ISubscriber
......
...@@ -224,6 +224,11 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) ...@@ -224,6 +224,11 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
public IEnumerable<Message> GetMessages(PhysicalConnection connection) public IEnumerable<Message> GetMessages(PhysicalConnection connection)
{ {
ResultBox lastBox = null; ResultBox lastBox = null;
var bridge = connection.BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(connection.ToString());
bool explicitCheckForQueued = !bridge.ServerEndPoint.GetFeatures().ExecAbort;
var multiplexer = bridge.Multiplexer;
try try
{ {
// Important: if the server supports EXECABORT, then we can check the pre-conditions (pause there), // Important: if the server supports EXECABORT, then we can check the pre-conditions (pause there),
...@@ -234,15 +239,11 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -234,15 +239,11 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
// up-version servers, pre-condition failures exit with UNWATCH; and on down-version servers pre-condition // up-version servers, pre-condition failures exit with UNWATCH; and on down-version servers pre-condition
// failures exit with DISCARD - but that's ok : both work fine // failures exit with DISCARD - but that's ok : both work fine
var bridge = connection.BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(connection.ToString());
bool explicitCheckForQueued = !bridge.ServerEndPoint.GetFeatures().ExecAbort;
var multiplexer = bridge.Multiplexer;
// PART 1: issue the pre-conditions // PART 1: issue the pre-conditions
if (!IsAborted && conditions.Length != 0) if (!IsAborted && conditions.Length != 0)
{ {
multiplexer.OnTransactionLog($"issueing conditions...");
int cmdCount = 0;
for (int i = 0; i < conditions.Length; i++) for (int i = 0; i < conditions.Length; i++)
{ {
// need to have locked them before sending them // need to have locked them before sending them
...@@ -255,11 +256,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -255,11 +256,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
{ {
msg.SetNoRedirect(); // need to keep them in the current context only msg.SetNoRedirect(); // need to keep them in the current context only
yield return msg; yield return msg;
multiplexer.OnTransactionLog($"issuing {msg.CommandAndKey}");
cmdCount++;
} }
} }
multiplexer.OnTransactionLog($"issued {conditions.Length} conditions ({cmdCount} commands)");
if (!explicitCheckForQueued && lastBox != null) if (!explicitCheckForQueued && lastBox != null)
{ {
multiplexer.OnTransactionLog($"checking conditions in the *early* path");
// need to get those sent ASAP; if they are stuck in the buffers, we die // need to get those sent ASAP; if they are stuck in the buffers, we die
multiplexer.Trace("Flushing and waiting for precondition responses"); multiplexer.Trace("Flushing and waiting for precondition responses");
connection.FlushAsync().Wait(); connection.FlushAsync().Wait();
...@@ -267,11 +272,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -267,11 +272,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
{ {
if (!AreAllConditionsSatisfied(multiplexer)) if (!AreAllConditionsSatisfied(multiplexer))
command = RedisCommand.UNWATCH; // somebody isn't happy command = RedisCommand.UNWATCH; // somebody isn't happy
multiplexer.OnTransactionLog($"after condition check, we are {command}");
} }
else else
{ // timeout running pre-conditions { // timeout running pre-conditions
multiplexer.Trace("Timeout checking preconditions"); multiplexer.Trace("Timeout checking preconditions");
command = RedisCommand.UNWATCH; command = RedisCommand.UNWATCH;
multiplexer.OnTransactionLog($"timeout waiting for conditions, we are {command}");
} }
Monitor.Exit(lastBox); Monitor.Exit(lastBox);
lastBox = null; lastBox = null;
...@@ -283,12 +292,13 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -283,12 +292,13 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
{ {
multiplexer.Trace("Begining transaction"); multiplexer.Trace("Begining transaction");
yield return Message.Create(-1, CommandFlags.None, RedisCommand.MULTI); yield return Message.Create(-1, CommandFlags.None, RedisCommand.MULTI);
multiplexer.OnTransactionLog($"issued MULTI");
} }
// PART 3: issue the commands // PART 3: issue the commands
if (!IsAborted && InnerOperations.Length != 0) if (!IsAborted && InnerOperations.Length != 0)
{ {
multiplexer.Trace("Issuing transaction operations"); multiplexer.Trace("Issuing operations...");
foreach (var op in InnerOperations) foreach (var op in InnerOperations)
{ {
...@@ -304,10 +314,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -304,10 +314,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
} }
} }
yield return op; yield return op;
multiplexer.OnTransactionLog($"issued {op.CommandAndKey}");
} }
multiplexer.OnTransactionLog($"issued {InnerOperations.Length} operations");
if (explicitCheckForQueued && lastBox != null) if (explicitCheckForQueued && lastBox != null)
{ {
multiplexer.OnTransactionLog($"checking conditions in the *late* path");
multiplexer.Trace("Flushing and waiting for precondition+queued responses"); multiplexer.Trace("Flushing and waiting for precondition+queued responses");
connection.FlushAsync().Wait(); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary) connection.FlushAsync().Wait(); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds)) if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
...@@ -323,17 +338,20 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -323,17 +338,20 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
if (!op.WasQueued) if (!op.WasQueued)
{ {
multiplexer.Trace("Aborting: operation was not queued: " + op.Command); multiplexer.Trace("Aborting: operation was not queued: " + op.Command);
multiplexer.OnTransactionLog($"command was not issued: {op.CommandAndKey}");
command = RedisCommand.DISCARD; command = RedisCommand.DISCARD;
break; break;
} }
} }
} }
multiplexer.Trace("Confirmed: QUEUED x " + InnerOperations.Length); multiplexer.Trace("Confirmed: QUEUED x " + InnerOperations.Length);
multiplexer.OnTransactionLog($"after condition check, we are {command}");
} }
else else
{ {
multiplexer.Trace("Aborting: timeout checking queued messages"); multiplexer.Trace("Aborting: timeout checking queued messages");
command = RedisCommand.DISCARD; command = RedisCommand.DISCARD;
multiplexer.OnTransactionLog($"timeout waiting for conditions, we are {command}");
} }
Monitor.Exit(lastBox); Monitor.Exit(lastBox);
lastBox = null; lastBox = null;
...@@ -346,8 +364,8 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -346,8 +364,8 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
} }
if (IsAborted) if (IsAborted)
{ {
multiplexer.OnTransactionLog($"aborting {InnerOperations.Length} wrapped commands...");
connection.Trace("Aborting: canceling wrapped messages"); connection.Trace("Aborting: canceling wrapped messages");
var bridge = connection.BridgeCouldBeNull;
foreach (var op in InnerOperations) foreach (var op in InnerOperations)
{ {
op.Wrapped.Cancel(); op.Wrapped.Cancel();
...@@ -355,6 +373,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -355,6 +373,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
} }
} }
connection.Trace("End of transaction: " + Command); connection.Trace("End of transaction: " + Command);
multiplexer.OnTransactionLog($"issuing {this.Command}");
yield return this; // acts as either an EXEC or an UNWATCH, depending on "aborted" yield return this; // acts as either an EXEC or an UNWATCH, depending on "aborted"
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment