Commit 2c0707d0 authored by Marc Gravell's avatar Marc Gravell

Try to make sure that QUIT doesn't get stuck to one side with no chance of...

Try to make sure that QUIT doesn't get stuck to one side with no chance of completion; do a better job of completing messages that we never send; stamp time of ambient exceptions
parent 5afb8a5d
...@@ -52,7 +52,7 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output) ...@@ -52,7 +52,7 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output)
SyncTimeout = 15000, SyncTimeout = 15000,
}; };
var conn = ConnectionMultiplexer.Connect(options); var conn = ConnectionMultiplexer.Connect(options);
conn.MessageFaulted += (msg, ex, origin) => output.WriteLine($"Faulted from '{origin}': '{msg}' - {ex.Message}"); conn.MessageFaulted += (msg, ex, origin) => output.WriteLine($"Faulted from '{origin}': '{msg}' - '{(ex == null ? "(null)" : ex.Message)}'");
var server = conn.GetServer(ep); var server = conn.GetServer(ep);
var arr = (RedisResult[])server.Execute("module", "list"); var arr = (RedisResult[])server.Execute("module", "list");
bool found = false; bool found = false;
......
...@@ -83,13 +83,13 @@ static TestBase() ...@@ -83,13 +83,13 @@ static TestBase()
} }
}; };
} }
private string Time() => DateTime.UtcNow.ToString("T");
protected void OnConnectionFailed(object sender, ConnectionFailedEventArgs e) protected void OnConnectionFailed(object sender, ConnectionFailedEventArgs e)
{ {
Interlocked.Increment(ref privateFailCount); Interlocked.Increment(ref privateFailCount);
lock (privateExceptions) lock (privateExceptions)
{ {
privateExceptions.Add($"Connection failed ({e.FailureType}): {EndPointCollection.ToString(e.EndPoint)}/{e.ConnectionType}: {e.Exception}"); privateExceptions.Add($"{Time()}: Connection failed ({e.FailureType}): {EndPointCollection.ToString(e.EndPoint)}/{e.ConnectionType}: {e.Exception}");
} }
} }
...@@ -98,7 +98,7 @@ protected void OnInternalError(object sender, InternalErrorEventArgs e) ...@@ -98,7 +98,7 @@ protected void OnInternalError(object sender, InternalErrorEventArgs e)
Interlocked.Increment(ref privateFailCount); Interlocked.Increment(ref privateFailCount);
lock (privateExceptions) lock (privateExceptions)
{ {
privateExceptions.Add("Internal error: " + e.Origin + ", " + EndPointCollection.ToString(e.EndPoint) + "/" + e.ConnectionType); privateExceptions.Add(Time() + ": Internal error: " + e.Origin + ", " + EndPointCollection.ToString(e.EndPoint) + "/" + e.ConnectionType);
} }
} }
......
...@@ -128,7 +128,7 @@ internal static string TryGetAzureRoleInstanceIdNoThrow() ...@@ -128,7 +128,7 @@ internal static string TryGetAzureRoleInstanceIdNoThrow()
internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionType, ConnectionFailureType failureType, Exception exception, bool reconfigure) internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionType, ConnectionFailureType failureType, Exception exception, bool reconfigure)
{ {
if (isDisposed) return; if (_isDisposed) return;
var handler = ConnectionFailed; var handler = ConnectionFailed;
if (handler != null) if (handler != null)
{ {
...@@ -147,7 +147,7 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con ...@@ -147,7 +147,7 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con
try try
{ {
Trace("Internal error: " + origin + ", " + exception == null ? "unknown" : exception.Message); Trace("Internal error: " + origin + ", " + exception == null ? "unknown" : exception.Message);
if (isDisposed) return; if (_isDisposed) return;
var handler = InternalError; var handler = InternalError;
if (handler != null) if (handler != null)
{ {
...@@ -163,7 +163,7 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con ...@@ -163,7 +163,7 @@ internal void OnInternalError(Exception exception, EndPoint endpoint = null, Con
internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionType) internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionType)
{ {
if (isDisposed) return; if (_isDisposed) return;
var handler = ConnectionRestored; var handler = ConnectionRestored;
if (handler != null) if (handler != null)
{ {
...@@ -176,7 +176,7 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT ...@@ -176,7 +176,7 @@ internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionT
private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs> handler) private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs> handler)
{ {
if (isDisposed) return; if (_isDisposed) return;
if (handler != null) if (handler != null)
{ {
UnprocessableCompletionManager.CompleteSyncOrAsync( UnprocessableCompletionManager.CompleteSyncOrAsync(
...@@ -194,7 +194,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs ...@@ -194,7 +194,7 @@ private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs
public event EventHandler<RedisErrorEventArgs> ErrorMessage; public event EventHandler<RedisErrorEventArgs> ErrorMessage;
internal void OnErrorMessage(EndPoint endpoint, string message) internal void OnErrorMessage(EndPoint endpoint, string message)
{ {
if (isDisposed) return; if (_isDisposed) return;
var handler = ErrorMessage; var handler = ErrorMessage;
if (handler != null) if (handler != null)
{ {
...@@ -755,8 +755,8 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re ...@@ -755,8 +755,8 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re
return fallback; return fallback;
} }
private volatile bool isDisposed; private volatile bool _isDisposed;
internal bool IsDisposed => isDisposed; internal bool IsDisposed => _isDisposed;
/// <summary> /// <summary>
/// Create a new ConnectionMultiplexer instance /// Create a new ConnectionMultiplexer instance
...@@ -944,7 +944,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, TextWriter log = nu ...@@ -944,7 +944,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, TextWriter log = nu
server = (ServerEndPoint)servers[endpoint]; server = (ServerEndPoint)servers[endpoint];
if (server == null) if (server == null)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (_isDisposed) throw new ObjectDisposedException(ToString());
server = new ServerEndPoint(this, endpoint); server = new ServerEndPoint(this, endpoint);
servers.Add(endpoint, server); servers.Add(endpoint, server);
...@@ -1314,7 +1314,7 @@ private void ActivateAllServers(TextWriter log) ...@@ -1314,7 +1314,7 @@ private void ActivateAllServers(TextWriter log)
} }
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None) internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (_isDisposed) throw new ObjectDisposedException(ToString());
bool showStats = true; bool showStats = true;
if (log == null) if (log == null)
...@@ -1949,7 +1949,7 @@ public bool IsConnecting ...@@ -1949,7 +1949,7 @@ public bool IsConnecting
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param> /// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public void Close(bool allowCommandsToComplete = true) public void Close(bool allowCommandsToComplete = true)
{ {
isDisposed = true; _isDisposed = true;
_profilingSessionProvider = null; _profilingSessionProvider = null;
using (var tmp = pulse) using (var tmp = pulse)
{ {
...@@ -2004,7 +2004,7 @@ private Task[] QuitAllServers() ...@@ -2004,7 +2004,7 @@ private Task[] QuitAllServers()
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param> /// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public async Task CloseAsync(bool allowCommandsToComplete = true) public async Task CloseAsync(bool allowCommandsToComplete = true)
{ {
isDisposed = true; _isDisposed = true;
using (var tmp = pulse) using (var tmp = pulse)
{ {
pulse = null; pulse = null;
...@@ -2025,12 +2025,12 @@ public async Task CloseAsync(bool allowCommandsToComplete = true) ...@@ -2025,12 +2025,12 @@ public async Task CloseAsync(bool allowCommandsToComplete = true)
public void Dispose() public void Dispose()
{ {
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
Close(!isDisposed); Close(!_isDisposed);
} }
internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> processor, object state, ServerEndPoint server) internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> processor, object state, ServerEndPoint server)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (_isDisposed) throw new ObjectDisposedException(ToString());
if (message == null) if (message == null)
{ {
...@@ -2099,7 +2099,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un ...@@ -2099,7 +2099,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception un
internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server) internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (_isDisposed) throw new ObjectDisposedException(ToString());
if (message == null) // fire-and forget could involve a no-op, represented by null - for example Increment by 0 if (message == null) // fire-and forget could involve a no-op, represented by null - for example Increment by 0
{ {
......
...@@ -126,7 +126,7 @@ public WriteResult TryWrite(Message message, bool isSlave) ...@@ -126,7 +126,7 @@ public WriteResult TryWrite(Message message, bool isSlave)
if (isDisposed) throw new ObjectDisposedException(Name); if (isDisposed) throw new ObjectDisposedException(Name);
if (!IsConnected) if (!IsConnected)
{ {
if (message.IsInternalCall) if (message.IsInternalCall && message.Command != RedisCommand.QUIT)
{ {
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
...@@ -141,12 +141,21 @@ public WriteResult TryWrite(Message message, bool isSlave) ...@@ -141,12 +141,21 @@ public WriteResult TryWrite(Message message, bool isSlave)
else else
{ {
// sorry, we're just not ready for you yet; // sorry, we're just not ready for you yet;
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.NoConnectionAvailable; return WriteResult.NoConnectionAvailable;
} }
} }
var physical = this.physical; var physical = this.physical;
if (physical == null) return WriteResult.NoConnectionAvailable; if (physical == null)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.NoConnectionAvailable;
}
var result = WriteMessageTakingWriteLock(physical, message); var result = WriteMessageTakingWriteLock(physical, message);
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
...@@ -519,23 +528,29 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave) ...@@ -519,23 +528,29 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
/// This writes a message to the output stream /// This writes a message to the output stream
/// </summary> /// </summary>
/// <param name="physical">The phsyical connection to write to.</param> /// <param name="physical">The phsyical connection to write to.</param>
/// <param name="next">The message to be written.</param> /// <param name="message">The message to be written.</param>
internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Message next) internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Message message)
{ {
Trace("Writing: " + next); Trace("Writing: " + message);
next.SetEnqueued(); message.SetEnqueued();
WriteResult result; WriteResult result;
bool haveLock = false; bool haveLock = false;
Monitor.TryEnter(WriteLock, TimeoutMilliseconds, ref haveLock); Monitor.TryEnter(WriteLock, TimeoutMilliseconds, ref haveLock);
if (!haveLock) return WriteResult.TimeoutBeforeWrite; if (!haveLock)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}
try try
{ {
var messageIsSent = false; var messageIsSent = false;
if (next is IMultiMessage) if (message is IMultiMessage)
{ {
SelectDatabaseInsideWriteLock(physical, next); // need to switch database *before* the transaction SelectDatabaseInsideWriteLock(physical, message); // need to switch database *before* the transaction
foreach (var subCommand in ((IMultiMessage)next).GetMessages(physical)) foreach (var subCommand in ((IMultiMessage)message).GetMessages(physical))
{ {
result = WriteMessageToServerInsideWriteLock(physical, subCommand); result = WriteMessageToServerInsideWriteLock(physical, subCommand);
if (result != WriteResult.Success) if (result != WriteResult.Success)
...@@ -543,24 +558,24 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -543,24 +558,24 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
// we screwed up; abort; note that WriteMessageToServer already // we screwed up; abort; note that WriteMessageToServer already
// killed the underlying connection // killed the underlying connection
Trace("Unable to write to server"); Trace("Unable to write to server");
next.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString()); message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
this.CompleteSyncOrAsync(next); this.CompleteSyncOrAsync(message);
return result; return result;
} }
//The parent message (next) may be returned from GetMessages //The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below //and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == next; messageIsSent = messageIsSent || subCommand == message;
} }
if (!messageIsSent) if (!messageIsSent)
{ {
next.SetRequestSent(); // well, it was attempted, at least... message.SetRequestSent(); // well, it was attempted, at least...
} }
result = WriteResult.Success; result = WriteResult.Success;
} }
else else
{ {
result = WriteMessageToServerInsideWriteLock(physical, next); result = WriteMessageToServerInsideWriteLock(physical, message);
} }
result = physical.WakeWriterAndCheckForThrottle(); result = physical.WakeWriterAndCheckForThrottle();
} }
......
...@@ -259,10 +259,10 @@ internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel) ...@@ -259,10 +259,10 @@ internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
#endif #endif
[Conditional("TEST")] [Conditional("TEST")]
internal void OnMessageFaulted(Message msg, Exception fault, [CallerMemberName] string origin = null) internal void OnMessageFaulted(Message msg, Exception fault, [CallerMemberName] string origin = default, [CallerFilePath] string path = default, [CallerLineNumber] int lineNumber = default)
{ {
#if TEST #if TEST
MessageFaulted?.Invoke(msg?.CommandAndKey, fault, origin); MessageFaulted?.Invoke(msg?.CommandAndKey, fault, $"{origin} ({path}#{lineNumber})");
#endif #endif
} }
} }
......
...@@ -321,17 +321,24 @@ internal void AutoConfigure(PhysicalConnection connection) ...@@ -321,17 +321,24 @@ internal void AutoConfigure(PhysicalConnection connection)
=> (uint)System.Threading.Interlocked.Increment(ref _nextReplicaOffset); => (uint)System.Threading.Interlocked.Increment(ref _nextReplicaOffset);
internal Task Close(ConnectionType connectionType) internal Task Close(ConnectionType connectionType)
{
try
{ {
var tmp = GetBridge(connectionType, create: false); var tmp = GetBridge(connectionType, create: false);
if (tmp == null || !tmp.IsConnected || !Multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT)) if (tmp == null || !tmp.IsConnected || !Multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT))
{ {
return CompletedTask<bool>.Default(null); return Task.CompletedTask;
} }
else else
{ {
return WriteDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: tmp); return WriteDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: tmp);
} }
} }
catch (Exception ex)
{
return Task.FromException(ex);
}
}
internal void FlushScriptCache() internal void FlushScriptCache()
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment