Commit 20fcc512 authored by Marc Gravell's avatar Marc Gravell

server: swallow lib uv disconnect; client: ping test

parent 74f81517
using System.Threading.Tasks; using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using StackExchange.Redis.Server; using StackExchange.Redis.Server;
namespace KestrelRedisServer namespace KestrelRedisServer
...@@ -8,7 +10,14 @@ public class RedisConnectionHandler : ConnectionHandler ...@@ -8,7 +10,14 @@ public class RedisConnectionHandler : ConnectionHandler
{ {
private readonly RespServer _server; private readonly RespServer _server;
public RedisConnectionHandler(RespServer server) => _server = server; public RedisConnectionHandler(RespServer server) => _server = server;
public override Task OnConnectedAsync(ConnectionContext connection) public override async Task OnConnectedAsync(ConnectionContext connection)
=> _server.RunClientAsync(connection.Transport); {
try
{
await _server.RunClientAsync(connection.Transport);
}
catch (IOException io) when (io.InnerException is UvException uv && uv.StatusCode == -4077)
{ } //swallow libuv disconnect
}
} }
} }
...@@ -8,79 +8,80 @@ static class Program ...@@ -8,79 +8,80 @@ static class Program
{ {
static async Task Main() static async Task Main()
{ {
const int ClientCount = 50; const int ClientCount = 150, ConnectionCount = 10;
CancellationTokenSource cancel = new CancellationTokenSource(); CancellationTokenSource cancel = new CancellationTokenSource();
var config = new ConfigurationOptions var config = new ConfigurationOptions
{ {
EndPoints = { new IPEndPoint(IPAddress.Loopback, 6379) } EndPoints = { new IPEndPoint(IPAddress.Loopback, 6379) }
}; };
var muxers = new ConnectionMultiplexer[ConnectionCount];
using (var muxer = await ConnectionMultiplexer.ConnectAsync(config)) try
{ {
for(int i = 0; i < muxers.Length; i++)
{
muxers[i] = await ConnectionMultiplexer.ConnectAsync(config);
}
var tasks = new Task[ClientCount + 1]; var tasks = new Task[ClientCount + 1];
tasks[0] = Task.Run(() => ShowState(cancel.Token)); tasks[0] = Task.Run(() => ShowState(cancel.Token));
for (int i = 1; i < tasks.Length; i++) for (int i = 1; i < tasks.Length; i++)
{ {
var db = muxers[i % muxers.Length].GetDatabase();
int seed = i; int seed = i;
var key = "test_client_" + i; var key = "test_client_" + i;
tasks[i] = Task.Run(() => RunClient(key, seed, muxer, cancel.Token)); tasks[i] = Task.Run(() => RunClient(key, seed, db, cancel.Token));
} }
Console.ReadLine(); Console.ReadLine();
cancel.Cancel(); cancel.Cancel();
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
} }
finally
{
for (int i = 0; i < muxers.Length; i++)
{
try { muxers[i]?.Dispose(); } catch { }
}
}
} }
static int success, failure, clients; static int clients;
static long bytesChecked; static long totalPings, pings, lastTicks;
static async Task ShowState(CancellationToken cancellation) static async Task ShowState(CancellationToken cancellation)
{ {
while (!cancellation.IsCancellationRequested) while (!cancellation.IsCancellationRequested)
{ {
await Task.Delay(2000); await Task.Delay(2000);
Console.WriteLine($"[{Thread.VolatileRead(ref clients)}] Success: {Thread.VolatileRead(ref success)}, Failure: {Thread.VolatileRead(ref failure)}, Bytes: {Format(Thread.VolatileRead(ref bytesChecked))}"); var nowTicks = DateTime.UtcNow.Ticks;
var thenTicks = Interlocked.Exchange(ref lastTicks, nowTicks);
long pingsInInterval = Interlocked.Exchange(ref pings, 0);
var newTotalPings = Interlocked.Add(ref totalPings, pingsInInterval);
var deltaTicks = nowTicks - thenTicks;
Console.WriteLine($"[{Thread.VolatileRead(ref clients)}], Pings: {newTotalPings} ({pingsInInterval}, {Rate(pingsInInterval, deltaTicks)}/s)");
} }
} }
static string Format(long value)
private static string Rate(long pingsInInterval, long deltaTicks)
{ {
if (value < 1024) return value + " B"; if (deltaTicks == 0) return "n/a";
if (value < (1024 * 1024)) return (value >> 10) + " KiB"; if (pingsInInterval == 0) return "0";
if (value < (1024 * 1024 * 1024)) return (value >> 20) + " MiB";
return (value >> 30) + " GiB"; var seconds = ((decimal)deltaTicks) / TimeSpan.TicksPerSecond;
return (pingsInInterval / seconds).ToString("0.0");
} }
static async Task RunClient(RedisKey key, int seed, ConnectionMultiplexer client, CancellationToken cancellation)
static async Task RunClient(RedisKey key, int seed, IDatabase db, CancellationToken cancellation)
{ {
Interlocked.Increment(ref clients); Interlocked.Increment(ref clients);
try try
{ {
var rand = new Random(seed);
byte[] payload = new byte[65536];
while (!cancellation.IsCancellationRequested) while (!cancellation.IsCancellationRequested)
{ {
var db = client.GetDatabase(rand.Next(0, 10)); await db.PingAsync();
Interlocked.Increment(ref pings);
rand.NextBytes(payload);
int len = rand.Next(1024, payload.Length);
var memory = new ReadOnlyMemory<byte>(payload, 0, len);
var set = db.StringSetAsync(key, memory);
var get = db.StringGetAsync(key);
await set;
ReadOnlyMemory<byte> result = await get;
Interlocked.Add(ref bytesChecked, len);
if (memory.Span.SequenceEqual(result.Span))
{
Interlocked.Increment(ref success);
}
else
{
Interlocked.Increment(ref failure);
Console.Error.WriteLine("Expectation failed on " + key.ToString());
}
} }
} }
catch(Exception ex) catch(Exception ex)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment