Commit ee6c021d authored by Nick Craver's avatar Nick Craver

Cleanup: PhysicalBridge

parent deec3fec
...@@ -62,8 +62,8 @@ public enum State : byte ...@@ -62,8 +62,8 @@ public enum State : byte
ConnectedEstablishing, ConnectedEstablishing,
ConnectedEstablished, ConnectedEstablished,
Disconnected Disconnected
} }
public Exception LastException { get; private set; } public Exception LastException { get; private set; }
public ConnectionType ConnectionType { get; } public ConnectionType ConnectionType { get; }
...@@ -122,7 +122,7 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -122,7 +122,7 @@ public bool TryEnqueue(Message message, bool isSlave)
{ {
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
queue.Push(message); queue.Push(message);
message.SetEnqueued(); message.SetEnqueued();
return true; return true;
} }
...@@ -133,7 +133,7 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -133,7 +133,7 @@ public bool TryEnqueue(Message message, bool isSlave)
} }
} }
bool reqWrite = queue.Push(message); bool reqWrite = queue.Push(message);
message.SetEnqueued(); message.SetEnqueued();
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
Trace("Now pending: " + GetPendingCount()); Trace("Now pending: " + GetPendingCount());
...@@ -144,7 +144,7 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -144,7 +144,7 @@ public bool TryEnqueue(Message message, bool isSlave)
} }
return true; return true;
} }
internal void AppendProfile(StringBuilder sb) internal void AppendProfile(StringBuilder sb)
{ {
var clone = new long[ProfileLogSamples + 1]; var clone = new long[ProfileLogSamples + 1];
...@@ -299,7 +299,7 @@ internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailur ...@@ -299,7 +299,7 @@ internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailur
{ {
if (reportNextFailure) if (reportNextFailure)
{ {
LastException = innerException; LastException = innerException;
reportNextFailure = false; // until it is restored reportNextFailure = false; // until it is restored
var endpoint = ServerEndPoint.EndPoint; var endpoint = ServerEndPoint.EndPoint;
Multiplexer.OnConnectionFailed(endpoint, ConnectionType, failureType, innerException, reconfigureNextFailure); Multiplexer.OnConnectionFailed(endpoint, ConnectionType, failureType, innerException, reconfigureNextFailure);
...@@ -381,7 +381,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -381,7 +381,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
bool shouldRetry = Multiplexer.RawConfig.ReconnectRetryPolicy.ShouldRetry(Interlocked.Read(ref connectTimeoutRetryCount), connectTimeMilliseconds); bool shouldRetry = Multiplexer.RawConfig.ReconnectRetryPolicy.ShouldRetry(Interlocked.Read(ref connectTimeoutRetryCount), connectTimeMilliseconds);
if (shouldRetry) if (shouldRetry)
{ {
Interlocked.Increment(ref connectTimeoutRetryCount); Interlocked.Increment(ref connectTimeoutRetryCount);
LastException = ExceptionFactory.UnableToConnect(Multiplexer.RawConfig.AbortOnConnectFail, "ConnectTimeout"); LastException = ExceptionFactory.UnableToConnect(Multiplexer.RawConfig.AbortOnConnectFail, "ConnectTimeout");
Trace("Aborting connect"); Trace("Aborting connect");
// abort and reconnect // abort and reconnect
...@@ -534,8 +534,8 @@ internal bool WriteMessageDirect(PhysicalConnection tmp, Message next) ...@@ -534,8 +534,8 @@ internal bool WriteMessageDirect(PhysicalConnection tmp, Message next)
next.Fail(ConnectionFailureType.ProtocolFailure, null); next.Fail(ConnectionFailureType.ProtocolFailure, null);
CompleteSyncOrAsync(next); CompleteSyncOrAsync(next);
return false; return false;
} }
//The parent message (next) may be returned from GetMessages //The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below //and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == next; messageIsSent = messageIsSent || subCommand == next;
} }
...@@ -725,12 +725,12 @@ private void LogNonPreferred(CommandFlags flags, bool isSlave) ...@@ -725,12 +725,12 @@ private void LogNonPreferred(CommandFlags flags, bool isSlave)
} }
} }
} }
private void OnInternalError(Exception exception, [CallerMemberName] string origin = null) private void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{ {
Multiplexer.OnInternalError(exception, ServerEndPoint.EndPoint, ConnectionType, origin); Multiplexer.OnInternalError(exception, ServerEndPoint.EndPoint, ConnectionType, origin);
} }
private void SelectDatabase(PhysicalConnection connection, Message message) private void SelectDatabase(PhysicalConnection connection, Message message)
{ {
int db = message.Db; int db = message.Db;
...@@ -740,13 +740,13 @@ private void SelectDatabase(PhysicalConnection connection, Message message) ...@@ -740,13 +740,13 @@ private void SelectDatabase(PhysicalConnection connection, Message message)
if (sel != null) if (sel != null)
{ {
connection.Enqueue(sel); connection.Enqueue(sel);
sel.WriteImpl(connection); sel.WriteImpl(connection);
sel.SetRequestSent(); sel.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
} }
} }
} }
private bool WriteMessageToServer(PhysicalConnection connection, Message message) private bool WriteMessageToServer(PhysicalConnection connection, Message message)
{ {
if (message == null) return true; if (message == null) return true;
...@@ -768,7 +768,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -768,7 +768,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
if (readmode != null) if (readmode != null)
{ {
connection.Enqueue(readmode); connection.Enqueue(readmode);
readmode.WriteTo(connection); readmode.WriteTo(connection);
readmode.SetRequestSent(); readmode.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
} }
...@@ -777,7 +777,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -777,7 +777,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
{ {
var asking = ReusableAskingCommand; var asking = ReusableAskingCommand;
connection.Enqueue(asking); connection.Enqueue(asking);
asking.WriteImpl(connection); asking.WriteImpl(connection);
asking.SetRequestSent(); asking.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
} }
...@@ -796,8 +796,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -796,8 +796,8 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
} }
connection.Enqueue(message); connection.Enqueue(message);
message.WriteImpl(connection); message.WriteImpl(connection);
message.SetRequestSent(); message.SetRequestSent();
IncrementOpCount(); IncrementOpCount();
// some commands smash our ability to trust the database; some commands // some commands smash our ability to trust the database; some commands
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment