Commit 83e7d4e7 authored by Marc Gravell's avatar Marc Gravell

attempt to repro #923

parent 0c0b9b6b
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
......@@ -10,96 +6,47 @@ namespace TestConsole
{
internal static class Program
{
private static void Main()
private static async Task Main()
{
Console.WriteLine($"{Environment.OSVersion} / {Environment.Version} / {(Environment.Is64BitProcess ? "64" : "32")}");
Console.WriteLine(RuntimeInformation.FrameworkDescription);
DateTime stop = DateTime.UtcNow.AddSeconds(30);
int i = 0;
do
using (var conn = Create())
{
Console.WriteLine(i++);
RunCompetingBatchesOnSameMuxer();
} while (DateTime.UtcNow < stop);
Console.WriteLine($"Completed {i} iterations, {2 * i * IterationCount * InnerCount} operations");
var sub = conn.GetSubscriber();
sub.Subscribe("foo", (channel, value) => Console.WriteLine($"{channel}: {value}"));
sub.Ping();
await RunPub().ConfigureAwait(false);
}
await Console.Out.WriteLineAsync("Waiting a minute...").ConfigureAwait(false);
await Task.Delay(60 * 1000).ConfigureAwait(false);
}
private static ConnectionMultiplexer Create()
{
var options = new ConfigurationOptions
{
KeepAlive = 5,
EndPoints = { "localhost:6379" },
SyncTimeout = int.MaxValue,
// CommandMap = CommandMap.Create(new HashSet<string> { "subscribe", "psubscsribe", "publish" }, false),
};
var muxer = ConnectionMultiplexer.Connect(options);
muxer.ConnectionFailed += (s, a) => Console.WriteLine($"Failed: {a.ConnectionType}, {a.EndPoint}, {a.FailureType}, {a.Exception}");
muxer.ConnectionRestored += (s, a) => Console.WriteLine($"Restored: {a.ConnectionType}, {a.EndPoint}, {a.FailureType}, {a.Exception}");
muxer.GetDatabase().Ping();
return muxer;
}
private const int IterationCount = 500, InnerCount = 20;
public static void RunCompetingBatchesOnSameMuxer()
{
using (var muxer = Create())
{
var db = muxer.GetDatabase();
Thread x = new Thread(state => BatchRunPings((IDatabase)state))
{
Name = nameof(BatchRunPings)
};
Thread y = new Thread(state => BatchRunIntegers((IDatabase)state))
{
Name = nameof(BatchRunIntegers)
};
var watch = Stopwatch.StartNew();
x.Start(db);
y.Start(db);
x.Join();
y.Join();
watch.Stop();
Console.WriteLine($"{watch.ElapsedMilliseconds}ms");
Console.WriteLine(muxer.GetCounters().Interactive);
Console.WriteLine($"Service Counts: {SocketManager.Shared}");
}
}
private static RedisKey Me([CallerMemberName]string caller = null) => caller;
private static void BatchRunIntegers(IDatabase db)
public static async Task RunPub()
{
var key = Me();
db.KeyDelete(key);
db.StringSet(key, 1);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
using (var conn = Create())
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
var pub = conn.GetSubscriber();
for (int i = 0; i < 100; i++)
{
tasks[j] = batch.StringIncrementAsync(key);
await pub.PublishAsync("foo", i).ConfigureAwait(false);
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
if (i % 1000 == 0) Console.WriteLine(i);
}
var count = (long)db.StringGet(key);
Console.WriteLine($"tally: {count}");
}
private static void BatchRunPings(IDatabase db)
{
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.ExecuteAsync("echo", "echo" + j);
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
if (i % 1000 == 0) Console.WriteLine(i);
await Console.Out.WriteLineAsync("Waiting a minute...").ConfigureAwait(false);
await Task.Delay(60 * 1000).ConfigureAwait(false);
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment