Unverified Commit 4949fc4c authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

make transaction inner tasks use RunContinuationsAsynchronously (#950)

* make transaction inner tasks use RunContinuationsAsynchronously (for discussion, see issue #943); detect RunContinuationsAsynchronously in ResultBox completion, and set directly rather than deferred
parent 6dd621b5
...@@ -73,7 +73,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr ...@@ -73,7 +73,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr
} }
else else
{ {
var tcs = TaskSource.Create<T>(asyncState); var tcs = TaskSource.Create<T>(asyncState, TaskCreationOptions.RunContinuationsAsynchronously);
var source = ResultBox<T>.Get(tcs); var source = ResultBox<T>.Get(tcs);
message.SetSource(source, processor); message.SetSource(source, processor);
task = tcs.Task; task = tcs.Task;
......
...@@ -98,8 +98,11 @@ public override bool TryComplete(bool isAsync) ...@@ -98,8 +98,11 @@ public override bool TryComplete(bool isAsync)
{ {
if (stateOrCompletionSource is TaskCompletionSource<T> tcs) if (stateOrCompletionSource is TaskCompletionSource<T> tcs)
{ {
if (isAsync) if (isAsync || (tcs.Task.CreationOptions & TaskCreationOptions.RunContinuationsAsynchronously) != 0)
{ {
// either on the async completion step, or the task is guarded
// againsts thread-stealing; complete it directly
// (note: RunContinuationsAsynchronously is only usable from NET46)
UnwrapAndRecycle(this, true, out T val, out Exception ex); UnwrapAndRecycle(this, true, out T val, out Exception ex);
if (ex == null) if (ex == null)
...@@ -117,7 +120,8 @@ public override bool TryComplete(bool isAsync) ...@@ -117,7 +120,8 @@ public override bool TryComplete(bool isAsync)
return true; return true;
} }
else else
{ // looks like continuations; push to async to preserve the reader thread {
// could be thread-stealing continuations; push to async to preserve the reader thread
return false; return false;
} }
} }
......
...@@ -9,7 +9,8 @@ internal static class TaskSource ...@@ -9,7 +9,8 @@ internal static class TaskSource
/// </summary> /// </summary>
/// <typeparam name="T">The type for the created <see cref="TaskCompletionSource{TResult}"/>.</typeparam> /// <typeparam name="T">The type for the created <see cref="TaskCompletionSource{TResult}"/>.</typeparam>
/// <param name="asyncState">The state for the created <see cref="TaskCompletionSource{TResult}"/>.</param> /// <param name="asyncState">The state for the created <see cref="TaskCompletionSource{TResult}"/>.</param>
public static TaskCompletionSource<T> Create<T>(object asyncState) /// <param name="options">The options to apply to the task</param>
=> new TaskCompletionSource<T>(asyncState, TaskCreationOptions.None); public static TaskCompletionSource<T> Create<T>(object asyncState, TaskCreationOptions options = TaskCreationOptions.None)
=> new TaskCompletionSource<T>(asyncState, options);
} }
} }
...@@ -981,6 +981,8 @@ public async Task ExecCompletes_Issue943() ...@@ -981,6 +981,8 @@ public async Task ExecCompletes_Issue943()
} }
Writer.WriteLine($"hash hit: {hashHit}, miss: {hashMiss}; expire hit: {expireHit}, miss: {expireMiss}"); Writer.WriteLine($"hash hit: {hashHit}, miss: {hashMiss}; expire hit: {expireHit}, miss: {expireMiss}");
Assert.Equal(0, hashMiss);
Assert.Equal(0, expireMiss);
} }
} }
} }
...@@ -219,7 +219,7 @@ public bool RemoveClient(RedisClient client) ...@@ -219,7 +219,7 @@ public bool RemoveClient(RedisClient client)
} }
} }
private readonly TaskCompletionSource<ShutdownReason> _shutdown = new TaskCompletionSource<ShutdownReason>(TaskCreationOptions.RunContinuationsAsynchronously); private readonly TaskCompletionSource<ShutdownReason> _shutdown = TaskSource.Create<ShutdownReason>(null, TaskCreationOptions.RunContinuationsAsynchronously);
private bool _isShutdown; private bool _isShutdown;
protected void ThrowIfShutdown() protected void ThrowIfShutdown()
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment