Commit a52077c2 authored by Marc Gravell's avatar Marc Gravell

make roslynator happy; update sockets-unofficial

parent e75d0746
......@@ -6,6 +6,7 @@
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<AssemblyOriginatorKeyFile>../StackExchange.Redis.snk</AssemblyOriginatorKeyFile>
<PackageId>$(AssemblyName)</PackageId>
<Features>strict</Features>
<Authors>Stack Exchange, Inc.; marc.gravell</Authors>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<CodeAnalysisRuleset>$(MSBuildThisFileDirectory)Shared.ruleset</CodeAnalysisRuleset>
......
......@@ -21,7 +21,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RedisConfigs", "RedisConfig
RedisConfigs\cli-secure.cmd = RedisConfigs\cli-secure.cmd
RedisConfigs\cli-slave.cmd = RedisConfigs\cli-slave.cmd
RedisConfigs\start-all.cmd = RedisConfigs\start-all.cmd
RedisConfigs\start-all.sh = RedisConfigs\start-all.sh
RedisConfigs\start-basic.cmd = RedisConfigs\start-basic.cmd
RedisConfigs\start-basic.sh = RedisConfigs\start-basic.sh
RedisConfigs\start-cluster.cmd = RedisConfigs\start-cluster.cmd
RedisConfigs\start-sentinel.cmd = RedisConfigs\start-sentinel.cmd
EndProjectSection
......
......@@ -26,7 +26,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.72" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.73" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
</ItemGroup>
</Project>
\ No newline at end of file
......@@ -125,8 +125,7 @@ internal static Exception NoConnectionAvailable(bool includeDetail, bool include
internal static Exception PopulateInnerExceptions(ReadOnlySpan<ServerEndPoint> serverSnapshot)
{
var innerExceptions = new List<Exception>();
if (serverSnapshot != null)
{
if (serverSnapshot.Length > 0 && serverSnapshot[0].Multiplexer.LastException != null)
{
innerExceptions.Add(serverSnapshot[0].Multiplexer.LastException);
......@@ -140,7 +139,7 @@ internal static Exception PopulateInnerExceptions(ReadOnlySpan<ServerEndPoint> s
innerExceptions.Add(lastException);
}
}
}
if (innerExceptions.Count == 1)
{
return innerExceptions[0];
......
......@@ -280,6 +280,7 @@ internal void ResetNonConnected()
internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException)
{
Trace($"OnConnectionFailed: {connection}");
AbandonPendingBacklog(innerException);
if (reportNextFailure)
{
......@@ -292,7 +293,7 @@ internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailur
internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnection connection, out bool isCurrent, out State oldState)
{
Trace("OnDisconnected");
Trace($"OnDisconnected: {failureType}");
oldState = default(State); // only defined when isCurrent = true
if (isCurrent = (physical == connection))
......
......@@ -277,7 +277,6 @@ public Task FlushAsync()
public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null, [CallerMemberName] string origin = null, bool isInitialConnect = false)
{
Exception outerException = innerException;
IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
......@@ -831,8 +830,7 @@ private static int AppendToSpanCommand(Span<byte> span, CommandBytes value, int
offset = WriteRaw(span, len, offset: offset);
value.CopyTo(span.Slice(offset, len));
offset += value.Length;
offset = WriteCrlf(span, offset);
return offset;
return WriteCrlf(span, offset);
}
private static int AppendToSpanSpan(Span<byte> span, ReadOnlySpan<byte> value, int offset = 0)
......@@ -840,8 +838,7 @@ private static int AppendToSpanSpan(Span<byte> span, ReadOnlySpan<byte> value, i
offset = WriteRaw(span, value.Length, offset: offset);
value.CopyTo(span.Slice(offset, value.Length));
offset += value.Length;
offset = WriteCrlf(span, offset);
return offset;
return WriteCrlf(span, offset);
}
internal void WriteSha1AsHex(byte[] value)
......@@ -1342,7 +1339,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
private static RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferReader reader, bool includeDetailInExceptions, ServerEndPoint server)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
var itemCount = ReadLineTerminatedString(ResultType.Integer, ref reader);
if (itemCount.HasValue)
{
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(includeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", server);
......@@ -1374,9 +1371,9 @@ private static RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferR
return RawResult.Nil;
}
private static RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferReader reader, bool includeDetailInExceptions, ServerEndPoint server)
private static RawResult ReadBulkString(ref BufferReader reader, bool includeDetailInExceptions, ServerEndPoint server)
{
var prefix = ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
var prefix = ReadLineTerminatedString(ResultType.Integer, ref reader);
if (prefix.HasValue)
{
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(includeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", server);
......@@ -1402,7 +1399,7 @@ private static RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref Bu
return RawResult.Nil;
}
private static RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence<byte> buffer, ref BufferReader reader)
private static RawResult ReadLineTerminatedString(ResultType type, ref BufferReader reader)
{
int crlfOffsetFromCurrent = BufferReader.FindNextCrLf(reader);
if (crlfOffsetFromCurrent < 0) return RawResult.Nil;
......@@ -1424,21 +1421,21 @@ private static RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySe
{
case '+': // simple string
reader.Consume(1);
return ReadLineTerminatedString(ResultType.SimpleString, in buffer, ref reader);
return ReadLineTerminatedString(ResultType.SimpleString, ref reader);
case '-': // error
reader.Consume(1);
return ReadLineTerminatedString(ResultType.Error, in buffer, ref reader);
return ReadLineTerminatedString(ResultType.Error, ref reader);
case ':': // integer
reader.Consume(1);
return ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
return ReadLineTerminatedString(ResultType.Integer, ref reader);
case '$': // bulk string
reader.Consume(1);
return ReadBulkString(in buffer, ref reader, includeDetilInExceptions, server);
return ReadBulkString(ref reader, includeDetilInExceptions, server);
case '*': // array
reader.Consume(1);
return ReadArray(in buffer, ref reader, includeDetilInExceptions, server);
default:
if (allowInlineProtocol) return ParseInlineProtocol(ReadLineTerminatedString(ResultType.SimpleString, in buffer, ref reader));
if (allowInlineProtocol) return ParseInlineProtocol(ReadLineTerminatedString(ResultType.SimpleString, ref reader));
throw new InvalidOperationException("Unexpected response prefix: " + (char)prefix);
}
}
......
......@@ -1731,18 +1731,13 @@ protected NameValueEntry[] ParseStreamEntryValues(RawResult result)
// 3) "temperature"
// 4) "18.2"
if (result.Type != ResultType.MultiBulk)
if (result.Type != ResultType.MultiBulk || result.IsNull)
{
return null;
}
var arr = result.GetItems();
if (arr == null)
{
return null;
}
// Calculate how many name/value pairs are in the stream entry.
int count = arr.Length / 2;
......
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
internal static class Program
namespace TestConsole
{
internal static class Program
{
private static void Main()
{
using (var muxer = ConnectionMultiplexer.Connect("localhost:6379", Console.Out))
......@@ -13,91 +12,92 @@ private static void Main()
muxer.GetDatabase().Ping();
}
}
private static async Task Main2()
{
const int ClientCount = 150, ConnectionCount = 10;
CancellationTokenSource cancel = new CancellationTokenSource();
//private static async Task Main2()
//{
// const int ClientCount = 150, ConnectionCount = 10;
// CancellationTokenSource cancel = new CancellationTokenSource();
var config = new ConfigurationOptions
{
EndPoints = { new IPEndPoint(IPAddress.Loopback, 6379) }
};
var muxers = new ConnectionMultiplexer[ConnectionCount];
try
{
for(int i = 0; i < muxers.Length; i++)
{
muxers[i] = await ConnectionMultiplexer.ConnectAsync(config);
}
var tasks = new Task[ClientCount + 1];
tasks[0] = Task.Run(() => ShowState(cancel.Token));
// var config = new ConfigurationOptions
// {
// EndPoints = { new IPEndPoint(IPAddress.Loopback, 6379) }
// };
// var muxers = new ConnectionMultiplexer[ConnectionCount];
// try
// {
// for (int i = 0; i < muxers.Length; i++)
// {
// muxers[i] = await ConnectionMultiplexer.ConnectAsync(config);
// }
// var tasks = new Task[ClientCount + 1];
// tasks[0] = Task.Run(() => ShowState(cancel.Token));
for (int i = 1; i < tasks.Length; i++)
{
var db = muxers[i % muxers.Length].GetDatabase();
int seed = i;
var key = "test_client_" + i;
tasks[i] = Task.Run(() => RunClient(key, seed, db, cancel.Token));
}
// for (int i = 1; i < tasks.Length; i++)
// {
// var db = muxers[i % muxers.Length].GetDatabase();
// int seed = i;
// var key = "test_client_" + i;
// tasks[i] = Task.Run(() => RunClient(key, seed, db, cancel.Token));
// }
Console.ReadLine();
cancel.Cancel();
await Task.WhenAll(tasks);
}
finally
{
for (int i = 0; i < muxers.Length; i++)
{
try { muxers[i]?.Dispose(); } catch { }
}
}
}
// Console.ReadLine();
// cancel.Cancel();
// await Task.WhenAll(tasks);
// }
// finally
// {
// for (int i = 0; i < muxers.Length; i++)
// {
// try { muxers[i]?.Dispose(); } catch { }
// }
// }
//}
private static int clients;
private static long totalPings, pings, lastTicks;
private static async Task ShowState(CancellationToken cancellation)
{
while (!cancellation.IsCancellationRequested)
{
await Task.Delay(2000);
var nowTicks = DateTime.UtcNow.Ticks;
var thenTicks = Interlocked.Exchange(ref lastTicks, nowTicks);
long pingsInInterval = Interlocked.Exchange(ref pings, 0);
var newTotalPings = Interlocked.Add(ref totalPings, pingsInInterval);
//private static int clients;
//private static long totalPings, pings, lastTicks;
//private static async Task ShowState(CancellationToken cancellation)
//{
// while (!cancellation.IsCancellationRequested)
// {
// await Task.Delay(2000);
// var nowTicks = DateTime.UtcNow.Ticks;
// var thenTicks = Interlocked.Exchange(ref lastTicks, nowTicks);
// long pingsInInterval = Interlocked.Exchange(ref pings, 0);
// var newTotalPings = Interlocked.Add(ref totalPings, pingsInInterval);
var deltaTicks = nowTicks - thenTicks;
// var deltaTicks = nowTicks - thenTicks;
Console.WriteLine($"[{Thread.VolatileRead(ref clients)}], Pings: {newTotalPings} ({pingsInInterval}, {Rate(pingsInInterval, deltaTicks)}/s)");
}
}
// Console.WriteLine($"[{Thread.VolatileRead(ref clients)}], Pings: {newTotalPings} ({pingsInInterval}, {Rate(pingsInInterval, deltaTicks)}/s)");
// }
//}
private static string Rate(long pingsInInterval, long deltaTicks)
{
if (deltaTicks == 0) return "n/a";
if (pingsInInterval == 0) return "0";
//private static string Rate(long pingsInInterval, long deltaTicks)
//{
// if (deltaTicks == 0) return "n/a";
// if (pingsInInterval == 0) return "0";
var seconds = ((decimal)deltaTicks) / TimeSpan.TicksPerSecond;
return (pingsInInterval / seconds).ToString("0.0");
}
// var seconds = ((decimal)deltaTicks) / TimeSpan.TicksPerSecond;
// return (pingsInInterval / seconds).ToString("0.0");
//}
private static async Task RunClient(RedisKey key, int seed, IDatabase db, CancellationToken cancellation)
{
Interlocked.Increment(ref clients);
try
{
while (!cancellation.IsCancellationRequested)
{
await db.PingAsync();
Interlocked.Increment(ref pings);
}
}
catch(Exception ex)
{
Console.Error.WriteLine(ex.Message);
}
finally
{
Interlocked.Decrement(ref clients);
}
//private static async Task RunClient(RedisKey key, int seed, IDatabase db, CancellationToken cancellation)
//{
// Interlocked.Increment(ref clients);
// try
// {
// while (!cancellation.IsCancellationRequested)
// {
// await db.PingAsync();
// Interlocked.Increment(ref pings);
// }
// }
// catch (Exception ex)
// {
// Console.Error.WriteLine(ex.Message);
// }
// finally
// {
// Interlocked.Decrement(ref clients);
// }
//}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment