Commit 9a65655b authored by Marc Gravell's avatar Marc Gravell

fix the thread-race around conditions/pending-work in RedisTransaction; this...

fix the thread-race around conditions/pending-work in RedisTransaction; this is *not* the problem we're looking for, it just needs fixing
parent 07c16dd4
...@@ -79,8 +79,15 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output) ...@@ -79,8 +79,15 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output)
{ {
var i = config.LastIndexOf('/'); var i = config.LastIndexOf('/');
var modulePath = config.Substring(0, i + 1) + "redisearch.so"; var modulePath = config.Substring(0, i + 1) + "redisearch.so";
try
{
var result = server.Execute("module", "load", modulePath); var result = server.Execute("module", "load", modulePath);
output?.WriteLine((string)result); output?.WriteLine((string)result);
} catch(RedisServerException err)
{
// *probably* duplicate load; we'll try the tests anyways!
output?.WriteLine(err.Message);
}
} }
} }
return conn; return conn;
......
...@@ -8,9 +8,9 @@ namespace StackExchange.Redis ...@@ -8,9 +8,9 @@ namespace StackExchange.Redis
{ {
internal class RedisTransaction : RedisDatabase, ITransaction internal class RedisTransaction : RedisDatabase, ITransaction
{ {
private List<ConditionResult> conditions; private List<ConditionResult> _conditions;
private List<QueuedMessage> _pending;
private List<QueuedMessage> pending; private object SyncLock => this;
public RedisTransaction(RedisDatabase wrapped, object asyncState) : base(wrapped.multiplexer, wrapped.Database, asyncState ?? wrapped.AsyncState) public RedisTransaction(RedisDatabase wrapped, object asyncState) : base(wrapped.multiplexer, wrapped.Database, asyncState ?? wrapped.AsyncState)
{ {
...@@ -26,18 +26,21 @@ public ConditionResult AddCondition(Condition condition) ...@@ -26,18 +26,21 @@ public ConditionResult AddCondition(Condition condition)
if (condition == null) throw new ArgumentNullException(nameof(condition)); if (condition == null) throw new ArgumentNullException(nameof(condition));
var commandMap = multiplexer.CommandMap; var commandMap = multiplexer.CommandMap;
if (conditions == null) lock (SyncLock)
{
if (_conditions == null)
{ {
// we don't demand these unless the user is requesting conditions, but we need both... // we don't demand these unless the user is requesting conditions, but we need both...
commandMap.AssertAvailable(RedisCommand.WATCH); commandMap.AssertAvailable(RedisCommand.WATCH);
commandMap.AssertAvailable(RedisCommand.UNWATCH); commandMap.AssertAvailable(RedisCommand.UNWATCH);
conditions = new List<ConditionResult>(); _conditions = new List<ConditionResult>();
} }
condition.CheckCommands(commandMap); condition.CheckCommands(commandMap);
var result = new ConditionResult(condition); var result = new ConditionResult(condition);
conditions.Add(result); _conditions.Add(result);
return result; return result;
} }
}
public void Execute() public void Execute()
{ {
...@@ -83,7 +86,9 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr ...@@ -83,7 +86,9 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr
// store it, and return the task of the *outer* command // store it, and return the task of the *outer* command
// (there is no task for the inner command) // (there is no task for the inner command)
(pending ?? (pending = new List<QueuedMessage>())).Add(queued); lock (SyncLock)
{
(_pending ?? (_pending = new List<QueuedMessage>())).Add(queued);
switch (message.Command) switch (message.Command)
{ {
...@@ -97,9 +102,10 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr ...@@ -97,9 +102,10 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr
queued = new QueuedMessage(sel); queued = new QueuedMessage(sel);
wasQueued = ResultBox<bool>.Get(null); wasQueued = ResultBox<bool>.Get(null);
queued.SetSource(wasQueued, QueuedProcessor.Default); queued.SetSource(wasQueued, QueuedProcessor.Default);
pending.Add(queued); _pending.Add(queued);
break; break;
} }
}
return task; return task;
} }
...@@ -110,11 +116,15 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor ...@@ -110,11 +116,15 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor
private Message CreateMessage(CommandFlags flags, out ResultProcessor<bool> processor) private Message CreateMessage(CommandFlags flags, out ResultProcessor<bool> processor)
{ {
var work = pending; List<ConditionResult> cond;
pending = null; // any new operations go into a different queue List<QueuedMessage> work;
var cond = conditions; lock (SyncLock)
conditions = null; // any new conditions go into a different queue {
work = _pending;
_pending = null; // any new operations go into a different queue
cond = _conditions;
_conditions = null; // any new conditions go into a different queue
}
if ((work == null || work.Count == 0) && (cond == null || cond.Count == 0)) if ((work == null || work.Count == 0) && (cond == null || cond.Count == 0))
{ {
if ((flags & CommandFlags.FireAndForget) != 0) if ((flags & CommandFlags.FireAndForget) != 0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment