Commit 928f8257 authored by Marc Gravell's avatar Marc Gravell

track the current message so we can give a good error in the case of fail;...

track the current message so we can give a good error in the case of fail; prefer Volatile.Read over CEX /null/null
parent 37fb348e
...@@ -1216,7 +1216,7 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau ...@@ -1216,7 +1216,7 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau
{ {
OnConfigurationChangedBroadcast(blame); OnConfigurationChangedBroadcast(blame);
} }
string activeCause = Interlocked.CompareExchange(ref activeConfigCause, null, null); string activeCause = Volatile.Read(ref activeConfigCause);
if (activeCause == null) if (activeCause == null)
{ {
bool reconfigureAll = fromBroadcast || publishReconfigure; bool reconfigureAll = fromBroadcast || publishReconfigure;
...@@ -2082,8 +2082,9 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo ...@@ -2082,8 +2082,9 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo
var bridge = server.GetBridge(message.Command, false); var bridge = server.GetBridge(message.Command, false);
if (bridge != null) if (bridge != null)
{ {
var active = bridge.GetActiveMessage();
bridge.GetOutstandingCount(out var inst, out var qs, out var @in); bridge.GetOutstandingCount(out var inst, out var qs, out var @in);
counters = $", inst={inst}, qs={qs}, in={@in}"; counters = $", inst={inst}, qs={qs}, in={@in}, active={active}";
} }
} }
catch { } catch { }
...@@ -2271,8 +2272,7 @@ private static int GetThreadPoolStats(out string iocp, out string worker) ...@@ -2271,8 +2272,7 @@ private static int GetThreadPoolStats(out string iocp, out string worker)
/// </summary> /// </summary>
public string GetStormLog() public string GetStormLog()
{ {
var result = Interlocked.CompareExchange(ref stormLogSnapshot, null, null); return Volatile.Read(ref stormLogSnapshot);
return result;
} }
/// <summary> /// <summary>
/// Resets the log of unusual busy patterns /// Resets the log of unusual busy patterns
......
...@@ -541,7 +541,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave) ...@@ -541,7 +541,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
private readonly object SingleWriterLock = new object(); private readonly object SingleWriterLock = new object();
private int reentrantCount = 0; private Message _activeMesssage;
/// <summary> /// <summary>
/// This writes a message to the output stream /// This writes a message to the output stream
/// </summary> /// </summary>
...@@ -565,9 +565,11 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -565,9 +565,11 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
return WriteResult.TimeoutBeforeWrite; return WriteResult.TimeoutBeforeWrite;
} }
if(reentrantCount++ != 0) var existingMessage = Interlocked.CompareExchange(ref _activeMesssage, message, null);
if (existingMessage != null)
{ {
Multiplexer?.OnInfoMessage("reentrant call to WriteMessageTakingWriteLock for " + message.CommandAndKey); Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
return WriteResult.NoConnectionAvailable;
} }
physical.SetWriting(); physical.SetWriting();
var messageIsSent = false; var messageIsSent = false;
...@@ -608,7 +610,7 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -608,7 +610,7 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
{ {
if (haveLock) if (haveLock)
{ {
reentrantCount--; Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us
Monitor.Exit(SingleWriterLock); Monitor.Exit(SingleWriterLock);
} }
} }
...@@ -834,5 +836,7 @@ internal void SimulateConnectionFailure() ...@@ -834,5 +836,7 @@ internal void SimulateConnectionFailure()
} }
physical?.RecordConnectionFailed(ConnectionFailureType.SocketFailure); physical?.RecordConnectionFailed(ConnectionFailureType.SocketFailure);
} }
internal RedisCommand? GetActiveMessage() => Volatile.Read(ref _activeMesssage)?.Command;
} }
} }
...@@ -56,7 +56,7 @@ private static readonly Message ...@@ -56,7 +56,7 @@ private static readonly Message
private IDuplexPipe _ioPipe; private IDuplexPipe _ioPipe;
private Socket _socket; private Socket _socket;
private Socket VolatileSocket => Interlocked.CompareExchange(ref _socket, null, null); private Socket VolatileSocket => Volatile.Read(ref _socket);
public PhysicalConnection(PhysicalBridge bridge) public PhysicalConnection(PhysicalBridge bridge)
{ {
...@@ -1345,7 +1345,7 @@ private void OnDebugAbort() ...@@ -1345,7 +1345,7 @@ private void OnDebugAbort()
// this CEX is just a hardcore "seriously, read the actual value" - there's no // this CEX is just a hardcore "seriously, read the actual value" - there's no
// convenient "Thread.VolatileRead<T>(ref T field) where T : class", and I don't // convenient "Thread.VolatileRead<T>(ref T field) where T : class", and I don't
// want to make the field volatile just for this one place that needs it // want to make the field volatile just for this one place that needs it
if (isReading && Interlocked.CompareExchange(ref _ioPipe, null, null) == null) if (isReading && Volatile.Read(ref _ioPipe) == null)
{ {
// yeah, that's fine... don't worry about it // yeah, that's fine... don't worry about it
} }
......
...@@ -216,7 +216,7 @@ public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, obje ...@@ -216,7 +216,7 @@ public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, obje
return oldOwner.WriteDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState); return oldOwner.WriteDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
} }
internal ServerEndPoint GetOwner() => Interlocked.CompareExchange(ref owner, null, null); internal ServerEndPoint GetOwner() => Volatile.Read(ref owner);
internal void Resubscribe(RedisChannel channel, ServerEndPoint server) internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
{ {
...@@ -232,7 +232,7 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server) ...@@ -232,7 +232,7 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel) internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
{ {
bool changed = false; bool changed = false;
var oldOwner = Interlocked.CompareExchange(ref owner, null, null); var oldOwner = Volatile.Read(ref owner);
if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE)) if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE))
{ {
if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null) if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null)
......
...@@ -42,7 +42,7 @@ internal static SocketManager Shared ...@@ -42,7 +42,7 @@ internal static SocketManager Shared
shared = null; shared = null;
} }
finally { shared?.Dispose(); } finally { shared?.Dispose(); }
return Interlocked.CompareExchange(ref _shared, null, null); return Volatile.Read(ref _shared);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment