Commit 86ee27f4 authored by Marc Gravell's avatar Marc Gravell

shave some yaks on ChannelMessageQueue

parent fa123558
using System;
using System.Reflection;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
......@@ -10,6 +11,20 @@ namespace StackExchange.Redis
/// </summary>
public readonly struct ChannelMessage
{
/// <summary>
/// See Object.ToString
/// </summary>
public override string ToString() => ((string)Channel) + ":" + ((string)Value);
/// <summary>
/// See Object.GetHashCode
/// </summary>
public override int GetHashCode() => Channel.GetHashCode() ^ Value.GetHashCode();
/// <summary>
/// See Object.Equals
/// </summary>
public override bool Equals(object obj) => obj is ChannelMessage cm
&& cm.Channel == Channel && cm.Value == Value;
internal ChannelMessage(RedisChannel channel, RedisValue value)
{
Channel = channel;
......@@ -37,6 +52,11 @@ public sealed class ChannelMessageQueue
private readonly RedisChannel _redisChannel;
private RedisSubscriber _parent;
/// <summary>
/// See Object.ToString
/// </summary>
public override string ToString() => (string)_redisChannel;
/// <summary>
/// Indicates if all messages that will be received have been drained from this channel
/// </summary>
......@@ -84,6 +104,26 @@ public ValueTask<ChannelMessage> ReadAsync(CancellationToken cancellationToken =
/// </summary>
public bool TryRead(out ChannelMessage item) => _channel.Reader.TryRead(out item);
/// <summary>
/// Attempt to query the backlog length of the queue
/// </summary>
public bool TryGetCount(out int count)
{
// get this using the reflection
try
{
var prop = _channel.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic);
if (prop != null)
{
count = (int)prop.GetValue(_channel);
return true;
}
}
catch { }
count = default;
return false;
}
private Delegate _onMessageHandler;
private void AssertOnMessage(Delegate handler)
{
......@@ -107,7 +147,7 @@ private async void OnMessageSyncImpl()
while (!IsCompleted)
{
ChannelMessage next;
try { if(!TryRead(out next)) next = await ReadAsync(); }
try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); }
catch (ChannelClosedException) { break; } // expected
catch (Exception ex)
{
......@@ -136,7 +176,7 @@ private async void OnMessageAsyncImpl()
while (!IsCompleted)
{
ChannelMessage next;
try { if (!TryRead(out next)) next = await ReadAsync(); }
try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); }
catch (ChannelClosedException) { break; } // expected
catch (Exception ex)
{
......@@ -147,7 +187,7 @@ private async void OnMessageAsyncImpl()
try
{
var task = handler.Invoke(next.Channel, next.Value);
if (task != null) await task;
if (task != null) await task.ConfigureAwait(false);
}
catch { } // matches MessageCompletable
}
......@@ -167,7 +207,7 @@ internal async Task UnsubscribeAsyncImpl(Exception error = null, CommandFlags fl
var parent = _parent;
if (parent != null)
{
await parent.UnsubscribeAsync(_redisChannel, HandleMessage, flags);
await parent.UnsubscribeAsync(_redisChannel, HandleMessage, flags).ConfigureAwait(false);
_parent = null;
_channel.Writer.TryComplete(error);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment