Commit fd7f7a3a authored by Marc Gravell's avatar Marc Gravell

add GC test; add indirection between heartbeat and muxer

parent 6724ed4e
using System;
using System.Threading;
using Xunit;
using Xunit.Abstractions;
namespace StackExchange.Redis.Tests
{
[Collection(NonParallelCollection.Name)] // because I need to measure some things that could get confused
public class GarbageCollectionTests : TestBase
{
public GarbageCollectionTests(ITestOutputHelper helper) : base(helper) { }
private static void ForceGC()
{
for(int i = 0; i < 3; i++)
{
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
GC.WaitForPendingFinalizers();
}
}
#if DEBUG
[Fact]
public void MuxerIsCollected()
{
// first check WeakReference works like we expect
var obj = new object();
var wr = new WeakReference(obj);
obj = null;
ForceGC();
Assert.Null(wr.Target);
var muxer = Create(); // deliberately not "using"
muxer.GetDatabase().Ping();
ForceGC();
int before = ConnectionMultiplexer.CollectedWithoutDispose;
wr = new WeakReference(muxer);
muxer = null;
ForceGC();
int after = ConnectionMultiplexer.CollectedWithoutDispose;
Thread.Sleep(TimeSpan.FromSeconds(60));
Assert.Null(wr.Target);
Assert.Equal(before + 1, after);
}
#endif
}
}
......@@ -23,6 +23,18 @@ public sealed partial class ConnectionMultiplexer : IConnectionMultiplexer, IDis
private static TaskFactory _factory = null;
#if DEBUG
private static int _collectedWithoutDispose;
internal static int CollectedWithoutDispose => Thread.VolatileRead(ref _collectedWithoutDispose);
/// <summary>
/// Invoked by the garbage collector
/// </summary>
~ConnectionMultiplexer()
{
Interlocked.Increment(ref _collectedWithoutDispose);
}
#endif
/// <summary>
/// Provides a way of overriding the default Task Factory. If not set, it will use the default Task.Factory.
/// Useful when top level code sets it's own factory which may interfere with Redis queries.
......@@ -911,7 +923,7 @@ internal EndPoint[] GetEndPoints()
if (_count == 0) return Array.Empty<EndPoint>();
var arr = new EndPoint[_count];
for(int i = 0; i < _count; i++)
for (int i = 0; i < _count; i++)
{
arr[i] = _arr[i].EndPoint;
}
......@@ -978,9 +990,46 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
partial void OnCreateReaderWriter(ConfigurationOptions configuration);
internal const int MillisecondsPerHeartbeat = 1000;
private sealed class TimerToken
{
public TimerToken(ConnectionMultiplexer muxer)
{
_ref = new WeakReference(muxer);
}
private Timer _timer;
public void SetTimer(Timer timer) => _timer = timer;
private readonly WeakReference _ref;
private static readonly TimerCallback Heartbeat = state =>
{
var token = (TimerToken)state;
var muxer = (ConnectionMultiplexer)(token._ref?.Target);
if (muxer != null)
{
muxer.OnHeartbeat();
}
else
{
// the muxer got disposed from out of us; kill the timer
var tmp = token._timer;
token._timer = null;
if (tmp != null) try { tmp.Dispose(); } catch { }
}
};
internal static IDisposable Create(ConnectionMultiplexer connection)
{
var token = new TimerToken(connection);
var timer = new Timer(Heartbeat, token, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
token.SetTimer(timer);
return timer;
}
}
private static readonly TimerCallback heartbeat = state => ((ConnectionMultiplexer)state).OnHeartbeat();
private int _activeHeartbeatErrors;
private void OnHeartbeat()
......@@ -1556,7 +1605,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (first)
{
LogLocked(log, "Starting heartbeat...");
pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
pulse = TimerToken.Create(this);
}
if (publishReconfigure)
{
......@@ -1773,7 +1822,7 @@ internal void UpdateClusterRange(ClusterConfiguration configuration)
}
}
private Timer pulse;
private IDisposable pulse;
internal ServerEndPoint SelectServer(Message message)
{
......@@ -1976,6 +2025,7 @@ public async Task CloseAsync(bool allowCommandsToComplete = true)
/// </summary>
public void Dispose()
{
GC.SuppressFinalize(this);
Close(!isDisposed);
}
......@@ -2015,7 +2065,7 @@ internal Exception GetException(WriteResult result, Message message, ServerEndPo
return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
case WriteResult.TimeoutBeforeWrite:
return ExceptionFactory.Timeout(IncludeDetailInExceptions, "The timeout was reached before the message could be written to the output buffer, and it was not sent ("
+ Format.ToString(TimeoutMilliseconds)+ "ms)", message, server);
+ Format.ToString(TimeoutMilliseconds) + "ms)", message, server);
case WriteResult.WriteFailure:
default:
return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment