Commit 75208257 authored by Marc Gravell's avatar Marc Gravell

update to current lib

parent 8422a46c
...@@ -7,6 +7,10 @@ internal static class CompletionManagerHelpers ...@@ -7,6 +7,10 @@ internal static class CompletionManagerHelpers
{ {
public static void CompleteSyncOrAsync(this PhysicalBridge bridge, ICompletable operation) public static void CompleteSyncOrAsync(this PhysicalBridge bridge, ICompletable operation)
=> CompletionManager.CompleteSyncOrAsyncImpl(bridge?.completionManager, operation); => CompletionManager.CompleteSyncOrAsyncImpl(bridge?.completionManager, operation);
public static void CompleteAsync(this PhysicalBridge bridge, ICompletable operation)
=> CompletionManager.CompleteAsync(bridge?.completionManager, operation);
public static void CompleteSyncOrAsync(this CompletionManager manager, ICompletable operation) public static void CompleteSyncOrAsync(this CompletionManager manager, ICompletable operation)
=> CompletionManager.CompleteSyncOrAsyncImpl(manager, operation); => CompletionManager.CompleteSyncOrAsyncImpl(manager, operation);
} }
...@@ -19,6 +23,20 @@ internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, IComplet ...@@ -19,6 +23,20 @@ internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, IComplet
else SharedCompleteSyncOrAsync(operation); else SharedCompleteSyncOrAsync(operation);
} }
internal static void CompleteAsync(CompletionManager manager, ICompletable operation)
{
var sched = manager.multiplexer.SocketManager;
if (sched != null)
{
sched.ScheduleTask(s_AnyOrderCompletionHandler, operation);
Interlocked.Increment(ref manager.completedAsync);
}
else
{
SocketManager.Shared.ScheduleTask(s_AnyOrderCompletionHandler, operation);
}
}
private readonly ConnectionMultiplexer multiplexer; private readonly ConnectionMultiplexer multiplexer;
private readonly string name; private readonly string name;
......
...@@ -590,14 +590,8 @@ internal bool ComputeResult(PhysicalConnection connection, RawResult result) ...@@ -590,14 +590,8 @@ internal bool ComputeResult(PhysicalConnection connection, RawResult result)
if (box != null && box.IsFaulted) return false; // already failed (timeout, etc) if (box != null && box.IsFaulted) return false; // already failed (timeout, etc)
if (resultProcessor == null) return true; if (resultProcessor == null) return true;
var accepted = resultProcessor.SetResult(connection, this, result); // false here would be things like resends (MOVED) - the message is not yet complete
if (!accepted) return resultProcessor.SetResult(connection, this, result);
{
var ex = new InvalidOperationException("Message rejected");
ex.Data.Add("got", result.ToString());
connection?.BridgeCouldBeNull?.Multiplexer?.OnMessageFaulted(this, ex);
}
return accepted;
} }
catch (Exception ex) catch (Exception ex)
{ {
......
...@@ -1215,12 +1215,13 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc ...@@ -1215,12 +1215,13 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
private void MatchResult(RawResult result) private void MatchResult(RawResult result)
{ {
var muxer = BridgeCouldBeNull?.Multiplexer;
if (muxer == null) return;
// check to see if it could be an out-of-band pubsub message // check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk) if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk)
{ // out of band message does not match to a queued message {
var muxer = BridgeCouldBeNull?.Multiplexer;
if (muxer == null) return;
// out of band message does not match to a queued message
var items = result.GetItems(); var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message)) if (items.Length >= 3 && items[0].IsEqual(message))
{ {
...@@ -1275,9 +1276,10 @@ private void MatchResult(RawResult result) ...@@ -1275,9 +1276,10 @@ private void MatchResult(RawResult result)
} }
Trace("Response to: " + msg); Trace("Response to: " + msg);
if (msg.ComputeResult(this, result)) if (msg.ComputeResult(this, result) && !msg.TryComplete(false))
{ { // got a result, and we couldn't complete it synchronously;
BridgeCouldBeNull.CompleteSyncOrAsync(msg); // note that we want to complete it async instead
BridgeCouldBeNull.CompleteAsync(msg);
} }
} }
......
using System; using System;
using System.Collections.Generic;
using System.IO; using System.IO;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Net; using System.Net;
...@@ -26,7 +27,7 @@ public sealed partial class SocketManager : IDisposable ...@@ -26,7 +27,7 @@ public sealed partial class SocketManager : IDisposable
/// </summary> /// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param> /// <param name="name">The name for this <see cref="SocketManager"/>.</param>
public SocketManager(string name = null) public SocketManager(string name = null)
: this(name, false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { } : this(name, false, DEFAULT_WORKERS) { }
/// <summary> /// <summary>
/// Default / shared socket manager /// Default / shared socket manager
...@@ -40,7 +41,7 @@ public static SocketManager Shared ...@@ -40,7 +41,7 @@ public static SocketManager Shared
try try
{ {
// note: we'll allow a higher max thread count on the shared one // note: we'll allow a higher max thread count on the shared one
shared = new SocketManager("DefaultSocketManager", false, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS * 2); shared = new SocketManager("DefaultSocketManager", false, DEFAULT_WORKERS * 2);
if (Interlocked.CompareExchange(ref _shared, shared, null) == null) if (Interlocked.CompareExchange(ref _shared, shared, null) == null)
shared = null; shared = null;
} }
...@@ -67,11 +68,11 @@ public override string ToString() ...@@ -67,11 +68,11 @@ public override string ToString()
/// <param name="name">The name for this <see cref="SocketManager"/>.</param> /// <param name="name">The name for this <see cref="SocketManager"/>.</param>
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param> /// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
public SocketManager(string name, bool useHighPrioritySocketThreads) public SocketManager(string name, bool useHighPrioritySocketThreads)
: this(name, useHighPrioritySocketThreads, DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS) { } : this(name, useHighPrioritySocketThreads, DEFAULT_WORKERS) { }
private const int DEFAULT_MIN_THREADS = 1, DEFAULT_MAX_THREADS = 5, MINIMUM_SEGMENT_SIZE = 8 * 1024; private const int DEFAULT_WORKERS = 5, MINIMUM_SEGMENT_SIZE = 8 * 1024;
private SocketManager(string name, bool useHighPrioritySocketThreads, int minThreads, int maxThreads) private SocketManager(string name, bool useHighPrioritySocketThreads, int workerCount)
{ {
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name; if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
Name = name; Name = name;
...@@ -81,12 +82,12 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr ...@@ -81,12 +82,12 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr
var defaultPipeOptions = PipeOptions.Default; var defaultPipeOptions = PipeOptions.Default;
_schedulerPool = new DedicatedThreadPoolPipeScheduler(name + ":IO", _schedulerPool = new DedicatedThreadPoolPipeScheduler(name + ":IO",
minWorkers: minThreads, maxWorkers: maxThreads, workerCount: workerCount,
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal); priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
SendPipeOptions = new PipeOptions( SendPipeOptions = new PipeOptions(
pool: defaultPipeOptions.Pool, pool: defaultPipeOptions.Pool,
readerScheduler: _schedulerPool, // copying from the outbound Pipe to the socket should happen on the worker, to release the lock ASAP readerScheduler: _schedulerPool, // copying from the outbound Pipe to the socket should happen on the worker, to release the lock ASAP
writerScheduler: PipeScheduler.Inline, // it is fine for FlushAsync to run inline - after the handshake, we just `Wait()` on this, not `await` writerScheduler: _schedulerPool,
pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold, pauseWriterThreshold: defaultPipeOptions.PauseWriterThreshold,
resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold, resumeWriterThreshold: defaultPipeOptions.ResumeWriterThreshold,
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE), minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
...@@ -94,14 +95,14 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr ...@@ -94,14 +95,14 @@ private SocketManager(string name, bool useHighPrioritySocketThreads, int minThr
ReceivePipeOptions = new PipeOptions( ReceivePipeOptions = new PipeOptions(
pool: defaultPipeOptions.Pool, pool: defaultPipeOptions.Pool,
readerScheduler: PipeScheduler.Inline, // let the IO thread stomp all over the place on the receives readerScheduler: PipeScheduler.Inline, // let the IO thread stomp all over the place on the receives
writerScheduler: PipeScheduler.Inline, // let the IO thread stomp all over the place on the receive writerScheduler: _schedulerPool,
pauseWriterThreshold: Receive_PauseWriterThreshold, pauseWriterThreshold: Receive_PauseWriterThreshold,
resumeWriterThreshold: Receive_ResumeWriterThreshold, resumeWriterThreshold: Receive_ResumeWriterThreshold,
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE), minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
useSynchronizationContext: false); useSynchronizationContext: false);
_completionPool = new DedicatedThreadPoolPipeScheduler(name + ":Completion", _completionPool = new DedicatedThreadPoolPipeScheduler(name + ":Completion",
minWorkers: 1, maxWorkers: maxThreads, useThreadPoolQueueLength: 1); workerCount: workerCount, useThreadPoolQueueLength: 1);
} }
private DedicatedThreadPoolPipeScheduler _schedulerPool, _completionPool; private DedicatedThreadPoolPipeScheduler _schedulerPool, _completionPool;
...@@ -162,7 +163,7 @@ internal static Socket CreateSocket(EndPoint endpoint) ...@@ -162,7 +163,7 @@ internal static Socket CreateSocket(EndPoint endpoint)
internal string GetState() internal string GetState()
{ {
var s = _schedulerPool; var s = _schedulerPool;
return s == null ? null : $"{s.BusyCount} of {s.WorkerCount} busy ({s.MaxWorkerCount} max)"; return s == null ? null : $"{s.AvailableCount} of {s.WorkerCount} available";
} }
internal void ScheduleTask(Action<object> action, object state) internal void ScheduleTask(Action<object> action, object state)
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.84" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.86" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.0" /> <PackageReference Include="System.IO.Pipelines" Version="4.5.0" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment