Commit 97d16d37 authored by Marc Gravell's avatar Marc Gravell

ZSCAN|HSCAN|SSCAN resumable iterators

parent e815fcb0
......@@ -148,7 +148,7 @@ public void HashLength()
[Test]
public void HashScan()
{
wrapper.HashScan("key", "pattern", 123, CommandFlags.HighPriority);
wrapper.HashScan("key", "pattern", 123, flags: CommandFlags.HighPriority);
mock.Verify(_ => _.HashScan("prefix:key", "pattern", 123, CommandFlags.HighPriority));
}
......@@ -603,7 +603,7 @@ public void SetRemove_2()
[Test]
public void SetScan()
{
wrapper.SetScan("key", "pattern", 123, CommandFlags.HighPriority);
wrapper.SetScan("key", "pattern", 123, flags: CommandFlags.HighPriority);
mock.Verify(_ => _.SetScan("prefix:key", "pattern", 123, CommandFlags.HighPriority));
}
......@@ -773,7 +773,7 @@ public void SortedSetRemoveRangeByValue()
[Test]
public void SortedSetScan()
{
wrapper.SortedSetScan("key", "pattern", 123, CommandFlags.HighPriority);
wrapper.SortedSetScan("key", "pattern", 123, flags: CommandFlags.HighPriority);
mock.Verify(_ => _.SortedSetScan("prefix:key", "pattern", 123, CommandFlags.HighPriority));
}
......
......@@ -82,6 +82,12 @@ internal static Exception NotSupported(bool includeDetail, RedisCommand command)
if (includeDetail) AddDetail(ex, null, null, s);
return ex;
}
internal static Exception NoCursor(RedisCommand command)
{
string s = GetLabel(false, command, null);
var ex = new RedisCommandException("Command cannot be used with a cursor: " + s);
return ex;
}
internal static Exception Timeout(bool includeDetail, string errorMessage, Message message, ServerEndPoint server)
{
......
......@@ -138,7 +138,14 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// <returns>yields all elements of the hash.</returns>
/// <remarks>http://redis.io/commands/hscan</remarks>
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None);
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary>
/// The HSCAN command is used to incrementally iterate over a hash
/// </summary>
/// <returns>yields all elements of the hash.</returns>
/// <remarks>http://redis.io/commands/hscan</remarks>
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sets the specified fields to their respective values in the hash stored at key. This command overwrites any existing fields in the hash. If key does not exist, a new key holding a hash is created.
......@@ -572,7 +579,14 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// <returns>yields all elements of the set.</returns>
/// <remarks>http://redis.io/commands/sscan</remarks>
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None);
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary>
/// The SSCAN command is used to incrementally iterate over set
/// </summary>
/// <returns>yields all elements of the set.</returns>
/// <remarks>http://redis.io/commands/sscan</remarks>
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sorts a list, set or sorted set (numerically or alphabetically, ascending by default); By default, the elements themselves are compared, but the values can also be
......@@ -758,7 +772,14 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// <returns>yields all elements of the sorted set.</returns>
/// <remarks>http://redis.io/commands/zscan</remarks>
IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None);
IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary>
/// The ZSCAN command is used to incrementally iterate over a sorted set
/// </summary>
/// <returns>yields all elements of the sorted set.</returns>
/// <remarks>http://redis.io/commands/zscan</remarks>
IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the score of member in the sorted set at key; If member does not exist in the sorted set, or key does not exist, nil is returned.
/// </summary>
......
......@@ -237,7 +237,7 @@ public partial interface IServer : IRedis
/// <remarks>Warning: consider KEYS as a command that should only be used in production environments with extreme care.</remarks>
/// <remarks>http://redis.io/commands/keys</remarks>
/// <remarks>http://redis.io/commands/scan</remarks>
IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = 10, long cursor = 0, CommandFlags flags = CommandFlags.None);
IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the time of the last DB save executed with success. A client may check if a BGSAVE command succeeded reading the LASTSAVE value, then issuing a BGSAVE command and checking at regular intervals every N seconds if LASTSAVE changed.
......
......@@ -608,17 +608,30 @@ public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
return this.Inner.Ping(flags);
}
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = 10, CommandFlags flags = CommandFlags.None)
IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return HashScan(key, pattern, pageSize, RedisBase.CursorEnumerable.Origin, flags);
}
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
return this.Inner.HashScan(this.ToInner(key), pattern, pageSize, flags);
}
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = 10, CommandFlags flags = CommandFlags.None)
IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return this.Inner.SetScan(this.ToInner(key), pattern, pageSize, flags);
return SetScan(key, pattern, pageSize, RedisBase.CursorEnumerable.Origin, flags);
}
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
return this.Inner.SetScan(this.ToInner(key), pattern, pageSize, cursor, flags);
}
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = 10, CommandFlags flags = CommandFlags.None)
IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return SortedSetScan(key, pattern, pageSize, RedisBase.CursorEnumerable.Origin, flags);
}
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorEnumerable.DefaultPageSize, long cursor = RedisBase.CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
return this.Inner.SortedSetScan(this.ToInner(key), pattern, pageSize, flags);
}
......
......@@ -141,9 +141,9 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag
}
internal abstract class CursorEnumerableBase
internal abstract class CursorEnumerable
{
internal const int DefaultPageSize = 10;
internal const int Origin = 0, DefaultPageSize = 10;
internal static bool IsNil(RedisValue pattern)
{
if (pattern.IsNullOrEmpty) return true;
......@@ -152,7 +152,7 @@ internal static bool IsNil(RedisValue pattern)
return rawValue.Length == 1 && rawValue[0] == '*';
}
}
internal abstract class CursorEnumerableBase<T> : CursorEnumerableBase, IEnumerable<T>, IScanning
internal abstract class CursorEnumerable<T> : CursorEnumerable, IEnumerable<T>, IScanning
{
private readonly RedisBase redis;
private readonly ServerEndPoint server;
......@@ -161,7 +161,7 @@ internal abstract class CursorEnumerableBase<T> : CursorEnumerableBase, IEnumera
protected readonly int pageSize;
protected readonly long initialCursor;
protected CursorEnumerableBase(RedisBase redis, ServerEndPoint server, int db, int pageSize, long cursor, CommandFlags flags)
protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int pageSize, long cursor, CommandFlags flags)
{
this.redis = redis;
this.server = server;
......@@ -217,8 +217,8 @@ protected ScanResult Wait(Task<ScanResult> pending)
class CursorEnumerator : IEnumerator<T>, IScanning
{
private CursorEnumerableBase<T> parent;
public CursorEnumerator(CursorEnumerableBase<T> parent)
private CursorEnumerable<T> parent;
public CursorEnumerator(CursorEnumerable<T> parent)
{
if (parent == null) throw new ArgumentNullException("parent");
this.parent = parent;
......
......@@ -193,11 +193,17 @@ public Task<long> HashLengthAsync(RedisKey key, CommandFlags flags = CommandFlag
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None)
IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
var scan = TryScan<HashEntry>(key, pattern, pageSize, flags, RedisCommand.HSCAN, HashScanResultProcessor.Default);
return HashScan(key, pattern, pageSize, CursorEnumerable.Origin, flags);
}
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorEnumerable.DefaultPageSize, long cursor = CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
var scan = TryScan<HashEntry>(key, pattern, pageSize, cursor, flags, RedisCommand.HSCAN, HashScanResultProcessor.Default);
if (scan != null) return scan;
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.HGETALL);
if (pattern.IsNull) return HashGetAll(key, flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.HSCAN);
}
......@@ -1017,11 +1023,17 @@ public Task<long> SetRemoveAsync(RedisKey key, RedisValue[] values, CommandFlags
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None)
IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
var scan = TryScan<RedisValue>(key, pattern, pageSize, flags, RedisCommand.SSCAN, SetScanResultProcessor.Default);
return SetScan(key, pattern, pageSize, CursorEnumerable.Origin, flags);
}
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorEnumerable.DefaultPageSize, long cursor = CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
var scan = TryScan<RedisValue>(key, pattern, pageSize, cursor, flags, RedisCommand.SSCAN, SetScanResultProcessor.Default);
if (scan != null) return scan;
if(cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.SMEMBERS);
if (pattern.IsNull) return SetMembers(key, flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.SSCAN);
}
......@@ -1239,11 +1251,17 @@ public Task<long> SortedSetRemoveRangeByScoreAsync(RedisKey key, double start, d
return ExecuteAsync(msg, ResultProcessor.Int64);
}
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None)
IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return SortedSetScan(key, pattern, pageSize, CursorEnumerable.Origin, flags);
}
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorEnumerable.DefaultPageSize, long cursor = CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
var scan = TryScan<SortedSetEntry>(key, pattern, pageSize, flags, RedisCommand.ZSCAN, SortedSetScanResultProcessor.Default);
var scan = TryScan<SortedSetEntry>(key, pattern, pageSize, cursor, flags, RedisCommand.ZSCAN, SortedSetScanResultProcessor.Default);
if (scan != null) return scan;
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.ZRANGE);
if (pattern.IsNull) return SortedSetRangeByRankWithScores(key, flags: flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.ZSCAN);
}
......@@ -1955,7 +1973,7 @@ private RedisCommand SetOperationCommand(SetOperation operation, bool store)
}
}
private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags, RedisCommand command, ResultProcessor<ScanIterator<T>.ScanResult> processor)
private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize, long cursor, CommandFlags flags, RedisCommand command, ResultProcessor<ScanIterator<T>.ScanResult> processor)
{
if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize");
if (!multiplexer.CommandMap.IsAvailable(command)) return null;
......@@ -1964,8 +1982,8 @@ private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize
var features = GetFeatures(Db, key, flags, out server);
if (!features.Scan) return null;
if (ScanUtils.IsNil(pattern)) pattern = (byte[])null;
return new ScanIterator<T>(this, server, key, pattern, pageSize, flags, command, processor).Read();
if (CursorEnumerable.IsNil(pattern)) pattern = (byte[])null;
return new ScanIterator<T>(this, server, key, pattern, pageSize, cursor, flags, command, processor);
}
private Message GetLexMessage(RedisCommand command, RedisKey key, RedisValue min, RedisValue max, Exclude exclude, long skip, long take, CommandFlags flags)
......@@ -2013,110 +2031,51 @@ public Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min,
return ExecuteAsync(msg, ResultProcessor.Int64);
}
internal static class ScanUtils
{
public const int DefaultPageSize = 10;
public static bool IsNil(RedisValue pattern)
{
if (pattern.IsNullOrEmpty) return true;
if (pattern.IsInteger) return false;
byte[] rawValue = pattern;
return rawValue.Length == 1 && rawValue[0] == '*';
}
}
internal class ScanIterator<T>
internal class ScanIterator<T> : CursorEnumerable<T>
{
private readonly RedisCommand command;
private readonly RedisDatabase database;
private readonly CommandFlags flags;
private readonly RedisKey key;
private readonly int pageSize;
private readonly RedisValue pattern;
private readonly RedisCommand command;
private readonly ResultProcessor<ScanResult> processor;
private readonly ServerEndPoint server;
public ScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags,
public ScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey key, RedisValue pattern, int pageSize, long cursor, CommandFlags flags,
RedisCommand command, ResultProcessor<ScanResult> processor)
: base(database, server, database.Database, pageSize, cursor, flags)
{
this.key = key;
this.pageSize = pageSize;
this.database = database;
this.pattern = pattern;
this.flags = flags;
this.server = server;
this.command = command;
this.processor = processor;
}
public IEnumerable<T> Read()
{
var msg = CreateMessage(0, false);
ScanResult current = database.ExecuteSync(msg, processor, server);
Task<ScanResult> pending;
do
{
// kick off the next immediately, but don't wait for it yet
msg = CreateMessage(current.Cursor, true);
pending = msg == null ? null : database.ExecuteAsync(msg, processor, server);
// now we can iterate the rows
var values = current.Values;
for (int i = 0; i < values.Length; i++)
yield return values[i];
// wait for the next, if any
if (pending != null)
protected override ResultProcessor<CursorEnumerable<T>.ScanResult> Processor
{
current = database.Wait(pending);
}
} while (pending != null);
get { return processor; }
}
Message CreateMessage(long cursor, bool running)
protected override Message CreateMessage(long cursor)
{
if (cursor == 0 && running) return null; // end of the line
if (ScanUtils.IsNil(pattern))
if (IsNil(pattern))
{
if (pageSize == ScanUtils.DefaultPageSize)
if (pageSize == DefaultPageSize)
{
return Message.Create(database.Database, flags, command, key, cursor);
return Message.Create(db, flags, command, key, cursor);
}
else
{
return Message.Create(database.Database, flags, command, key, cursor, RedisLiterals.COUNT, pageSize);
return Message.Create(db, flags, command, key, cursor, RedisLiterals.COUNT, pageSize);
}
}
else
{
if (pageSize == ScanUtils.DefaultPageSize)
if (pageSize == DefaultPageSize)
{
return Message.Create(database.Database, flags, command, key, cursor, RedisLiterals.MATCH, pattern);
return Message.Create(db, flags, command, key, cursor, RedisLiterals.MATCH, pattern);
}
else
{
return Message.Create(database.Database, flags, command, key, new RedisValue[] { cursor, RedisLiterals.MATCH, pattern, RedisLiterals.COUNT, pageSize });
}
return Message.Create(db, flags, command, key, new RedisValue[] { cursor, RedisLiterals.MATCH, pattern, RedisLiterals.COUNT, pageSize });
}
}
internal struct ScanResult
{
public readonly long Cursor;
public readonly T[] Values;
public ScanResult(long cursor, T[] values)
{
this.Cursor = cursor;
this.Values = values;
}
}
}
......
......@@ -271,13 +271,13 @@ public Task<string> InfoRawAsync(RedisValue section = default(RedisValue), Comma
IEnumerable<RedisKey> IServer.Keys(int database, RedisValue pattern, int pageSize, CommandFlags flags)
{
return Keys(database, pattern, pageSize, 0, flags);
return Keys(database, pattern, pageSize, CursorEnumerable.Origin, flags);
}
public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = CursorEnumerableBase.DefaultPageSize, long cursor = 0, CommandFlags flags = CommandFlags.None)
public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = CursorEnumerable.DefaultPageSize, long cursor = CursorEnumerable.Origin, CommandFlags flags = CommandFlags.None)
{
if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize");
if (CursorEnumerableBase.IsNil(pattern)) pattern = RedisLiterals.Wildcard;
if (CursorEnumerable.IsNil(pattern)) pattern = RedisLiterals.Wildcard;
if (multiplexer.CommandMap.IsAvailable(RedisCommand.SCAN))
{
......@@ -286,7 +286,7 @@ public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default
if (features.Scan) return new KeysScanEnumerable(this, database, pattern, pageSize, cursor, flags);
}
if (cursor != 0) throw new InvalidOperationException("A cursor cannot be used with KEYS");
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.KEYS);
Message msg = Message.Create(database, flags, RedisCommand.KEYS, pattern);
return ExecuteSync(msg, ResultProcessor.RedisKeyArray);
}
......@@ -640,7 +640,7 @@ public static RedisValue Hash(string value)
}
}
sealed class KeysScanEnumerable : CursorEnumerableBase<RedisKey>
sealed class KeysScanEnumerable : CursorEnumerable<RedisKey>
{
private readonly RedisValue pattern;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment