Commit 9ebe3e46 authored by Marc Gravell's avatar Marc Gravell

Fixup PUBSUB command (NUMSUB and CHANNELS)

parent 0190e1a0
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class PubSubCommand : TestBase
{
[Test]
public void SubscriberCount()
{
using(var conn = Create())
{
RedisChannel channel = Me() + Guid.NewGuid();
var server = conn.GetServer(conn.GetEndPoints()[0]);
var channels = server.SubscriptionChannels(Me() + "*");
Assert.IsFalse(channels.Contains(channel));
long justWork = server.SubscriptionPatternCount();
var count = server.SubscriptionSubscriberCount(channel);
Assert.AreEqual(0, count);
conn.GetSubscriber().Subscribe(channel, delegate { });
count = server.SubscriptionSubscriberCount(channel);
Assert.AreEqual(1, count);
channels = server.SubscriptionChannels(Me() + "*");
Assert.IsTrue(channels.Contains(channel));
}
}
protected override string GetConfiguration()
{
return "ubuntu";
}
}
}
...@@ -92,6 +92,7 @@ ...@@ -92,6 +92,7 @@
<Compile Include="PreserveOrder.cs" /> <Compile Include="PreserveOrder.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSub.cs" /> <Compile Include="PubSub.cs" />
<Compile Include="PubSubCommand.cs" />
<Compile Include="RealWorld.cs" /> <Compile Include="RealWorld.cs" />
<Compile Include="Scans.cs" /> <Compile Include="Scans.cs" />
<Compile Include="Scripting.cs" /> <Compile Include="Scripting.cs" />
......
...@@ -467,13 +467,13 @@ public Task<long> SubscriptionPatternCountAsync(CommandFlags flags = CommandFlag ...@@ -467,13 +467,13 @@ public Task<long> SubscriptionPatternCountAsync(CommandFlags flags = CommandFlag
public long SubscriptionSubscriberCount(RedisChannel channel, CommandFlags flags = CommandFlags.None) public long SubscriptionSubscriberCount(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(-1, flags, RedisCommand.PUBSUB, RedisLiterals.NUMSUB, channel); var msg = Message.Create(-1, flags, RedisCommand.PUBSUB, RedisLiterals.NUMSUB, channel);
return ExecuteSync(msg, ResultProcessor.Int64); return ExecuteSync(msg, ResultProcessor.PubSubNumSub);
} }
public Task<long> SubscriptionSubscriberCountAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) public Task<long> SubscriptionSubscriberCountAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{ {
var msg = Message.Create(-1, flags, RedisCommand.PUBSUB, RedisLiterals.NUMSUB, channel); var msg = Message.Create(-1, flags, RedisCommand.PUBSUB, RedisLiterals.NUMSUB, channel);
return ExecuteAsync(msg, ResultProcessor.Int64); return ExecuteAsync(msg, ResultProcessor.PubSubNumSub);
} }
public DateTime Time(CommandFlags flags = CommandFlags.None) public DateTime Time(CommandFlags flags = CommandFlags.None)
......
...@@ -41,7 +41,8 @@ abstract class ResultProcessor ...@@ -41,7 +41,8 @@ abstract class ResultProcessor
Info = new InfoProcessor(); Info = new InfoProcessor();
public static readonly ResultProcessor<long> public static readonly ResultProcessor<long>
Int64 = new Int64Processor(); Int64 = new Int64Processor(),
PubSubNumSub = new PubSubNumSubProcessor();
public static readonly ResultProcessor<double?> public static readonly ResultProcessor<double?>
NullableDouble = new NullableDoubleProcessor(); NullableDouble = new NullableDoubleProcessor();
...@@ -879,7 +880,7 @@ static string Normalize(string category) ...@@ -879,7 +880,7 @@ static string Normalize(string category)
} }
} }
sealed class Int64Processor : ResultProcessor<long> class Int64Processor : ResultProcessor<long>
{ {
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
...@@ -899,6 +900,23 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -899,6 +900,23 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false; return false;
} }
} }
class PubSubNumSubProcessor : Int64Processor
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if(result.Type == ResultType.MultiBulk)
{
var arr = result.GetItems();
long val;
if(arr != null && arr.Length == 2 && arr[1].TryGetInt64(out val))
{
SetResult(message, val);
return true;
}
}
return base.SetResultCore(connection, message, result);
}
}
sealed class NullableDoubleProcessor : ResultProcessor<double?> sealed class NullableDoubleProcessor : ResultProcessor<double?>
{ {
...@@ -970,7 +988,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -970,7 +988,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
byte[] channelPrefix = connection.ChannelPrefix; byte[] channelPrefix = connection.ChannelPrefix;
for (int i = 0; i < final.Length; i++) for (int i = 0; i < final.Length; i++)
{ {
final[i] = result.AsRedisChannel(channelPrefix); final[i] = arr[i].AsRedisChannel(channelPrefix);
} }
} }
SetResult(message, final); SetResult(message, final);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment