Commit 693f3e44 authored by Marc Gravell's avatar Marc Gravell

move to a shared socketmanager by default; remove existing use of...

move to a shared socketmanager by default; remove existing use of SocketManager in tests - prefer the default behaviour - TODO: add explicit tests that focus on the new SocketManager reality
parent d2a8296d
...@@ -27,8 +27,7 @@ static BookSleeveTestBase() ...@@ -27,8 +27,7 @@ static BookSleeveTestBase()
public static string CreateUniqueName() => Guid.NewGuid().ToString("N"); public static string CreateUniqueName() => Guid.NewGuid().ToString("N");
internal static IServer GetServer(ConnectionMultiplexer conn) => conn.GetServer(conn.GetEndPoints()[0]); internal static IServer GetServer(ConnectionMultiplexer conn) => conn.GetServer(conn.GetEndPoints()[0]);
private static readonly SocketManager socketManager = new SocketManager(nameof(BookSleeveTestBase));
internal static ConnectionMultiplexer GetRemoteConnection(bool open = true, bool allowAdmin = false, bool waitForOpen = false, int syncTimeout = 5000, int ioTimeout = 5000) internal static ConnectionMultiplexer GetRemoteConnection(bool open = true, bool allowAdmin = false, bool waitForOpen = false, int syncTimeout = 5000, int ioTimeout = 5000)
{ {
return GetConnection(TestConfig.Current.RemoteServer, TestConfig.Current.RemotePort, open, allowAdmin, waitForOpen, syncTimeout, ioTimeout); return GetConnection(TestConfig.Current.RemoteServer, TestConfig.Current.RemotePort, open, allowAdmin, waitForOpen, syncTimeout, ioTimeout);
...@@ -41,7 +40,6 @@ private static ConnectionMultiplexer GetConnection(string host, int port, bool o ...@@ -41,7 +40,6 @@ private static ConnectionMultiplexer GetConnection(string host, int port, bool o
EndPoints = { { host, port } }, EndPoints = { { host, port } },
AllowAdmin = allowAdmin, AllowAdmin = allowAdmin,
SyncTimeout = syncTimeout, SyncTimeout = syncTimeout,
SocketManager = socketManager,
ResponseTimeout = ioTimeout ResponseTimeout = ioTimeout
}; };
var conn = ConnectionMultiplexer.Connect(options); var conn = ConnectionMultiplexer.Connect(options);
...@@ -67,7 +65,6 @@ internal static ConnectionMultiplexer GetSecuredConnection() ...@@ -67,7 +65,6 @@ internal static ConnectionMultiplexer GetSecuredConnection()
EndPoints = { { TestConfig.Current.SecureServer, TestConfig.Current.SecurePort } }, EndPoints = { { TestConfig.Current.SecureServer, TestConfig.Current.SecurePort } },
Password = "changeme", Password = "changeme",
SyncTimeout = 6000, SyncTimeout = 6000,
SocketManager = socketManager
}; };
var conn = ConnectionMultiplexer.Connect(options); var conn = ConnectionMultiplexer.Connect(options);
conn.InternalError += (s, args) => Trace.WriteLine(args.Exception.Message, args.Origin); conn.InternalError += (s, args) => Trace.WriteLine(args.Exception.Message, args.Origin);
......
...@@ -169,18 +169,16 @@ public async Task DeslaveGoesToPrimary() ...@@ -169,18 +169,16 @@ public async Task DeslaveGoesToPrimary()
} }
} }
[Theory] [Fact]
[InlineData(false)] public async Task SubscriptionsSurviveMasterSwitchAsync()
[InlineData(true)]
public async Task SubscriptionsSurviveMasterSwitchAsync(bool useSharedSocketManager)
{ {
if (RunningInCI) if (RunningInCI)
{ {
Skip.Inconclusive("TODO: Fix race in broadcast reconfig a zero latency."); Skip.Inconclusive("TODO: Fix race in broadcast reconfig a zero latency.");
} }
using (var a = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager)) using (var a = Create(allowAdmin: true))
using (var b = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager)) using (var b = Create(allowAdmin: true))
{ {
RedisChannel channel = Me(); RedisChannel channel = Me();
var subA = a.GetSubscriber(); var subA = a.GetSubscriber();
......
...@@ -26,7 +26,7 @@ public void SingleKeyLength() ...@@ -26,7 +26,7 @@ public void SingleKeyLength()
[Fact] [Fact]
public void MultiKeyLength() public void MultiKeyLength()
{ {
using (var conn = Create(useSharedSocketManager: true)) using (var conn = Create())
{ {
var db = conn.GetDatabase(); var db = conn.GetDatabase();
RedisKey[] keys = { "hll1", "hll2", "hll3" }; RedisKey[] keys = { "hll1", "hll2", "hll3" };
...@@ -39,4 +39,4 @@ public void MultiKeyLength() ...@@ -39,4 +39,4 @@ public void MultiKeyLength()
} }
} }
} }
} }
\ No newline at end of file
...@@ -33,7 +33,7 @@ public void FlushFetchRandomKey() ...@@ -33,7 +33,7 @@ public void FlushFetchRandomKey()
{ {
using (var conn = Create(allowAdmin: true)) using (var conn = Create(allowAdmin: true))
{ {
var db = conn.GetDatabase(7); var db = conn.GetDatabase(14);
conn.GetServer(TestConfig.Current.MasterServerAndPort).FlushDatabase(); conn.GetServer(TestConfig.Current.MasterServerAndPort).FlushDatabase();
string anyKey = db.KeyRandom(); string anyKey = db.KeyRandom();
......
...@@ -25,7 +25,6 @@ protected TestBase(ITestOutputHelper output) ...@@ -25,7 +25,6 @@ protected TestBase(ITestOutputHelper output)
Output = output; Output = output;
Output.WriteFrameworkVersion(); Output.WriteFrameworkVersion();
Writer = new TextWriterOutputHelper(output); Writer = new TextWriterOutputHelper(output);
socketManager = new SocketManager(GetType().Name);
ClearAmbientFailures(); ClearAmbientFailures();
} }
...@@ -38,12 +37,9 @@ protected void CollectGarbage() ...@@ -38,12 +37,9 @@ protected void CollectGarbage()
} }
} }
private readonly SocketManager socketManager;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1063:ImplementIDisposableCorrectly")] [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1063:ImplementIDisposableCorrectly")]
public void Dispose() public void Dispose()
{ {
socketManager?.Dispose();
Teardown(); Teardown();
} }
...@@ -199,7 +195,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -199,7 +195,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null, int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null,
bool fail = true, string[] disabledCommands = null, string[] enabledCommands = null, bool fail = true, string[] disabledCommands = null, string[] enabledCommands = null,
bool checkConnect = true, bool pause = true, string failMessage = null, bool checkConnect = true, bool pause = true, string failMessage = null,
string channelPrefix = null, bool useSharedSocketManager = true, Proxy? proxy = null, string channelPrefix = null, Proxy? proxy = null,
[CallerMemberName] string caller = null) [CallerMemberName] string caller = null)
{ {
if (pause) Thread.Sleep(250); // get a lot of glitches when hammering new socket creations etc; pace it out a bit if (pause) Thread.Sleep(250); // get a lot of glitches when hammering new socket creations etc; pace it out a bit
...@@ -219,7 +215,6 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -219,7 +215,6 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
syncTimeout = int.MaxValue; syncTimeout = int.MaxValue;
} }
if (useSharedSocketManager) config.SocketManager = socketManager;
if (channelPrefix != null) config.ChannelPrefix = channelPrefix; if (channelPrefix != null) config.ChannelPrefix = channelPrefix;
if (tieBreaker != null) config.TieBreaker = tieBreaker; if (tieBreaker != null) config.TieBreaker = tieBreaker;
if (password != null) config.Password = string.IsNullOrEmpty(password) ? null : password; if (password != null) config.Password = string.IsNullOrEmpty(password) ? null : password;
......
...@@ -24,6 +24,6 @@ ...@@ -24,6 +24,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.52" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.53" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -2,21 +2,16 @@ ...@@ -2,21 +2,16 @@
{ {
public partial class ConnectionMultiplexer public partial class ConnectionMultiplexer
{ {
internal SocketManager SocketManager => socketManager; internal SocketManager SocketManager { get; private set; }
private SocketManager socketManager;
private bool ownsSocketManager;
partial void OnCreateReaderWriter(ConfigurationOptions configuration) partial void OnCreateReaderWriter(ConfigurationOptions configuration)
{ {
ownsSocketManager = configuration.SocketManager == null; SocketManager = configuration.SocketManager ?? SocketManager.Shared;
socketManager = configuration.SocketManager ?? new SocketManager(ClientName, configuration.HighPrioritySocketThreads);
} }
partial void OnCloseReaderWriter() partial void OnCloseReaderWriter()
{ {
if (ownsSocketManager) socketManager?.Dispose(); SocketManager = null;
socketManager = null;
} }
partial void OnWriterCreated(); partial void OnWriterCreated();
} }
......
...@@ -598,7 +598,7 @@ private static bool AllComplete(Task[] tasks) ...@@ -598,7 +598,7 @@ private static bool AllComplete(Task[] tasks)
return true; return true;
} }
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log) private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log, [CallerMemberName] string caller = null, [CallerLineNumber] int callerLineNumber = 0)
{ {
if (tasks == null) throw new ArgumentNullException(nameof(tasks)); if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Length == 0) if (tasks.Length == 0)
...@@ -628,7 +628,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -628,7 +628,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var allTasks = Task.WhenAll(tasks).ObserveErrors(); var allTasks = Task.WhenAll(tasks).ObserveErrors();
var any = Task.WhenAny(allTasks, Task.Delay(remaining)).ObserveErrors(); var any = Task.WhenAny(allTasks, Task.Delay(remaining)).ObserveErrors();
bool all = await any.ForAwait() == allTasks; bool all = await any.ForAwait() == allTasks;
LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : "Not all tasks completed cleanly", out busyWorkerCount); LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out busyWorkerCount);
return all; return all;
} }
catch catch
...@@ -2015,8 +2015,11 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -2015,8 +2015,11 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) }; data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
void add(string lk, string sk, string v) void add(string lk, string sk, string v)
{ {
data.Add(Tuple.Create(lk, v)); if (v != null)
sb.Append(", ").Append(sk).Append(": ").Append(v); {
data.Add(Tuple.Create(lk, v));
sb.Append(", ").Append(sk).Append(": ").Append(v);
}
} }
int queue = server.GetOutstandingCount(message.Command, out int inst, out int qs, out int qc, out int @in); int queue = server.GetOutstandingCount(message.Command, out int inst, out int qs, out int qc, out int @in);
...@@ -2025,6 +2028,7 @@ void add(string lk, string sk, string v) ...@@ -2025,6 +2028,7 @@ void add(string lk, string sk, string v)
add("Queue-Awaiting-Response", "qs", qs.ToString()); add("Queue-Awaiting-Response", "qs", qs.ToString());
add("Queue-Completion-Outstanding", "qc", qc.ToString()); add("Queue-Completion-Outstanding", "qc", qc.ToString());
add("Inbound-Bytes", "in", @in.ToString()); add("Inbound-Bytes", "in", @in.ToString());
add("Manager", "mgr", SocketManager?.GetState());
add("Client-Name", "clientName", ClientName); add("Client-Name", "clientName", ClientName);
add("Server-Endpoint", "serverEndpoint", server.EndPoint.ToString()); add("Server-Endpoint", "serverEndpoint", server.EndPoint.ToString());
......
...@@ -213,7 +213,7 @@ void add(string lk, string sk, string v) ...@@ -213,7 +213,7 @@ void add(string lk, string sk, string v)
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago"); add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s"); add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s");
add("Previous-Physical-State", "state", oldState.ToString()); add("Previous-Physical-State", "state", oldState.ToString());
add("Manager", "mgr", Multiplexer?.SocketManager?.GetState());
if (@in >= 0) if (@in >= 0)
{ {
add("Inbound-Bytes", "in", @in.ToString()); add("Inbound-Bytes", "in", @in.ToString());
......
...@@ -105,7 +105,7 @@ internal enum ManagerState ...@@ -105,7 +105,7 @@ internal enum ManagerState
ProcessReadQueue, ProcessReadQueue,
ProcessErrorQueue, ProcessErrorQueue,
} }
/// <summary> /// <summary>
/// Gets the name of this SocketManager instance /// Gets the name of this SocketManager instance
/// </summary> /// </summary>
...@@ -115,7 +115,26 @@ internal enum ManagerState ...@@ -115,7 +115,26 @@ internal enum ManagerState
/// Creates a new (optionally named) <see cref="SocketManager"/> instance /// Creates a new (optionally named) <see cref="SocketManager"/> instance
/// </summary> /// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param> /// <param name="name">The name for this <see cref="SocketManager"/>.</param>
public SocketManager(string name = null) : this(name, true) { } public SocketManager(string name = null)
: this(name, false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { }
internal static SocketManager Shared
{
get
{
var shared = _shared;
if (shared != null) return _shared;
try
{
// note: we'll allow a higher max thread count on the shared one
shared = new SocketManager("DefaultSocketManager", false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS * 2);
if (Interlocked.CompareExchange(ref _shared, shared, null) == null)
shared = null;
} finally { shared?.Dispose(); }
return Interlocked.CompareExchange(ref _shared, null, null);
}
}
private static SocketManager _shared;
/// <summary> /// <summary>
/// Creates a new <see cref="SocketManager"/> instance /// Creates a new <see cref="SocketManager"/> instance
...@@ -123,6 +142,10 @@ internal enum ManagerState ...@@ -123,6 +142,10 @@ internal enum ManagerState
/// <param name="name">The name for this <see cref="SocketManager"/>.</param> /// <param name="name">The name for this <see cref="SocketManager"/>.</param>
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param> /// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
public SocketManager(string name, bool useHighPrioritySocketThreads) public SocketManager(string name, bool useHighPrioritySocketThreads)
: this(name, useHighPrioritySocketThreads, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) {}
const int DEFAULT_MIN_THREADS = 1, DEFAULT_MAX_THREADS = 5;
private SocketManager(string name, bool useHighPrioritySocketThreads, int minThreads, int maxThreads)
{ {
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name; if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
Name = name; Name = name;
...@@ -130,7 +153,9 @@ public SocketManager(string name, bool useHighPrioritySocketThreads) ...@@ -130,7 +153,9 @@ public SocketManager(string name, bool useHighPrioritySocketThreads)
const int Receive_PauseWriterThreshold = 1024 * 1024 * 1024; // let's give it up to 1GiB of buffer for now const int Receive_PauseWriterThreshold = 1024 * 1024 * 1024; // let's give it up to 1GiB of buffer for now
var defaultPipeOptions = PipeOptions.Default; var defaultPipeOptions = PipeOptions.Default;
_scheduler = new DedicatedThreadPoolPipeScheduler(name, priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal); _scheduler = new DedicatedThreadPoolPipeScheduler(name,
minWorkers: minThreads, maxWorkers: maxThreads,
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal :ThreadPriority.Normal);
SendPipeOptions = new PipeOptions( SendPipeOptions = new PipeOptions(
defaultPipeOptions.Pool, _scheduler, _scheduler, defaultPipeOptions.Pool, _scheduler, _scheduler,
pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold, pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold,
...@@ -144,7 +169,7 @@ public SocketManager(string name, bool useHighPrioritySocketThreads) ...@@ -144,7 +169,7 @@ public SocketManager(string name, bool useHighPrioritySocketThreads)
defaultPipeOptions.MinimumSegmentSize, defaultPipeOptions.MinimumSegmentSize,
useSynchronizationContext: false); useSynchronizationContext: false);
} }
readonly DedicatedThreadPoolPipeScheduler _scheduler; DedicatedThreadPoolPipeScheduler _scheduler;
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions; internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
private enum CallbackOperation private enum CallbackOperation
...@@ -156,11 +181,25 @@ private enum CallbackOperation ...@@ -156,11 +181,25 @@ private enum CallbackOperation
/// <summary> /// <summary>
/// Releases all resources associated with this instance /// Releases all resources associated with this instance
/// </summary> /// </summary>
public void Dispose() public void Dispose() => Dispose(true);
private void Dispose(bool disposing)
{ {
_scheduler?.Dispose(); // note: the scheduler *can't* be collected by itself - there will
OnDispose(); // be threads, and those threads will be rooting the DedicatedThreadPool;
// but: we can lend a hand! We need to do this even in the finalizer
try { _scheduler?.Dispose(); } catch { }
_scheduler = null;
if (disposing)
{
GC.SuppressFinalize(this);
OnDispose();
}
} }
/// <summary>
/// Releases *appropriate* resources associated with this instance
/// </summary>
~SocketManager() => Dispose(false);
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log) internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback) void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback)
...@@ -303,5 +342,11 @@ private void Shutdown(Socket socket) ...@@ -303,5 +342,11 @@ private void Shutdown(Socket socket)
try { socket.Dispose(); } catch { } try { socket.Dispose(); } catch { }
} }
} }
internal string GetState()
{
var s = _scheduler;
return s == null ? null : $"{s.BusyCount} of {s.WorkerCount} busy ({s.MaxWorkerCount} max)";
}
} }
} }
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj" /> <ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.52" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.53" />
</ItemGroup> </ItemGroup>
</Project> </Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment