Commit daec342d authored by Nick Craver's avatar Nick Craver

Cleanup: PhysicalConnection

parent 7b6c2b78
......@@ -16,10 +16,8 @@
namespace StackExchange.Redis
{
internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
{
internal readonly byte[] ChannelPrefix;
private const int DefaultRedisDatabaseCount = 16;
......@@ -27,7 +25,7 @@ internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
#if NETSTANDARD1_5
readonly Action<Task<int>> endRead;
private readonly Action<Task<int>> endRead;
private static Action<Task<int>> EndReadFactory(PhysicalConnection physical)
{
return result =>
......@@ -48,7 +46,7 @@ private static Action<Task<int>> EndReadFactory(PhysicalConnection physical)
};
}
#else
static readonly AsyncCallback endRead = result =>
private static readonly AsyncCallback endRead = result =>
{
PhysicalConnection physical;
if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
......@@ -66,7 +64,7 @@ private static Action<Task<int>> EndReadFactory(PhysicalConnection physical)
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
private static readonly Message
......@@ -80,20 +78,20 @@ private static readonly Message
// things sent to this physical, but not yet received
private readonly Queue<Message> outstanding = new Queue<Message>();
readonly string physicalName;
private readonly string physicalName;
volatile int currentDatabase = 0;
private volatile int currentDatabase = 0;
ReadMode currentReadMode = ReadMode.NotSpecified;
private ReadMode currentReadMode = ReadMode.NotSpecified;
int failureReported;
private int failureReported;
byte[] ioBuffer = new byte[512];
private byte[] ioBuffer = new byte[512];
int ioBufferBytes = 0;
private int ioBufferBytes = 0;
int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
int firstUnansweredWriteTickCount;
private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private int firstUnansweredWriteTickCount;
private Stream netStream, outStream;
......@@ -103,13 +101,13 @@ public PhysicalConnection(PhysicalBridge bridge)
{
lastWriteTickCount = lastReadTickCount = Environment.TickCount;
lastBeatTickCount = 0;
this.connectionType = bridge.ConnectionType;
this.Multiplexer = bridge.Multiplexer;
this.ChannelPrefix = Multiplexer.RawConfig.ChannelPrefix;
if (this.ChannelPrefix != null && this.ChannelPrefix.Length == 0) this.ChannelPrefix = null; // null tests are easier than null+empty
connectionType = bridge.ConnectionType;
Multiplexer = bridge.Multiplexer;
ChannelPrefix = Multiplexer.RawConfig.ChannelPrefix;
if (ChannelPrefix?.Length == 0) ChannelPrefix = null; // null tests are easier than null+empty
var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
this.Bridge = bridge;
Bridge = bridge;
#if NETSTANDARD1_5
endRead = EndReadFactory(this);
#endif
......@@ -119,10 +117,10 @@ public PhysicalConnection(PhysicalBridge bridge)
public void BeginConnect(TextWriter log)
{
VolatileWrapper.Write(ref firstUnansweredWriteTickCount, 0);
var endpoint = this.Bridge.ServerEndPoint.EndPoint;
var endpoint = Bridge.ServerEndPoint.EndPoint;
Multiplexer.Trace("Connecting...", physicalName);
this.socketToken = Multiplexer.SocketManager.BeginConnect(endpoint, this, Multiplexer, log);
socketToken = Multiplexer.SocketManager.BeginConnect(endpoint, this, Multiplexer, log);
}
private enum ReadMode : byte
......@@ -180,11 +178,13 @@ public void Flush()
Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount);
}
}
public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null, [CallerMemberName] string origin = null)
{
SocketManager.ManagerState mgrState = SocketManager.ManagerState.CheckForStaleConnections;
var mgrState = SocketManager.ManagerState.CheckForStaleConnections;
RecordConnectionFailed(failureType, ref mgrState, innerException, origin);
}
public void RecordConnectionFailed(ConnectionFailureType failureType, ref SocketManager.ManagerState managerState, Exception innerException = null, [CallerMemberName] string origin = null)
{
IdentifyFailureType(innerException, ref failureType);
......@@ -194,12 +194,10 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, ref Socket
// stop anything new coming in...
Bridge.Trace("Failed: " + failureType);
bool isCurrent;
PhysicalBridge.State oldState;
int @in = -1, ar = -1;
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnDisconnected;
Bridge.OnDisconnected(failureType, this, out isCurrent, out oldState);
if(oldState == PhysicalBridge.State.ConnectedEstablished)
Bridge.OnDisconnected(failureType, this, out bool isCurrent, out PhysicalBridge.State oldState);
if (oldState == PhysicalBridge.State.ConnectedEstablished)
{
try
{
......@@ -221,29 +219,29 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, ref Socket
Tuple.Create("FailureType", failureType.ToString()),
Tuple.Create("EndPoint", Format.ToString(Bridge.ServerEndPoint.EndPoint))
};
Action<string, string, string> add = (lk, sk, v) =>
void add(string lk, string sk, string v)
{
data.Add(Tuple.Create(lk, v));
exMessage.Append(", " + sk + ": " + v);
};
exMessage.Append(", ").Append(sk).Append(": ").Append(v);
}
add("Origin", "origin", origin);
add("Input-Buffer", "input-buffer", ioBufferBytes.ToString());
add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString());
add("Last-Read", "last-read", unchecked(now - lastRead) / 1000 + "s ago");
add("Last-Write", "last-write", unchecked(now - lastWrite) / 1000 + "s ago");
add("Unanswered-Write", "unanswered-write", unchecked(now - unansweredRead) / 1000 + "s ago");
add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago");
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s");
add("Pending", "pending", Bridge.GetPendingCount().ToString());
add("Previous-Physical-State", "state", oldState.ToString());
if(@in >= 0)
if (@in >= 0)
{
add("Inbound-Bytes", "in", @in.ToString());
add("Active-Readers", "ar", ar.ToString());
}
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : (unchecked(now - lastBeat)/1000 + "s ago"))+ (Bridge.IsBeating ? " (mid-beat)" : "") );
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (Bridge.IsBeating ? " (mid-beat)" : ""));
add("Last-Multiplexer-Heartbeat", "last-mbeat", Multiplexer.LastHeartbeatSecondsAgo + "s ago");
add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago");
#if FEATURE_SOCKET_MODE_POLL
......@@ -360,7 +358,7 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
return null;
}
if(message.Command == RedisCommand.SELECT)
if (message.Command == RedisCommand.SELECT)
{
// this could come from an EVAL/EVALSHA inside a transaction, for example; we'll accept it
Bridge.Trace("Switching database: " + targetDatabase);
......@@ -383,6 +381,7 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
}
return null;
}
internal static Message GetSelectDatabaseCommand(int targetDatabase)
{
return targetDatabase < DefaultRedisDatabaseCount
......@@ -474,11 +473,12 @@ internal void WriteHeader(RedisCommand command, int arguments)
WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes);
}
internal const int REDIS_MAX_ARGS = 1024 * 1024; // there is a <= 1024*1024 max constraint inside redis itself: https://github.com/antirez/redis/blob/6c60526db91e23fb2d666fc52facc9a11780a2a3/src/networking.c#L1024
internal void WriteHeader(string command, int arguments)
{
if(arguments >= REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
if (arguments >= REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
{
throw ExceptionFactory.TooManyArgs(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint, arguments + 1);
}
......@@ -496,7 +496,7 @@ internal void WriteHeader(string command, int arguments)
WriteUnified(outStream, commandBytes);
}
static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
private static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
{
if (value >= 0 && value <= 9)
{
......@@ -514,8 +514,8 @@ static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
stream.WriteByte((byte)'2');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + (int)value / 10));
stream.WriteByte((byte)((int)'0' + (int)value % 10));
stream.WriteByte((byte)((int)'0' + ((int)value / 10)));
stream.WriteByte((byte)((int)'0' + ((int)value % 10)));
}
else if (value >= 100 && value < 1000)
{
......@@ -551,8 +551,8 @@ static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
}
value = -value;
stream.WriteByte((byte)'-');
stream.WriteByte((byte)((int)'0' + (int)value / 10));
stream.WriteByte((byte)((int)'0' + (int)value % 10));
stream.WriteByte((byte)((int)'0' + ((int)value / 10)));
stream.WriteByte((byte)((int)'0' + ((int)value % 10)));
}
else
{
......@@ -566,7 +566,7 @@ static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
stream.Write(Crlf, 0, 2);
}
static void WriteUnified(Stream stream, byte[] value)
private static void WriteUnified(Stream stream, byte[] value)
{
stream.WriteByte((byte)'$');
if (value == null)
......@@ -588,10 +588,11 @@ internal void WriteAsHex(byte[] value)
if (value == null)
{
WriteRaw(stream, -1);
} else
}
else
{
WriteRaw(stream, value.Length * 2);
for(int i = 0; i < value.Length; i++)
for (int i = 0; i < value.Length; i++)
{
stream.WriteByte(ToHexNibble(value[i] >> 4));
stream.WriteByte(ToHexNibble(value[i] & 15));
......@@ -599,12 +600,13 @@ internal void WriteAsHex(byte[] value)
stream.Write(Crlf, 0, 2);
}
}
internal static byte ToHexNibble(int value)
{
return value < 10 ? (byte)('0' + value) : (byte)('a' - 10 + value);
}
void WriteUnified(Stream stream, byte[] prefix, string value)
private void WriteUnified(Stream stream, byte[] prefix, string value)
{
stream.WriteByte((byte)'$');
if (value == null)
......@@ -628,9 +630,9 @@ void WriteUnified(Stream stream, byte[] prefix, string value)
stream.Write(Crlf, 0, 2);
}
}
}
unsafe void WriteRaw(Stream stream, string value, int encodedLength)
private unsafe void WriteRaw(Stream stream, string value, int encodedLength)
{
if (encodedLength <= ScratchSize)
{
......@@ -669,11 +671,12 @@ unsafe void WriteRaw(Stream stream, string value, int encodedLength)
#endif
}
}
const int ScratchSize = 512;
static readonly int Scratch_CharsPerBlock = ScratchSize / Encoding.UTF8.GetMaxByteCount(1);
private const int ScratchSize = 512;
private static readonly int Scratch_CharsPerBlock = ScratchSize / Encoding.UTF8.GetMaxByteCount(1);
private readonly byte[] outScratch = new byte[ScratchSize];
private readonly Encoder outEncoder = Encoding.UTF8.GetEncoder();
static void WriteUnified(Stream stream, byte[] prefix, byte[] value)
private static void WriteUnified(Stream stream, byte[] prefix, byte[] value)
{
stream.WriteByte((byte)'$');
if (value == null)
......@@ -695,7 +698,7 @@ static void WriteUnified(Stream stream, byte[] prefix, byte[] value)
}
}
static void WriteUnified(Stream stream, long value)
private static void WriteUnified(Stream stream, long value)
{
// note from specification: A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
// (i.e. we can't just send ":123\r\n", we need to send "$3\r\n123\r\n"
......@@ -703,7 +706,7 @@ static void WriteUnified(Stream stream, long value)
WriteRaw(stream, value, withLengthPrefix: true);
}
void BeginReading()
private void BeginReading()
{
bool keepReading;
try
......@@ -747,15 +750,16 @@ void BeginReading()
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
}
}
int haveReader;
private int haveReader;
internal int GetAvailableInboundBytes(out int activeReaders)
{
activeReaders = Interlocked.CompareExchange(ref haveReader, 0, 0);
return this.socketToken.Available;
return socketToken.Available;
}
static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
{
try
{
......@@ -773,10 +777,12 @@ static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
{
return delegate { return new X509Certificate2(pfxPath, pfxPassword ?? "", flags ?? X509KeyStorageFlags.DefaultKeySet); };
}
} catch
}
catch
{ }
return null;
}
SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
{
try
......@@ -790,7 +796,7 @@ SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
// [network]<==[ssl]<==[logging]<==[buffered]
var config = Multiplexer.RawConfig;
if(config.Ssl)
if (config.Ssl)
{
Multiplexer.LogLocked(log, "Configuring SSL");
var host = config.SslHost;
......@@ -820,8 +826,8 @@ SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
OnWrapForLogging(ref stream, physicalName);
int bufferSize = config.WriteBuffer;
this.netStream = stream;
this.outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize);
netStream = stream;
outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize);
Multiplexer.LogLocked(log, "Connected {0}", Bridge);
Bridge.OnConnected(this, log);
......@@ -865,7 +871,7 @@ private bool EndReading(IAsyncResult result)
}
}
#endif
int EnsureSpaceAndComputeBytesToRead()
private int EnsureSpaceAndComputeBytesToRead()
{
int space = ioBuffer.Length - ioBufferBytes;
if (space == 0)
......@@ -880,7 +886,8 @@ void ISocketCallback.Error()
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
}
void MatchResult(RawResult result)
private void MatchResult(RawResult result)
{
// check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk)
......@@ -942,6 +949,7 @@ void MatchResult(RawResult result)
Bridge.CompleteSyncOrAsync(msg);
}
}
partial void OnCloseEcho();
partial void OnCreateEcho();
......@@ -981,6 +989,7 @@ private int ProcessBuffer(byte[] underlying, ref int offset, ref int count)
} while (result.HasValue);
return messageCount;
}
private bool ProcessReadBytes(int bytesRead)
{
if (bytesRead <= 0)
......@@ -1033,7 +1042,8 @@ void ISocketCallback.Read()
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}finally
}
finally
{
Interlocked.Decrement(ref haveReader);
}
......@@ -1047,24 +1057,24 @@ bool ISocketCallback.IsDataAvailable
catch { return false; }
}
}
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
if (itemCount.HasValue)
{
long i64;
if (!itemCount.TryGetInt64(out i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", Bridge.ServerEndPoint);
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", Bridge.ServerEndPoint);
int itemCountActual = checked((int)i64);
if (itemCountActual < 0)
{
//for null response by command like EXEC, RESP array: *-1\r\n
return new RawResult(ResultType.SimpleString, null, 0, 0);
return new RawResult(ResultType.SimpleString, null, 0, 0);
}
else if (itemCountActual == 0)
{
//for zero array response by command like SCAN, Resp array: *0\r\n
return RawResult.EmptyArray;
return RawResult.EmptyArray;
}
var arr = new RawResult[itemCountActual];
......@@ -1083,8 +1093,7 @@ private RawResult ReadBulkString(byte[] buffer, ref int offset, ref int count)
var prefix = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
if (prefix.HasValue)
{
long i64;
if (!prefix.TryGetInt64(out i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", Bridge.ServerEndPoint);
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", Bridge.ServerEndPoint);
int bodySize = checked((int)i64);
if (bodySize < 0)
{
......@@ -1126,13 +1135,14 @@ void ISocketCallback.StartReading()
{
BeginReading();
}
RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
private RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
{
if(count == 0) return RawResult.Nil;
if (count == 0) return RawResult.Nil;
char resultType = (char)buffer[offset++];
count--;
switch(resultType)
switch (resultType)
{
case '+': // simple string
return ReadLineTerminatedString(ResultType.SimpleString, buffer, ref offset, ref count);
......@@ -1159,12 +1169,10 @@ public void CheckForStaleConnection(ref SocketManager.ManagerState state)
int now = Environment.TickCount;
if (firstUnansweredWrite != 0 && (now - firstUnansweredWrite) > this.Multiplexer.RawConfig.ResponseTimeout)
if (firstUnansweredWrite != 0 && (now - firstUnansweredWrite) > Multiplexer.RawConfig.ResponseTimeout)
{
this.RecordConnectionFailed(ConnectionFailureType.SocketFailure, ref state, origin: "CheckForStaleConnection");
RecordConnectionFailed(ConnectionFailureType.SocketFailure, ref state, origin: "CheckForStaleConnection");
}
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment