Commit 0badeaef authored by Marc Gravell's avatar Marc Gravell

add aggressive tests that stress batches and transactions, sync and async

parent d7c2d80d
......@@ -11,7 +11,6 @@ public class AggresssiveTests : TestBase
{
public AggresssiveTests(ITestOutputHelper output) : base(output) { }
[Fact]
public async Task ParallelTransactionsWithConditions()
{
......@@ -72,5 +71,248 @@ public async Task ParallelTransactionsWithConditions()
}
}
}
private const int IterationCount = 5000, InnerCount = 20;
[FactLongRunning]
public void RunCompetingBatchesOnSameMuxer()
{
using (var muxer = Create())
{
var db = muxer.GetDatabase();
Thread x = new Thread(state => BatchRunPings((IDatabase)state));
x.Name = nameof(BatchRunPings);
Thread y = new Thread(state => BatchRunIntegers((IDatabase)state));
y.Name = nameof(BatchRunIntegers);
x.Start(db);
y.Start(db);
x.Join();
y.Join();
Writer.WriteLine(muxer.GetCounters().Interactive);
}
}
private void BatchRunIntegers(IDatabase db)
{
var key = Me();
db.KeyDelete(key);
db.StringSet(key, 1);
Task[] tasks = new Task[InnerCount];
for(int i = 0; i < IterationCount; i++)
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.StringIncrementAsync(key);
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
}
var count = (long)db.StringGet(key);
Writer.WriteLine($"tally: {count}");
}
private void BatchRunPings(IDatabase db)
{
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.PingAsync();
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
}
}
[FactLongRunning]
public async Task RunCompetingBatchesOnSameMuxerAsync()
{
using (var muxer = Create())
{
var db = muxer.GetDatabase();
var x = Task.Run(() => BatchRunPingsAsync(db));
var y = Task.Run(() => BatchRunIntegersAsync(db));
await x;
await y;
Writer.WriteLine(muxer.GetCounters().Interactive);
}
}
private async Task BatchRunIntegersAsync(IDatabase db)
{
var key = Me();
await db.KeyDeleteAsync(key);
await db.StringSetAsync(key, 1);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.StringIncrementAsync(key);
}
batch.Execute();
for(int j = tasks.Length - 1; j >= 0;j--)
{
await tasks[j];
}
}
var count = (long)await db.StringGetAsync(key);
Writer.WriteLine($"tally: {count}");
}
private async Task BatchRunPingsAsync(IDatabase db)
{
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.PingAsync();
}
batch.Execute();
for (int j = tasks.Length - 1; j >= 0; j--)
{
await tasks[j];
}
}
}
[FactLongRunning]
public void RunCompetingTransactionsOnSameMuxer()
{
using (var muxer = Create(logTransactionData: false))
{
var db = muxer.GetDatabase();
Thread x = new Thread(state => TranRunPings((IDatabase)state));
x.Name = nameof(BatchRunPings);
Thread y = new Thread(state => TranRunIntegers((IDatabase)state));
y.Name = nameof(BatchRunIntegers);
x.Start(db);
y.Start(db);
x.Join();
y.Join();
Writer.WriteLine(muxer.GetCounters().Interactive);
}
}
private void TranRunIntegers(IDatabase db)
{
var key = Me();
db.KeyDelete(key);
db.StringSet(key, 1);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateTransaction();
batch.AddCondition(Condition.KeyExists(key));
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.StringIncrementAsync(key);
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
}
var count = (long)db.StringGet(key);
Writer.WriteLine($"tally: {count}");
}
private void TranRunPings(IDatabase db)
{
var key = Me();
db.KeyDelete(key);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateTransaction();
batch.AddCondition(Condition.KeyNotExists(key));
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.PingAsync();
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
}
}
[FactLongRunning]
public async Task RunCompetingTransactionsOnSameMuxerAsync()
{
using (var muxer = Create(logTransactionData: false))
{
var db = muxer.GetDatabase();
var x = Task.Run(() => TranRunPingsAsync(db));
var y = Task.Run(() => TranRunIntegersAsync(db));
await x;
await y;
Writer.WriteLine(muxer.GetCounters().Interactive);
}
}
private async Task TranRunIntegersAsync(IDatabase db)
{
var key = Me();
await db.KeyDeleteAsync(key);
await db.StringSetAsync(key, 1);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateTransaction();
batch.AddCondition(Condition.KeyExists(key));
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.StringIncrementAsync(key);
}
await batch.ExecuteAsync();
for (int j = tasks.Length - 1; j >= 0; j--)
{
await tasks[j];
}
}
var count = (long)await db.StringGetAsync(key);
Writer.WriteLine($"tally: {count}");
}
private async Task TranRunPingsAsync(IDatabase db)
{
var key = Me();
db.KeyDelete(key);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateTransaction();
batch.AddCondition(Condition.KeyNotExists(key));
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.PingAsync();
}
await batch.ExecuteAsync();
for (int j = tasks.Length - 1; j >= 0; j--)
{
await tasks[j];
}
}
}
}
}
......@@ -213,7 +213,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
bool fail = true, string[] disabledCommands = null, string[] enabledCommands = null,
bool checkConnect = true, string failMessage = null,
string channelPrefix = null, Proxy? proxy = null,
string configuration = null,
string configuration = null, bool logTransactionData = true,
[CallerMemberName] string caller = null)
{
StringWriter localLog = null;
......@@ -288,7 +288,10 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
}
};
muxer.Connecting += (e, t) => Writer.WriteLine($"Connecting to {Format.ToString(e)} as {t}");
muxer.TransactionLog += msg => { lock (Writer) { Writer.WriteLine("tran: " + msg); } };
if (logTransactionData)
{
muxer.TransactionLog += msg => { lock (Writer) { Writer.WriteLine("tran: " + msg); } };
}
muxer.InfoMessage += msg => Writer.WriteLine(msg);
muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}");
muxer.Closing += complete => Writer.WriteLine(complete ? "Closed" : "Closing...");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment