Commit e9e23102 authored by Marc Gravell's avatar Marc Gravell

add parallel transaction test

parent 4fe6bb2d
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
......@@ -891,5 +893,66 @@ public async Task CombineFireAndForgetAndRegularAsyncInTransaction()
Assert.Equal(30, count);
}
}
[Fact]
public async Task ParallelTransactionsWithConditions()
{
const int Muxers = 4, Workers = 20, PerThread = 250;
var muxers = new ConnectionMultiplexer[Muxers];
try
{
for (int i = 0; i < Muxers; i++)
muxers[i] = Create(log: TextWriter.Null);
RedisKey hits = Me(), trigger = Me() + "3";
int expectedSuccess = 0;
await muxers[0].GetDatabase().KeyDeleteAsync(new[] { hits, trigger });
Task[] tasks = new Task[Workers];
for (int i = 0; i < tasks.Length; i++)
{
var scopedDb = muxers[i % Muxers].GetDatabase();
var rand = new Random(i);
tasks[i] = Task.Run(async () =>
{
for (int j = 0; j < PerThread; j++)
{
var oldVal = await scopedDb.StringGetAsync(trigger);
var tran = scopedDb.CreateTransaction();
tran.AddCondition(Condition.StringEqual(trigger, oldVal));
var x = tran.StringIncrementAsync(trigger);
var y = tran.StringIncrementAsync(hits);
if(await tran.ExecuteAsync())
{
Interlocked.Increment(ref expectedSuccess);
await x;
await y;
}
else
{
await Assert.ThrowsAsync<TaskCanceledException>(() => x);
await Assert.ThrowsAsync<TaskCanceledException>(() => y);
}
}
});
}
for (int i = tasks.Length - 1; i >= 0; i--)
{
await tasks[i];
}
var actual = (int) await muxers[0].GetDatabase().StringGetAsync(hits);
Assert.Equal(expectedSuccess, actual);
Writer.WriteLine($"success: {actual} out of {(Workers * PerThread)} attempts");
}
finally
{
for (int i = 0; i < muxers.Length; i++)
{
try { muxers[i]?.Dispose(); } catch { }
}
}
}
}
}
......@@ -27,11 +27,13 @@ internal sealed class ResultBox<T> : ResultBox
{
private static readonly ResultBox<T>[] store = new ResultBox<T>[64];
private object stateOrCompletionSource;
private int _usageCount;
private T value;
public ResultBox(object stateOrCompletionSource)
{
this.stateOrCompletionSource = stateOrCompletionSource;
_usageCount = 1;
}
public static ResultBox<T> Get(object stateOrCompletionSource)
......@@ -64,6 +66,9 @@ public static void UnwrapAndRecycle(ResultBox<T> box, bool recycle, out T value,
box._exception = null;
if (recycle)
{
var newCount = Interlocked.Decrement(ref box._usageCount);
if (newCount != 0)
throw new InvalidOperationException($"Result box count error: is {newCount} in UnwrapAndRecycle (should be 0)");
for (int i = 0; i < store.Length; i++)
{
if (Interlocked.CompareExchange(ref store[i], box, null) == null) return;
......@@ -119,6 +124,8 @@ public override bool TryComplete(bool isAsync)
private void Reset(object stateOrCompletionSource)
{
var newCount = Interlocked.Increment(ref _usageCount);
if (newCount != 1) throw new InvalidOperationException($"Result box count error: is {newCount} in Reset (should be 1)");
value = default(T);
_exception = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment