Commit e3493310 authored by Nick Craver's avatar Nick Craver

Cleanup

parent 9426e1af
...@@ -119,9 +119,9 @@ public async Task<int> ExecuteIncrByAsync() ...@@ -119,9 +119,9 @@ public async Task<int> ExecuteIncrByAsync()
{ {
int x = rand.Next(50); int x = rand.Next(50);
expected += x; expected += x;
await db.StringIncrementAsync(IncrByKey, x, CommandFlags.FireAndForget); await db.StringIncrementAsync(IncrByKey, x, CommandFlags.FireAndForget).ConfigureAwait(false);
} }
int actual = (int)await db.StringGetAsync(IncrByKey); int actual = (int)await db.StringGetAsync(IncrByKey).ConfigureAwait(false);
if (actual != expected) throw new InvalidOperationException($"expected: {expected}, actual: {actual}"); if (actual != expected) throw new InvalidOperationException($"expected: {expected}, actual: {actual}");
return actual; return actual;
} }
...@@ -160,7 +160,7 @@ public async Task<int> ExecuteGeoRadiusAsync() ...@@ -160,7 +160,7 @@ public async Task<int> ExecuteGeoRadiusAsync()
for (int i = 0; i < COUNT; i++) for (int i = 0; i < COUNT; i++)
{ {
var results = await db.GeoRadiusAsync(GeoKey, 15, 37, 200, GeoUnit.Kilometers, var results = await db.GeoRadiusAsync(GeoKey, 15, 37, 200, GeoUnit.Kilometers,
options: GeoRadiusOptions.WithCoordinates | GeoRadiusOptions.WithDistance | GeoRadiusOptions.WithGeoHash); options: GeoRadiusOptions.WithCoordinates | GeoRadiusOptions.WithDistance | GeoRadiusOptions.WithGeoHash).ConfigureAwait(false);
total += results.Length; total += results.Length;
} }
return total; return total;
...@@ -195,7 +195,7 @@ public async Task LoadAsync() ...@@ -195,7 +195,7 @@ public async Task LoadAsync()
{ {
for (int i = 0; i < max; ++i) for (int i = 0; i < max; ++i)
{ {
await db.StringSetAsync(i.ToString(), i); await db.StringSetAsync(i.ToString(), i).ConfigureAwait(false);
} }
} }
[Benchmark(OperationsPerInvoke = max)] [Benchmark(OperationsPerInvoke = max)]
...@@ -224,7 +224,7 @@ public async Task SampleAsync() ...@@ -224,7 +224,7 @@ public async Task SampleAsync()
{ {
var r = rnd.Next(0, max - 1); var r = rnd.Next(0, max - 1);
var rv = await db.StringGetAsync(r.ToString()); var rv = await db.StringGetAsync(r.ToString()).ConfigureAwait(false);
if (rv != r) if (rv != r)
{ {
throw new Exception($"Unexpected {rv}, expected {r}"); throw new Exception($"Unexpected {rv}, expected {r}");
......
...@@ -23,10 +23,7 @@ public static void Main(string[] args) ...@@ -23,10 +23,7 @@ public static void Main(string[] args)
options.ListenLocalhost(5000); options.ListenLocalhost(5000);
// TCP 6379 // TCP 6379
options.ListenLocalhost(6379, builder => options.ListenLocalhost(6379, builder => builder.UseConnectionHandler<RedisConnectionHandler>());
{
builder.UseConnectionHandler<RedisConnectionHandler>();
});
}).UseStartup<Startup>(); }).UseStartup<Startup>();
} }
} }
...@@ -14,7 +14,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection) ...@@ -14,7 +14,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
{ {
try try
{ {
await _server.RunClientAsync(connection.Transport); await _server.RunClientAsync(connection.Transport).ConfigureAwait(false);
} }
catch (IOException io) when (io.InnerException is UvException uv && uv.StatusCode == -4077) catch (IOException io) when (io.InnerException is UvException uv && uv.StatusCode == -4077)
{ } //swallow libuv disconnect { } //swallow libuv disconnect
......
...@@ -52,10 +52,7 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output) ...@@ -52,10 +52,7 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output)
SyncTimeout = 15000, SyncTimeout = 15000,
}; };
var conn = ConnectionMultiplexer.Connect(options); var conn = ConnectionMultiplexer.Connect(options);
conn.MessageFaulted += (msg, ex, origin) => conn.MessageFaulted += (msg, ex, origin) => output.WriteLine($"Faulted from '{origin}': '{msg}' - {ex.Message}");
{
output.WriteLine($"Faulted from '{origin}': '{msg}' - {ex.Message}");
};
var server = conn.GetServer(ep); var server = conn.GetServer(ep);
var arr = (RedisResult[])server.Execute("module", "list"); var arr = (RedisResult[])server.Execute("module", "list");
bool found = false; bool found = false;
......
...@@ -158,9 +158,12 @@ public RespCommand Resolve(in RedisRequest request) ...@@ -158,9 +158,12 @@ public RespCommand Resolve(in RedisRequest request)
public TypedRedisValue Execute(RedisClient client, RedisRequest request) public TypedRedisValue Execute(RedisClient client, RedisRequest request)
{ {
var args = request.Count; var args = request.Count;
if (!CheckArity(request.Count)) return IsSubCommand if (!CheckArity(request.Count))
? request.UnknownSubcommandOrArgumentCount() {
: request.WrongArgCount(); return IsSubCommand
? request.UnknownSubcommandOrArgumentCount()
: request.WrongArgCount();
}
return _operation(client, request); return _operation(client, request);
} }
...@@ -222,7 +225,7 @@ protected void ThrowIfShutdown() ...@@ -222,7 +225,7 @@ protected void ThrowIfShutdown()
{ {
if (_isShutdown) throw new InvalidOperationException("The server is shutting down"); if (_isShutdown) throw new InvalidOperationException("The server is shutting down");
} }
protected void DoShutdown(ShutdownReason reason, PipeScheduler scheduler = null) protected void DoShutdown(ShutdownReason reason)
{ {
if (_isShutdown) return; if (_isShutdown) return;
Log("Server shutting down..."); Log("Server shutting down...");
...@@ -250,11 +253,11 @@ public async Task RunClientAsync(IDuplexPipe pipe) ...@@ -250,11 +253,11 @@ public async Task RunClientAsync(IDuplexPipe pipe)
client = AddClient(); client = AddClient();
while (!client.Closed) while (!client.Closed)
{ {
var readResult = await pipe.Input.ReadAsync(); var readResult = await pipe.Input.ReadAsync().ConfigureAwait(false);
var buffer = readResult.Buffer; var buffer = readResult.Buffer;
bool makingProgress = false; bool makingProgress = false;
while (!client.Closed && await TryProcessRequestAsync(ref buffer, client, pipe.Output)) while (!client.Closed && await TryProcessRequestAsync(ref buffer, client, pipe.Output).ConfigureAwait(false))
{ {
makingProgress = true; makingProgress = true;
} }
...@@ -358,7 +361,7 @@ void WritePrefix(PipeWriter ooutput, char pprefix) ...@@ -358,7 +361,7 @@ void WritePrefix(PipeWriter ooutput, char pprefix)
throw new InvalidOperationException( throw new InvalidOperationException(
"Unexpected result type: " + value.Type); "Unexpected result type: " + value.Type);
} }
await output.FlushAsync(); await output.FlushAsync().ConfigureAwait(false);
} }
public static bool TryParseRequest(ref ReadOnlySequence<byte> buffer, out RedisRequest request) public static bool TryParseRequest(ref ReadOnlySequence<byte> buffer, out RedisRequest request)
{ {
......
...@@ -116,7 +116,6 @@ public void SocketFailureError() ...@@ -116,7 +116,6 @@ public void SocketFailureError()
} }
Assert.False(true); // force fail Assert.False(true); // force fail
} }
} }
#if DEBUG // needs AllowConnect, which is DEBUG only #if DEBUG // needs AllowConnect, which is DEBUG only
[Fact] [Fact]
......
...@@ -161,7 +161,7 @@ public void ManyContexts() ...@@ -161,7 +161,7 @@ public void ManyContexts()
for (var i = 0; i < tasks.Length; i++) for (var i = 0; i < tasks.Length; i++)
{ {
var ix = i; var ix = i;
var task = Task.Run(async () => tasks[ix] = Task.Run(async () =>
{ {
var db = conn.GetDatabase(ix); var db = conn.GetDatabase(ix);
...@@ -178,8 +178,6 @@ public void ManyContexts() ...@@ -178,8 +178,6 @@ public void ManyContexts()
results[ix] = profiler.GetSession().FinishProfiling(); results[ix] = profiler.GetSession().FinishProfiling();
}); });
tasks[ix] = task;
} }
Task.WhenAll(tasks).Wait(); Task.WhenAll(tasks).Wait();
......
...@@ -221,15 +221,15 @@ public async Task BasicTranWithHashEqualsCondition(string expected, string value ...@@ -221,15 +221,15 @@ public async Task BasicTranWithHashEqualsCondition(string expected, string value
} }
} }
static TaskStatus SafeStatus(Task task) private static TaskStatus SafeStatus(Task task)
{ {
if(task.Status == TaskStatus.WaitingForActivation) if (task.Status == TaskStatus.WaitingForActivation)
{ {
try try
{ {
if (!task.Wait(1000)) throw new TimeoutException("timeout waiting for task to complete"); if (!task.Wait(1000)) throw new TimeoutException("timeout waiting for task to complete");
} }
catch(AggregateException ex) catch (AggregateException ex)
when (ex.InnerException is TaskCanceledException when (ex.InnerException is TaskCanceledException
|| (ex.InnerExceptions.Count == 1 && ex.InnerException is TaskCanceledException)) || (ex.InnerExceptions.Count == 1 && ex.InnerException is TaskCanceledException))
{ {
......
...@@ -155,6 +155,7 @@ public void OnMessage(Action<ChannelMessage> handler) ...@@ -155,6 +155,7 @@ public void OnMessage(Action<ChannelMessage> handler)
ThreadPool.QueueUserWorkItem( ThreadPool.QueueUserWorkItem(
state => ((ChannelMessageQueue)state).OnMessageSyncImpl(), this); state => ((ChannelMessageQueue)state).OnMessageSyncImpl(), this);
} }
private async void OnMessageSyncImpl() private async void OnMessageSyncImpl()
{ {
var handler = (Action<ChannelMessage>)_onMessageHandler; var handler = (Action<ChannelMessage>)_onMessageHandler;
...@@ -207,6 +208,7 @@ private async void OnMessageAsyncImpl() ...@@ -207,6 +208,7 @@ private async void OnMessageAsyncImpl()
catch { } // matches MessageCompletable catch { } // matches MessageCompletable
} }
} }
internal void UnsubscribeImpl(Exception error = null, CommandFlags flags = CommandFlags.None) internal void UnsubscribeImpl(Exception error = null, CommandFlags flags = CommandFlags.None)
{ {
var parent = _parent; var parent = _parent;
...@@ -216,15 +218,15 @@ internal void UnsubscribeImpl(Exception error = null, CommandFlags flags = Comma ...@@ -216,15 +218,15 @@ internal void UnsubscribeImpl(Exception error = null, CommandFlags flags = Comma
parent.UnsubscribeAsync(Channel, HandleMessage, flags); parent.UnsubscribeAsync(Channel, HandleMessage, flags);
} }
_queue.Writer.TryComplete(error); _queue.Writer.TryComplete(error);
} }
internal async Task UnsubscribeAsyncImpl(Exception error = null, CommandFlags flags = CommandFlags.None) internal async Task UnsubscribeAsyncImpl(Exception error = null, CommandFlags flags = CommandFlags.None)
{ {
var parent = _parent; var parent = _parent;
_parent = null; _parent = null;
if (parent != null) if (parent != null)
{ {
await parent.UnsubscribeAsync(Channel, HandleMessage, flags).ConfigureAwait(false); await parent.UnsubscribeAsync(Channel, HandleMessage, flags).ConfigureAwait(false);
} }
_queue.Writer.TryComplete(error); _queue.Writer.TryComplete(error);
} }
......
...@@ -12,7 +12,6 @@ public static void CompleteSyncOrAsync(this CompletionManager manager, ICompleta ...@@ -12,7 +12,6 @@ public static void CompleteSyncOrAsync(this CompletionManager manager, ICompleta
} }
internal sealed partial class CompletionManager internal sealed partial class CompletionManager
{ {
internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, ICompletable operation) internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, ICompletable operation)
{ {
if (operation == null) return; if (operation == null) return;
......
...@@ -86,6 +86,7 @@ public bool MoveNext() ...@@ -86,6 +86,7 @@ public bool MoveNext()
/// <summary> /// <summary>
/// Returns the number of commands captured in this snapshot that match a condition /// Returns the number of commands captured in this snapshot that match a condition
/// </summary> /// </summary>
/// <param name="predicate">The predicate to match.</param>
public int Count(Func<IProfiledCommand, bool> predicate) public int Count(Func<IProfiledCommand, bool> predicate)
{ {
if (predicate == null) throw new ArgumentNullException(nameof(predicate)); if (predicate == null) throw new ArgumentNullException(nameof(predicate));
......
...@@ -29,7 +29,7 @@ protected static void IncrementAllocationCount() ...@@ -29,7 +29,7 @@ protected static void IncrementAllocationCount()
// that call Cancel are transactions etc - TCS-based, and we detect // that call Cancel are transactions etc - TCS-based, and we detect
// that and use TrySetCanceled instead // that and use TrySetCanceled instead
// about any confusion in stack-trace // about any confusion in stack-trace
static readonly Exception s_cancelled = new TaskCanceledException(); private static readonly Exception s_cancelled = new TaskCanceledException();
} }
internal sealed class ResultBox<T> : ResultBox internal sealed class ResultBox<T> : ResultBox
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment