Commit 9bc2defd authored by Marc Gravell's avatar Marc Gravell

SSCAN, HSCAN, ZSCAN

parent 9b5cdfe2
......@@ -10,4 +10,5 @@ _ReSharper.*
Mono/
*.sln.ide
*.rdb
*.orig
\ No newline at end of file
*.orig
redis-cli.exe
\ No newline at end of file
......@@ -15,4 +15,5 @@ _ReSharper.*
Mono/
*.sln.ide
*.rdb
*.orig
\ No newline at end of file
*.orig
redis-cli.exe
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class Scans : TestBase
{
protected override string GetConfiguration()
{
return "ubuntu";
}
[Test]
[TestCase(true)]
[TestCase(false)]
public void SetScan(bool supported)
{
string[] disabledCommands = supported ? null : new[] { "sscan" };
using(var conn = Create(disabledCommands: disabledCommands))
{
RedisKey key = Me();
var db = conn.GetDatabase();
db.KeyDelete(key);
db.SetAdd(key, "a");
db.SetAdd(key, "b");
db.SetAdd(key, "c");
var arr = db.SetScan(key).ToArray();
Assert.AreEqual(3, arr.Length);
Assert.IsTrue(arr.Contains("a"), "a");
Assert.IsTrue(arr.Contains("b"), "b");
Assert.IsTrue(arr.Contains("c"), "c");
}
}
[Test]
[TestCase(true)]
[TestCase(false)]
public void SortedSetScan(bool supported)
{
string[] disabledCommands = supported ? null : new[] { "zscan" };
using (var conn = Create(disabledCommands: disabledCommands))
{
RedisKey key = Me();
var db = conn.GetDatabase();
db.KeyDelete(key);
db.SortedSetAdd(key, "a", 1);
db.SortedSetAdd(key, "b", 2);
db.SortedSetAdd(key, "c", 3);
var arr = db.SortedSetScan(key).ToArray();
Assert.AreEqual(3, arr.Length);
Assert.IsTrue(arr.Any(x => x.Key == "a" && x.Value == 1), "a");
Assert.IsTrue(arr.Any(x => x.Key == "b" && x.Value == 2), "b");
Assert.IsTrue(arr.Any(x => x.Key == "c" && x.Value == 3), "c");
}
}
[Test]
[TestCase(true)]
[TestCase(false)]
public void HashScan(bool supported)
{
string[] disabledCommands = supported ? null : new[] { "hscan" };
using (var conn = Create(disabledCommands: disabledCommands))
{
RedisKey key = Me();
var db = conn.GetDatabase();
db.KeyDelete(key);
db.HashSet(key, "a", "1");
db.HashSet(key, "b", "2");
db.HashSet(key, "c", "3");
var arr = db.HashScan(key).ToArray();
Assert.AreEqual(3, arr.Length);
Assert.IsTrue(arr.Any(x => x.Key == "a" && x.Value == "1"), "a");
Assert.IsTrue(arr.Any(x => x.Key == "b" && x.Value == "2"), "b");
Assert.IsTrue(arr.Any(x => x.Key == "c" && x.Value == "3"), "c");
}
}
}
}
......@@ -81,6 +81,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSub.cs" />
<Compile Include="RealWorld.cs" />
<Compile Include="Scans.cs" />
<Compile Include="Scripting.cs" />
<Compile Include="Secure.cs" />
<Compile Include="Sets.cs" />
......
......@@ -94,5 +94,13 @@ internal static Exception ConnectionFailure(bool includeDetail, ConnectionFailur
if (includeDetail) AddDetail(ex, null, server, null);
return ex;
}
internal static Exception NotSupported(bool includeDetail, RedisCommand command)
{
string s = GetLabel(includeDetail, command, null);
var ex = new RedisCommandException("Command is not available on your server: " + s);
if (includeDetail) AddDetail(ex, null, null, s);
return ex;
}
}
}
......@@ -507,11 +507,26 @@ public interface IDatabase : IRedis, IDatabaseAsync
long SetRemove(RedisKey key, RedisValue[] values, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The SSCAN command is used to incrementally iterate over a collection of elements.
/// The SSCAN command is used to incrementally iterate over set
/// </summary>
/// <returns>yields all elements of the set.</returns>
/// <remarks>http://redis.io/commands/sscan</remarks>
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.SetScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The ZSCAN command is used to incrementally iterate over a sorted set
/// </summary>
/// <returns>yields all elements of the sorted set.</returns>
/// <remarks>http://redis.io/commands/zscan</remarks>
IEnumerable<KeyValuePair<RedisValue, double>> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The HSCAN command is used to incrementally iterate over a hash
/// </summary>
/// <returns>yields all elements of the hash.</returns>
/// <remarks>http://redis.io/commands/hscan</remarks>
IEnumerable<KeyValuePair<RedisValue, RedisValue>> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sorts a list, set or sorted set (numerically or alphabetically, ascending by default); By default, the elements themselves are compared, but the values can also be
/// used to perform external key-lookups using the <c>by</c> parameter. By default, the elements themselves are returned, but external key-lookups (one or many) can
......
......@@ -866,22 +866,42 @@ public Task<long> SetRemoveAsync(RedisKey key, RedisValue[] values, CommandFlags
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.SetScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None)
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None)
{
var scan = TryScan(key, pattern, pageSize, flags, RedisCommand.SSCAN, SetScanResultProcessor.Default);
if (scan != null) return scan;
if (pattern.IsNull) return SetMembers(key, flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.SSCAN);
}
public IEnumerable<KeyValuePair<RedisValue,RedisValue>> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None)
{
var scan = TryScan(key, pattern, pageSize, flags, RedisCommand.HSCAN, HashScanResultProcessor.Default);
if (scan != null) return scan;
if (pattern.IsNull) return HashGetAll(key, flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.HSCAN);
}
public IEnumerable<KeyValuePair<RedisValue, double>> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None)
{
var scan = TryScan(key, pattern, pageSize, flags, RedisCommand.ZSCAN, SortedSetScanResultProcessor.Default);
if (scan != null) return scan;
if (pattern.IsNull) return SortedSetRangeByRankWithScores(key, flags: flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.ZSCAN);
}
private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags, RedisCommand command, ResultProcessor<ScanIterator<T>.ScanResult> processor)
{
if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize");
if (SetScanIterator.IsNil(pattern)) pattern = (byte[])null;
if (!multiplexer.CommandMap.IsAvailable(command)) return null;
multiplexer.CommandMap.AssertAvailable(RedisCommand.SCAN);
ServerEndPoint server;
var features = GetFeatures(Db, key, flags, out server);
if (!features.Scan)
{
if (pattern.IsNull)
{
return SetMembers(key, flags);
}
}
return new SetScanIterator(this, server, key, pattern, pageSize, flags).Read();
if (!features.Scan) return null;
if (ScanIterator.IsNil(pattern)) pattern = (byte[])null;
return new ScanIterator<T>(this, server, key, pattern, pageSize, flags, command, processor).Read();
}
public RedisValue[] Sort(RedisKey key, long skip = 0, long take = -1, Order order = Order.Ascending, SortType sortType = SortType.Numeric, RedisValue by = default(RedisValue), RedisValue[] get = null, CommandFlags flags = CommandFlags.None)
......@@ -1765,37 +1785,7 @@ private RedisCommand SetOperationCommand(SetOperation operation, bool store)
default: throw new ArgumentOutOfRangeException("operation");
}
}
struct SetScanResult
{
public static readonly ResultProcessor<SetScanResult> Processor = new SetScanResultProcessor();
public readonly long Cursor;
public readonly RedisValue[] Values;
public SetScanResult(long cursor, RedisValue[] values)
{
this.Cursor = cursor;
this.Values = values;
}
private class SetScanResultProcessor : ResultProcessor<SetScanResult>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItems();
long i64;
if (arr.Length == 2 && arr[1].Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
{
var sscanResult = new SetScanResult(i64, arr[1].GetItemsAsValues());
SetResult(message, sscanResult);
return true;
}
break;
}
return false;
}
}
}
internal sealed class ScriptLoadMessage : Message
{
......@@ -1812,22 +1802,96 @@ internal override void WriteImpl(PhysicalConnection physical)
physical.Write((RedisValue)Script);
}
}
private abstract class ScanResultProcessor<T> : ResultProcessor<ScanIterator<T>.ScanResult>
{
protected abstract T[] Parse(RawResult result);
internal sealed class SetScanIterator
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItems();
long i64;
if (arr.Length == 2 && arr[1].Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
{
var sscanResult = new ScanIterator<T>.ScanResult(i64, Parse(arr[1]));
SetResult(message, sscanResult);
return true;
}
break;
}
return false;
}
}
sealed class SetScanResultProcessor : ScanResultProcessor<RedisValue>
{
internal const int DefaultPageSize = 10;
public static readonly ResultProcessor<ScanIterator<RedisValue>.ScanResult> Default = new SetScanResultProcessor();
private SetScanResultProcessor() { }
protected override RedisValue[] Parse(RawResult result)
{
return result.GetItemsAsValues();
}
}
sealed class HashScanResultProcessor : ScanResultProcessor<KeyValuePair<RedisValue,RedisValue>>
{
public static readonly ResultProcessor<ScanIterator<KeyValuePair<RedisValue, RedisValue>>.ScanResult> Default = new HashScanResultProcessor();
private HashScanResultProcessor() { }
protected override KeyValuePair<RedisValue, RedisValue>[] Parse(RawResult result)
{
KeyValuePair<RedisValue, RedisValue>[] pairs;
if (!ValuePairInterleaved.TryParse(result, out pairs)) pairs = null;
return pairs;
}
}
sealed class SortedSetScanResultProcessor : ScanResultProcessor<KeyValuePair<RedisValue, double>>
{
public static readonly ResultProcessor<ScanIterator<KeyValuePair<RedisValue, double>>.ScanResult> Default = new SortedSetScanResultProcessor();
private SortedSetScanResultProcessor() { }
protected override KeyValuePair<RedisValue, double>[] Parse(RawResult result)
{
KeyValuePair<RedisValue, double>[] pairs;
if (!SortedSetWithScores.TryParse(result, out pairs)) pairs = null;
return pairs;
}
}
internal static class ScanIterator
{
public const int DefaultPageSize = 10;
public static bool IsNil(RedisValue pattern)
{
if (pattern.IsNullOrEmpty) return true;
if (pattern.IsInteger) return false;
byte[] rawValue = pattern;
return rawValue.Length == 1 && rawValue[0] == '*';
}
}
internal class ScanIterator<T>
{
internal struct ScanResult
{
public readonly long Cursor;
public readonly T[] Values;
public ScanResult(long cursor, T[] values)
{
this.Cursor = cursor;
this.Values = values;
}
}
private readonly RedisDatabase database;
private readonly CommandFlags flags;
private readonly RedisCommand command;
private readonly RedisKey key;
private readonly int pageSize;
private readonly RedisValue pattern;
private readonly ServerEndPoint server;
private readonly ResultProcessor<ScanResult> processor;
public SetScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
public ScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags,
RedisCommand command, ResultProcessor<ScanResult> processor)
{
this.key = key;
this.pageSize = pageSize;
......@@ -1835,25 +1899,19 @@ public SetScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey k
this.pattern = pattern;
this.flags = flags;
this.server = server;
this.command = command;
this.processor = processor;
}
public static bool IsNil(RedisValue pattern)
{
if (pattern.IsNullOrEmpty) return true;
if (pattern.IsInteger) return false;
byte[] rawValue = pattern;
return rawValue.Length == 1 && rawValue[0] == '*';
}
public IEnumerable<RedisValue> Read()
public IEnumerable<T> Read()
{
var msg = CreateMessage(0, false);
SetScanResult current = database.ExecuteSync(msg, SetScanResult.Processor, server);
Task<SetScanResult> pending;
ScanResult current = database.ExecuteSync(msg, processor, server);
Task<ScanResult> pending;
do
{
// kick off the next immediately, but don't wait for it yet
msg = CreateMessage(current.Cursor, true);
pending = msg == null ? null : database.ExecuteAsync(msg, SetScanResult.Processor, server);
pending = msg == null ? null : database.ExecuteAsync(msg, processor, server);
// now we can iterate the rows
var values = current.Values;
......@@ -1872,30 +1930,32 @@ public IEnumerable<RedisValue> Read()
Message CreateMessage(long cursor, bool running)
{
if (cursor == 0 && running) return null; // end of the line
if (IsNil(pattern))
if (ScanIterator.IsNil(pattern))
{
if (pageSize == DefaultPageSize)
if (pageSize == ScanIterator.DefaultPageSize)
{
return Message.Create(database.Database, flags, RedisCommand.SSCAN, key, cursor);
return Message.Create(database.Database, flags, command, key, cursor);
}
else
{
return Message.Create(database.Database, flags, RedisCommand.SSCAN, key, cursor, RedisLiterals.COUNT, pageSize);
return Message.Create(database.Database, flags, command, key, cursor, RedisLiterals.COUNT, pageSize);
}
}
else
{
if (pageSize == DefaultPageSize)
if (pageSize == ScanIterator.DefaultPageSize)
{
return Message.Create(database.Database, flags, RedisCommand.SSCAN, key, cursor, RedisLiterals.MATCH, pattern);
return Message.Create(database.Database, flags, command, key, cursor, RedisLiterals.MATCH, pattern);
}
else
{
return Message.Create(database.Database, flags, RedisCommand.SSCAN, key, new RedisValue[] { cursor, RedisLiterals.MATCH, pattern, RedisLiterals.COUNT, pageSize });
return Message.Create(database.Database, flags, command, key, new RedisValue[] { cursor, RedisLiterals.MATCH, pattern, RedisLiterals.COUNT, pageSize });
}
}
}
}
private sealed class ScriptEvalMessage : Message, IMultiMessage
{
private readonly RedisKey[] keys;
......
......@@ -75,10 +75,10 @@ abstract class ResultProcessor
public static readonly TimeSpanProcessor
TimeSpanFromMilliseconds = new TimeSpanProcessor(true),
TimeSpanFromSeconds = new TimeSpanProcessor(false);
public static readonly ResultProcessor<KeyValuePair<RedisValue, RedisValue>[]>
public static readonly ValuePairInterleavedProcessor
ValuePairInterleaved = new ValuePairInterleavedProcessor();
public static readonly ResultProcessor<KeyValuePair<RedisValue, double>[]>
public static readonly SortedSetWithScoresProcessor
SortedSetWithScores = new SortedSetWithScoresProcessor();
public static readonly ResultProcessor<RedisResult>
......@@ -928,65 +928,68 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}
sealed class ValuePairInterleavedProcessor : ValuePairInterleavedProcessorBase<RedisValue, RedisValue>
internal sealed class ValuePairInterleavedProcessor : ValuePairInterleavedProcessorBase<RedisValue, RedisValue>
{
protected override RedisValue ParseKey(RawResult key) { return key.AsRedisValue(); }
protected override RedisValue ParseValue(RawResult key) { return key.AsRedisValue(); }
}
abstract class ValuePairInterleavedProcessorBase<TKey, TValue> : ResultProcessor<KeyValuePair<TKey, TValue>[]>
internal sealed class SortedSetWithScoresProcessor : ValuePairInterleavedProcessorBase<RedisValue, double>
{
protected override RedisValue ParseKey(RawResult key) { return key.AsRedisValue(); }
protected override double ParseValue(RawResult value)
{
double val;
return value.TryGetDouble(out val) ? val: double.NaN;
}
}
internal abstract class ValuePairInterleavedProcessorBase<TKey, TValue> : ResultProcessor<KeyValuePair<TKey, TValue>[]>
{
static readonly KeyValuePair<TKey, TValue>[] nix = new KeyValuePair<TKey, TValue>[0];
protected abstract TKey ParseKey(RawResult key);
protected abstract TValue ParseValue(RawResult value);
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
public bool TryParse(RawResult result, out KeyValuePair<TKey, TValue>[] pairs)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItems();
int count = arr.Length / 2;
KeyValuePair<TKey, TValue>[] pairs;
if (count == 0)
if (arr == null)
{
pairs = nix;
pairs = null;
}
else
{
pairs = new KeyValuePair<TKey, TValue>[count];
int offset = 0;
for (int i = 0; i < pairs.Length; i++)
int count = arr.Length / 2;
if (count == 0)
{
pairs = nix;
}
else
{
var setting = ParseKey(arr[offset++]);
var value = ParseValue(arr[offset++]);
pairs[i] = new KeyValuePair<TKey, TValue>(setting, value);
pairs = new KeyValuePair<TKey, TValue>[count];
int offset = 0;
for (int i = 0; i < pairs.Length; i++)
{
var setting = ParseKey(arr[offset++]);
var value = ParseValue(arr[offset++]);
pairs[i] = new KeyValuePair<TKey, TValue>(setting, value);
}
}
}
SetResult(message, pairs);
return true;
default:
pairs = null;
return false;
}
}
}
private class SortedSetWithScoresProcessor : ResultProcessor<KeyValuePair<RedisValue, double>[]>
{
protected abstract TKey ParseKey(RawResult key);
protected abstract TValue ParseValue(RawResult value);
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if(result.Type == ResultType.MultiBulk)
KeyValuePair<TKey, TValue>[] arr;
if(TryParse(result, out arr))
{
var items = result.GetItems();
var arr = new KeyValuePair<RedisValue, double>[items.Length / 2];
int index = 0;
for(int i = 0; i < arr.Length; i++)
{
var member = items[index++].AsRedisValue();
double score;
if (!items[index++].TryGetDouble(out score)) return false;
arr[i] = new KeyValuePair<RedisValue, double>(member, score);
}
SetResult(message, arr);
return true;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment