Commit 7171938d authored by Marc Gravell's avatar Marc Gravell

make sure PhysicalConnection doesn't root the multiplexer; this, however,...

make sure PhysicalConnection doesn't root the multiplexer; this, however, means that every check to the bridge needs to be null-checked
parent fd7f7a3a
......@@ -42,8 +42,6 @@ public void MuxerIsCollected()
int after = ConnectionMultiplexer.CollectedWithoutDispose;
Thread.Sleep(TimeSpan.FromSeconds(60));
Assert.Null(wr.Target);
Assert.Equal(before + 1, after);
}
......
......@@ -77,13 +77,14 @@ internal partial class PhysicalConnection
partial void OnDebugAbort()
{
if (!Multiplexer.AllowConnect)
var bridge = BridgeCouldBeNull;
if (bridge == null || !bridge.Multiplexer.AllowConnect)
{
throw new RedisConnectionException(ConnectionFailureType.InternalFailure, "debugging");
}
}
public bool IgnoreConnect => Multiplexer.IgnoreConnect;
public bool IgnoreConnect => BridgeCouldBeNull?.Multiplexer?.IgnoreConnect ?? false;
private static volatile bool emulateStaleConnection;
public static bool EmulateStaleConnection
......
......@@ -35,7 +35,8 @@ protected override void WriteImpl(PhysicalConnection physical)
{
try
{
physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical.Bridge, tail.CommandAndKey);
var bridge = physical.BridgeCouldBeNull;
bridge?.Multiplexer?.LogLocked(log, "Writing to {0}: {1}", bridge, tail.CommandAndKey);
}
catch { }
tail.WriteTo(physical);
......
......@@ -98,8 +98,19 @@ public void Dispose()
{
physical = null;
}
GC.SuppressFinalize(this);
}
~PhysicalBridge()
{
// shouldn't *really* touch managed objects
// in a finalizer, but we need to kill that socket,
// and this is the first place that isn't going to
// be rooted by the socket async bits
try {
var tmp = physical;
tmp?.Shutdown();
} catch { }
}
public void ReportNextFailure()
{
reportNextFailure = true;
......@@ -399,7 +410,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
if (state == (int)State.ConnectedEstablished)
{
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
tmp.Bridge.ServerEndPoint.ClearUnselectable(UnselectableFlags.DidNotRespond);
tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond);
}
tmp.OnBridgeHeartbeat();
int writeEverySeconds = ServerEndPoint.WriteEverySeconds,
......
......@@ -80,24 +80,30 @@ public PhysicalConnection(PhysicalBridge bridge)
lastWriteTickCount = lastReadTickCount = Environment.TickCount;
lastBeatTickCount = 0;
connectionType = bridge.ConnectionType;
Multiplexer = bridge.Multiplexer;
ChannelPrefix = Multiplexer.RawConfig.ChannelPrefix;
_bridge = new WeakReference(bridge);
ChannelPrefix = bridge.Multiplexer.RawConfig.ChannelPrefix;
if (ChannelPrefix?.Length == 0) ChannelPrefix = null; // null tests are easier than null+empty
var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
Bridge = bridge;
OnCreateEcho();
}
internal async void BeginConnectAsync(TextWriter log)
{
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var endpoint = Bridge.ServerEndPoint.EndPoint;
var bridge = BridgeCouldBeNull;
var endpoint = bridge?.ServerEndPoint?.EndPoint;
if(endpoint == null)
{
log?.WriteLine("No endpoint");
}
Multiplexer.Trace("Connecting...", physicalName);
Trace("Connecting...");
_socket = SocketManager.CreateSocket(endpoint);
Multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
bridge.Multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
var awaitable = new SocketAwaitable();
_socketArgs = new SocketAsyncEventArgs
......@@ -111,7 +117,7 @@ internal async void BeginConnectAsync(TextWriter log)
{
if (_socket.ConnectAsync(_socketArgs))
{ // asynchronous operation is pending
timeoutSource = ConfigureTimeout(_socketArgs, Multiplexer.RawConfig.ConnectTimeout);
timeoutSource = ConfigureTimeout(_socketArgs, bridge.Multiplexer.RawConfig.ConnectTimeout);
}
else
{ // completed synchronously
......@@ -128,9 +134,9 @@ internal async void BeginConnectAsync(TextWriter log)
await awaitable; // wait for the connect to complete or fail (will throw)
timeoutSource?.Cancel();
if (await ConnectedAsync(_socket, log, Multiplexer.SocketManager).ForAwait())
if (await ConnectedAsync(_socket, log, bridge.Multiplexer.SocketManager).ForAwait())
{
Multiplexer.LogLocked(log, "Starting read");
bridge.Multiplexer.LogLocked(log, "Starting read");
try
{
StartReading();
......@@ -150,7 +156,7 @@ internal async void BeginConnectAsync(TextWriter log)
}
catch (ObjectDisposedException)
{
Multiplexer.LogLocked(log, "(socket shutdown)");
bridge.Multiplexer.LogLocked(log, "(socket shutdown)");
try { Error(); }
catch (Exception inner)
{
......@@ -203,11 +209,22 @@ private enum ReadMode : byte
ReadWrite
}
public PhysicalBridge Bridge { get; }
private readonly WeakReference _bridge;
public PhysicalBridge BridgeCouldBeNull => (PhysicalBridge)_bridge.Target;
public long LastWriteSecondsAgo => unchecked(Environment.TickCount - Thread.VolatileRead(ref lastWriteTickCount)) / 1000;
public ConnectionMultiplexer Multiplexer { get; }
private bool IncludeDetailInExceptions
{
get
{
var bridge = BridgeCouldBeNull;
return bridge == null ? false : bridge.Multiplexer.IncludeDetailInExceptions;
}
}
[Conditional("VERBOSE")]
internal void Trace(string message) => BridgeCouldBeNull?.Multiplexer?.Trace(message, physicalName);
public long SubscriptionCount { get; set; }
......@@ -216,13 +233,15 @@ private enum ReadMode : byte
partial void ShouldIgnoreConnect(ref bool ignore);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown()
internal void Shutdown()
{
if (_socket != null)
var socket = _socket;
socket = null;
if (socket != null)
{
try { _socket.Shutdown(SocketShutdown.Both); } catch { }
try { _socket.Close(); } catch { }
try { _socket.Dispose(); } catch { }
try { socket.Shutdown(SocketShutdown.Both); } catch { }
try { socket.Close(); } catch { }
try { socket.Dispose(); } catch { }
}
try { using (_socketArgs) { } } catch { }
}
......@@ -233,7 +252,7 @@ public void Dispose()
_ioPipe = null;
if (ioPipe != null)
{
Multiplexer.Trace("Disconnecting...", physicalName);
Trace("Disconnecting...");
try { ioPipe.Input?.CancelPendingRead(); } catch { }
try { ioPipe.Input?.Complete(); } catch { }
try { ioPipe.Output?.CancelPendingFlush(); } catch { }
......@@ -245,11 +264,12 @@ public void Dispose()
if (_socket != null)
{
Shutdown();
_socket = default;
Multiplexer.Trace("Disconnected", physicalName);
Trace("Disconnected");
RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed);
}
OnCloseEcho();
GC.SuppressFinalize(this);
}
private async Task AwaitedFlush(ValueTask<FlushResult> flush)
......@@ -278,9 +298,11 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
if (failureType == ConnectionFailureType.InternalFailure) OnInternalError(innerException, origin);
// stop anything new coming in...
Bridge.Trace("Failed: " + failureType);
BridgeCouldBeNull?.Trace("Failed: " + failureType);
int @in = -1;
Bridge.OnDisconnected(failureType, this, out bool isCurrent, out PhysicalBridge.State oldState);
PhysicalBridge.State oldState = PhysicalBridge.State.Disconnected;
bool isCurrent = false;
BridgeCouldBeNull?.OnDisconnected(failureType, this, out isCurrent, out oldState);
if (oldState == PhysicalBridge.State.ConnectedEstablished)
{
try
......@@ -299,36 +321,40 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
var exMessage = new StringBuilder(failureType.ToString());
var data = new List<Tuple<string, string>>();
if (Multiplexer.IncludeDetailInExceptions)
if (IncludeDetailInExceptions)
{
exMessage.Append(" on " + Format.ToString(Bridge.ServerEndPoint.EndPoint) + "/" + connectionType);
var bridge = BridgeCouldBeNull;
if (bridge != null)
{
exMessage.Append(" on " + Format.ToString(bridge.ServerEndPoint?.EndPoint) + "/" + connectionType);
data.Add(Tuple.Create("FailureType", failureType.ToString()));
data.Add(Tuple.Create("EndPoint", Format.ToString(Bridge.ServerEndPoint.EndPoint)));
data.Add(Tuple.Create("FailureType", failureType.ToString()));
data.Add(Tuple.Create("EndPoint", Format.ToString(bridge.ServerEndPoint?.EndPoint)));
void add(string lk, string sk, string v)
{
data.Add(Tuple.Create(lk, v));
exMessage.Append(", ").Append(sk).Append(": ").Append(v);
}
void add(string lk, string sk, string v)
{
data.Add(Tuple.Create(lk, v));
exMessage.Append(", ").Append(sk).Append(": ").Append(v);
}
add("Origin", "origin", origin);
// add("Input-Buffer", "input-buffer", _ioPipe.Input);
add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString());
add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago");
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s");
add("Previous-Physical-State", "state", oldState.ToString());
add("Manager", "mgr", Multiplexer?.SocketManager?.GetState());
if (@in >= 0)
{
add("Inbound-Bytes", "in", @in.ToString());
}
add("Origin", "origin", origin);
// add("Input-Buffer", "input-buffer", _ioPipe.Input);
add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString());
add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago");
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", bridge.ServerEndPoint?.WriteEverySeconds + "s");
add("Previous-Physical-State", "state", oldState.ToString());
add("Manager", "mgr", bridge.Multiplexer.SocketManager?.GetState());
if (@in >= 0)
{
add("Inbound-Bytes", "in", @in.ToString());
}
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (Bridge.IsBeating ? " (mid-beat)" : ""));
add("Last-Multiplexer-Heartbeat", "last-mbeat", Multiplexer.LastHeartbeatSecondsAgo + "s ago");
add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago");
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (BridgeCouldBeNull.IsBeating ? " (mid-beat)" : ""));
add("Last-Multiplexer-Heartbeat", "last-mbeat", bridge.Multiplexer.LastHeartbeatSecondsAgo + "s ago");
add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago");
}
}
outerException = innerException == null
......@@ -340,19 +366,19 @@ void add(string lk, string sk, string v)
outerException.Data["Redis-" + kv.Item1] = kv.Item2;
}
Bridge.OnConnectionFailed(this, failureType, outerException);
BridgeCouldBeNull?.OnConnectionFailed(this, failureType, outerException);
}
// cleanup
lock (_writtenAwaitingResponse)
{
Bridge.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count);
BridgeCouldBeNull?.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count);
while (_writtenAwaitingResponse.Count != 0)
{
var next = _writtenAwaitingResponse.Dequeue();
Bridge.Trace("Failing: " + next);
BridgeCouldBeNull?.Trace("Failing: " + next);
next.SetException(innerException is RedisException ? innerException : outerException);
Bridge.CompleteSyncOrAsync(next);
BridgeCouldBeNull?.CompleteSyncOrAsync(next);
}
}
......@@ -397,8 +423,8 @@ internal void GetCounters(ConnectionCounters counters)
internal Message GetReadModeCommand(bool isMasterOnly)
{
var serverEndpoint = Bridge.ServerEndPoint;
if (serverEndpoint.RequiresReadMode)
var serverEndpoint = BridgeCouldBeNull?.ServerEndPoint;
if (serverEndpoint != null && serverEndpoint.RequiresReadMode)
{
ReadMode requiredReadMode = isMasterOnly ? ReadMode.ReadWrite : ReadMode.ReadOnly;
if (requiredReadMode != currentReadMode)
......@@ -425,7 +451,8 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
if (targetDatabase < 0) return null;
if (targetDatabase != currentDatabase)
{
var serverEndpoint = Bridge.ServerEndPoint;
var serverEndpoint = BridgeCouldBeNull?.ServerEndPoint;
if (serverEndpoint == null) return null;
int available = serverEndpoint.Databases;
if (!serverEndpoint.HasDatabases) // only db0 is available on cluster/twemproxy
......@@ -440,7 +467,7 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
if (message.Command == RedisCommand.SELECT)
{
// this could come from an EVAL/EVALSHA inside a transaction, for example; we'll accept it
Bridge.Trace("Switching database: " + targetDatabase);
BridgeCouldBeNull?.Trace("Switching database: " + targetDatabase);
currentDatabase = targetDatabase;
return null;
}
......@@ -452,9 +479,9 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
if (available != 0 && targetDatabase >= available) // we positively know it is out of range
{
throw ExceptionFactory.DatabaseOutfRange(Multiplexer.IncludeDetailInExceptions, targetDatabase, message, serverEndpoint);
throw ExceptionFactory.DatabaseOutfRange(IncludeDetailInExceptions, targetDatabase, message, serverEndpoint);
}
Bridge.Trace("Switching database: " + targetDatabase);
BridgeCouldBeNull?.Trace("Switching database: " + targetDatabase);
currentDatabase = targetDatabase;
return GetSelectDatabaseCommand(targetDatabase);
}
......@@ -501,17 +528,21 @@ internal void OnBridgeHeartbeat()
{
if (_writtenAwaitingResponse.Count != 0)
{
bool includeDetail = Multiplexer.IncludeDetailInExceptions;
var server = Bridge.ServerEndPoint;
var timeout = Multiplexer.AsyncTimeoutMilliseconds;
var bridge = BridgeCouldBeNull;
if (bridge == null) return;
bool includeDetail = bridge.Multiplexer.IncludeDetailInExceptions;
var server = bridge?.ServerEndPoint;
var timeout = bridge.Multiplexer.AsyncTimeoutMilliseconds;
foreach (var msg in _writtenAwaitingResponse)
{
if (msg.HasAsyncTimedOut(now, timeout, out var elapsed))
{
var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
msg.SetException(timeoutEx); // tell the message that it is doomed
Bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc
Multiplexer.OnAsyncTimeout();
bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc
bridge.Multiplexer.OnAsyncTimeout();
}
// note: it is important that we **do not** remove the message unless we're tearing down the socket; that
// would disrupt the chain for MatchResult; we just pre-emptively abort the message from the caller's
......@@ -523,7 +554,11 @@ internal void OnBridgeHeartbeat()
internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{
Multiplexer.OnInternalError(exception, Bridge.ServerEndPoint.EndPoint, connectionType, origin);
var bridge = BridgeCouldBeNull;
if(bridge != null)
{
bridge.Multiplexer.OnInternalError(exception, bridge.ServerEndPoint.EndPoint, connectionType, origin);
}
}
internal void SetUnknownDatabase()
......@@ -573,10 +608,13 @@ internal void Write(RedisValue value)
internal void WriteHeader(RedisCommand command, int arguments)
{
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
var bridge = BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(physicalName);
var commandBytes = bridge.Multiplexer.CommandMap.GetBytes(command);
if (commandBytes == null)
{
throw ExceptionFactory.CommandDisabled(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint);
throw ExceptionFactory.CommandDisabled(IncludeDetailInExceptions, command, null, bridge.ServerEndPoint);
}
WriteHeader(commandBytes, arguments);
}
......@@ -585,11 +623,13 @@ internal void WriteHeader(RedisCommand command, int arguments)
internal void WriteHeader(string command, int arguments)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(physicalName);
if (arguments >= REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
{
throw ExceptionFactory.TooManyArgs(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint, arguments + 1);
throw ExceptionFactory.TooManyArgs(bridge.Multiplexer.IncludeDetailInExceptions, command, null, bridge.ServerEndPoint, arguments + 1);
}
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
var commandBytes = bridge.Multiplexer.CommandMap.GetBytes(command);
WriteHeader(commandBytes, arguments);
}
......@@ -1023,6 +1063,9 @@ private static LocalCertificateSelectionCallback GetAmbientClientCertificateCall
internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, SocketManager manager)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) return false;
try
{
// disallow connection in some cases
......@@ -1031,14 +1074,15 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
// the order is important here:
// non-TLS: [Socket]<==[SocketConnection:IDuplexPipe]
// TLS: [Socket]<==[NetworkStream]<==[SslStream]<==[StreamConnection:IDuplexPipe]
var config = Multiplexer.RawConfig;
var config = bridge.Multiplexer.RawConfig;
IDuplexPipe pipe;
if (config.Ssl)
{
Multiplexer.LogLocked(log, "Configuring SSL");
bridge.Multiplexer.LogLocked(log, "Configuring SSL");
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(Bridge.ServerEndPoint.EndPoint);
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint);
var ssl = new SslStream(new NetworkStream(socket), false,
config.CertificateValidationCallback ?? GetAmbientIssuerCertificateCallback(),
......@@ -1053,36 +1097,36 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
catch(Exception ex)
{
Debug.WriteLine(ex.Message);
Multiplexer?.SetAuthSuspect();
bridge.Multiplexer?.SetAuthSuspect();
throw;
}
Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
bridge.Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
}
catch (AuthenticationException authexception)
{
RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, authexception);
Multiplexer.Trace("Encryption failure");
bridge.Multiplexer.Trace("Encryption failure");
return false;
}
pipe = StreamConnection.GetDuplex(ssl, manager.SendPipeOptions, manager.ReceivePipeOptions, name: Bridge.Name);
pipe = StreamConnection.GetDuplex(ssl, manager.SendPipeOptions, manager.ReceivePipeOptions, name: bridge.Name);
}
else
{
pipe = SocketConnection.Create(socket, manager.SendPipeOptions, manager.ReceivePipeOptions, name: Bridge.Name);
pipe = SocketConnection.Create(socket, manager.SendPipeOptions, manager.ReceivePipeOptions, name: bridge.Name);
}
OnWrapForLogging(ref pipe, physicalName, manager);
_ioPipe = pipe;
Multiplexer.LogLocked(log, "Connected {0}", Bridge);
bridge.Multiplexer.LogLocked(log, "Connected {0}", bridge);
await Bridge.OnConnectedAsync(this, log).ForAwait();
await bridge.OnConnectedAsync(this, log).ForAwait();
return true;
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
bridge.Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
return false;
}
}
......@@ -1094,6 +1138,9 @@ internal void Error()
private void MatchResult(RawResult result)
{
var muxer = BridgeCouldBeNull?.Multiplexer;
if (muxer == null) return;
// check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk)
{ // out of band message does not match to a queued message
......@@ -1101,7 +1148,8 @@ private void MatchResult(RawResult result)
if (items.Length >= 3 && items[0].IsEqual(message))
{
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = Multiplexer.ConfigurationChangedChannel;
var configChanged = muxer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged))
{
EndPoint blame = null;
......@@ -1113,34 +1161,34 @@ private void MatchResult(RawResult result)
}
}
catch { /* no biggie */ }
Multiplexer.Trace("Configuration changed: " + Format.ToString(blame), physicalName);
Multiplexer.ReconfigureIfNeeded(blame, true, "broadcast");
Trace("Configuration changed: " + Format.ToString(blame));
muxer.ReconfigureIfNeeded(blame, true, "broadcast");
}
// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Multiplexer.Trace("MESSAGE: " + channel, physicalName);
Trace("MESSAGE: " + channel);
if (!channel.IsNull)
{
Multiplexer.OnMessage(channel, channel, items[2].AsRedisValue());
muxer.OnMessage(channel, channel, items[2].AsRedisValue());
}
return; // AND STOP PROCESSING!
}
else if (items.Length >= 4 && items[0].IsEqual(pmessage))
{
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Multiplexer.Trace("PMESSAGE: " + channel, physicalName);
Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
Multiplexer.OnMessage(sub, channel, items[3].AsRedisValue());
muxer.OnMessage(sub, channel, items[3].AsRedisValue());
}
return; // AND STOP PROCESSING!
}
// if it didn't look like "[p]message", then we still need to process the pending queue
}
Multiplexer.Trace("Matching result...", physicalName);
Trace("Matching result...");
Message msg;
lock (_writtenAwaitingResponse)
{
......@@ -1150,14 +1198,13 @@ private void MatchResult(RawResult result)
// be even remotely close
Monitor.Wait(_writtenAwaitingResponse, 500);
}
Multiplexer.Trace(_writtenAwaitingResponse.Count == 0, "Nothing to respond to!", physicalName);
msg = _writtenAwaitingResponse.Dequeue();
}
Multiplexer.Trace("Response to: " + msg, physicalName);
Trace("Response to: " + msg);
if (msg.ComputeResult(this, result))
{
Bridge.CompleteSyncOrAsync(msg);
BridgeCouldBeNull?.CompleteSyncOrAsync(msg);
}
}
......@@ -1170,7 +1217,7 @@ internal void OnHeartbeat()
{
try
{
Bridge.OnHeartbeat(true); // all the fun code is here
BridgeCouldBeNull?.OnHeartbeat(true); // all the fun code is here
}
catch (Exception ex)
{
......@@ -1206,7 +1253,7 @@ internal void OnHeartbeat()
allowSyncRead = handled != 0;
Multiplexer.Trace($"Processed {handled} messages", physicalName);
Trace($"Processed {handled} messages");
input.AdvanceTo(buffer.Start, buffer.End);
if (handled == 0 && readResult.IsCompleted)
......@@ -1214,12 +1261,12 @@ internal void OnHeartbeat()
break; // no more data, or trailing incomplete messages
}
}
Multiplexer.Trace("EOF", physicalName);
Trace("EOF");
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
}
catch (Exception ex)
{
Multiplexer.Trace("Faulted", physicalName);
Trace("Faulted");
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
}
......@@ -1239,7 +1286,7 @@ private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
buffer = reader.SliceFromCurrent();
messageCount++;
Multiplexer.Trace(result.ToString(), physicalName);
Trace(result.ToString());
MatchResult(result);
}
else
......@@ -1284,7 +1331,7 @@ private RawResult ReadArray(in ReadOnlySequence<byte> buffer, ref BufferReader r
var itemCount = ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
if (itemCount.HasValue)
{
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", Bridge.ServerEndPoint);
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", BridgeCouldBeNull?.ServerEndPoint);
int itemCountActual = checked((int)i64);
if (itemCountActual < 0)
......@@ -1318,7 +1365,7 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea
var prefix = ReadLineTerminatedString(ResultType.Integer, in buffer, ref reader);
if (prefix.HasValue)
{
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", Bridge.ServerEndPoint);
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", BridgeCouldBeNull?.ServerEndPoint);
int bodySize = checked((int)i64);
if (bodySize < 0)
{
......@@ -1334,7 +1381,7 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea
case ConsumeResult.Success:
return new RawResult(ResultType.BulkString, payload, false);
default:
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
throw ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", BridgeCouldBeNull?.ServerEndPoint);
}
}
}
......
......@@ -3303,11 +3303,13 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
public IEnumerable<Message> GetMessages(PhysicalConnection connection)
{
if (script != null && connection.Multiplexer.CommandMap.IsAvailable(RedisCommand.SCRIPT)
PhysicalBridge bridge;
if (script != null && (bridge = connection.BridgeCouldBeNull) != null
&& bridge.Multiplexer.CommandMap.IsAvailable(RedisCommand.SCRIPT)
&& (Flags & CommandFlags.NoScriptCache) == 0)
{
// a script was provided (rather than a hash); check it is known and supported
asciiHash = connection.Bridge.ServerEndPoint.GetScriptHash(script, command);
asciiHash = bridge.ServerEndPoint.GetScriptHash(script, command);
if (asciiHash == null)
{
......
......@@ -223,8 +223,11 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
// up-version servers, pre-condition failures exit with UNWATCH; and on down-version servers pre-condition
// failures exit with DISCARD - but that's ok : both work fine
bool explicitCheckForQueued = !connection.Bridge.ServerEndPoint.GetFeatures().ExecAbort;
var multiplexer = connection.Multiplexer;
var bridge = connection.BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(connection.ToString());
bool explicitCheckForQueued = !bridge.ServerEndPoint.GetFeatures().ExecAbort;
var multiplexer = bridge.Multiplexer;
// PART 1: issue the pre-conditions
if (!IsAborted && conditions.Length != 0)
......@@ -332,15 +335,15 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
}
if (IsAborted)
{
connection.Multiplexer.Trace("Aborting: canceling wrapped messages");
var bridge = connection.Bridge;
connection.Trace("Aborting: canceling wrapped messages");
var bridge = connection.BridgeCouldBeNull;
foreach (var op in InnerOperations)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
bridge?.CompleteSyncOrAsync(op.Wrapped);
}
}
connection.Multiplexer.Trace("End ot transaction: " + Command);
connection.Trace("End of transaction: " + Command);
yield return this; // acts as either an EXEC or an UNWATCH, depending on "aborted"
}
......@@ -378,11 +381,11 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
if (result.IsError && message is TransactionMessage tran)
{
string error = result.GetString();
var bridge = connection.Bridge;
var bridge = connection.BridgeCouldBeNull;
foreach (var op in tran.InnerOperations)
{
ServerFail(op.Wrapped, error);
bridge.CompleteSyncOrAsync(op.Wrapped);
bridge?.CompleteSyncOrAsync(op.Wrapped);
}
}
return base.SetResult(connection, message, result);
......@@ -392,26 +395,26 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
if (message is TransactionMessage tran)
{
var bridge = connection.Bridge;
var bridge = connection.BridgeCouldBeNull;
var wrapped = tran.InnerOperations;
switch (result.Type)
{
case ResultType.SimpleString:
if (tran.IsAborted && result.IsEqual(RedisLiterals.BytesOK))
{
connection.Multiplexer.Trace("Acknowledging UNWATCH (aborted electively)");
connection.Trace("Acknowledging UNWATCH (aborted electively)");
SetResult(message, false);
return true;
}
//EXEC returned with a NULL
if (!tran.IsAborted && result.IsNull)
{
connection.Multiplexer.Trace("Server aborted due to failed EXEC");
connection.Trace("Server aborted due to failed EXEC");
//cancel the commands in the transaction and mark them as complete with the completion manager
foreach (var op in wrapped)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
bridge?.CompleteSyncOrAsync(op.Wrapped);
}
SetResult(message, false);
return true;
......@@ -423,23 +426,23 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var arr = result.GetItems();
if (result.IsNull)
{
connection.Multiplexer.Trace("Server aborted due to failed WATCH");
connection.Trace("Server aborted due to failed WATCH");
foreach (var op in wrapped)
{
op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped);
bridge?.CompleteSyncOrAsync(op.Wrapped);
}
SetResult(message, false);
return true;
}
else if (wrapped.Length == arr.Length)
{
connection.Multiplexer.Trace("Server committed; processing nested replies");
connection.Trace("Server committed; processing nested replies");
for (int i = 0; i < arr.Length; i++)
{
if (wrapped[i].Wrapped.ComputeResult(connection, arr[i]))
{
bridge.CompleteSyncOrAsync(wrapped[i].Wrapped);
bridge?.CompleteSyncOrAsync(wrapped[i].Wrapped);
}
}
SetResult(message, true);
......
......@@ -166,18 +166,19 @@ public void SetException(Message message, Exception ex)
// true if ready to be completed (i.e. false if re-issued to another server)
public virtual bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var bridge = connection.BridgeCouldBeNull;
if (message is LoggingMessage logging)
{
try
{
connection.Multiplexer.LogLocked(logging.Log, "Response from {0} / {1}: {2}", connection.Bridge, message.CommandAndKey, result);
bridge?.Multiplexer?.LogLocked(logging.Log, "Response from {0} / {1}: {2}", bridge, message.CommandAndKey, result);
}
catch { }
}
if (result.IsError)
{
if (result.AssertStarts(NOAUTH)) connection?.Multiplexer?.SetAuthSuspect();
var bridge = connection.Bridge;
if (result.AssertStarts(NOAUTH)) bridge?.Multiplexer?.SetAuthSuspect();
var server = bridge.ServerEndPoint;
bool log = !message.IsInternalCall;
bool isMoved = result.AssertStarts(MOVED);
......@@ -196,9 +197,11 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
// no point sending back to same server, and no point sending to a dead server
if (!Equals(server.EndPoint, endpoint))
{
if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
if (bridge == null)
{ } // already toast
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
{
connection.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
return false;
}
else
......@@ -227,7 +230,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
{
bridge.Multiplexer.OnErrorMessage(server.EndPoint, err);
}
connection.Multiplexer.Trace("Completed with error: " + err + " (" + GetType().Name + ")", ToString());
bridge?.Multiplexer?.Trace("Completed with error: " + err + " (" + GetType().Name + ")", ToString());
if (unableToConnectError)
{
ConnectionFail(message, ConnectionFailureType.UnableToConnect, err);
......@@ -242,7 +245,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
bool coreResult = SetResultCore(connection, message, result);
if (coreResult)
{
connection.Multiplexer.Trace("Completed with success: " + result.ToString() + " (" + GetType().Name + ")", ToString());
bridge?.Multiplexer?.Trace("Completed with success: " + result.ToString() + " (" + GetType().Name + ")", ToString());
}
else
{
......@@ -481,7 +484,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var sl = message as RedisDatabase.ScriptLoadMessage;
if (sl != null)
{
connection.Bridge.ServerEndPoint.AddScript(sl.Script, asciiHash);
connection.BridgeCouldBeNull?.ServerEndPoint?.AddScript(sl.Script, asciiHash);
}
SetResult(message, hash);
return true;
......@@ -561,16 +564,21 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
{
if (result.IsError && result.AssertStarts(READONLY))
{
var server = connection.Bridge.ServerEndPoint;
server.Multiplexer.Trace("Auto-configured role: slave");
server.IsSlave = true;
var bridge = connection.BridgeCouldBeNull;
if(bridge != null)
{
var server = bridge.ServerEndPoint;
server.Multiplexer.Trace("Auto-configured role: slave");
server.IsSlave = true;
}
}
return base.SetResult(connection, message, result);
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var server = connection.Bridge.ServerEndPoint;
var server = connection.BridgeCouldBeNull?.ServerEndPoint;
if (server == null) return false;
switch (result.Type)
{
case ResultType.BulkString:
......@@ -777,8 +785,10 @@ private sealed class ClusterNodesProcessor : ResultProcessor<ClusterConfiguratio
{
internal static ClusterConfiguration Parse(PhysicalConnection connection, string nodes)
{
var server = connection.Bridge.ServerEndPoint;
var config = new ClusterConfiguration(connection.Multiplexer.ServerSelectionStrategy, nodes, server.EndPoint);
var bridge = connection.BridgeCouldBeNull;
if (bridge == null) throw new ObjectDisposedException(connection.ToString());
var server = bridge.ServerEndPoint;
var config = new ClusterConfiguration(bridge.Multiplexer.ServerSelectionStrategy, nodes, server.EndPoint);
server.SetClusterConfiguration(config);
return config;
}
......@@ -789,7 +799,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
case ResultType.BulkString:
string nodes = result.GetString();
connection.Bridge.ServerEndPoint.ServerType = ServerType.Cluster;
var bridge = connection.BridgeCouldBeNull;
if (bridge != null) bridge.ServerEndPoint.ServerType = ServerType.Cluster;
var config = Parse(connection, nodes);
SetResult(message, config);
return true;
......@@ -823,7 +834,7 @@ private sealed class ConnectionIdentityProcessor : ResultProcessor<EndPoint>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
SetResult(message, connection.Bridge.ServerEndPoint.EndPoint);
SetResult(message, connection.BridgeCouldBeNull?.ServerEndPoint?.EndPoint);
return true;
}
}
......@@ -918,7 +929,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
SetResult(message, true);
return true;
}
if(message.Command == RedisCommand.AUTH) connection?.Multiplexer?.SetAuthSuspect();
if(message.Command == RedisCommand.AUTH) connection?.BridgeCouldBeNull?.Multiplexer?.SetAuthSuspect();
return false;
}
}
......@@ -1312,7 +1323,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
{
if (result.Type == ResultType.Error && result.AssertStarts(NOSCRIPT))
{ // scripts are not flushed individually, so assume the entire script cache is toast ("SCRIPT FLUSH")
connection.Bridge.ServerEndPoint.FlushScriptCache();
connection.BridgeCouldBeNull?.ServerEndPoint?.FlushScriptCache();
message.SetScriptUnavailable();
}
// and apply usual processing for the rest
......@@ -1837,7 +1848,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (message.Command)
{
case RedisCommand.ECHO:
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.Multiplexer.UniqueId));
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.BridgeCouldBeNull?.Multiplexer?.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.IsEqual(RedisLiterals.BytesPONG);
......@@ -1854,7 +1865,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
if (happy)
{
if (establishConnection) connection.Bridge.OnFullyEstablished(connection);
if (establishConnection) connection.BridgeCouldBeNull?.OnFullyEstablished(connection);
SetResult(message, happy);
return true;
}
......
......@@ -480,12 +480,15 @@ internal void OnFullyEstablished(PhysicalConnection connection)
try
{
if (connection == null) return;
var bridge = connection.Bridge;
if (bridge == subscription)
var bridge = connection.BridgeCouldBeNull;
if (bridge != null)
{
Multiplexer.ResendSubscriptions(this);
if (bridge == subscription)
{
Multiplexer.ResendSubscriptions(this);
}
Multiplexer.OnConnectionRestored(EndPoint, bridge.ConnectionType);
}
Multiplexer.OnConnectionRestored(EndPoint, bridge.ConnectionType);
}
catch (Exception ex)
{
......@@ -630,7 +633,16 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
else
{
Multiplexer.Trace("Writing direct: " + message);
connection.Bridge.WriteMessageTakingWriteLock(connection, message);
var bridge = connection.BridgeCouldBeNull;
if (bridge == null)
{
throw new ObjectDisposedException(connection.ToString());
}
else
{
bridge.WriteMessageTakingWriteLock(connection, message);
}
}
}
}
......@@ -676,14 +688,19 @@ private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
}
}
var connType = connection.Bridge.ConnectionType;
var bridge = connection.BridgeCouldBeNull;
if (bridge == null)
{
return Task.CompletedTask;
}
var connType = bridge.ConnectionType;
if (connType == ConnectionType.Interactive)
{
Multiplexer.LogLocked(log, "Auto-configure...");
AutoConfigure(connection);
}
Multiplexer.LogLocked(log, "Sending critical tracer: {0}", connection.Bridge);
Multiplexer.LogLocked(log, "Sending critical tracer: {0}", bridge);
var tracer = GetTracerMessage(true);
tracer = LoggingMessage.Create(log, tracer);
WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment