Commit 6aee9105 authored by Marc Gravell's avatar Marc Gravell

remove all the `asyncCompletionQueue` stuff; no longer makes sense

parent 0222f5d8
...@@ -42,9 +42,6 @@ public async Task MassiveBulkOpsAsync(bool withContinuation) ...@@ -42,9 +42,6 @@ public async Task MassiveBulkOpsAsync(bool withContinuation)
Output.WriteLine("{2}: Time for {0} ops: {1}ms ({3}, {4}); ops/s: {5}", AsyncOpsQty, watch.ElapsedMilliseconds, Me(), Output.WriteLine("{2}: Time for {0} ops: {1}ms ({3}, {4}); ops/s: {5}", AsyncOpsQty, watch.ElapsedMilliseconds, Me(),
withContinuation ? "with continuation" : "no continuation", "any order", withContinuation ? "with continuation" : "no continuation", "any order",
AsyncOpsQty / watch.Elapsed.TotalSeconds); AsyncOpsQty / watch.Elapsed.TotalSeconds);
#if DEBUG
Output.WriteLine("Async completion workers: " + (ConnectionMultiplexer.GetAsyncCompletionWorkerCount() - oldAsyncCompletionCount));
#endif
} }
} }
......
...@@ -7,8 +7,6 @@ namespace StackExchange.Redis ...@@ -7,8 +7,6 @@ namespace StackExchange.Redis
{ {
internal sealed partial class CompletionManager internal sealed partial class CompletionManager
{ {
private readonly Queue<ICompletable> asyncCompletionQueue = new Queue<ICompletable>();
private readonly ConnectionMultiplexer multiplexer; private readonly ConnectionMultiplexer multiplexer;
private readonly string name; private readonly string name;
...@@ -38,39 +36,11 @@ public void CompleteSyncOrAsync(ICompletable operation) ...@@ -38,39 +36,11 @@ public void CompleteSyncOrAsync(ICompletable operation)
internal void GetCounters(ConnectionCounters counters) internal void GetCounters(ConnectionCounters counters)
{ {
lock (asyncCompletionQueue)
{
counters.ResponsesAwaitingAsyncCompletion = asyncCompletionQueue.Count;
}
counters.CompletedSynchronously = Interlocked.Read(ref completedSync); counters.CompletedSynchronously = Interlocked.Read(ref completedSync);
counters.CompletedAsynchronously = Interlocked.Read(ref completedAsync); counters.CompletedAsynchronously = Interlocked.Read(ref completedAsync);
counters.FailedAsynchronously = Interlocked.Read(ref failedAsync); counters.FailedAsynchronously = Interlocked.Read(ref failedAsync);
} }
internal int GetOutstandingCount()
{
lock(asyncCompletionQueue)
{
return asyncCompletionQueue.Count;
}
}
internal void GetStormLog(StringBuilder sb)
{
lock(asyncCompletionQueue)
{
if (asyncCompletionQueue.Count == 0) return;
sb.Append("Response awaiting completion: ").Append(asyncCompletionQueue.Count).AppendLine();
int total = 0;
foreach(var item in asyncCompletionQueue)
{
if (++total >= 500) break;
item.AppendStormLog(sb);
sb.AppendLine();
}
}
}
private static readonly Action<object> s_AnyOrderCompletionHandler = AnyOrderCompletionHandler; private static readonly Action<object> s_AnyOrderCompletionHandler = AnyOrderCompletionHandler;
private static void AnyOrderCompletionHandler(object state) private static void AnyOrderCompletionHandler(object state)
{ {
...@@ -84,8 +54,5 @@ private static void AnyOrderCompletionHandler(object state) ...@@ -84,8 +54,5 @@ private static void AnyOrderCompletionHandler(object state)
ConnectionMultiplexer.TraceWithoutContext("Async completion error: " + ex.Message); ConnectionMultiplexer.TraceWithoutContext("Async completion error: " + ex.Message);
} }
} }
partial void OnCompletedAsync();
} }
} }
...@@ -2041,11 +2041,9 @@ void add(string lk, string sk, string v) ...@@ -2041,11 +2041,9 @@ void add(string lk, string sk, string v)
} }
} }
int queue = server.GetOutstandingCount(message.Command, out int inst, out int qs, out int qc, out int @in); server.GetOutstandingCount(message.Command, out int inst, out int qs, out int @in);
add("Instantaneous", "inst", inst.ToString()); add("Instantaneous", "inst", inst.ToString());
add("Queue-Length", "queue", queue.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString()); add("Queue-Awaiting-Response", "qs", qs.ToString());
add("Queue-Completion-Outstanding", "qc", qc.ToString());
add("Inbound-Bytes", "in", @in.ToString()); add("Inbound-Bytes", "in", @in.ToString());
add("Manager", "mgr", SocketManager?.GetState()); add("Manager", "mgr", SocketManager?.GetState());
...@@ -2071,7 +2069,7 @@ void add(string lk, string sk, string v) ...@@ -2071,7 +2069,7 @@ void add(string lk, string sk, string v)
sb.Append(timeoutHelpLink); sb.Append(timeoutHelpLink);
sb.Append(")"); sb.Append(")");
errMessage = sb.ToString(); errMessage = sb.ToString();
if (StormLogThreshold >= 0 && queue >= StormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0) if (StormLogThreshold >= 0 && qs >= StormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0)
{ {
var log = server.GetStormLog(message.Command); var log = server.GetStormLog(message.Command);
if (string.IsNullOrWhiteSpace(log)) Interlocked.Exchange(ref haveStormLog, 0); if (string.IsNullOrWhiteSpace(log)) Interlocked.Exchange(ref haveStormLog, 0);
......
...@@ -129,17 +129,6 @@ void IServer.Hang(TimeSpan duration, CommandFlags flags) ...@@ -129,17 +129,6 @@ void IServer.Hang(TimeSpan duration, CommandFlags flags)
} }
} }
internal partial class CompletionManager
{
private static long asyncCompletionWorkerCount;
#pragma warning disable RCS1047 // Non-asynchronous method name should not end with 'Async'.
partial void OnCompletedAsync() => Interlocked.Increment(ref asyncCompletionWorkerCount);
#pragma warning restore RCS1047 // Non-asynchronous method name should not end with 'Async'.
internal static long GetAsyncCompletionWorkerCount() => Interlocked.Read(ref asyncCompletionWorkerCount);
}
public partial class ConnectionMultiplexer public partial class ConnectionMultiplexer
{ {
/// <summary> /// <summary>
...@@ -147,11 +136,6 @@ public partial class ConnectionMultiplexer ...@@ -147,11 +136,6 @@ public partial class ConnectionMultiplexer
/// </summary> /// </summary>
public static long GetResultBoxAllocationCount() => ResultBox.GetAllocationCount(); public static long GetResultBoxAllocationCount() => ResultBox.GetAllocationCount();
/// <summary>
/// Gets how many async completion workers were queueud
/// </summary>
public static long GetAsyncCompletionWorkerCount() => CompletionManager.GetAsyncCompletionWorkerCount();
private volatile bool allowConnect = true, private volatile bool allowConnect = true,
ignoreConnect = false; ignoreConnect = false;
......
...@@ -177,7 +177,7 @@ internal void GetCounters(ConnectionCounters counters) ...@@ -177,7 +177,7 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters); physical?.GetCounters(counters);
} }
internal int GetOutstandingCount(out int inst, out int qs, out int qc, out int @in) internal void GetOutstandingCount(out int inst, out int qs, out int @in)
{// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion {// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
var tmp = physical; var tmp = physical;
...@@ -190,8 +190,6 @@ internal int GetOutstandingCount(out int inst, out int qs, out int qc, out int @ ...@@ -190,8 +190,6 @@ internal int GetOutstandingCount(out int inst, out int qs, out int qc, out int @
qs = tmp.GetSentAwaitingResponseCount(); qs = tmp.GetSentAwaitingResponseCount();
@in = tmp.GetAvailableInboundBytes(); @in = tmp.GetAvailableInboundBytes();
} }
qc = completionManager.GetOutstandingCount();
return qs + qc;
} }
internal string GetStormLog() internal string GetStormLog()
...@@ -200,7 +198,6 @@ internal string GetStormLog() ...@@ -200,7 +198,6 @@ internal string GetStormLog()
.Append(" at ").Append(DateTime.UtcNow) .Append(" at ").Append(DateTime.UtcNow)
.AppendLine().AppendLine(); .AppendLine().AppendLine();
physical?.GetStormLog(sb); physical?.GetStormLog(sb);
completionManager.GetStormLog(sb);
sb.Append("Circular op-count snapshot:"); sb.Append("Circular op-count snapshot:");
AppendProfile(sb); AppendProfile(sb);
sb.AppendLine(); sb.AppendLine();
......
...@@ -86,10 +86,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) ...@@ -86,10 +86,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
foreach (var pair in subscriptions) foreach (var pair in subscriptions)
{ {
var msg = pair.Value.ForSyncShutdown(); var msg = pair.Value.ForSyncShutdown();
if(msg != null) if(msg != null) UnprocessableCompletionManager?.CompleteSyncOrAsync(msg);
{
// TODO: execute
}
pair.Value.Remove(true, null); pair.Value.Remove(true, null);
pair.Value.Remove(false, null); pair.Value.Remove(false, null);
......
...@@ -367,15 +367,14 @@ internal ServerCounters GetCounters() ...@@ -367,15 +367,14 @@ internal ServerCounters GetCounters()
return counters; return counters;
} }
internal int GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int qc, out int @in) internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int @in)
{ {
var bridge = GetBridge(command, false); var bridge = GetBridge(command, false);
if (bridge == null) if (bridge == null)
{ {
return inst = qs = qc = @in = 0; inst = qs = @in = 0;
} }
bridge.GetOutstandingCount(out inst, out qs, out @in);
return bridge.GetOutstandingCount(out inst, out qs, out qc, out @in);
} }
internal string GetProfile() internal string GetProfile()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment