Commit 21898f22 authored by Marc Gravell's avatar Marc Gravell

fix "transaction" tests; issue is actually a test expectation, not the

code; the question is whether we guarantee that the completion has
*already run* or not - we don't want to do this on the processor, as
anything that involves changing Task status can lead to thread theft, so
it is done via the custom thread pool; the fix here is simply to check
that the task *becomes* cancelled (in a timely fashion), rather than that
the task *is* cancelled; anyone using "await" or ".Wait()" ".Result" will
observe that *anyway*; also tidied up some efficiency code to reduce exception
allocations in this scenario
parent 1a2b3737
...@@ -74,7 +74,7 @@ public async Task BasicTranWithExistsCondition(bool demandKeyExists, bool keyExi ...@@ -74,7 +74,7 @@ public async Task BasicTranWithExistsCondition(bool demandKeyExists, bool keyExi
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, incr.Status); // neq: incr Assert.Equal(TaskStatus.Canceled, SafeStatus(incr)); // neq: incr
Assert.Equal(0, (long)get); // neq: get Assert.Equal(0, (long)get); // neq: get
} }
} }
...@@ -123,7 +123,7 @@ public async Task BasicTranWithEqualsCondition(string expected, string value, bo ...@@ -123,7 +123,7 @@ public async Task BasicTranWithEqualsCondition(string expected, string value, bo
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, incr.Status); // neq: incr Assert.Equal(TaskStatus.Canceled, SafeStatus(incr)); // neq: incr
Assert.Equal(0, (long)get); // neq: get Assert.Equal(0, (long)get); // neq: get
} }
} }
...@@ -165,7 +165,7 @@ public async Task BasicTranWithHashExistsCondition(bool demandKeyExists, bool ke ...@@ -165,7 +165,7 @@ public async Task BasicTranWithHashExistsCondition(bool demandKeyExists, bool ke
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, incr.Status); // neq: incr Assert.Equal(TaskStatus.Canceled, SafeStatus(incr)); // neq: incr
Assert.Equal(0, (long)get); // neq: get Assert.Equal(0, (long)get); // neq: get
} }
} }
...@@ -215,12 +215,28 @@ public async Task BasicTranWithHashEqualsCondition(string expected, string value ...@@ -215,12 +215,28 @@ public async Task BasicTranWithHashEqualsCondition(string expected, string value
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, incr.Status); // neq: incr Assert.Equal(TaskStatus.Canceled, SafeStatus(incr)); // neq: incr
Assert.Equal(0, (long)get); // neq: get Assert.Equal(0, (long)get); // neq: get
} }
} }
} }
static TaskStatus SafeStatus(Task task)
{
if(task.Status == TaskStatus.WaitingForActivation)
{
try
{
if (!task.Wait(1000)) throw new TimeoutException("timeout waiting for task to complete");
}
catch (TaskCanceledException)
{
return TaskStatus.Canceled;
}
}
return task.Status;
}
[Theory] [Theory]
[InlineData(false, false, true)] [InlineData(false, false, true)]
[InlineData(false, true, false)] [InlineData(false, true, false)]
...@@ -256,7 +272,7 @@ public async Task BasicTranWithListExistsCondition(bool demandKeyExists, bool ke ...@@ -256,7 +272,7 @@ public async Task BasicTranWithListExistsCondition(bool demandKeyExists, bool ke
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Null((string)get); // neq: get Assert.Null((string)get); // neq: get
} }
} }
...@@ -305,7 +321,7 @@ public async Task BasicTranWithListEqualsCondition(string expected, string value ...@@ -305,7 +321,7 @@ public async Task BasicTranWithListEqualsCondition(string expected, string value
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Null((string)get); // neq: get Assert.Null((string)get); // neq: get
} }
} }
...@@ -396,7 +412,7 @@ public async Task BasicTranWithStringLengthCondition(string value, ComparisonTyp ...@@ -396,7 +412,7 @@ public async Task BasicTranWithStringLengthCondition(string value, ComparisonTyp
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Equal(0, get); // neq: get Assert.Equal(0, get); // neq: get
} }
} }
...@@ -474,7 +490,7 @@ public async Task BasicTranWithHashLengthCondition(string value, ComparisonType ...@@ -474,7 +490,7 @@ public async Task BasicTranWithHashLengthCondition(string value, ComparisonType
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Equal(0, get); // neq: get Assert.Equal(0, get); // neq: get
} }
} }
...@@ -552,7 +568,7 @@ public async Task BasicTranWithSetCardinalityCondition(string value, ComparisonT ...@@ -552,7 +568,7 @@ public async Task BasicTranWithSetCardinalityCondition(string value, ComparisonT
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Equal(0, get); // neq: get Assert.Equal(0, get); // neq: get
} }
} }
...@@ -594,7 +610,7 @@ public async Task BasicTranWithSetContainsCondition(bool demandKeyExists, bool k ...@@ -594,7 +610,7 @@ public async Task BasicTranWithSetContainsCondition(bool demandKeyExists, bool k
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, incr.Status); // neq: incr Assert.Equal(TaskStatus.Canceled, SafeStatus(incr)); // neq: incr
Assert.Equal(0, (long)get); // neq: get Assert.Equal(0, (long)get); // neq: get
} }
} }
...@@ -672,7 +688,7 @@ public async Task BasicTranWithSortedSetCardinalityCondition(string value, Compa ...@@ -672,7 +688,7 @@ public async Task BasicTranWithSortedSetCardinalityCondition(string value, Compa
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Equal(0, get); // neq: get Assert.Equal(0, get); // neq: get
} }
} }
...@@ -714,7 +730,7 @@ public async Task BasicTranWithSortedSetContainsCondition(bool demandKeyExists, ...@@ -714,7 +730,7 @@ public async Task BasicTranWithSortedSetContainsCondition(bool demandKeyExists,
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, incr.Status); // neq: incr Assert.Equal(TaskStatus.Canceled, SafeStatus(incr)); // neq: incr
Assert.Equal(0, (long)get); // neq: get Assert.Equal(0, (long)get); // neq: get
} }
} }
...@@ -792,7 +808,7 @@ public async Task BasicTranWithListLengthCondition(string value, ComparisonType ...@@ -792,7 +808,7 @@ public async Task BasicTranWithListLengthCondition(string value, ComparisonType
{ {
Assert.False(await exec, "neq: exec"); Assert.False(await exec, "neq: exec");
Assert.False(cond.WasSatisfied, "neq: was satisfied"); Assert.False(cond.WasSatisfied, "neq: was satisfied");
Assert.Equal(TaskStatus.Canceled, push.Status); // neq: push Assert.Equal(TaskStatus.Canceled, SafeStatus(push)); // neq: push
Assert.Equal(0, get); // neq: get Assert.Equal(0, get); // neq: get
} }
} }
......
...@@ -584,10 +584,7 @@ internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, Comman ...@@ -584,10 +584,7 @@ internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, Comman
| masterSlave; | masterSlave;
} }
internal void Cancel(Exception ex = null) internal void Cancel() => resultBox?.Cancel();
{
resultProcessor?.SetException(this, ex ?? new TaskCanceledException());
}
// true if ready to be completed (i.e. false if re-issued to another server) // true if ready to be completed (i.e. false if re-issued to another server)
internal bool ComputeResult(PhysicalConnection connection, RawResult result) internal bool ComputeResult(PhysicalConnection connection, RawResult result)
......
...@@ -346,7 +346,7 @@ private void AbandonPendingBacklog(Exception ex) ...@@ -346,7 +346,7 @@ private void AbandonPendingBacklog(Exception ex)
next = DequeueNextPendingBacklog(); next = DequeueNextPendingBacklog();
if(next != null) if(next != null)
{ {
next.Cancel(ex); next.SetException(ex);
CompleteSyncOrAsync(next); CompleteSyncOrAsync(next);
} }
} while (next != null); } while (next != null);
......
...@@ -11,7 +11,7 @@ internal abstract partial class ResultBox ...@@ -11,7 +11,7 @@ internal abstract partial class ResultBox
public abstract bool IsAsync { get; } public abstract bool IsAsync { get; }
public bool IsFaulted => _exception != null; public bool IsFaulted => _exception != null;
public void SetException(Exception exception) => _exception = exception; public void SetException(Exception exception) => _exception = exception ?? s_cancelled;
public abstract bool TryComplete(bool isAsync); public abstract bool TryComplete(bool isAsync);
...@@ -22,6 +22,14 @@ protected static void IncrementAllocationCount() ...@@ -22,6 +22,14 @@ protected static void IncrementAllocationCount()
} }
static partial void OnAllocated(); static partial void OnAllocated();
public void Cancel() => _exception = s_cancelled;
// in theory nobody should directly observe this; the only things
// that call Cancel are transactions etc - TCS-based, and we detect
// that and use TrySetCanceled instead
// about any confusion in stack-trace
static readonly Exception s_cancelled = new TaskCanceledException();
} }
internal sealed class ResultBox<T> : ResultBox internal sealed class ResultBox<T> : ResultBox
...@@ -35,11 +43,6 @@ public ResultBox(object stateOrCompletionSource) ...@@ -35,11 +43,6 @@ public ResultBox(object stateOrCompletionSource)
this.stateOrCompletionSource = stateOrCompletionSource; this.stateOrCompletionSource = stateOrCompletionSource;
} }
public object AsyncState =>
stateOrCompletionSource is TaskCompletionSource<T>
? ((TaskCompletionSource<T>) stateOrCompletionSource).Task.AsyncState
: stateOrCompletionSource;
public static ResultBox<T> Get(object stateOrCompletionSource) public static ResultBox<T> Get(object stateOrCompletionSource)
{ {
ResultBox<T> found; ResultBox<T> found;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment