Commit aaade660 authored by Nick Craver's avatar Nick Craver

Remove ChannelMessageQueue.IsCompleted

Now uses ChannelMessageQueue.Completion.IsCompleted and should be consistent. Because _completion on UnboundedChannel respect no-synchronous-continuations, the old way doesn't behave as expected (IsCompleted isn't instantly set).
parent c7a017aa
...@@ -359,7 +359,7 @@ public async Task PubSubGetAllCorrectOrder() ...@@ -359,7 +359,7 @@ public async Task PubSubGetAllCorrectOrder()
async Task RunLoop() async Task RunLoop()
{ {
while (!subChannel.IsCompleted) while (!subChannel.Completion.IsCompleted)
{ {
var work = await subChannel.ReadAsync().ForAwait(); var work = await subChannel.ReadAsync().ForAwait();
int i = int.Parse(Encoding.UTF8.GetString(work.Message)); int i = int.Parse(Encoding.UTF8.GetString(work.Message));
...@@ -456,7 +456,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync() ...@@ -456,7 +456,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync()
} }
await subChannel.Completion; await subChannel.Completion;
Assert.True(subChannel.IsCompleted); Assert.True(subChannel.Completion.IsCompleted);
await Assert.ThrowsAsync<ChannelClosedException>(async delegate await Assert.ThrowsAsync<ChannelClosedException>(async delegate
{ {
var final = await subChannel.ReadAsync().ForAwait(); var final = await subChannel.ReadAsync().ForAwait();
...@@ -517,7 +517,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async() ...@@ -517,7 +517,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async()
} }
await subChannel.Completion; await subChannel.Completion;
Assert.True(subChannel.IsCompleted); Assert.True(subChannel.Completion.IsCompleted);
await Assert.ThrowsAsync<ChannelClosedException>(async delegate await Assert.ThrowsAsync<ChannelClosedException>(async delegate
{ {
var final = await subChannel.ReadAsync().ForAwait(); var final = await subChannel.ReadAsync().ForAwait();
......
...@@ -69,24 +69,17 @@ public sealed class ChannelMessageQueue ...@@ -69,24 +69,17 @@ public sealed class ChannelMessageQueue
public override string ToString() => (string)Channel; public override string ToString() => (string)Channel;
/// <summary> /// <summary>
/// Indicates if all messages that will be received have been drained from this channel /// An awaitable task the indicates completion of the queue (including drain of data)
/// </summary> /// </summary>
public bool IsCompleted { get; private set; } public Task Completion => _queue.Reader.Completion;
internal ChannelMessageQueue(RedisChannel redisChannel, RedisSubscriber parent) internal ChannelMessageQueue(RedisChannel redisChannel, RedisSubscriber parent)
{ {
Channel = redisChannel; Channel = redisChannel;
_parent = parent; _parent = parent;
_queue = System.Threading.Channels.Channel.CreateUnbounded<ChannelMessage>(s_ChannelOptions); _queue = System.Threading.Channels.Channel.CreateUnbounded<ChannelMessage>(s_ChannelOptions);
_queue.Reader.Completion.ContinueWith(
(_, state) => ((ChannelMessageQueue)state).IsCompleted = true, this, TaskContinuationOptions.ExecuteSynchronously);
} }
/// <summary>
/// An awaitable task the indicates completion of the queue (including drain of data)
/// </summary>
public Task Completion => _queue.Reader.Completion;
private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions
{ {
SingleWriter = true, SingleWriter = true,
...@@ -165,7 +158,7 @@ public void OnMessage(Action<ChannelMessage> handler) ...@@ -165,7 +158,7 @@ public void OnMessage(Action<ChannelMessage> handler)
private async void OnMessageSyncImpl() private async void OnMessageSyncImpl()
{ {
var handler = (Action<ChannelMessage>)_onMessageHandler; var handler = (Action<ChannelMessage>)_onMessageHandler;
while (!IsCompleted) while (!Completion.IsCompleted)
{ {
ChannelMessage next; ChannelMessage next;
try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); } try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); }
...@@ -195,7 +188,7 @@ public void OnMessage(Func<ChannelMessage, Task> handler) ...@@ -195,7 +188,7 @@ public void OnMessage(Func<ChannelMessage, Task> handler)
private async void OnMessageAsyncImpl() private async void OnMessageAsyncImpl()
{ {
var handler = (Func<ChannelMessage, Task>)_onMessageHandler; var handler = (Func<ChannelMessage, Task>)_onMessageHandler;
while (!IsCompleted) while (!Completion.IsCompleted)
{ {
ChannelMessage next; ChannelMessage next;
try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); } try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment