Commit dd808a7f authored by mgravell's avatar mgravell

test and fix for #1101 - Unsubscribe[Async] with null arg should wipe both...

test and fix for #1101 - Unsubscribe[Async] with null arg should wipe both trusted and non-trusted subscribers
parent 4db837ee
...@@ -97,8 +97,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) ...@@ -97,8 +97,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
{ {
var msg = pair.Value.ForSyncShutdown(); var msg = pair.Value.ForSyncShutdown();
if (msg != null && !msg.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(msg); if (msg != null && !msg.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(msg);
pair.Value.Remove(true, null); pair.Value.Remove(default, null); // when passing null, it wipes both sync+async
pair.Value.Remove(false, null);
var task = pair.Value.UnsubscribeFromServer(pair.Key, flags, asyncState, false); var task = pair.Value.UnsubscribeFromServer(pair.Key, flags, asyncState, false);
if (task != null) last = task; if (task != null) last = task;
...@@ -190,8 +189,8 @@ public bool Remove(bool asAsync, Action<RedisChannel, RedisValue> value) ...@@ -190,8 +189,8 @@ public bool Remove(bool asAsync, Action<RedisChannel, RedisValue> value)
{ {
if (value == null) if (value == null)
{ // treat as blanket wipe { // treat as blanket wipe
if (asAsync) _asyncHandler = null; _asyncHandler = null;
else _syncHandler = null; _syncHandler = null;
} }
else else
{ {
......
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace StackExchange.Redis.Tests.Issues
{
public class Issue1101 : TestBase
{
public Issue1101(ITestOutputHelper output) : base(output) { }
[Fact]
public async Task ExecuteWithUnsubscribeViaChannel()
{
using (var muxer = Create())
{
RedisChannel name = Me();
var pubsub = muxer.GetSubscriber();
// subscribe and check we get data
var channel = await pubsub.SubscribeAsync(name);
List<string> values = new List<string>();
channel.OnMessage(x =>
{
lock(values) { values.Add(x.Message); }
return Task.CompletedTask;
});
await Task.Delay(100);
await pubsub.PublishAsync(name, "abc");
await Task.Delay(100);
lock (values)
{
Assert.Equal("abc", Assert.Single(values));
}
var subs = muxer.GetServer(muxer.GetEndPoints().Single()).SubscriptionSubscriberCount(name);
Assert.Equal(1, subs);
await channel.UnsubscribeAsync();
await Task.Delay(100);
await pubsub.PublishAsync(name, "def");
await Task.Delay(100);
lock (values)
{
Assert.Equal("abc", Assert.Single(values));
}
subs = muxer.GetServer(muxer.GetEndPoints().Single()).SubscriptionSubscriberCount(name);
Assert.Equal(0, subs);
}
}
[Fact]
public async Task ExecuteWithUnsubscribeViaSubscriber()
{
using (var muxer = Create())
{
RedisChannel name = Me();
var pubsub = muxer.GetSubscriber();
// subscribe and check we get data
var channel = await pubsub.SubscribeAsync(name);
List<string> values = new List<string>();
channel.OnMessage(x =>
{
lock (values) { values.Add(x.Message); }
return Task.CompletedTask;
});
await Task.Delay(100);
await pubsub.PublishAsync(name, "abc");
await Task.Delay(100);
lock (values)
{
Assert.Equal("abc", Assert.Single(values));
}
var subs = muxer.GetServer(muxer.GetEndPoints().Single()).SubscriptionSubscriberCount(name);
Assert.Equal(1, subs);
await pubsub.UnsubscribeAsync(name);
await Task.Delay(100);
await pubsub.PublishAsync(name, "def");
await Task.Delay(100);
lock (values)
{
Assert.Equal("abc", Assert.Single(values));
}
subs = muxer.GetServer(muxer.GetEndPoints().Single()).SubscriptionSubscriberCount(name);
Assert.Equal(0, subs);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment