Commit a4fd24e1 authored by Marc Gravell's avatar Marc Gravell

Pick up scans at the right item

parent f5f4285c
...@@ -26,7 +26,7 @@ public void KeysScan(bool supported) ...@@ -26,7 +26,7 @@ public void KeysScan(bool supported)
db.StringSet("KeysScan:" + i, Guid.NewGuid().ToString(), flags: CommandFlags.FireAndForget); db.StringSet("KeysScan:" + i, Guid.NewGuid().ToString(), flags: CommandFlags.FireAndForget);
} }
var seq = server.Keys(DB, pageSize:50); var seq = server.Keys(DB, pageSize:50);
bool isScanning = seq is IScanning; bool isScanning = seq is IScanningCursor;
Assert.AreEqual(supported, isScanning, "scanning"); Assert.AreEqual(supported, isScanning, "scanning");
Assert.AreEqual(100, seq.Distinct().Count()); Assert.AreEqual(100, seq.Distinct().Count());
Assert.AreEqual(100, seq.Distinct().Count()); Assert.AreEqual(100, seq.Distinct().Count());
...@@ -52,30 +52,24 @@ public void ScansIScanning() ...@@ -52,30 +52,24 @@ public void ScansIScanning()
var seq = server.Keys(DB, pageSize: 15); var seq = server.Keys(DB, pageSize: 15);
using(var iter = seq.GetEnumerator()) using(var iter = seq.GetEnumerator())
{ {
IScanning s0 = (IScanning)seq, s1 = (IScanning)iter; IScanningCursor s0 = (IScanningCursor)seq, s1 = (IScanningCursor)iter;
Assert.AreEqual(15, s0.PageSize); Assert.AreEqual(15, s0.PageSize);
Assert.AreEqual(15, s1.PageSize); Assert.AreEqual(15, s1.PageSize);
// start at zero // start at zero
Assert.AreEqual(0, s0.CurrentCursor); Assert.AreEqual(0, s0.Cursor);
Assert.AreEqual(0, s0.NextCursor); Assert.AreEqual(s0.Cursor, s1.Cursor);
Assert.AreEqual(s0.CurrentCursor, s1.CurrentCursor);
Assert.AreEqual(s0.NextCursor, s1.NextCursor);
for(int i = 0 ; i < 47 ; i++) for(int i = 0 ; i < 47 ; i++)
{ {
Assert.IsTrue(iter.MoveNext()); Assert.IsTrue(iter.MoveNext());
} }
// non-zero in the middle // non-zero in the middle
Assert.AreNotEqual(0, s0.CurrentCursor); Assert.AreNotEqual(0, s0.Cursor);
Assert.AreNotEqual(0, s0.NextCursor); Assert.AreEqual(s0.Cursor, s1.Cursor);
Assert.AreEqual(s0.CurrentCursor, s1.CurrentCursor);
Assert.AreEqual(s0.NextCursor, s1.NextCursor);
Assert.AreNotEqual(s1.CurrentCursor, s1.NextCursor, "iter");
Assert.AreNotEqual(s0.CurrentCursor, s0.NextCursor, "seq");
for (int i = 0; i < 53; i++) for (int i = 0; i < 53; i++)
{ {
Assert.IsTrue(iter.MoveNext()); Assert.IsTrue(iter.MoveNext());
...@@ -83,10 +77,8 @@ public void ScansIScanning() ...@@ -83,10 +77,8 @@ public void ScansIScanning()
// zero "next" at the end // zero "next" at the end
Assert.IsFalse(iter.MoveNext()); Assert.IsFalse(iter.MoveNext());
Assert.AreEqual(0, s0.NextCursor); Assert.AreNotEqual(0, s0.Cursor);
Assert.AreEqual(0, s1.NextCursor); Assert.AreNotEqual(0, s1.Cursor);
Assert.AreNotEqual(0, s0.CurrentCursor);
Assert.AreNotEqual(0, s1.CurrentCursor);
} }
} }
} }
...@@ -106,7 +98,8 @@ public void ScanResume() ...@@ -106,7 +98,8 @@ public void ScanResume()
} }
var expected = new HashSet<string>(); var expected = new HashSet<string>();
long snap = 0; long snapCursor = 0;
int snapOffset = 0;
i = 0; i = 0;
var seq = server.Keys(DB, pageSize: 15); var seq = server.Keys(DB, pageSize: 15);
...@@ -116,14 +109,16 @@ public void ScanResume() ...@@ -116,14 +109,16 @@ public void ScanResume()
if (i < 57) continue; if (i < 57) continue;
if (i == 57) if (i == 57)
{ {
snap = ((IScanning)seq).CurrentCursor; snapCursor = ((IScanningCursor)seq).Cursor;
snapOffset = ((IScanningCursor)seq).PageOffset;
} }
expected.Add((string)key); expected.Add((string)key);
} }
Assert.AreNotEqual(43, expected.Count); Assert.AreNotEqual(43, expected.Count);
Assert.AreNotEqual(0, snap); Assert.AreNotEqual(0, snapCursor);
Assert.AreEqual(11, snapOffset);
seq = server.Keys(DB, pageSize: 15, cursor: snap); seq = server.Keys(DB, pageSize: 15, cursor: snapCursor, pageOffset: snapOffset);
int count = 0; int count = 0;
foreach(var key in seq) foreach(var key in seq)
{ {
...@@ -131,7 +126,7 @@ public void ScanResume() ...@@ -131,7 +126,7 @@ public void ScanResume()
count++; count++;
} }
Assert.AreEqual(0, expected.Count); Assert.AreEqual(0, expected.Count);
Assert.AreEqual(55, count); // expect some overlap due to paged, etc Assert.AreEqual(44, count); // expect the initial item to be repeated
} }
} }
......
...@@ -141,11 +141,11 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -141,11 +141,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags); IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary> /// <summary>
/// The HSCAN command is used to incrementally iterate over a hash; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanning</i>. /// The HSCAN command is used to incrementally iterate over a hash; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary> /// </summary>
/// <returns>yields all elements of the hash.</returns> /// <returns>yields all elements of the hash.</returns>
/// <remarks>http://redis.io/commands/hscan</remarks> /// <remarks>http://redis.io/commands/hscan</remarks>
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None); IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Sets the specified fields to their respective values in the hash stored at key. This command overwrites any existing fields in the hash. If key does not exist, a new key holding a hash is created. /// Sets the specified fields to their respective values in the hash stored at key. This command overwrites any existing fields in the hash. If key does not exist, a new key holding a hash is created.
...@@ -589,11 +589,11 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -589,11 +589,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags); IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary> /// <summary>
/// The SSCAN command is used to incrementally iterate over set; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanning</i>. /// The SSCAN command is used to incrementally iterate over set; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary> /// </summary>
/// <returns>yields all elements of the set.</returns> /// <returns>yields all elements of the set.</returns>
/// <remarks>http://redis.io/commands/sscan</remarks> /// <remarks>http://redis.io/commands/sscan</remarks>
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None); IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Sorts a list, set or sorted set (numerically or alphabetically, ascending by default); By default, the elements themselves are compared, but the values can also be /// Sorts a list, set or sorted set (numerically or alphabetically, ascending by default); By default, the elements themselves are compared, but the values can also be
...@@ -782,11 +782,11 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -782,11 +782,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags); IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary> /// <summary>
/// The ZSCAN command is used to incrementally iterate over a sorted set; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanning</i>. /// The ZSCAN command is used to incrementally iterate over a sorted set; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary> /// </summary>
/// <returns>yields all elements of the sorted set.</returns> /// <returns>yields all elements of the sorted set.</returns>
/// <remarks>http://redis.io/commands/zscan</remarks> /// <remarks>http://redis.io/commands/zscan</remarks>
IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None); IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Returns the score of member in the sorted set at key; If member does not exist in the sorted set, or key does not exist, nil is returned. /// Returns the score of member in the sorted set at key; If member does not exist in the sorted set, or key does not exist, nil is returned.
/// </summary> /// </summary>
......
...@@ -19,22 +19,22 @@ public partial interface IRedis : IRedisAsync ...@@ -19,22 +19,22 @@ public partial interface IRedis : IRedisAsync
/// <summary> /// <summary>
/// Represents a resumable, cursor-based scanning operation /// Represents a resumable, cursor-based scanning operation
/// </summary> /// </summary>
public interface IScanning public interface IScanningCursor
{ {
/// <summary> /// <summary>
/// Returns the cursor that represents the *active* page of results (not the pending/next page of results) /// Returns the cursor that represents the *active* page of results (not the pending/next page of results as returned by SCAN/HSCAN/ZSCAN/SSCAN)
/// </summary> /// </summary>
long CurrentCursor { get; } long Cursor { get; }
/// <summary> /// <summary>
/// Returns the cursor for the *pending/next* page of results /// The page size of the current operation
/// </summary> /// </summary>
long NextCursor { get; } int PageSize { get; }
/// <summary> /// <summary>
/// The page size of the current operation /// The offset into the current page
/// </summary> /// </summary>
int PageSize { get; } int PageOffset { get; }
} }
[Conditional("DEBUG")] [Conditional("DEBUG")]
......
...@@ -232,12 +232,12 @@ public partial interface IServer : IRedis ...@@ -232,12 +232,12 @@ public partial interface IServer : IRedis
IEnumerable<RedisKey> Keys(int database, RedisValue pattern, int pageSize, CommandFlags flags); IEnumerable<RedisKey> Keys(int database, RedisValue pattern, int pageSize, CommandFlags flags);
/// <summary> /// <summary>
/// Returns all keys matching pattern; the KEYS or SCAN commands will be used based on the server capabilities; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanning</i>. /// Returns all keys matching pattern; the KEYS or SCAN commands will be used based on the server capabilities; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary> /// </summary>
/// <remarks>Warning: consider KEYS as a command that should only be used in production environments with extreme care.</remarks> /// <remarks>Warning: consider KEYS as a command that should only be used in production environments with extreme care.</remarks>
/// <remarks>http://redis.io/commands/keys</remarks> /// <remarks>http://redis.io/commands/keys</remarks>
/// <remarks>http://redis.io/commands/scan</remarks> /// <remarks>http://redis.io/commands/scan</remarks>
IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None); IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Return the time of the last DB save executed with success. A client may check if a BGSAVE command succeeded reading the LASTSAVE value, then issuing a BGSAVE command and checking at regular intervals every N seconds if LASTSAVE changed. /// Return the time of the last DB save executed with success. A client may check if a BGSAVE command succeeded reading the LASTSAVE value, then issuing a BGSAVE command and checking at regular intervals every N seconds if LASTSAVE changed.
......
...@@ -616,29 +616,29 @@ public TimeSpan Ping(CommandFlags flags = CommandFlags.None) ...@@ -616,29 +616,29 @@ public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return HashScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, flags); return HashScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, 0, flags);
} }
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
return this.Inner.HashScan(this.ToInner(key), pattern, pageSize, flags); return this.Inner.HashScan(this.ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
} }
IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return SetScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, flags); return SetScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, 0, flags);
} }
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
return this.Inner.SetScan(this.ToInner(key), pattern, pageSize, cursor, flags); return this.Inner.SetScan(this.ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
} }
IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return SortedSetScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, flags); return SortedSetScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, 0, flags);
} }
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
return this.Inner.SortedSetScan(this.ToInner(key), pattern, pageSize, flags); return this.Inner.SortedSetScan(this.ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
} }
......
...@@ -152,16 +152,17 @@ internal static bool IsNil(RedisValue pattern) ...@@ -152,16 +152,17 @@ internal static bool IsNil(RedisValue pattern)
return rawValue.Length == 1 && rawValue[0] == '*'; return rawValue.Length == 1 && rawValue[0] == '*';
} }
} }
internal abstract class CursorEnumerable<T> : IEnumerable<T>, IScanning internal abstract class CursorEnumerable<T> : IEnumerable<T>, IScanningCursor
{ {
private readonly RedisBase redis; private readonly RedisBase redis;
private readonly ServerEndPoint server; private readonly ServerEndPoint server;
protected readonly int db; protected readonly int db;
protected readonly CommandFlags flags; protected readonly CommandFlags flags;
protected readonly int pageSize; protected readonly int pageSize, initialOffset;
protected readonly long initialCursor; protected readonly long initialCursor;
private volatile IScanningCursor activeCursor;
protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int pageSize, long cursor, CommandFlags flags) protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int pageSize, long cursor, int pageOffset, CommandFlags flags)
{ {
this.redis = redis; this.redis = redis;
this.server = server; this.server = server;
...@@ -169,6 +170,7 @@ protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int p ...@@ -169,6 +170,7 @@ protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int p
this.pageSize = pageSize; this.pageSize = pageSize;
this.flags = flags; this.flags = flags;
this.initialCursor = cursor; this.initialCursor = cursor;
this.initialOffset = pageOffset;
} }
public IEnumerator<T> GetEnumerator() public IEnumerator<T> GetEnumerator()
...@@ -192,22 +194,17 @@ public ScanResult(long cursor, T[] values) ...@@ -192,22 +194,17 @@ public ScanResult(long cursor, T[] values)
protected abstract Message CreateMessage(long cursor); protected abstract Message CreateMessage(long cursor);
private long currentCursor, nextCursor;
internal void SetPosition(long current, long next)
{
Interlocked.Exchange(ref currentCursor, current);
Interlocked.Exchange(ref nextCursor, next);
}
protected abstract ResultProcessor<ScanResult> Processor { get; } protected abstract ResultProcessor<ScanResult> Processor { get; }
protected ScanResult GetNextPageSync(long cursor) protected ScanResult GetNextPageSync(IScanningCursor obj, long cursor)
{ {
this.activeCursor = obj;
return redis.ExecuteSync(CreateMessage(cursor), Processor, server); return redis.ExecuteSync(CreateMessage(cursor), Processor, server);
} }
protected Task<ScanResult> GetNextPageAsync(long cursor) protected Task<ScanResult> GetNextPageAsync(IScanningCursor obj, long cursor)
{ {
this.activeCursor = obj;
return redis.ExecuteAsync(CreateMessage(cursor), Processor, server); return redis.ExecuteAsync(CreateMessage(cursor), Processor, server);
} }
protected ScanResult Wait(Task<ScanResult> pending) protected ScanResult Wait(Task<ScanResult> pending)
...@@ -215,7 +212,7 @@ protected ScanResult Wait(Task<ScanResult> pending) ...@@ -215,7 +212,7 @@ protected ScanResult Wait(Task<ScanResult> pending)
return redis.Wait(pending); return redis.Wait(pending);
} }
class CursorEnumerator : IEnumerator<T>, IScanning class CursorEnumerator : IEnumerator<T>, IScanningCursor
{ {
private CursorEnumerable<T> parent; private CursorEnumerable<T> parent;
public CursorEnumerator(CursorEnumerable<T> parent) public CursorEnumerator(CursorEnumerable<T> parent)
...@@ -233,7 +230,13 @@ public T Current ...@@ -233,7 +230,13 @@ public T Current
object System.Collections.IEnumerator.Current object System.Collections.IEnumerator.Current
{ {
get { return page[pageIndex]; ; } get { return page[pageIndex]; }
}
private void LoadNextPageAsync()
{
if(pending == null && nextCursor != 0)
pending = parent.GetNextPageAsync(this, nextCursor);
} }
private bool SimpleNext() private bool SimpleNext()
...@@ -241,10 +244,7 @@ private bool SimpleNext() ...@@ -241,10 +244,7 @@ private bool SimpleNext()
if (page != null && ++pageIndex < page.Length) if (page != null && ++pageIndex < page.Length)
{ {
// first of a new page? cool; start a new background op, because we're about to exit the iterator // first of a new page? cool; start a new background op, because we're about to exit the iterator
if (pageIndex == 0 && pending == null && nextCursor != 0) if (pageIndex == 0) LoadNextPageAsync();
{
pending = parent.GetNextPageAsync(nextCursor);
}
return true; return true;
} }
return false; return false;
...@@ -266,10 +266,11 @@ private enum State : byte ...@@ -266,10 +266,11 @@ private enum State : byte
void ProcessReply(ScanResult result) void ProcessReply(ScanResult result)
{ {
pending = null; currentCursor = nextCursor;
page = result.Values; nextCursor = result.Cursor;
pageIndex = -1; pageIndex = -1;
parent.SetPosition(currentCursor = nextCursor, nextCursor = result.Cursor); page = result.Values;
pending = null;
} }
public bool MoveNext() public bool MoveNext()
...@@ -279,8 +280,10 @@ public bool MoveNext() ...@@ -279,8 +280,10 @@ public bool MoveNext()
case State.Complete: case State.Complete:
return false; return false;
case State.Initial: case State.Initial:
ProcessReply(parent.GetNextPageSync(nextCursor)); ProcessReply(parent.GetNextPageSync(this, nextCursor));
pageIndex = parent.initialOffset - 1;
state = State.Running; state = State.Running;
LoadNextPageAsync();
goto case State.Running; goto case State.Running;
case State.Running: case State.Running:
// are we working through the current buffer? // are we working through the current buffer?
...@@ -296,7 +299,7 @@ public bool MoveNext() ...@@ -296,7 +299,7 @@ public bool MoveNext()
// nothing outstanding? wait synchronously // nothing outstanding? wait synchronously
while(nextCursor != 0) while(nextCursor != 0)
{ {
ProcessReply(parent.GetNextPageSync(nextCursor)); ProcessReply(parent.GetNextPageSync(this, nextCursor));
if (SimpleNext()) return true; if (SimpleNext()) return true;
} }
...@@ -313,41 +316,40 @@ public void Reset() ...@@ -313,41 +316,40 @@ public void Reset()
{ {
if(state == State.Disposed) throw new ObjectDisposedException(GetType().Name); if(state == State.Disposed) throw new ObjectDisposedException(GetType().Name);
nextCursor = currentCursor = parent.initialCursor; nextCursor = currentCursor = parent.initialCursor;
pageIndex = parent.initialOffset - 1;
state = State.Initial; state = State.Initial;
page = null; page = null;
pageIndex = -1;
pending = null; pending = null;
} }
long IScanning.CurrentCursor long IScanningCursor.Cursor
{ {
get { return currentCursor; } get { return currentCursor; }
} }
long IScanning.NextCursor int IScanningCursor.PageSize
{ {
get { return nextCursor; } get { return parent.pageSize; }
} }
int IScanning.PageSize int IScanningCursor.PageOffset
{ {
get { return parent.pageSize; } get { return pageIndex; }
} }
} }
long IScanning.CurrentCursor long IScanningCursor.Cursor
{ {
get { return Interlocked.Read(ref currentCursor); } get { var tmp = activeCursor; return tmp == null ? CursorUtils.Origin : tmp.Cursor; }
} }
long IScanning.NextCursor int IScanningCursor.PageSize
{ {
get { return Interlocked.Read(ref nextCursor); } get { return pageSize; }
} }
int IScanningCursor.PageOffset
int IScanning.PageSize
{ {
get { return pageSize; } get { var tmp = activeCursor; return tmp == null ? 0 : tmp.PageOffset; }
} }
} }
} }
......
...@@ -195,15 +195,15 @@ public Task<long> HashLengthAsync(RedisKey key, CommandFlags flags = CommandFlag ...@@ -195,15 +195,15 @@ public Task<long> HashLengthAsync(RedisKey key, CommandFlags flags = CommandFlag
IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return HashScan(key, pattern, pageSize, CursorUtils.Origin, flags); return HashScan(key, pattern, pageSize, CursorUtils.Origin, 0, flags);
} }
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
var scan = TryScan<HashEntry>(key, pattern, pageSize, cursor, flags, RedisCommand.HSCAN, HashScanResultProcessor.Default); var scan = TryScan<HashEntry>(key, pattern, pageSize, cursor, pageOffset, flags, RedisCommand.HSCAN, HashScanResultProcessor.Default);
if (scan != null) return scan; if (scan != null) return scan;
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.HGETALL); if (cursor != 0 || pageOffset != 0) throw ExceptionFactory.NoCursor(RedisCommand.HGETALL);
if (pattern.IsNull) return HashGetAll(key, flags); if (pattern.IsNull) return HashGetAll(key, flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.HSCAN); throw ExceptionFactory.NotSupported(true, RedisCommand.HSCAN);
} }
...@@ -1041,15 +1041,15 @@ public Task<long> SetRemoveAsync(RedisKey key, RedisValue[] values, CommandFlags ...@@ -1041,15 +1041,15 @@ public Task<long> SetRemoveAsync(RedisKey key, RedisValue[] values, CommandFlags
IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return SetScan(key, pattern, pageSize, CursorUtils.Origin, flags); return SetScan(key, pattern, pageSize, CursorUtils.Origin, 0, flags);
} }
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
var scan = TryScan<RedisValue>(key, pattern, pageSize, cursor, flags, RedisCommand.SSCAN, SetScanResultProcessor.Default); var scan = TryScan<RedisValue>(key, pattern, pageSize, cursor, pageOffset, flags, RedisCommand.SSCAN, SetScanResultProcessor.Default);
if (scan != null) return scan; if (scan != null) return scan;
if(cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.SMEMBERS); if(cursor != 0 || pageOffset != 0) throw ExceptionFactory.NoCursor(RedisCommand.SMEMBERS);
if (pattern.IsNull) return SetMembers(key, flags); if (pattern.IsNull) return SetMembers(key, flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.SSCAN); throw ExceptionFactory.NotSupported(true, RedisCommand.SSCAN);
} }
...@@ -1269,15 +1269,15 @@ public Task<long> SortedSetRemoveRangeByScoreAsync(RedisKey key, double start, d ...@@ -1269,15 +1269,15 @@ public Task<long> SortedSetRemoveRangeByScoreAsync(RedisKey key, double start, d
IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return SortedSetScan(key, pattern, pageSize, CursorUtils.Origin, flags); return SortedSetScan(key, pattern, pageSize, CursorUtils.Origin, 0, flags);
} }
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
var scan = TryScan<SortedSetEntry>(key, pattern, pageSize, cursor, flags, RedisCommand.ZSCAN, SortedSetScanResultProcessor.Default); var scan = TryScan<SortedSetEntry>(key, pattern, pageSize, cursor, pageOffset, flags, RedisCommand.ZSCAN, SortedSetScanResultProcessor.Default);
if (scan != null) return scan; if (scan != null) return scan;
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.ZRANGE); if (cursor != 0 || pageOffset != 0) throw ExceptionFactory.NoCursor(RedisCommand.ZRANGE);
if (pattern.IsNull) return SortedSetRangeByRankWithScores(key, flags: flags); if (pattern.IsNull) return SortedSetRangeByRankWithScores(key, flags: flags);
throw ExceptionFactory.NotSupported(true, RedisCommand.ZSCAN); throw ExceptionFactory.NotSupported(true, RedisCommand.ZSCAN);
} }
...@@ -1989,7 +1989,7 @@ private RedisCommand SetOperationCommand(SetOperation operation, bool store) ...@@ -1989,7 +1989,7 @@ private RedisCommand SetOperationCommand(SetOperation operation, bool store)
} }
} }
private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize, long cursor, CommandFlags flags, RedisCommand command, ResultProcessor<ScanIterator<T>.ScanResult> processor) private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags, RedisCommand command, ResultProcessor<ScanIterator<T>.ScanResult> processor)
{ {
if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize"); if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize");
if (!multiplexer.CommandMap.IsAvailable(command)) return null; if (!multiplexer.CommandMap.IsAvailable(command)) return null;
...@@ -1999,7 +1999,7 @@ private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize ...@@ -1999,7 +1999,7 @@ private IEnumerable<T> TryScan<T>(RedisKey key, RedisValue pattern, int pageSize
if (!features.Scan) return null; if (!features.Scan) return null;
if (CursorUtils.IsNil(pattern)) pattern = (byte[])null; if (CursorUtils.IsNil(pattern)) pattern = (byte[])null;
return new ScanIterator<T>(this, server, key, pattern, pageSize, cursor, flags, command, processor); return new ScanIterator<T>(this, server, key, pattern, pageSize, cursor, pageOffset, flags, command, processor);
} }
private Message GetLexMessage(RedisCommand command, RedisKey key, RedisValue min, RedisValue max, Exclude exclude, long skip, long take, CommandFlags flags) private Message GetLexMessage(RedisCommand command, RedisKey key, RedisValue min, RedisValue max, Exclude exclude, long skip, long take, CommandFlags flags)
...@@ -2055,9 +2055,9 @@ internal class ScanIterator<T> : CursorEnumerable<T> ...@@ -2055,9 +2055,9 @@ internal class ScanIterator<T> : CursorEnumerable<T>
private readonly RedisCommand command; private readonly RedisCommand command;
private readonly ResultProcessor<ScanResult> processor; private readonly ResultProcessor<ScanResult> processor;
public ScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey key, RedisValue pattern, int pageSize, long cursor, CommandFlags flags, public ScanIterator(RedisDatabase database, ServerEndPoint server, RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags,
RedisCommand command, ResultProcessor<ScanResult> processor) RedisCommand command, ResultProcessor<ScanResult> processor)
: base(database, server, database.Database, pageSize, cursor, flags) : base(database, server, database.Database, pageSize, cursor, pageOffset, flags)
{ {
this.key = key; this.key = key;
this.pattern = pattern; this.pattern = pattern;
......
...@@ -271,10 +271,10 @@ public Task<string> InfoRawAsync(RedisValue section = default(RedisValue), Comma ...@@ -271,10 +271,10 @@ public Task<string> InfoRawAsync(RedisValue section = default(RedisValue), Comma
IEnumerable<RedisKey> IServer.Keys(int database, RedisValue pattern, int pageSize, CommandFlags flags) IEnumerable<RedisKey> IServer.Keys(int database, RedisValue pattern, int pageSize, CommandFlags flags)
{ {
return Keys(database, pattern, pageSize, CursorUtils.Origin, flags); return Keys(database, pattern, pageSize, CursorUtils.Origin, 0, flags);
} }
public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, CommandFlags flags = CommandFlags.None) public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{ {
if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize"); if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize");
if (CursorUtils.IsNil(pattern)) pattern = RedisLiterals.Wildcard; if (CursorUtils.IsNil(pattern)) pattern = RedisLiterals.Wildcard;
...@@ -283,10 +283,10 @@ public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default ...@@ -283,10 +283,10 @@ public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default
{ {
var features = server.GetFeatures(); var features = server.GetFeatures();
if (features.Scan) return new KeysScanEnumerable(this, database, pattern, pageSize, cursor, flags); if (features.Scan) return new KeysScanEnumerable(this, database, pattern, pageSize, cursor, pageOffset, flags);
} }
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.KEYS); if (cursor != 0 || pageOffset != 0) throw ExceptionFactory.NoCursor(RedisCommand.KEYS);
Message msg = Message.Create(database, flags, RedisCommand.KEYS, pattern); Message msg = Message.Create(database, flags, RedisCommand.KEYS, pattern);
return ExecuteSync(msg, ResultProcessor.RedisKeyArray); return ExecuteSync(msg, ResultProcessor.RedisKeyArray);
} }
...@@ -644,8 +644,8 @@ sealed class KeysScanEnumerable : CursorEnumerable<RedisKey> ...@@ -644,8 +644,8 @@ sealed class KeysScanEnumerable : CursorEnumerable<RedisKey>
{ {
private readonly RedisValue pattern; private readonly RedisValue pattern;
public KeysScanEnumerable(RedisServer server, int db, RedisValue pattern, int pageSize, long cursor, CommandFlags flags) public KeysScanEnumerable(RedisServer server, int db, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
: base(server, server.server, db, pageSize, cursor, flags) : base(server, server.server, db, pageSize, cursor, pageOffset, flags)
{ {
this.pattern = pattern; this.pattern = pattern;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment