Commit d5024f5e authored by Marc Gravell's avatar Marc Gravell

transactions: don't log the connect crap; use a StringBuilder to buffer the...

transactions: don't log the connect crap; use a StringBuilder to buffer the tran log to avoid lots of sync blocks
parent d86195d2
using System; using System;
using System.IO; using System.IO;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Xunit; using Xunit;
...@@ -9,6 +10,13 @@ namespace StackExchange.Redis.Tests ...@@ -9,6 +10,13 @@ namespace StackExchange.Redis.Tests
{ {
public class Transactions : TestBase public class Transactions : TestBase
{ {
protected override ConnectionMultiplexer Create(string clientName = null, int? syncTimeout = null, bool? allowAdmin = null, int? keepAlive = null, int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null, bool fail = true, string[] disabledCommands = null, string[] enabledCommands = null, bool checkConnect = true, string failMessage = null, string channelPrefix = null, Proxy? proxy = null, string configuration = null, [CallerMemberName] string caller = null)
{
return base.Create(clientName, syncTimeout, allowAdmin, keepAlive, connectTimeout, password, tieBreaker,
TextWriter.Null, // <== the one I care about
fail, disabledCommands, enabledCommands, checkConnect, failMessage, channelPrefix, proxy, configuration, caller);
}
public Transactions(ITestOutputHelper output) : base(output) { } public Transactions(ITestOutputHelper output) : base(output) { }
[Fact] [Fact]
...@@ -764,7 +772,7 @@ public async Task BasicTranWithSortedSetContainsCondition(bool demandKeyExists, ...@@ -764,7 +772,7 @@ public async Task BasicTranWithSortedSetContainsCondition(bool demandKeyExists,
[InlineData("", ComparisonType.GreaterThan, 0L, false)] [InlineData("", ComparisonType.GreaterThan, 0L, false)]
public async Task BasicTranWithListLengthCondition(string value, ComparisonType type, long length, bool expectTranResult) public async Task BasicTranWithListLengthCondition(string value, ComparisonType type, long length, bool expectTranResult)
{ {
using (var muxer = Create(log: TextWriter.Null)) using (var muxer = Create())
{ {
RedisKey key = Me(), key2 = Me() + "2"; RedisKey key = Me(), key2 = Me() + "2";
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
...@@ -903,7 +911,7 @@ public async Task ParallelTransactionsWithConditions() ...@@ -903,7 +911,7 @@ public async Task ParallelTransactionsWithConditions()
try try
{ {
for (int i = 0; i < Muxers; i++) for (int i = 0; i < Muxers; i++)
muxers[i] = Create(log: TextWriter.Null); muxers[i] = Create();
RedisKey hits = Me(), trigger = Me() + "3"; RedisKey hits = Me(), trigger = Me() + "3";
int expectedSuccess = 0; int expectedSuccess = 0;
...@@ -958,8 +966,8 @@ public async Task ParallelTransactionsWithConditions() ...@@ -958,8 +966,8 @@ public async Task ParallelTransactionsWithConditions()
[Fact] [Fact]
public async Task WatchAbort_StringEqual() public async Task WatchAbort_StringEqual()
{ {
using (var vic = Create(log: TextWriter.Null)) using (var vic = Create())
using (var perp = Create(log: TextWriter.Null)) using (var perp = Create())
{ {
var key = Me(); var key = Me();
var db = vic.GetDatabase(); var db = vic.GetDatabase();
...@@ -983,8 +991,8 @@ public async Task WatchAbort_StringEqual() ...@@ -983,8 +991,8 @@ public async Task WatchAbort_StringEqual()
[Fact] [Fact]
public async Task WatchAbort_HashLengthEqual() public async Task WatchAbort_HashLengthEqual()
{ {
using (var vic = Create(log: TextWriter.Null)) using (var vic = Create())
using (var perp = Create(log: TextWriter.Null)) using (var perp = Create())
{ {
var key = Me(); var key = Me();
var db = vic.GetDatabase(); var db = vic.GetDatabase();
......
...@@ -233,151 +233,159 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -233,151 +233,159 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
bool explicitCheckForQueued = !bridge.ServerEndPoint.GetFeatures().ExecAbort; bool explicitCheckForQueued = !bridge.ServerEndPoint.GetFeatures().ExecAbort;
var multiplexer = bridge.Multiplexer; var multiplexer = bridge.Multiplexer;
var sb = new StringBuilder();
try try
{ {
// Important: if the server supports EXECABORT, then we can check the pre-conditions (pause there), try
// which will usually be pretty small and cheap to do - if that passes, we can just isue all the commands
// and rely on EXECABORT to kick us if we are being idiotic inside the MULTI. However, if the server does
// *not* support EXECABORT, then we need to explicitly check for QUEUED anyway; we might as well defer
// checking the preconditions to the same time to avoid having to pause twice. This will mean that on
// up-version servers, pre-condition failures exit with UNWATCH; and on down-version servers pre-condition
// failures exit with DISCARD - but that's ok : both work fine
// PART 1: issue the pre-conditions
if (!IsAborted && conditions.Length != 0)
{ {
multiplexer.OnTransactionLog($"issuing conditions..."); // Important: if the server supports EXECABORT, then we can check the pre-conditions (pause there),
int cmdCount = 0; // which will usually be pretty small and cheap to do - if that passes, we can just isue all the commands
for (int i = 0; i < conditions.Length; i++) // and rely on EXECABORT to kick us if we are being idiotic inside the MULTI. However, if the server does
// *not* support EXECABORT, then we need to explicitly check for QUEUED anyway; we might as well defer
// checking the preconditions to the same time to avoid having to pause twice. This will mean that on
// up-version servers, pre-condition failures exit with UNWATCH; and on down-version servers pre-condition
// failures exit with DISCARD - but that's ok : both work fine
// PART 1: issue the pre-conditions
if (!IsAborted && conditions.Length != 0)
{ {
// need to have locked them before sending them sb.AppendLine($"issuing conditions...");
// to guarantee that we see the pulse int cmdCount = 0;
ResultBox latestBox = conditions[i].GetBox(); for (int i = 0; i < conditions.Length; i++)
Monitor.Enter(latestBox);
if (lastBox != null) Monitor.Exit(lastBox);
lastBox = latestBox;
foreach (var msg in conditions[i].CreateMessages(Db))
{ {
msg.SetNoRedirect(); // need to keep them in the current context only // need to have locked them before sending them
yield return msg; // to guarantee that we see the pulse
multiplexer.OnTransactionLog($"issuing {msg.CommandAndKey}"); ResultBox latestBox = conditions[i].GetBox();
cmdCount++; Monitor.Enter(latestBox);
if (lastBox != null) Monitor.Exit(lastBox);
lastBox = latestBox;
foreach (var msg in conditions[i].CreateMessages(Db))
{
msg.SetNoRedirect(); // need to keep them in the current context only
yield return msg;
sb.AppendLine($"issuing {msg.CommandAndKey}");
cmdCount++;
}
} }
} sb.AppendLine($"issued {conditions.Length} conditions ({cmdCount} commands)");
multiplexer.OnTransactionLog($"issued {conditions.Length} conditions ({cmdCount} commands)");
if (!explicitCheckForQueued && lastBox != null) if (!explicitCheckForQueued && lastBox != null)
{
multiplexer.OnTransactionLog($"checking conditions in the *early* path");
// need to get those sent ASAP; if they are stuck in the buffers, we die
multiplexer.Trace("Flushing and waiting for precondition responses");
connection.FlushAsync().Wait();
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{ {
if (!AreAllConditionsSatisfied(multiplexer)) sb.AppendLine($"checking conditions in the *early* path");
command = RedisCommand.UNWATCH; // somebody isn't happy // need to get those sent ASAP; if they are stuck in the buffers, we die
multiplexer.Trace("Flushing and waiting for precondition responses");
connection.FlushAsync().Wait();
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{
if (!AreAllConditionsSatisfied(multiplexer))
command = RedisCommand.UNWATCH; // somebody isn't happy
multiplexer.OnTransactionLog($"after condition check, we are {command}"); sb.AppendLine($"after condition check, we are {command}");
} }
else else
{ // timeout running pre-conditions { // timeout running pre-conditions
multiplexer.Trace("Timeout checking preconditions"); multiplexer.Trace("Timeout checking preconditions");
command = RedisCommand.UNWATCH; command = RedisCommand.UNWATCH;
multiplexer.OnTransactionLog($"timeout waiting for conditions, we are {command}"); sb.AppendLine($"timeout waiting for conditions, we are {command}");
}
Monitor.Exit(lastBox);
lastBox = null;
} }
Monitor.Exit(lastBox);
lastBox = null;
} }
}
// PART 2: begin the transaction // PART 2: begin the transaction
if (!IsAborted) if (!IsAborted)
{
multiplexer.Trace("Begining transaction");
yield return Message.Create(-1, CommandFlags.None, RedisCommand.MULTI);
multiplexer.OnTransactionLog($"issued MULTI");
}
// PART 3: issue the commands
if (!IsAborted && InnerOperations.Length != 0)
{
multiplexer.Trace("Issuing operations...");
foreach (var op in InnerOperations)
{ {
if (explicitCheckForQueued) multiplexer.Trace("Begining transaction");
{ // need to have locked them before sending them yield return Message.Create(-1, CommandFlags.None, RedisCommand.MULTI);
// to guarantee that we see the pulse sb.AppendLine($"issued MULTI");
ResultBox thisBox = op.ResultBox;
if (thisBox != null)
{
Monitor.Enter(thisBox);
if (lastBox != null) Monitor.Exit(lastBox);
lastBox = thisBox;
}
}
yield return op;
multiplexer.OnTransactionLog($"issued {op.CommandAndKey}");
} }
multiplexer.OnTransactionLog($"issued {InnerOperations.Length} operations");
if (explicitCheckForQueued && lastBox != null) // PART 3: issue the commands
if (!IsAborted && InnerOperations.Length != 0)
{ {
multiplexer.OnTransactionLog($"checking conditions in the *late* path"); multiplexer.Trace("Issuing operations...");
multiplexer.Trace("Flushing and waiting for precondition+queued responses"); foreach (var op in InnerOperations)
connection.FlushAsync().Wait(); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{ {
if (!AreAllConditionsSatisfied(multiplexer)) if (explicitCheckForQueued)
{ { // need to have locked them before sending them
command = RedisCommand.DISCARD; // to guarantee that we see the pulse
ResultBox thisBox = op.ResultBox;
if (thisBox != null)
{
Monitor.Enter(thisBox);
if (lastBox != null) Monitor.Exit(lastBox);
lastBox = thisBox;
}
} }
else yield return op;
sb.AppendLine($"issued {op.CommandAndKey}");
}
sb.AppendLine($"issued {InnerOperations.Length} operations");
if (explicitCheckForQueued && lastBox != null)
{
sb.AppendLine($"checking conditions in the *late* path");
multiplexer.Trace("Flushing and waiting for precondition+queued responses");
connection.FlushAsync().Wait(); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{ {
foreach (var op in InnerOperations) if (!AreAllConditionsSatisfied(multiplexer))
{
command = RedisCommand.DISCARD;
}
else
{ {
if (!op.WasQueued) foreach (var op in InnerOperations)
{ {
multiplexer.Trace("Aborting: operation was not queued: " + op.Command); if (!op.WasQueued)
multiplexer.OnTransactionLog($"command was not issued: {op.CommandAndKey}"); {
command = RedisCommand.DISCARD; multiplexer.Trace("Aborting: operation was not queued: " + op.Command);
break; sb.AppendLine($"command was not issued: {op.CommandAndKey}");
command = RedisCommand.DISCARD;
break;
}
} }
} }
multiplexer.Trace("Confirmed: QUEUED x " + InnerOperations.Length);
sb.AppendLine($"after condition check, we are {command}");
} }
multiplexer.Trace("Confirmed: QUEUED x " + InnerOperations.Length); else
multiplexer.OnTransactionLog($"after condition check, we are {command}"); {
} multiplexer.Trace("Aborting: timeout checking queued messages");
else command = RedisCommand.DISCARD;
{ sb.AppendLine($"timeout waiting for conditions, we are {command}");
multiplexer.Trace("Aborting: timeout checking queued messages"); }
command = RedisCommand.DISCARD; Monitor.Exit(lastBox);
multiplexer.OnTransactionLog($"timeout waiting for conditions, we are {command}"); lastBox = null;
} }
Monitor.Exit(lastBox);
lastBox = null;
} }
} }
finally
{
if (lastBox != null) Monitor.Exit(lastBox);
}
if (IsAborted)
{
sb.AppendLine($"aborting {InnerOperations.Length} wrapped commands...");
connection.Trace("Aborting: canceling wrapped messages");
foreach (var op in InnerOperations)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
}
}
connection.Trace("End of transaction: " + Command);
sb.AppendLine($"issuing {Command}");
yield return this; // acts as either an EXEC or an UNWATCH, depending on "aborted"
} }
finally finally
{ {
if (lastBox != null) Monitor.Exit(lastBox); multiplexer.OnTransactionLog(sb.ToString());
}
if (IsAborted)
{
multiplexer.OnTransactionLog($"aborting {InnerOperations.Length} wrapped commands...");
connection.Trace("Aborting: canceling wrapped messages");
foreach (var op in InnerOperations)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
}
} }
connection.Trace("End of transaction: " + Command);
multiplexer.OnTransactionLog($"issuing {Command}");
yield return this; // acts as either an EXEC or an UNWATCH, depending on "aborted"
} }
protected override void WriteImpl(PhysicalConnection physical) protected override void WriteImpl(PhysicalConnection physical)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment