Commit 3cd098b4 authored by Marc Gravell's avatar Marc Gravell

handle AggregateException it SafeStatus (transaction tests); be more...

handle AggregateException it SafeStatus (transaction tests); be more aggressive about completing the queue in ChannelMessageQueue.Unsubscribe[Async]Impl
parent c1b1f25e
......@@ -229,6 +229,12 @@ static TaskStatus SafeStatus(Task task)
{
if (!task.Wait(1000)) throw new TimeoutException("timeout waiting for task to complete");
}
catch(AggregateException ex)
when (ex.InnerException is TaskCanceledException
|| (ex.InnerExceptions.Count == 1 && ex.InnerException is TaskCanceledException))
{
return TaskStatus.Canceled;
}
catch (TaskCanceledException)
{
return TaskStatus.Canceled;
......
......@@ -210,22 +210,23 @@ private async void OnMessageAsyncImpl()
internal void UnsubscribeImpl(Exception error = null, CommandFlags flags = CommandFlags.None)
{
var parent = _parent;
_parent = null;
if (parent != null)
{
parent.UnsubscribeAsync(Channel, HandleMessage, flags);
_parent = null;
_queue.Writer.TryComplete(error);
}
_queue.Writer.TryComplete(error);
}
internal async Task UnsubscribeAsyncImpl(Exception error = null, CommandFlags flags = CommandFlags.None)
{
var parent = _parent;
_parent = null;
if (parent != null)
{
await parent.UnsubscribeAsync(Channel, HandleMessage, flags).ConfigureAwait(false);
_parent = null;
_queue.Writer.TryComplete(error);
await parent.UnsubscribeAsync(Channel, HandleMessage, flags).ConfigureAwait(false);
}
_queue.Writer.TryComplete(error);
}
internal static bool IsOneOf(Action<RedisChannel, RedisValue> handler)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment