Commit 5b5be771 authored by Nick Craver's avatar Nick Craver

Cleanup pass

parent e898ec45
......@@ -8,14 +8,15 @@
<PackageId>$(AssemblyName)</PackageId>
<Authors>Stack Exchange, Inc.; marc.gravell</Authors>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<CodeAnalysisRuleset>$(MSBuildThisFileDirectory)Shared.ruleset</CodeAnalysisRuleset>
<PackageReleaseNotes>https://stackexchange.github.io/StackExchange.Redis/ReleaseNotes</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/StackExchange/StackExchange.Redis/</PackageProjectUrl>
<PackageLicenseUrl>https://raw.github.com/StackExchange/StackExchange.Redis/master/LICENSE</PackageLicenseUrl>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/StackExchange/StackExchange.Redis/</RepositoryUrl>
<DebugSymbols>true</DebugSymbols>
<DebugType>embedded</DebugType>
<DefaultLanguage>en-US</DefaultLanguage>
......@@ -27,7 +28,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Nerdbank.GitVersioning" Version="2.1.23" PrivateAssets="all" />
<PackageReference Include="SourceLink.Create.GitHub" Version="2.8.2" PrivateAssets="All" />
<PackageReference Include="SourceLink.Create.GitHub" Version="2.8.2" PrivateAssets="All" />
<DotNetCliToolReference Include="dotnet-sourcelink" Version="2.8.2" />
<DotNetCliToolReference Include="dotnet-sourcelink-git" Version="2.8.2" />
</ItemGroup>
......
<?xml version="1.0" encoding="utf-8"?>
<RuleSet Name="Rules for StackOverflow" Description="Code analysis rules for the StackOverflow solution." ToolsVersion="15.0">
<IncludeAll Action="Warning" />
<Rules AnalyzerId="Roslynator.CSharp.Analyzers" RuleNamespace="Roslynator.CSharp.Analyzers">
<Rule Id="RCS1057" Action="None" />
</Rules>
</RuleSet>
\ No newline at end of file
......@@ -10,6 +10,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
Directory.build.props = Directory.build.props
global.json = global.json
NuGet.Config = NuGet.Config
Shared.ruleset = Shared.ruleset
version.json = version.json
EndProjectSection
EndProject
......
......@@ -38,7 +38,7 @@ internal sealed partial class PhysicalBridge : IDisposable
private int profileLogIndex;
private volatile bool reportNextFailure = true, reconfigureNextFailure = false;
private volatile int state = (int)State.Disconnected;
public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int timeoutMilliseconds)
......
......@@ -69,7 +69,7 @@ private static readonly Message
private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private int firstUnansweredWriteTickCount;
IDuplexPipe _ioPipe;
private IDuplexPipe _ioPipe;
private SocketToken socketToken;
......@@ -115,7 +115,6 @@ private enum ReadMode : byte
public void Dispose()
{
var ioPipe = _ioPipe;
_ioPipe = null;
if(ioPipe != null)
......@@ -138,11 +137,13 @@ public void Dispose()
}
OnCloseEcho();
}
private async Task AwaitedFlush(ValueTask<FlushResult> flush)
{
await flush;
Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount);
}
public Task FlushAsync()
{
var tmp = _ioPipe?.Output;
......@@ -439,7 +440,6 @@ internal void Write(RedisValue value)
break;
default:
throw new InvalidOperationException($"Unexpected {value.Type} value: '{value}'");
}
}
......@@ -453,7 +453,6 @@ internal void WriteHeader(RedisCommand command, int arguments)
WriteHeader(commandBytes, arguments);
}
internal const int REDIS_MAX_ARGS = 1024 * 1024; // there is a <= 1024*1024 max constraint inside redis itself: https://github.com/antirez/redis/blob/6c60526db91e23fb2d666fc52facc9a11780a2a3/src/networking.c#L1024
internal void WriteHeader(string command, int arguments)
......@@ -465,10 +464,10 @@ internal void WriteHeader(string command, int arguments)
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
WriteHeader(commandBytes, arguments);
}
private void WriteHeader(byte[] commandBytes, int arguments)
{
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
......@@ -484,26 +483,28 @@ private void WriteHeader(byte[] commandBytes, int arguments)
_ioPipe.Output.Advance(offset);
}
internal const int
MaxInt32TextLen = 11, // -2,147,483,648 (not including the commas)
MaxInt64TextLen = 20; // -9,223,372,036,854,775,808 (not including the commas)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static int WriteCrlf(Span<byte> span, int offset)
private static int WriteCrlf(Span<byte> span, int offset)
{
span[offset++] = (byte)'\r';
span[offset++] = (byte)'\n';
return offset;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void WriteCrlf(PipeWriter writer)
private static void WriteCrlf(PipeWriter writer)
{
var span = writer.GetSpan(2);
span[0] = (byte)'\r';
span[1] = (byte)'\n';
writer.Advance(2);
}
internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix = false, int offset = 0)
{
if (value >= 0 && value <= 9)
......@@ -593,7 +594,7 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix
offset += formattedLength;
}
}
return WriteCrlf(span, offset);
}
......@@ -603,7 +604,8 @@ internal void WakeWriterAndCheckForThrottle()
if (!flush.IsCompletedSuccessfully) flush.AsTask().Wait();
}
static readonly byte[] NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static readonly byte[] NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static void WriteUnified(PipeWriter writer, byte[] value)
{
if (value == null)
......@@ -615,8 +617,8 @@ private static void WriteUnified(PipeWriter writer, byte[] value)
{
WriteUnified(writer, new ReadOnlySpan<byte>(value));
}
}
private static void WriteUnified(PipeWriter writer, ReadOnlySpan<byte> value)
{
// ${len}\r\n = 3 + MaxInt32TextLen
......@@ -648,6 +650,7 @@ private static void WriteUnified(PipeWriter writer, ReadOnlySpan<byte> value)
WriteCrlf(writer);
}
}
private static int WriteUnified(Span<byte> span, byte[] value, int offset = 0)
{
span[offset++] = (byte)'$';
......@@ -661,6 +664,7 @@ private static int WriteUnified(Span<byte> span, byte[] value, int offset = 0)
}
return offset;
}
private static int WriteUnified(Span<byte> span, ReadOnlySpan<byte> value, int offset = 0)
{
offset = WriteRaw(span, value.Length, offset: offset);
......@@ -745,7 +749,7 @@ private void WriteUnified(PipeWriter writer, byte[] prefix, string value)
}
}
}
private unsafe void WriteRaw(PipeWriter writer, string value, int encodedLength)
{
const int MaxQuickEncodeSize = 512;
......@@ -787,7 +791,9 @@ private unsafe void WriteRaw(PipeWriter writer, string value, int encodedLength)
Debug.Assert(totalBytes == encodedLength);
}
}
private readonly Encoder outEncoder = Encoding.UTF8.GetEncoder();
private static void WriteUnified(PipeWriter writer, byte[] prefix, byte[] value)
{
// ${total-len}\r\n
......@@ -857,7 +863,7 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr
{
try
{
var socketMode = SocketMode.Async;
const SocketMode socketMode = SocketMode.Async;
// disallow connection in some cases
OnDebugAbort();
......@@ -869,7 +875,6 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr
IDuplexPipe pipe;
if (config.Ssl)
{
Multiplexer.LogLocked(log, "Configuring SSL");
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(Bridge.ServerEndPoint.EndPoint);
......@@ -991,6 +996,7 @@ private void MatchResult(RawResult result)
partial void OnCreateEcho();
partial void OnDebugAbort();
void ISocketCallback.OnHeartbeat()
{
try
......@@ -1004,6 +1010,7 @@ void ISocketCallback.OnHeartbeat()
}
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name);
private async void ReadFromPipe() // yes it is an async void; deal with it!
{
try
......@@ -1020,7 +1027,7 @@ void ISocketCallback.OnHeartbeat()
{
readResult = await input.ReadAsync();
}
if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
{
break; // we're all done
......@@ -1166,7 +1173,7 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea
return new RawResult(ResultType.BulkString, payload, false);
default:
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
}
}
}
}
return RawResult.Nil;
......@@ -1212,7 +1219,8 @@ public enum ConsumeResult
Success,
NeedMoreData,
}
ref struct BufferReader
private ref struct BufferReader
{
private ReadOnlySequence<byte>.Enumerator _iterator;
private ReadOnlySpan<byte> _current;
......@@ -1225,7 +1233,8 @@ public enum ConsumeResult
public int RemainingThisSpan { get; private set; }
public bool IsEmpty => RemainingThisSpan == 0;
bool FetchNextSegment()
private bool FetchNextSegment()
{
do
{
......@@ -1242,6 +1251,7 @@ bool FetchNextSegment()
return true;
}
public BufferReader(ReadOnlySequence<byte> buffer)
{
_iterator = buffer.GetEnumerator();
......@@ -1250,8 +1260,8 @@ public BufferReader(ReadOnlySequence<byte> buffer)
FetchNextSegment();
}
static readonly byte[] CRLF = { (byte)'\r', (byte)'\n' };
private static readonly byte[] CRLF = { (byte)'\r', (byte)'\n' };
/// <summary>
/// Note that in results other than success, no guarantees are made about final state; if you care: snapshot
......@@ -1278,7 +1288,6 @@ public ConsumeResult TryConsumeCRLF()
}
}
public bool TryConsume(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
......@@ -1295,27 +1304,29 @@ public bool TryConsume(int count)
if (count == available) FetchNextSegment(); // burned all of it; fetch next
return true;
}
// consume all of this span
TotalConsumed += available;
count -= available;
} while (FetchNextSegment());
return false;
}
public void Consume(int count)
public void Consume(int count)
{
if (!TryConsume(count)) throw new EndOfStreamException();
}
internal static int FindNextCrLf(BufferReader reader) // very deliberately not ref; want snapshot
{
// is it in the current span? (we need to handle the offsets differently if so)
int totalSkipped = 0;
bool haveTrailingCR = false;
do
{
if (reader.RemainingThisSpan == 0) continue;
var span = reader.SlicedSpan;
if (haveTrailingCR)
{
......
......@@ -54,6 +54,7 @@ internal partial interface ISocketCallback
internal struct SocketToken
{
internal readonly Socket Socket;
public SocketToken(Socket socket)
{
Socket = socket;
......@@ -130,10 +131,12 @@ internal static SocketManager Shared
shared = new SocketManager("DefaultSocketManager", false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS * 2);
if (Interlocked.CompareExchange(ref _shared, shared, null) == null)
shared = null;
} finally { shared?.Dispose(); }
}
finally { shared?.Dispose(); }
return Interlocked.CompareExchange(ref _shared, null, null);
}
}
private static SocketManager _shared;
/// <summary>
......@@ -142,21 +145,22 @@ internal static SocketManager Shared
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
public SocketManager(string name, bool useHighPrioritySocketThreads)
: this(name, useHighPrioritySocketThreads, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) {}
: this(name, useHighPrioritySocketThreads, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { }
private const int DEFAULT_MIN_THREADS = 1, DEFAULT_MAX_THREADS = 5;
const int DEFAULT_MIN_THREADS = 1, DEFAULT_MAX_THREADS = 5;
private SocketManager(string name, bool useHighPrioritySocketThreads, int minThreads, int maxThreads)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
Name = name;
const long Receive_PauseWriterThreshold = 4L * 1024 * 1024 * 1024; // let's give it up to 4GiB of buffer for now
const long Receive_ResumeWriterThreshold = 3L * 1024 * 1024 * 1024;
var defaultPipeOptions = PipeOptions.Default;
_scheduler = new DedicatedThreadPoolPipeScheduler(name,
minWorkers: minThreads, maxWorkers: maxThreads,
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal :ThreadPriority.Normal);
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
SendPipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler,
pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold,
......@@ -170,7 +174,8 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr
defaultPipeOptions.MinimumSegmentSize,
useSynchronizationContext: false);
}
DedicatedThreadPoolPipeScheduler _scheduler;
private DedicatedThreadPoolPipeScheduler _scheduler;
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
private enum CallbackOperation
......@@ -183,6 +188,7 @@ private enum CallbackOperation
/// Releases all resources associated with this instance
/// </summary>
public void Dispose() => Dispose(true);
private void Dispose(bool disposing)
{
// note: the scheduler *can't* be collected by itself - there will
......@@ -195,12 +201,13 @@ private void Dispose(bool disposing)
GC.SuppressFinalize(this);
OnDispose();
}
}
/// <summary>
/// Releases *appropriate* resources associated with this instance
/// </summary>
~SocketManager() => Dispose(false);
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{
void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback)
......@@ -221,7 +228,6 @@ void proxyCallback(IAsyncResult ar)
}
}
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var protocolType = addressFamily == AddressFamily.Unix ? ProtocolType.Unspecified : ProtocolType.Tcp;
var socket = new Socket(addressFamily, SocketType.Stream, protocolType);
......@@ -237,7 +243,8 @@ void proxyCallback(IAsyncResult ar)
{
RunWithCompletionType(
cb => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple),
ar => {
ar =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
......@@ -247,7 +254,8 @@ void proxyCallback(IAsyncResult ar)
{
RunWithCompletionType(
cb => socket.BeginConnect(endpoint, cb, tuple),
ar => {
ar =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
......@@ -313,7 +321,7 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl
}
}
}
catch(Exception outer)
catch (Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
if (callback != null)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment