Commit 7e06b342 authored by Nick Craver's avatar Nick Craver

Cleanup: ServerEndPoint

parent 05554dd8
...@@ -19,7 +19,6 @@ internal enum UnselectableFlags ...@@ -19,7 +19,6 @@ internal enum UnselectableFlags
RedundantMaster = 1, RedundantMaster = 1,
DidNotRespond = 2, DidNotRespond = 2,
ServerType = 4 ServerType = 4
} }
internal sealed partial class ServerEndPoint : IDisposable internal sealed partial class ServerEndPoint : IDisposable
...@@ -30,30 +29,23 @@ internal sealed partial class ServerEndPoint : IDisposable ...@@ -30,30 +29,23 @@ internal sealed partial class ServerEndPoint : IDisposable
private static readonly ServerEndPoint[] NoSlaves = new ServerEndPoint[0]; private static readonly ServerEndPoint[] NoSlaves = new ServerEndPoint[0];
private readonly EndPoint endpoint; private readonly EndPoint endpoint;
private readonly Hashtable knownScripts = new Hashtable(StringComparer.Ordinal); private readonly Hashtable knownScripts = new Hashtable(StringComparer.Ordinal);
private readonly ConnectionMultiplexer multiplexer; private readonly ConnectionMultiplexer multiplexer;
private int databases, writeEverySeconds; private int databases, writeEverySeconds;
private PhysicalBridge interactive, subscription; private PhysicalBridge interactive, subscription;
private bool isDisposed;
bool isDisposed; private ServerType serverType;
ServerType serverType;
private bool slaveReadOnly, isSlave; private bool slaveReadOnly, isSlave;
private volatile UnselectableFlags unselectableReasons; private volatile UnselectableFlags unselectableReasons;
private Version version; private Version version;
internal void ResetNonConnected() internal void ResetNonConnected()
{ {
interactive?.ResetNonConnected(); interactive?.ResetNonConnected();
subscription?.ResetNonConnected(); subscription?.ResetNonConnected();
} }
public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, TextWriter log) public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, TextWriter log)
{ {
this.multiplexer = multiplexer; this.multiplexer = multiplexer;
...@@ -83,14 +75,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, Text ...@@ -83,14 +75,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, Text
public bool HasDatabases => serverType == ServerType.Standalone; public bool HasDatabases => serverType == ServerType.Standalone;
public bool IsConnected public bool IsConnected => interactive?.IsConnected == true;
{
get
{
var tmp = interactive;
return tmp != null && tmp.IsConnected;
}
}
internal Exception LastException internal Exception LastException
{ {
...@@ -100,12 +85,9 @@ internal Exception LastException ...@@ -100,12 +85,9 @@ internal Exception LastException
var tmp2 = subscription; var tmp2 = subscription;
//check if subscription endpoint has a better lastexception //check if subscription endpoint has a better lastexception
if (tmp2 != null && tmp2.LastException != null) if (tmp2?.LastException != null && tmp2.LastException.Data.Contains("Redis-FailureType") && !tmp2.LastException.Data["Redis-FailureType"].ToString().Equals(nameof(ConnectionFailureType.UnableToConnect)))
{ {
if (tmp2.LastException.Data.Contains("Redis-FailureType") && !tmp2.LastException.Data["Redis-FailureType"].ToString().Equals(ConnectionFailureType.UnableToConnect.ToString())) return tmp2.LastException;
{
return tmp2.LastException;
}
} }
return tmp1?.LastException; return tmp1?.LastException;
} }
...@@ -185,6 +167,7 @@ public PhysicalBridge GetBridge(ConnectionType type, bool create = true, TextWri ...@@ -185,6 +167,7 @@ public PhysicalBridge GetBridge(ConnectionType type, bool create = true, TextWri
} }
return null; return null;
} }
public PhysicalBridge GetBridge(RedisCommand command, bool create = true) public PhysicalBridge GetBridge(RedisCommand command, bool create = true)
{ {
if (isDisposed) return null; if (isDisposed) return null;
...@@ -207,7 +190,6 @@ public RedisFeatures GetFeatures() ...@@ -207,7 +190,6 @@ public RedisFeatures GetFeatures()
public void SetClusterConfiguration(ClusterConfiguration configuration) public void SetClusterConfiguration(ClusterConfiguration configuration)
{ {
ClusterConfiguration = configuration; ClusterConfiguration = configuration;
if (configuration != null) if (configuration != null)
...@@ -251,16 +233,10 @@ public void SetUnselectable(UnselectableFlags flags) ...@@ -251,16 +233,10 @@ public void SetUnselectable(UnselectableFlags flags)
} }
} }
} }
public override string ToString()
{
return Format.ToString(EndPoint);
}
public bool TryEnqueue(Message message) public override string ToString() => Format.ToString(EndPoint);
{
var bridge = GetBridge(message.Command); public bool TryEnqueue(Message message) => GetBridge(message.Command)?.TryEnqueue(message, isSlave) == true;
return bridge != null && bridge.TryEnqueue(message, isSlave);
}
internal void Activate(ConnectionType type, TextWriter log) internal void Activate(ConnectionType type, TextWriter log)
{ {
...@@ -341,23 +317,21 @@ internal void AutoConfigure(PhysicalConnection connection) ...@@ -341,23 +317,21 @@ internal void AutoConfigure(PhysicalConnection connection)
} }
} }
int _nextReplicaOffset; private int _nextReplicaOffset;
internal uint NextReplicaOffset() // used to round-robin between multiple replicas internal uint NextReplicaOffset() // used to round-robin between multiple replicas
=> (uint) System.Threading.Interlocked.Increment(ref _nextReplicaOffset); => (uint) System.Threading.Interlocked.Increment(ref _nextReplicaOffset);
internal Task Close() internal Task Close()
{ {
var tmp = interactive; var tmp = interactive;
Task result;
if (tmp == null || !tmp.IsConnected || !multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT)) if (tmp == null || !tmp.IsConnected || !multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT))
{ {
result = CompletedTask<bool>.Default(null); return CompletedTask<bool>.Default(null);
} }
else else
{ {
result = QueueDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: interactive); return QueueDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: interactive);
} }
return result;
} }
internal void FlushScriptCache() internal void FlushScriptCache()
...@@ -473,7 +447,7 @@ internal Message GetTracerMessage(bool assertIdentity) ...@@ -473,7 +447,7 @@ internal Message GetTracerMessage(bool assertIdentity)
internal bool IsSelectable(RedisCommand command) internal bool IsSelectable(RedisCommand command)
{ {
var bridge = unselectableReasons == 0 ? GetBridge(command, false) : null; var bridge = unselectableReasons == 0 ? GetBridge(command, false) : null;
return bridge != null && bridge.IsConnected; return bridge?.IsConnected == true;
} }
internal void OnEstablishing(PhysicalConnection connection, TextWriter log) internal void OnEstablishing(PhysicalConnection connection, TextWriter log)
...@@ -506,7 +480,7 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -506,7 +480,7 @@ internal void OnFullyEstablished(PhysicalConnection connection)
connection.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); connection.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
} }
} }
internal int LastInfoReplicationCheckSecondsAgo internal int LastInfoReplicationCheckSecondsAgo
{ {
get { return unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastInfoReplicationCheckTicks)) / 1000; } get { return unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastInfoReplicationCheckTicks)) / 1000; }
...@@ -519,7 +493,6 @@ public EndPoint MasterEndPoint ...@@ -519,7 +493,6 @@ public EndPoint MasterEndPoint
set { SetConfig(ref masterEndPoint, value); } set { SetConfig(ref masterEndPoint, value); }
} }
internal bool CheckInfoReplication() internal bool CheckInfoReplication()
{ {
lastInfoReplicationCheckTicks = Environment.TickCount; lastInfoReplicationCheckTicks = Environment.TickCount;
...@@ -534,6 +507,7 @@ internal bool CheckInfoReplication() ...@@ -534,6 +507,7 @@ internal bool CheckInfoReplication()
} }
return false; return false;
} }
private int lastInfoReplicationCheckTicks; private int lastInfoReplicationCheckTicks;
private int _heartBeatActive; private int _heartBeatActive;
...@@ -544,8 +518,6 @@ internal void OnHeartbeat() ...@@ -544,8 +518,6 @@ internal void OnHeartbeat()
{ {
try try
{ {
interactive?.OnHeartbeat(false); interactive?.OnHeartbeat(false);
subscription?.OnHeartbeat(false); subscription?.OnHeartbeat(false);
} }
...@@ -600,7 +572,6 @@ internal string Summary() ...@@ -600,7 +572,6 @@ internal string Summary()
{ {
var sb = new StringBuilder(Format.ToString(endpoint)) var sb = new StringBuilder(Format.ToString(endpoint))
.Append(": ").Append(serverType).Append(" v").Append(version).Append(", ").Append(isSlave ? "slave" : "master"); .Append(": ").Append(serverType).Append(" v").Append(version).Append(", ").Append(isSlave ? "slave" : "master");
if (databases > 0) sb.Append("; ").Append(databases).Append(" databases"); if (databases > 0) sb.Append("; ").Append(databases).Append(" databases");
if (writeEverySeconds > 0) if (writeEverySeconds > 0)
...@@ -628,6 +599,7 @@ internal string Summary() ...@@ -628,6 +599,7 @@ internal string Summary()
} }
return sb.ToString(); return sb.ToString();
} }
internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, Message message, ResultProcessor<T> processor) internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, Message message, ResultProcessor<T> processor)
{ {
if (message != null) if (message != null)
...@@ -653,7 +625,8 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log) ...@@ -653,7 +625,8 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
bridge.TryConnect(log); bridge.TryConnect(log);
return bridge; return bridge;
} }
void Handshake(PhysicalConnection connection, TextWriter log)
private void Handshake(PhysicalConnection connection, TextWriter log)
{ {
multiplexer.LogLocked(log, "Server handshake"); multiplexer.LogLocked(log, "Server handshake");
if (connection == null) if (connection == null)
...@@ -698,7 +671,6 @@ void Handshake(PhysicalConnection connection, TextWriter log) ...@@ -698,7 +671,6 @@ void Handshake(PhysicalConnection connection, TextWriter log)
tracer = LoggingMessage.Create(log, tracer); tracer = LoggingMessage.Create(log, tracer);
WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection); WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection);
// note: this **must** be the last thing on the subscription handshake, because after this // note: this **must** be the last thing on the subscription handshake, because after this
// we will be in subscriber mode: regular commands cannot be sent // we will be in subscriber mode: regular commands cannot be sent
if (connType == ConnectionType.Subscription) if (connType == ConnectionType.Subscription)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment