Commit 78a5239b authored by Nick Craver's avatar Nick Craver

Cleanup: RedisBase

parent c3f1f2e9
...@@ -42,30 +42,15 @@ public Task QuitAsync(CommandFlags flags = CommandFlags.None) ...@@ -42,30 +42,15 @@ public Task QuitAsync(CommandFlags flags = CommandFlags.None)
return ExecuteAsync(msg, ResultProcessor.DemandOK); return ExecuteAsync(msg, ResultProcessor.DemandOK);
} }
public override string ToString() public override string ToString() => multiplexer.ToString();
{
return multiplexer.ToString();
}
public bool TryWait(Task task) public bool TryWait(Task task) => task.Wait(multiplexer.TimeoutMilliseconds);
{
return task.Wait(multiplexer.TimeoutMilliseconds);
}
public void Wait(Task task) public void Wait(Task task) => multiplexer.Wait(task);
{
multiplexer.Wait(task);
}
public T Wait<T>(Task<T> task) public T Wait<T>(Task<T> task) => multiplexer.Wait(task);
{
return multiplexer.Wait(task);
}
public void WaitAll(params Task[] tasks) public void WaitAll(params Task[] tasks) => multiplexer.WaitAll(tasks);
{
multiplexer.WaitAll(tasks);
}
internal virtual Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server = null) internal virtual Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server = null)
{ {
...@@ -140,7 +125,6 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag ...@@ -140,7 +125,6 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag
return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId); return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
} }
internal static class CursorUtils internal static class CursorUtils
{ {
internal const int Origin = 0, DefaultPageSize = 10; internal const int Origin = 0, DefaultPageSize = 10;
...@@ -152,6 +136,7 @@ internal static bool IsNil(RedisValue pattern) ...@@ -152,6 +136,7 @@ internal static bool IsNil(RedisValue pattern)
return rawValue.Length == 1 && rawValue[0] == '*'; return rawValue.Length == 1 && rawValue[0] == '*';
} }
} }
internal abstract class CursorEnumerable<T> : IEnumerable<T>, IScanningCursor internal abstract class CursorEnumerable<T> : IEnumerable<T>, IScanningCursor
{ {
private readonly RedisBase redis; private readonly RedisBase redis;
...@@ -174,14 +159,10 @@ protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int p ...@@ -174,14 +159,10 @@ protected CursorEnumerable(RedisBase redis, ServerEndPoint server, int db, int p
initialOffset = pageOffset; initialOffset = pageOffset;
} }
public IEnumerator<T> GetEnumerator() public IEnumerator<T> GetEnumerator() => new CursorEnumerator(this);
{
return new CursorEnumerator(this); System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
internal struct ScanResult internal struct ScanResult
{ {
public readonly long Cursor; public readonly long Cursor;
...@@ -195,7 +176,6 @@ public ScanResult(long cursor, T[] values) ...@@ -195,7 +176,6 @@ public ScanResult(long cursor, T[] values)
protected abstract Message CreateMessage(long cursor); protected abstract Message CreateMessage(long cursor);
protected abstract ResultProcessor<ScanResult> Processor { get; } protected abstract ResultProcessor<ScanResult> Processor { get; }
protected ScanResult GetNextPageSync(IScanningCursor obj, long cursor) protected ScanResult GetNextPageSync(IScanningCursor obj, long cursor)
...@@ -203,25 +183,24 @@ protected ScanResult GetNextPageSync(IScanningCursor obj, long cursor) ...@@ -203,25 +183,24 @@ protected ScanResult GetNextPageSync(IScanningCursor obj, long cursor)
activeCursor = obj; activeCursor = obj;
return redis.ExecuteSync(CreateMessage(cursor), Processor, server); return redis.ExecuteSync(CreateMessage(cursor), Processor, server);
} }
protected Task<ScanResult> GetNextPageAsync(IScanningCursor obj, long cursor) protected Task<ScanResult> GetNextPageAsync(IScanningCursor obj, long cursor)
{ {
activeCursor = obj; activeCursor = obj;
return redis.ExecuteAsync(CreateMessage(cursor), Processor, server); return redis.ExecuteAsync(CreateMessage(cursor), Processor, server);
} }
protected ScanResult Wait(Task<ScanResult> pending)
{
return redis.Wait(pending);
}
class CursorEnumerator : IEnumerator<T>, IScanningCursor protected ScanResult Wait(Task<ScanResult> pending) => redis.Wait(pending);
private class CursorEnumerator : IEnumerator<T>, IScanningCursor
{ {
private CursorEnumerable<T> parent; private CursorEnumerable<T> parent;
public CursorEnumerator(CursorEnumerable<T> parent) public CursorEnumerator(CursorEnumerable<T> parent)
{ {
if (parent == null) throw new ArgumentNullException(nameof(parent)); this.parent = parent ?? throw new ArgumentNullException(nameof(parent));
this.parent = parent;
Reset(); Reset();
} }
public T Current => page[pageIndex]; public T Current => page[pageIndex];
void IDisposable.Dispose() { parent = null; state = State.Disposed; } void IDisposable.Dispose() { parent = null; state = State.Disposed; }
...@@ -245,9 +224,9 @@ private bool SimpleNext() ...@@ -245,9 +224,9 @@ private bool SimpleNext()
return false; return false;
} }
T[] page; private T[] page;
Task<ScanResult> pending; private Task<ScanResult> pending;
int pageIndex; private int pageIndex;
private long currentCursor, nextCursor; private long currentCursor, nextCursor;
private State state; private State state;
...@@ -259,7 +238,7 @@ private enum State : byte ...@@ -259,7 +238,7 @@ private enum State : byte
Disposed, Disposed,
} }
void ProcessReply(ScanResult result) private void ProcessReply(ScanResult result)
{ {
currentCursor = nextCursor; currentCursor = nextCursor;
nextCursor = result.Cursor; nextCursor = result.Cursor;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment