Commit 7b6c2b78 authored by Nick Craver's avatar Nick Craver

Cleanup: PhysicalBridge

parent c2dd3a4d
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
enum WriteResult internal enum WriteResult
{ {
QueueEmptyAfterWrite, QueueEmptyAfterWrite,
NothingToDo, NothingToDo,
...@@ -16,36 +16,35 @@ enum WriteResult ...@@ -16,36 +16,35 @@ enum WriteResult
CompetingWriter, CompetingWriter,
NoConnection, NoConnection,
} }
sealed partial class PhysicalBridge : IDisposable internal sealed partial class PhysicalBridge : IDisposable
{ {
internal readonly string Name; internal readonly string Name;
internal int inWriteQueue = 0; internal int inWriteQueue = 0;
const int ProfileLogSamples = 10; private const int ProfileLogSamples = 10;
const double ProfileLogSeconds = (ConnectionMultiplexer.MillisecondsPerHeartbeat * ProfileLogSamples) / 1000.0; private const double ProfileLogSeconds = (ConnectionMultiplexer.MillisecondsPerHeartbeat * ProfileLogSamples) / 1000.0;
private static readonly Message ReusableAskingCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.ASKING); private static readonly Message ReusableAskingCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.ASKING);
private readonly CompletionManager completionManager; private readonly CompletionManager completionManager;
readonly long[] profileLog = new long[ProfileLogSamples]; private readonly long[] profileLog = new long[ProfileLogSamples];
private readonly MessageQueue queue = new MessageQueue(); private readonly MessageQueue queue = new MessageQueue();
int activeWriters = 0; private int activeWriters = 0;
private int beating; private int beating;
int failConnectCount = 0; private int failConnectCount = 0;
volatile bool isDisposed; private volatile bool isDisposed;
long nonPreferredEndpointCount; private long nonPreferredEndpointCount;
//private volatile int missedHeartbeats; //private volatile int missedHeartbeats;
private long operationCount, socketCount; private long operationCount, socketCount;
private volatile PhysicalConnection physical; private volatile PhysicalConnection physical;
private long profileLastLog;
long profileLastLog; private int profileLogIndex;
int profileLogIndex; private volatile bool reportNextFailure = true, reconfigureNextFailure = false;
volatile bool reportNextFailure = true, reconfigureNextFailure = false;
private volatile int state = (int)State.Disconnected; private volatile int state = (int)State.Disconnected;
public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type) public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type)
...@@ -108,15 +107,9 @@ public void ReportNextFailure() ...@@ -108,15 +107,9 @@ public void ReportNextFailure()
reportNextFailure = true; reportNextFailure = true;
} }
public override string ToString() public override string ToString() => ConnectionType + "/" + Format.ToString(ServerEndPoint.EndPoint);
{
return ConnectionType + "/" + Format.ToString(ServerEndPoint.EndPoint);
}
public void TryConnect(TextWriter log) public void TryConnect(TextWriter log) => GetConnection(log);
{
GetConnection(log);
}
public bool TryEnqueue(Message message, bool isSlave) public bool TryEnqueue(Message message, bool isSlave)
{ {
...@@ -149,9 +142,10 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -149,9 +142,10 @@ public bool TryEnqueue(Message message, bool isSlave)
} }
return true; return true;
} }
internal void AppendProfile(StringBuilder sb) internal void AppendProfile(StringBuilder sb)
{ {
long[] clone = new long[ProfileLogSamples + 1]; var clone = new long[ProfileLogSamples + 1];
for (int i = 0; i < ProfileLogSamples; i++) for (int i = 0; i < ProfileLogSamples; i++)
{ {
clone[i] = Interlocked.Read(ref profileLog[i]); clone[i] = Interlocked.Read(ref profileLog[i]);
...@@ -289,7 +283,6 @@ internal void OnConnected(PhysicalConnection connection, TextWriter log) ...@@ -289,7 +283,6 @@ internal void OnConnected(PhysicalConnection connection, TextWriter log)
} }
} }
internal void ResetNonConnected() internal void ResetNonConnected()
{ {
var tmp = physical; var tmp = physical;
...@@ -316,8 +309,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti ...@@ -316,8 +309,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
Trace("OnDisconnected"); Trace("OnDisconnected");
// if the next thing in the pipe is a PING, we can tell it that we failed (this really helps spot doomed connects) // if the next thing in the pipe is a PING, we can tell it that we failed (this really helps spot doomed connects)
int count; var ping = queue.DequeueUnsentPing(out int count);
var ping = queue.DequeueUnsentPing(out count);
if (ping != null) if (ping != null)
{ {
Trace("Marking PING as failed (queue length: " + count + ")"); Trace("Marking PING as failed (queue length: " + count + ")");
...@@ -366,7 +358,7 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -366,7 +358,7 @@ internal void OnFullyEstablished(PhysicalConnection connection)
private int connectStartTicks; private int connectStartTicks;
private long connectTimeoutRetryCount = 0; private long connectTimeoutRetryCount = 0;
internal void OnHeartbeat(bool ifConnectedOnly) internal void OnHeartbeat(bool ifConnectedOnly)
{ {
bool runThisTime = false; bool runThisTime = false;
...@@ -392,9 +384,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -392,9 +384,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
Trace("Aborting connect"); Trace("Aborting connect");
// abort and reconnect // abort and reconnect
var snapshot = physical; var snapshot = physical;
bool isCurrent; OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out bool isCurrent, out State oldState);
State oldState;
OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out isCurrent, out oldState);
using (snapshot) { } // dispose etc using (snapshot) { } // dispose etc
TryConnect(null); TryConnect(null);
} }
...@@ -432,9 +422,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -432,9 +422,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
} }
else else
{ {
bool ignore; OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState);
State oldState;
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out ignore, out oldState);
} }
} }
else if (!queue.Any() && tmp.GetSentAwaitingResponseCount() != 0) else if (!queue.Any() && tmp.GetSentAwaitingResponseCount() != 0)
...@@ -451,7 +439,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -451,7 +439,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
AbortUnsent(); AbortUnsent();
Multiplexer.Trace("Resurrecting " + this.ToString()); Multiplexer.Trace("Resurrecting " + ToString());
GetConnection(null); GetConnection(null);
} }
break; break;
...@@ -735,10 +723,12 @@ private void LogNonPreferred(CommandFlags flags, bool isSlave) ...@@ -735,10 +723,12 @@ private void LogNonPreferred(CommandFlags flags, bool isSlave)
} }
} }
} }
private void OnInternalError(Exception exception, [CallerMemberName] string origin = null) private void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{ {
Multiplexer.OnInternalError(exception, ServerEndPoint.EndPoint, ConnectionType, origin); Multiplexer.OnInternalError(exception, ServerEndPoint.EndPoint, ConnectionType, origin);
} }
private void SelectDatabase(PhysicalConnection connection, Message message) private void SelectDatabase(PhysicalConnection connection, Message message)
{ {
int db = message.Db; int db = message.Db;
...@@ -754,6 +744,7 @@ private void SelectDatabase(PhysicalConnection connection, Message message) ...@@ -754,6 +744,7 @@ private void SelectDatabase(PhysicalConnection connection, Message message)
} }
} }
} }
private bool WriteMessageToServer(PhysicalConnection connection, Message message) private bool WriteMessageToServer(PhysicalConnection connection, Message message)
{ {
if (message == null) return true; if (message == null) return true;
...@@ -833,7 +824,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -833,7 +824,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
CompleteSyncOrAsync(message); CompleteSyncOrAsync(message);
// this failed without actually writing; we're OK with that... unless there's a transaction // this failed without actually writing; we're OK with that... unless there's a transaction
if (connection != null && connection.TransactionActive) if (connection?.TransactionActive == true)
{ {
// we left it in a broken state; need to kill the connection // we left it in a broken state; need to kill the connection
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure, ex); connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure, ex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment