Commit 1c2f2834 authored by Marc Gravell's avatar Marc Gravell

implement ChannelMessage / ChannelMessageChannel to illustrate API;...

implement ChannelMessage / ChannelMessageChannel to illustrate API; PubSubGetAllCorrectOrder shows usage, although it doesn't work yet (order is wrong) - I need to implement sync enqueue
parent e30a829e
...@@ -75,8 +75,7 @@ private void TestMassivePublish(ISubscriber conn, string channel, string caption ...@@ -75,8 +75,7 @@ private void TestMassivePublish(ISubscriber conn, string channel, string caption
Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds, caption); Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds, caption);
} }
// [FactLongRunning] [FactLongRunning]
[Fact]
public async Task PubSubGetAllAnyOrder() public async Task PubSubGetAllAnyOrder()
{ {
using (var muxer = GetRemoteConnection(waitForOpen: true, using (var muxer = GetRemoteConnection(waitForOpen: true,
...@@ -125,6 +124,66 @@ public async Task PubSubGetAllAnyOrder() ...@@ -125,6 +124,66 @@ public async Task PubSubGetAllAnyOrder()
} }
} }
[FactLongRunning]
public async Task PubSubGetAllCorrectOrder()
{
using (var muxer = GetRemoteConnection(waitForOpen: true,
syncTimeout: 20000))
{
var sub = muxer.GetSubscriber();
RedisChannel channel = Me();
const int count = 500000;
var syncLock = new object();
var dataList = new List<int>(count);
var dataHash = new HashSet<int>();
var subChannel = await sub.SubscribeAsync(channel);
await sub.PingAsync();
async Task RunLoop()
{
while (!subChannel.IsComplete)
{
var work = await subChannel.ReadAsync();
int i = int.Parse(Encoding.UTF8.GetString(work.Value));
lock (dataList)
{
dataList.Add(i);
dataHash.Add(i);
if (dataList.Count == count) break;
if ((dataList.Count % 10) == 99) Output.WriteLine(dataList.Count.ToString());
}
}
lock (syncLock)
{
Monitor.PulseAll(syncLock);
}
}
lock (syncLock)
{
Task.Run(RunLoop);
for (int i = 0; i < count; i++)
{
sub.Publish(channel, i.ToString(), CommandFlags.FireAndForget);
}
sub.Ping();
// subChannel.Unsubscribe();
if (!Monitor.Wait(syncLock, 20000))
{
throw new TimeoutException("Items: " + dataList.Count);
}
for (int i = 0; i < count; i++)
{
Assert.Contains(i, dataHash);
// Assert.Equal(i, data[i]);
}
}
}
}
[Fact] [Fact]
public void TestPublishWithSubscribers() public void TestPublishWithSubscribers()
{ {
......
...@@ -25,5 +25,6 @@ ...@@ -25,5 +25,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.58" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.58" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// Represents a message that is broadcast via pub/sub
/// </summary>
public readonly struct ChannelMessage
{
internal ChannelMessage(RedisChannel channel, RedisValue value)
{
Channel = channel;
Value = value;
}
/// <summary>
/// The channel that the message was broadcast to
/// </summary>
public RedisChannel Channel { get; }
/// <summary>
/// The value that was broadcast
/// </summary>
public RedisValue Value { get; }
}
/// <summary>
/// Represents a message queue of pub/sub notifications
/// </summary>
public sealed class ChannelMessageChannel
{
private readonly Channel<ChannelMessage> _channel;
private readonly RedisChannel _redisChannel;
private ISubscriber _parent;
/// <summary>
/// Indicates if all messages that will be received have been drained from this channel
/// </summary>
public bool IsComplete { get; private set; }
internal ChannelMessageChannel(RedisChannel redisChannel, ISubscriber parent)
{
_redisChannel = redisChannel;
_parent = parent;
_channel = Channel.CreateUnbounded<ChannelMessage>();
_channel.Reader.Completion.ContinueWith(
(t, state) => ((ChannelMessageChannel)state).IsComplete = true, this, TaskContinuationOptions.ExecuteSynchronously);
}
internal void Subscribe(CommandFlags flags) => _parent.Subscribe(_redisChannel, HandleMessage, flags);
internal Task SubscribeAsync(CommandFlags flags) => _parent.SubscribeAsync(_redisChannel, HandleMessage, flags);
private void HandleMessage(RedisChannel channel, RedisValue value)
=> _channel.Writer.TryWrite(new ChannelMessage(channel, value));
/// <summary>
/// Consume a message from the channel
/// </summary>
public ValueTask<ChannelMessage> ReadAsync(CancellationToken cancellationToken = default)
=> _channel.Reader.ReadAsync(cancellationToken);
internal void UnsubscribeImpl(Exception error = null, CommandFlags flags = CommandFlags.None)
{
var parent = _parent;
if (parent != null)
{
_parent.UnsubscribeAsync(_redisChannel, HandleMessage, flags);
_parent = null;
_channel.Writer.TryComplete(error);
}
}
internal async Task UnsubscribeAsyncImpl(Exception error = null, CommandFlags flags = CommandFlags.None)
{
var parent = _parent;
if (parent != null)
{
await _parent.UnsubscribeAsync(_redisChannel, HandleMessage, flags);
_parent = null;
_channel.Writer.TryComplete(error);
}
}
/// <summary>
/// Stop receiving messages on this channel
/// </summary>
public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags);
/// <summary>
/// Stop receiving messages on this channel
/// </summary>
public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
}
}
using System; using System;
using System.Net; using System.Net;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace StackExchange.Redis namespace StackExchange.Redis
...@@ -64,6 +65,16 @@ public interface ISubscriber : IRedis ...@@ -64,6 +65,16 @@ public interface ISubscriber : IRedis
/// <remarks>https://redis.io/commands/psubscribe</remarks> /// <remarks>https://redis.io/commands/psubscribe</remarks>
void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None); void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Subscribe to perform some operation when a change to the preferred/active node is broadcast, as a channel.
/// </summary>
/// <param name="channel">The redis channel to subscribe to.</param>
/// <param name="flags">The command flags to use.</param>
/// <returns>A channel that represents this source</returns>
/// <remarks>https://redis.io/commands/subscribe</remarks>
/// <remarks>https://redis.io/commands/psubscribe</remarks>
ChannelMessageChannel Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Subscribe to perform some operation when a change to the preferred/active node is broadcast. /// Subscribe to perform some operation when a change to the preferred/active node is broadcast.
/// </summary> /// </summary>
...@@ -74,6 +85,16 @@ public interface ISubscriber : IRedis ...@@ -74,6 +85,16 @@ public interface ISubscriber : IRedis
/// <remarks>https://redis.io/commands/psubscribe</remarks> /// <remarks>https://redis.io/commands/psubscribe</remarks>
Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None); Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Subscribe to perform some operation when a change to the preferred/active node is broadcast, as a channel.
/// </summary>
/// <param name="channel">The redis channel to subscribe to.</param>
/// <param name="flags">The command flags to use.</param>
/// <returns>A channel that represents this source</returns>
/// <remarks>https://redis.io/commands/subscribe</remarks>
/// <remarks>https://redis.io/commands/psubscribe</remarks>
Task<ChannelMessageChannel> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Inidicate to which redis server we are actively subscribed for a given channel; returns null if /// Inidicate to which redis server we are actively subscribed for a given channel; returns null if
/// the channel is not actively subscribed /// the channel is not actively subscribed
...@@ -122,4 +143,4 @@ public interface ISubscriber : IRedis ...@@ -122,4 +143,4 @@ public interface ISubscriber : IRedis
/// <remarks>https://redis.io/commands/punsubscribe</remarks> /// <remarks>https://redis.io/commands/punsubscribe</remarks>
Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None); Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None);
} }
} }
\ No newline at end of file
...@@ -287,12 +287,26 @@ public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> han ...@@ -287,12 +287,26 @@ public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> han
if ((flags & CommandFlags.FireAndForget) == 0) Wait(task); if ((flags & CommandFlags.FireAndForget) == 0) Wait(task);
} }
public ChannelMessageChannel Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
var c = new ChannelMessageChannel(channel, this);
c.Subscribe(flags);
return c;
}
public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None) public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{ {
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
return multiplexer.AddSubscription(channel, handler, flags, asyncState); return multiplexer.AddSubscription(channel, handler, flags, asyncState);
} }
public async Task<ChannelMessageChannel> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
var c = new ChannelMessageChannel(channel, this);
await c.SubscribeAsync(flags);
return c;
}
public EndPoint SubscribedEndpoint(RedisChannel channel) public EndPoint SubscribedEndpoint(RedisChannel channel)
{ {
var server = multiplexer.GetSubscribedServer(channel); var server = multiplexer.GetSubscribedServer(channel);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment