Commit 16363206 authored by Marc Gravell's avatar Marc Gravell

make RunCompetingBatchesOnSameMuxer the console runner

parent dad32414
...@@ -28,7 +28,10 @@ public sealed partial class SocketManager : IDisposable ...@@ -28,7 +28,10 @@ public sealed partial class SocketManager : IDisposable
public SocketManager(string name = null) public SocketManager(string name = null)
: this(name, false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { } : this(name, false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { }
internal static SocketManager Shared /// <summary>
/// Default / shared socket manager
/// </summary>
public static SocketManager Shared
{ {
get get
{ {
...@@ -46,6 +49,16 @@ internal static SocketManager Shared ...@@ -46,6 +49,16 @@ internal static SocketManager Shared
} }
} }
/// <summary>Returns a string that represents the current object.</summary>
/// <returns>A string that represents the current object.</returns>
public override string ToString()
{
var scheduler = SchedulerPool;
var comp = CompletionPool;
return $"scheduler - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}; completion - queue: {comp ?.TotalServicedByQueue}, pool: {comp?.TotalServicedByPool}";
}
private static SocketManager _shared; private static SocketManager _shared;
/// <summary> /// <summary>
......
using System; using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis; using StackExchange.Redis;
namespace TestConsole namespace TestConsole
...@@ -7,97 +11,74 @@ internal static class Program ...@@ -7,97 +11,74 @@ internal static class Program
{ {
private static void Main() private static void Main()
{ {
using (var muxer = ConnectionMultiplexer.Connect("localhost:6379", Console.Out)) RunCompetingBatchesOnSameMuxer();
{
muxer.GetDatabase().Ping();
}
} }
//private static async Task Main2() static ConnectionMultiplexer Create()
//{ {
// const int ClientCount = 150, ConnectionCount = 10; var muxer = ConnectionMultiplexer.Connect("localhost:6379");
// CancellationTokenSource cancel = new CancellationTokenSource(); muxer.GetDatabase().Ping();
return muxer;
// var config = new ConfigurationOptions }
// { private const int IterationCount = 5000, InnerCount = 20;
// EndPoints = { new IPEndPoint(IPAddress.Loopback, 6379) } public static void RunCompetingBatchesOnSameMuxer()
// }; {
// var muxers = new ConnectionMultiplexer[ConnectionCount]; using (var muxer = Create())
// try {
// { var db = muxer.GetDatabase();
// for (int i = 0; i < muxers.Length; i++)
// {
// muxers[i] = await ConnectionMultiplexer.ConnectAsync(config);
// }
// var tasks = new Task[ClientCount + 1];
// tasks[0] = Task.Run(() => ShowState(cancel.Token));
// for (int i = 1; i < tasks.Length; i++)
// {
// var db = muxers[i % muxers.Length].GetDatabase();
// int seed = i;
// var key = "test_client_" + i;
// tasks[i] = Task.Run(() => RunClient(key, seed, db, cancel.Token));
// }
// Console.ReadLine();
// cancel.Cancel();
// await Task.WhenAll(tasks);
// }
// finally
// {
// for (int i = 0; i < muxers.Length; i++)
// {
// try { muxers[i]?.Dispose(); } catch { }
// }
// }
//}
//private static int clients; Thread x = new Thread(state => BatchRunPings((IDatabase)state));
//private static long totalPings, pings, lastTicks; x.Name = nameof(BatchRunPings);
//private static async Task ShowState(CancellationToken cancellation) Thread y = new Thread(state => BatchRunIntegers((IDatabase)state));
//{ y.Name = nameof(BatchRunIntegers);
// while (!cancellation.IsCancellationRequested)
// {
// await Task.Delay(2000);
// var nowTicks = DateTime.UtcNow.Ticks;
// var thenTicks = Interlocked.Exchange(ref lastTicks, nowTicks);
// long pingsInInterval = Interlocked.Exchange(ref pings, 0);
// var newTotalPings = Interlocked.Add(ref totalPings, pingsInInterval);
// var deltaTicks = nowTicks - thenTicks; var watch = Stopwatch.StartNew();
x.Start(db);
y.Start(db);
x.Join();
y.Join();
watch.Stop();
Console.WriteLine($"{watch.ElapsedMilliseconds}ms");
Console.WriteLine(muxer.GetCounters().Interactive);
Console.WriteLine($"Service Counts: {SocketManager.Shared}");
}
}
// Console.WriteLine($"[{Thread.VolatileRead(ref clients)}], Pings: {newTotalPings} ({pingsInInterval}, {Rate(pingsInInterval, deltaTicks)}/s)"); static RedisKey Me([CallerMemberName]string caller = null) => caller;
// }
//}
//private static string Rate(long pingsInInterval, long deltaTicks) private static void BatchRunIntegers(IDatabase db)
//{ {
// if (deltaTicks == 0) return "n/a"; var key = Me();
// if (pingsInInterval == 0) return "0"; db.KeyDelete(key);
db.StringSet(key, 1);
Task[] tasks = new Task[InnerCount];
for (int i = 0; i < IterationCount; i++)
{
var batch = db.CreateBatch();
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = batch.StringIncrementAsync(key);
}
batch.Execute();
db.Multiplexer.WaitAll(tasks);
}
// var seconds = ((decimal)deltaTicks) / TimeSpan.TicksPerSecond; var count = (long)db.StringGet(key);
// return (pingsInInterval / seconds).ToString("0.0"); Console.WriteLine($"tally: {count}");
//} }
//private static async Task RunClient(RedisKey key, int seed, IDatabase db, CancellationToken cancellation) private static void BatchRunPings(IDatabase db)
//{ {
// Interlocked.Increment(ref clients); Task[] tasks = new Task[InnerCount];
// try for (int i = 0; i < IterationCount; i++)
// { {
// while (!cancellation.IsCancellationRequested) var batch = db.CreateBatch();
// { for (int j = 0; j < tasks.Length; j++)
// await db.PingAsync(); {
// Interlocked.Increment(ref pings); tasks[j] = batch.PingAsync();
// } }
// } batch.Execute();
// catch (Exception ex) db.Multiplexer.WaitAll(tasks);
// { }
// Console.Error.WriteLine(ex.Message); }
// }
// finally
// {
// Interlocked.Decrement(ref clients);
// }
//}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment