Unverified Commit fbe89e2a authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Async enumerable (#1087)

* first steps into async-enumerable

* move to the old iterator

* make it compile

* add missing APIs

* re-hide CursorEnumerable<T> - invent dummy enumerable APIs for now; implement KeysAsync; make pageOffset work for all; increase the default library page size, because 10 is *way* too small - noting that we still need to compare to 10 when building messages

* missing awaits

* fix range error in ValuePairInterleavedProcessorBase<T>; fix HSCAN tests

* async streams is "8.0", not "preview"

* update async enumerable API

* eek, the special compiler trick for spans only apples to byte - presumably due to endianness; this was allocating *lots*

* fix page size merge fail

* lib updates

* fix mock tests

* fix async signature detection re IAsyncEnumerable

* fix bug in interleaved pair processor

* fix more scanning glitches

* fix resume on scans

* detect and warn on thread-theft

* more logs in NoticesConnectFail
parent 9b8bdf36
......@@ -27,7 +27,7 @@
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Nerdbank.GitVersioning" Version="3.0.28" PrivateAssets="all" />
<PackageReference Include="Nerdbank.GitVersioning" Version="3.0.50" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="all"/>
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="all" />
</ItemGroup>
......
......@@ -25,7 +25,7 @@ to run whatever it is that your continuation wanted. In our case, that would be
as that would mean that the dedicated reader loop (that is meant to be processing results
from redis) is now running your application logic instead; this is **thread theft**, and
would exhibit as lots of timeouts with `rs: CompletePendingMessage` in the information (`rs`
is the **r**eader **s**tate; you shouldn't often observe it in the `CompletePendingMessage`
is the **r**eader **s**tate; you shouldn't often observe it in the `CompletePendingMessage*`
step, because it is meant to be very fast; if you are seeing it often it probably means
that the reader is being hijacked when trying to set results).
......
......@@ -5,7 +5,7 @@ Similarly, verify you are not getting CPU bound on client or on the server box w
Are you experiencing "thread theft" of the reader?
---------------
The parameter “`qs`” in the error message tells you the state of the reader; if this is frquently reporting `CompletePendingMessage`,
The parameter “`qs`” in the error message tells you the state of the reader; if this is frquently reporting `CompletePendingMessage*`,
it is possible that the reader loop has been hijacked; see [Thread Theft](ThreadTheft) for specific guidance.
Are there commands taking a long time to process on the redis-server?
......
This diff is collapsed.
......@@ -246,6 +246,13 @@ void add(string lk, string sk, string v)
if (server != null)
{
server.GetOutstandingCount(message.Command, out int inst, out int qs, out long @in, out int qu, out bool aw, out long toRead, out long toWrite, out var bs, out var rs, out var ws);
switch(rs)
{
case PhysicalConnection.ReadStatus.CompletePendingMessageAsync:
case PhysicalConnection.ReadStatus.CompletePendingMessageSync:
sb.Append(" ** possible thread-theft indicated; see https://stackexchange.github.io/StackExchange.Redis/ThreadTheft ** ");
break;
}
add("OpsSinceLastHeartbeat", "inst", inst.ToString());
add("Queue-Awaiting-Write", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString());
......
......@@ -334,7 +334,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Yields all elements of the hash matching the pattern.</returns>
/// <remarks>https://redis.io/commands/hscan</remarks>
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sets the specified fields to their respective values in the hash stored at key. This command overwrites any specified fields that already exist in the hash, leaving other unspecified fields untouched. If key does not exist, a new key holding a hash is created.
......@@ -1075,7 +1075,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Yields all matching elements of the set.</returns>
/// <remarks>https://redis.io/commands/sscan</remarks>
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sorts a list, set or sorted set (numerically or alphabetically, ascending by default); By default, the elements themselves are compared, but the values can also be
......@@ -1446,7 +1446,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/commands/zscan</remarks>
IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key,
RedisValue pattern = default(RedisValue),
int pageSize = RedisBase.CursorUtils.DefaultPageSize,
int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize,
long cursor = RedisBase.CursorUtils.Origin,
int pageOffset = 0,
CommandFlags flags = CommandFlags.None);
......
......@@ -300,6 +300,19 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/commands/hlen</remarks>
Task<long> HashLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The HSCAN command is used to incrementally iterate over a hash; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary>
/// <param name="key">The key of the hash.</param>
/// <param name="pattern">The pattern of keys to get entries for.</param>
/// <param name="pageSize">The page size to iterate by.</param>
/// <param name="cursor">The cursor position to start at.</param>
/// <param name="pageOffset">The page offset to start at.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Yields all elements of the hash matching the pattern.</returns>
/// <remarks>https://redis.io/commands/hscan</remarks>
IAsyncEnumerable<HashEntry> HashScanAsync(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sets the specified fields to their respective values in the hash stored at key. This command overwrites any specified fields that already exist in the hash, leaving other unspecified fields untouched. If key does not exist, a new key holding a hash is created.
/// </summary>
......@@ -1362,6 +1375,32 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/commands/zremrangebylex</remarks>
Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min, RedisValue max, Exclude exclude = Exclude.None, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The SSCAN command is used to incrementally iterate over set; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary>
/// <param name="key">The key of the set.</param>
/// <param name="pattern">The pattern to match.</param>
/// <param name="pageSize">The page size to iterate by.</param>
/// <param name="cursor">The cursor position to start at.</param>
/// <param name="pageOffset">The page offset to start at.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Yields all matching elements of the set.</returns>
/// <remarks>https://redis.io/commands/sscan</remarks>
IAsyncEnumerable<RedisValue> SetScanAsync(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The ZSCAN command is used to incrementally iterate over a sorted set
/// </summary>
/// <param name="key">The key of the sorted set.</param>
/// <param name="pattern">The pattern to match.</param>
/// <param name="pageSize">The page size to iterate by.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="cursor">The cursor position to start at.</param>
/// <param name="pageOffset">The page offset to start at.</param>
/// <returns>Yields all matching elements of the sorted set.</returns>
/// <remarks>https://redis.io/commands/zscan</remarks>
IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the score of member in the sorted set at key; If member does not exist in the sorted set, or key does not exist, nil is returned.
/// </summary>
......
......@@ -371,7 +371,21 @@ public partial interface IServer : IRedis
/// <remarks>Warning: consider KEYS as a command that should only be used in production environments with extreme care.</remarks>
/// <remarks>https://redis.io/commands/keys</remarks>
/// <remarks>https://redis.io/commands/scan</remarks>
IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns all keys matching pattern; the KEYS or SCAN commands will be used based on the server capabilities; note: to resume an iteration via <i>cursor</i>, cast the original enumerable or enumerator to <i>IScanningCursor</i>.
/// </summary>
/// <param name="database">The database ID.</param>
/// <param name="pattern">The pattern to use.</param>
/// <param name="pageSize">The page size to iterate by.</param>
/// <param name="cursor">The cursor position to resume at.</param>
/// <param name="pageOffset">The page offset to start at.</param>
/// <param name="flags">The command flags to use.</param>
/// <remarks>Warning: consider KEYS as a command that should only be used in production environments with extreme care.</remarks>
/// <remarks>https://redis.io/commands/keys</remarks>
/// <remarks>https://redis.io/commands/scan</remarks>
IAsyncEnumerable<RedisKey> KeysAsync(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultLibraryPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return the time of the last DB save executed with success. A client may check if a BGSAVE command succeeded reading the LASTSAVE value, then issuing a BGSAVE command and checking at regular intervals every N seconds if LASTSAVE changed.
......
......@@ -867,34 +867,22 @@ public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
}
IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return HashScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, 0, flags);
}
=> Inner.HashScan(ToInner(key), pattern, pageSize, flags);
public IEnumerable<HashEntry> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{
return Inner.HashScan(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
}
IEnumerable<HashEntry> IDatabase.HashScan(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> Inner.HashScan(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return SetScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, 0, flags);
}
=> Inner.SetScan(ToInner(key), pattern, pageSize, flags);
public IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{
return Inner.SetScan(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
}
IEnumerable<RedisValue> IDatabase.SetScan(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> Inner.SetScan(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, CommandFlags flags)
{
return SortedSetScan(key, pattern, pageSize, RedisBase.CursorUtils.Origin, 0, flags);
}
=> Inner.SortedSetScan(ToInner(key), pattern, pageSize, flags);
public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisBase.CursorUtils.DefaultPageSize, long cursor = RedisBase.CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
{
return Inner.SortedSetScan(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
}
IEnumerable<SortedSetEntry> IDatabase.SortedSetScan(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> Inner.SortedSetScan(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
public bool KeyTouch(RedisKey key, CommandFlags flags = CommandFlags.None)
{
......@@ -905,6 +893,5 @@ public long KeyTouch(RedisKey[] keys, CommandFlags flags = CommandFlags.None)
{
return Inner.KeyTouch(ToInner(keys), flags);
}
}
}
......@@ -123,6 +123,9 @@ public Task<long> HashLengthAsync(RedisKey key, CommandFlags flags = CommandFlag
return Inner.HashLengthAsync(ToInner(key), flags);
}
public IAsyncEnumerable<HashEntry> HashScanAsync(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> Inner.HashScanAsync(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
public Task<bool> HashSetAsync(RedisKey key, RedisValue hashField, RedisValue value, When when = When.Always, CommandFlags flags = CommandFlags.None)
{
return Inner.HashSetAsync(ToInner(key), hashField, value, when, flags);
......@@ -466,6 +469,9 @@ public Task<long> SetRemoveAsync(RedisKey key, RedisValue[] values, CommandFlags
return Inner.SetRemoveAsync(ToInner(key), values, flags);
}
public IAsyncEnumerable<RedisValue> SetScanAsync(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> Inner.SetScanAsync(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
public Task<bool> SetRemoveAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
return Inner.SetRemoveAsync(ToInner(key), value, flags);
......@@ -596,6 +602,9 @@ public Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min,
return Inner.SortedSetScoreAsync(ToInner(key), member, flags);
}
public IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> Inner.SortedSetScanAsync(ToInner(key), pattern, pageSize, cursor, pageOffset, flags);
public Task<SortedSetEntry?> SortedSetPopAsync(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.SortedSetPopAsync(ToInner(key), order, flags);
......
......@@ -115,7 +115,10 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag
internal static class CursorUtils
{
internal const int Origin = 0, DefaultPageSize = 10;
internal const int
Origin = 0,
DefaultRedisPageSize = 10,
DefaultLibraryPageSize = 250;
internal static bool IsNil(in RedisValue pattern)
{
if (pattern.IsNullOrEmpty) return true;
......@@ -124,199 +127,5 @@ internal static bool IsNil(in RedisValue pattern)
return rawValue.Length == 1 && rawValue[0] == '*';
}
}
internal abstract class CursorEnumerable<T> : IEnumerable<T>, IScanningCursor
{
private readonly RedisBase redis;
private readonly ServerEndPoint server;
protected readonly int db;
protected readonly CommandFlags flags;
protected readonly int pageSize, initialOffset;
protected readonly long initialCursor;
private volatile IScanningCursor activeCursor;
protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int pageSize, long cursor, int pageOffset, CommandFlags flags)
{
if (pageOffset < 0) throw new ArgumentOutOfRangeException(nameof(pageOffset));
this.redis = redis;
this.server = server;
this.db = db;
this.pageSize = pageSize;
this.flags = flags;
initialCursor = cursor;
initialOffset = pageOffset;
}
public IEnumerator<T> GetEnumerator() => new CursorEnumerator(this);
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
internal readonly struct ScanResult
{
public readonly long Cursor;
public readonly T[] Values;
public ScanResult(long cursor, T[] values)
{
Cursor = cursor;
Values = values;
}
}
protected abstract Message CreateMessage(long cursor);
protected abstract ResultProcessor<ScanResult> Processor { get; }
protected ScanResult GetNextPageSync(IScanningCursor obj, long cursor)
{
activeCursor = obj;
return redis.ExecuteSync(CreateMessage(cursor), Processor, server);
}
protected Task<ScanResult> GetNextPageAsync(IScanningCursor obj, long cursor, out Message message)
{
activeCursor = obj;
message = CreateMessage(cursor);
return redis.ExecuteAsync(message, Processor, server);
}
protected bool TryWait(Task<ScanResult> pending) => redis.TryWait(pending);
private class CursorEnumerator : IEnumerator<T>, IScanningCursor
{
private CursorEnumerable<T> parent;
public CursorEnumerator(CursorEnumerable<T> parent)
{
this.parent = parent ?? throw new ArgumentNullException(nameof(parent));
Reset();
}
public T Current => page[pageIndex];
void IDisposable.Dispose() { parent = null; state = State.Disposed; }
object System.Collections.IEnumerator.Current => page[pageIndex];
private void LoadNextPageAsync()
{
if (pending == null && nextCursor != 0)
{
pending = parent.GetNextPageAsync(this, nextCursor, out var message);
pendingMessage = message;
}
}
private bool SimpleNext()
{
if (page != null && ++pageIndex < page.Length)
{
// first of a new page? cool; start a new background op, because we're about to exit the iterator
if (pageIndex == 0) LoadNextPageAsync();
return true;
}
return false;
}
private T[] page;
private Task<ScanResult> pending;
private Message pendingMessage;
private int pageIndex;
private long currentCursor, nextCursor;
private State state;
private enum State : byte
{
Initial,
Running,
Complete,
Disposed,
}
private void ProcessReply(in ScanResult result)
{
currentCursor = nextCursor;
nextCursor = result.Cursor;
pageIndex = -1;
page = result.Values;
pending = null;
pendingMessage = null;
}
public bool MoveNext()
{
switch(state)
{
case State.Complete:
return false;
case State.Initial:
ProcessReply(parent.GetNextPageSync(this, nextCursor));
pageIndex = parent.initialOffset - 1; // will be incremented in a moment
state = State.Running;
LoadNextPageAsync();
goto case State.Running;
case State.Running:
// are we working through the current buffer?
if (SimpleNext()) return true;
// do we have an outstanding operation? wait for the background task to finish
while (pending != null)
{
if (parent.TryWait(pending))
{
ProcessReply(pending.Result);
}
else
{
throw ExceptionFactory.Timeout(parent.redis.multiplexer, null, pendingMessage, parent.server);
}
if (SimpleNext()) return true;
}
// nothing outstanding? wait synchronously
while(nextCursor != 0)
{
ProcessReply(parent.GetNextPageSync(this, nextCursor));
if (SimpleNext()) return true;
}
// we're exhausted
state = State.Complete;
return false;
case State.Disposed:
default:
throw new ObjectDisposedException(GetType().Name);
}
}
public void Reset()
{
if(state == State.Disposed) throw new ObjectDisposedException(GetType().Name);
nextCursor = currentCursor = parent.initialCursor;
pageIndex = parent.initialOffset; // don't -1 here; this makes it look "right" before incremented
state = State.Initial;
page = null;
pending = null;
pendingMessage = null;
}
long IScanningCursor.Cursor => currentCursor;
int IScanningCursor.PageSize => parent.pageSize;
int IScanningCursor.PageOffset => pageIndex;
}
long IScanningCursor.Cursor
{
get { var tmp = activeCursor; return tmp?.Cursor ?? initialCursor; }
}
int IScanningCursor.PageSize => pageSize;
int IScanningCursor.PageOffset
{
get { var tmp = activeCursor; return tmp?.PageOffset ?? initialOffset; }
}
}
}
}
This diff is collapsed.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
......@@ -6,6 +7,7 @@
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial.Arenas;
using static StackExchange.Redis.ConnectionMultiplexer;
#pragma warning disable RCS1231 // Make parameter ref read-only.
......@@ -286,11 +288,15 @@ public Task<string> InfoRawAsync(RedisValue section = default(RedisValue), Comma
}
IEnumerable<RedisKey> IServer.Keys(int database, RedisValue pattern, int pageSize, CommandFlags flags)
{
return Keys(database, pattern, pageSize, CursorUtils.Origin, 0, flags);
}
=> KeysAsync(database, pattern, pageSize, CursorUtils.Origin, 0, flags);
IEnumerable<RedisKey> IServer.Keys(int database, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> KeysAsync(database, pattern, pageSize, cursor, pageOffset, flags);
IAsyncEnumerable<RedisKey> IServer.KeysAsync(int database, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
=> KeysAsync(database, pattern, pageSize, cursor, pageOffset, flags);
public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default(RedisValue), int pageSize = CursorUtils.DefaultPageSize, long cursor = CursorUtils.Origin, int pageOffset = 0, CommandFlags flags = CommandFlags.None)
private CursorEnumerable<RedisKey> KeysAsync(int database, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags)
{
if (pageSize <= 0) throw new ArgumentOutOfRangeException(nameof(pageSize));
if (CursorUtils.IsNil(pattern)) pattern = RedisLiterals.Wildcard;
......@@ -302,9 +308,9 @@ public IEnumerable<RedisKey> Keys(int database = 0, RedisValue pattern = default
if (features.Scan) return new KeysScanEnumerable(this, database, pattern, pageSize, cursor, pageOffset, flags);
}
if (cursor != 0 || pageOffset != 0) throw ExceptionFactory.NoCursor(RedisCommand.KEYS);
if (cursor != 0) throw ExceptionFactory.NoCursor(RedisCommand.KEYS);
Message msg = Message.Create(database, flags, RedisCommand.KEYS, pattern);
return ExecuteSync(msg, ResultProcessor.RedisKeyArray);
return CursorEnumerable<RedisKey>.From(this, server, ExecuteAsync(msg, ResultProcessor.RedisKeyArray), pageOffset);
}
public DateTime LastSave(CommandFlags flags = CommandFlags.None)
......@@ -721,11 +727,11 @@ public KeysScanEnumerable(RedisServer server, int db, RedisValue pattern, int pa
this.pattern = pattern;
}
protected override Message CreateMessage(long cursor)
private protected override Message CreateMessage(long cursor)
{
if (CursorUtils.IsNil(pattern))
{
if (pageSize == CursorUtils.DefaultPageSize)
if (pageSize == CursorUtils.DefaultRedisPageSize)
{
return Message.Create(db, flags, RedisCommand.SCAN, cursor);
}
......@@ -736,7 +742,7 @@ protected override Message CreateMessage(long cursor)
}
else
{
if (pageSize == CursorUtils.DefaultPageSize)
if (pageSize == CursorUtils.DefaultRedisPageSize)
{
return Message.Create(db, flags, RedisCommand.SCAN, cursor, RedisLiterals.MATCH, pattern);
}
......@@ -747,7 +753,7 @@ protected override Message CreateMessage(long cursor)
}
}
protected override ResultProcessor<ScanResult> Processor => processor;
private protected override ResultProcessor<ScanResult> Processor => processor;
public static readonly ResultProcessor<ScanResult> processor = new KeysResultProcessor();
private class KeysResultProcessor : ResultProcessor<ScanResult>
......@@ -762,7 +768,21 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
RawResult inner;
if (arr.Length == 2 && (inner = arr[1]).Type == ResultType.MultiBulk && arr[0].TryGetInt64(out i64))
{
var keysResult = new ScanResult(i64, inner.GetItemsAsKeys());
var items = inner.GetItems();
RedisKey[] keys;
int count;
if (items.IsEmpty)
{
keys = Array.Empty<RedisKey>();
count = 0;
}
else
{
count = (int)items.Length;
keys = ArrayPool<RedisKey>.Shared.Rent(count);
items.CopyTo(keys, (in RawResult r) => r.AsRedisKey());
}
var keysResult = new ScanResult(i64, keys, count, true);
SetResult(message, keysResult);
return true;
}
......
......@@ -552,7 +552,11 @@ protected override HashEntry Parse(in RawResult first, in RawResult second)
internal abstract class ValuePairInterleavedProcessorBase<T> : ResultProcessor<T[]>
{
public bool TryParse(in RawResult result, out T[] pairs)
=> TryParse(result, out pairs, false, out _);
public bool TryParse(in RawResult result, out T[] pairs, bool allowOversized, out int count)
{
count = 0;
switch (result.Type)
{
case ResultType.MultiBulk:
......@@ -563,19 +567,19 @@ public bool TryParse(in RawResult result, out T[] pairs)
}
else
{
int count = (int)arr.Length / 2;
count = (int)arr.Length / 2;
if (count == 0)
{
pairs = Array.Empty<T>();
}
else
{
pairs = new T[count];
pairs = allowOversized ? ArrayPool<T>.Shared.Rent(count) : new T[count];
if (arr.IsSingleSegment)
{
var span = arr.FirstSpan;
int offset = 0;
for (int i = 0; i < pairs.Length; i++)
for (int i = 0; i < count; i++)
{
pairs[i] = Parse(span[offset++], span[offset++]);
}
......@@ -583,7 +587,7 @@ public bool TryParse(in RawResult result, out T[] pairs)
else
{
var iter = arr.GetEnumerator(); // simplest way of getting successive values
for (int i = 0; i < pairs.Length; i++)
for (int i = 0; i < count; i++)
{
pairs[i] = Parse(iter.GetNext(), iter.GetNext());
}
......
......@@ -9,42 +9,42 @@ internal sealed class ServerSelectionStrategy
public const int NoSlot = -1, MultipleSlots = -2;
private const int RedisClusterSlotCount = 16384;
#pragma warning disable IDE1006 // Naming Styles
private static ReadOnlySpan<ushort> s_crc16tab => new ushort[]
private static readonly ushort[] s_crc16tab = new ushort[]
{
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
};
#pragma warning restore IDE1006 // Naming Styles
{ // this syntax allows a special-case population implementation by the compiler/JIT
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
};
private readonly ConnectionMultiplexer multiplexer;
private int anyStartOffset;
......
......@@ -19,5 +19,7 @@
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
<!-- net472 needs this for ZipArchive; I have no idea why this changed, but... meh; note this also demands SDK 2.1.400 -->
<PackageReference Include="System.IO.Compression" Version="4.3.0" Condition="'$(TargetFramework)' == 'net472'" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.0" />
</ItemGroup>
</Project>
\ No newline at end of file
......@@ -24,14 +24,19 @@ public async Task NoticesConnectFail()
// No need to delay, we're going to try a disconnected connection immediately so it'll fail...
conn.IgnoreConnect = true;
Log("simulating failure");
server.SimulateConnectionFailure();
Log("simulated failure");
conn.IgnoreConnect = false;
Log("pinging - expect failure");
Assert.Throws<RedisConnectionException>(() => server.Ping());
Log("pinged");
// Heartbeat should reconnect by now
await Task.Delay(5000).ConfigureAwait(false);
Log("pinging - expect success");
var time = server.Ping();
Log("pinged");
Log(time.ToString());
}
}
......
......@@ -12,7 +12,7 @@ namespace StackExchange.Redis.Tests
public sealed class DatabaseWrapperTests
{
private readonly Mock<IDatabase> mock;
private readonly DatabaseWrapper wrapper;
private readonly IDatabase wrapper;
public DatabaseWrapperTests()
{
......@@ -148,7 +148,14 @@ public void HashLength()
public void HashScan()
{
wrapper.HashScan("key", "pattern", 123, flags: CommandFlags.None);
mock.Verify(_ => _.HashScan("prefix:key", "pattern", 123, 0, 0, CommandFlags.None));
mock.Verify(_ => _.HashScan("prefix:key", "pattern", 123, CommandFlags.None));
}
[Fact]
public void HashScan_Full()
{
wrapper.HashScan("key", "pattern", 123, 42, 64, flags: CommandFlags.None);
mock.Verify(_ => _.HashScan("prefix:key", "pattern", 123, 42, 64, CommandFlags.None));
}
[Fact]
......@@ -619,7 +626,14 @@ public void SetRemove_2()
public void SetScan()
{
wrapper.SetScan("key", "pattern", 123, flags: CommandFlags.None);
mock.Verify(_ => _.SetScan("prefix:key", "pattern", 123, 0, 0, CommandFlags.None));
mock.Verify(_ => _.SetScan("prefix:key", "pattern", 123, CommandFlags.None));
}
[Fact]
public void SetScan_Full()
{
wrapper.SetScan("key", "pattern", 123, 42, 64, flags: CommandFlags.None);
mock.Verify(_ => _.SetScan("prefix:key", "pattern", 123, 42, 64, CommandFlags.None));
}
[Fact]
......@@ -796,7 +810,14 @@ public void SortedSetRemoveRangeByValue()
public void SortedSetScan()
{
wrapper.SortedSetScan("key", "pattern", 123, flags: CommandFlags.None);
mock.Verify(_ => _.SortedSetScan("prefix:key", "pattern", 123, 0, 0, CommandFlags.None));
mock.Verify(_ => _.SortedSetScan("prefix:key", "pattern", 123, CommandFlags.None));
}
[Fact]
public void SortedSetScan_Full()
{
wrapper.SortedSetScan("key", "pattern", 123, 42, 64, flags: CommandFlags.None);
mock.Verify(_ => _.SortedSetScan("prefix:key", "pattern", 123, 42, 64, CommandFlags.None));
}
[Fact]
......
......@@ -39,6 +39,55 @@ public async Task TestIncrBy()
}
}
[Fact]
public async Task ScanAsync()
{
using (var muxer = Create())
{
Skip.IfMissingFeature(muxer, nameof(RedisFeatures.Scan), r => r.Scan);
var conn = muxer.GetDatabase();
var key = Me();
await conn.KeyDeleteAsync(key);
for(int i = 0; i < 200; i++)
{
await conn.HashSetAsync(key, "key" + i, "value " + i);
}
int count = 0;
// works for async
await foreach(var item in conn.HashScanAsync(key, pageSize: 20))
{
count++;
}
Assert.Equal(200, count);
// and sync=>async (via cast)
count = 0;
await foreach (var item in (IAsyncEnumerable<HashEntry>)conn.HashScan(key, pageSize: 20))
{
count++;
}
Assert.Equal(200, count);
// and sync (native)
count = 0;
foreach (var item in conn.HashScan(key, pageSize: 20))
{
count++;
}
Assert.Equal(200, count);
// and async=>sync (via cast)
count = 0;
foreach (var item in (IEnumerable<HashEntry>)conn.HashScanAsync(key, pageSize: 20))
{
count++;
}
Assert.Equal(200, count);
}
}
[Fact]
public void Scan()
{
......
......@@ -228,13 +228,30 @@ private void CheckMethod(MethodInfo method, bool isAsync)
Assert.False(shortName.Contains("If"), fullName + ":If"); // should probably be a When option
var returnType = method.ReturnType ?? typeof(void);
if (isAsync)
{
Assert.True(typeof(Task).IsAssignableFrom(returnType), fullName + ":Task");
Assert.True(IsAsyncMethod(returnType), fullName + ":Task");
}
else
{
Assert.False(typeof(Task).IsAssignableFrom(returnType), fullName + ":Task");
Assert.False(IsAsyncMethod(returnType), fullName + ":Task");
}
static bool IsAsyncMethod(Type returnType)
{
if (returnType == typeof(Task)) return true;
if (returnType == typeof(ValueTask)) return true;
if (returnType.IsGenericType)
{
var genDef = returnType.GetGenericTypeDefinition();
if (genDef == typeof(Task<>)) return true;
if (genDef == typeof(ValueTask<>)) return true;
if (genDef == typeof(IAsyncEnumerable<>)) return true;
}
return false;
}
}
......
......@@ -29,8 +29,19 @@ public void KeysScan(bool supported)
db.StringSet(prefix + i, Guid.NewGuid().ToString(), flags: CommandFlags.FireAndForget);
}
var seq = server.Keys(dbId, pageSize: 50);
bool isScanning = seq is IScanningCursor;
Assert.Equal(supported, isScanning);
var cur = seq as IScanningCursor;
Assert.NotNull(cur);
Log($"Cursor: {cur.Cursor}, PageOffset: {cur.PageOffset}, PageSize: {cur.PageSize}");
Assert.Equal(0, cur.PageOffset);
Assert.Equal(0, cur.Cursor);
if (supported)
{
Assert.Equal(50, cur.PageSize);
}
else
{
Assert.Equal(int.MaxValue, cur.PageSize);
}
Assert.Equal(100, seq.Distinct().Count());
Assert.Equal(100, seq.Distinct().Count());
Assert.Equal(100, server.Keys(dbId, prefix + "*").Distinct().Count());
......@@ -61,7 +72,7 @@ public void ScansIScanning()
Assert.Equal(15, s0.PageSize);
Assert.Equal(15, s1.PageSize);
// start at zero
// start at zero
Assert.Equal(0, s0.Cursor);
Assert.Equal(s0.Cursor, s1.Cursor);
......
......@@ -5,6 +5,7 @@
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<SignAssembly>true</SignAssembly>
<DebugType>full</DebugType>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment